spinny:~/writing $ vim trigger-dev-background-jobs-guide.md
1~2大多数生产应用程序需要的工作不适合请求/响应循环:发送电子邮件、处理上传、运行 AI 管道、同步第三方数据、生成报告。传统的答案是一个队列(Redis、SQS、RabbitMQ)、一个 worker 集群、一个调度程序,以及一堆脆弱的胶水代码,它们会在每次部署时崩溃。3~4[Trigger.dev](https://trigger.dev) 将该堆栈折叠成一个 TypeScript SDK。你编写函数,从任何地方调用它们,平台处理排队、重试、可观察性、调度和持久执行。Task 运行所需的时间 - 没有 10 秒的 serverless 超时,重新部署时不会丢失工作。5~6## 为什么选择 Trigger.dev7~82026 年的转变是持久执行。工作流必须能够在重启、崩溃、部署和速率限制中存活。它们还必须实时将进度流式传输到 UI,并暂停以等待人工输入。Trigger.dev 在版本 3 中围绕这些要求进行了重建,并继续扩展其 AI 基础设施表面。9~10```mermaid11graph LR12 App[你的应用] -->|trigger| API[Trigger.dev API]13 API --> Queue[持久队列]14 Queue --> Worker[Worker 容器]15 Worker -->|run task| Task[你的 Task 代码]16 Task -->|metadata| Realtime[实时流]17 Realtime --> UI[React UI]18 Worker --> Storage[Run 状态存储]19```20~21模型很简单:你将 task 定义为导出,SDK 拾取它们,平台在隔离的容器中调度并运行它们,run 状态被持久化,因此你可以恢复、重试和观察。22~23## 入门24~25### 初始化项目26~27```bash28npx trigger.dev@latest login29npx trigger.dev@latest init30```31~32这会创建一个 `trigger.config.ts` 文件和一个带有示例 task 的 `trigger/` 目录。配置文件是项目的真相来源:哪些目录包含 task、构建设置、生命周期 hook 和运行时选项。33~34```typescript35// trigger.config.ts36import { defineConfig } from "@trigger.dev/sdk";37~38export default defineConfig({39 project: "proj_abc123",40 runtime: "node",41 logLevel: "log",42 maxDuration: 3600,43 retries: {44 enabledInDev: true,45 default: {46 maxAttempts: 3,47 factor: 2,48 minTimeoutInMs: 1000,49 maxTimeoutInMs: 30_000,50 },51 },52 dirs: ["./trigger"],53});54```55~56### 在本地运行 task57~58```bash59npx trigger.dev@latest dev60```61~62dev 服务器连接到云端,注册你的 task,并通过你的本地代码流式传输 run。你在编辑器中设置断点,并在真实触发器上命中它们 - 与你在任何普通 Node.js 项目中使用的循环相同。63~64## 定义一个 Task65~66Task 是一个用唯一 `id` 和 `run` 函数导出的对象。SDK 检查 `dirs` 中的导出并自动注册它们。67~68```typescript69// trigger/send-welcome-email.ts70import { task } from "@trigger.dev/sdk";71import { Resend } from "resend";72~73const resend = new Resend(process.env.RESEND_API_KEY);74~75export const sendWelcomeEmail = task({76 id: "send-welcome-email",77 retry: {78 maxAttempts: 5,79 factor: 1.8,80 minTimeoutInMs: 500,81 maxTimeoutInMs: 30_000,82 },83 run: async (payload: { email: string; name: string }) => {84 const { data, error } = await resend.emails.send({85 from: "hello@spinny.dev",86 to: payload.email,87 subject: `Welcome, ${payload.name}`,88 html: `<p>Glad you are here, ${payload.name}.</p>`,89 });90~91 if (error) throw error;92 return { messageId: data?.id };93 },94});95```96~97需要注意三件事:98~991. **run 体内没有超时。** 平台通过配置中的 `maxDuration` 管理执行时间,而不是在运行时。1002. **Throw 即重试。** SDK 捕获异常并根据 `retry` 策略以指数退避重新运行。1013. **返回值被持久化。** 其他 task 和你的前端可以从任何地方读取 `run.output`。102~103## 触发 Task104~105你从后端、API 路由或另一个 task 调用 task。106~107```typescript108import { sendWelcomeEmail } from "@/trigger/send-welcome-email";109~110const handle = await sendWelcomeEmail.trigger(111 { email: "user@example.com", name: "Alex" },112 {113 idempotencyKey: `welcome-${userId}`,114 concurrencyKey: `tenant-${tenantId}`,115 queue: { name: "emails", concurrencyLimit: 50 },116 delay: "30s",117 ttl: "10m",118 }119);120~121console.log(handle.id); // run_xyz - 用它来跟踪或显示进度122```123~124这些选项在一次调用中解锁了大量行为:125~126- **`idempotencyKey`** - 如果具有相同 key 的 run 已经存在,SDK 返回现有 handle 而不是重复工作。127- **`concurrencyKey`** - 序列化共享 key 的 run,以避免超出每租户的速率限制。128- **`queue.concurrencyLimit`** - 队列在所有 key 上的全局上限。129- **`delay`** - 安排 run 在未来时间运行。130- **`ttl`** - 如果 run 到那时还没有开始,自动让它过期。131~132### 批量触发133~134对于扇出工作负载,`batchTrigger` 每次调用最多接受 500 个项目,并为每个项目创建一个 run。135~136```typescript137await sendWelcomeEmail.batchTrigger(138 newUsers.map((u) => ({139 payload: { email: u.email, name: u.name },140 options: { idempotencyKey: `welcome-${u.id}` },141 }))142);143```144~145## 计划任务146~147Cron 作业成为一等声明。schedule 本身是一个独立的对象,你可以多次将其附加到 task。148~149```typescript150// trigger/daily-digest.ts151import { schedules } from "@trigger.dev/sdk";152~153export const dailyDigest = schedules.task({154 id: "daily-digest",155 cron: "0 9 * * *",156 run: async (payload) => {157 console.log("Scheduled at:", payload.timestamp);158 console.log("Last run:", payload.lastTimestamp);159 console.log("Timezone:", payload.timezone);160 console.log("Next 5 runs:", payload.upcoming);161~162 await sendDigestForDate(payload.timestamp);163 },164});165```166~167对于每租户的 schedule - 比如每个客户一个 cron - 你通过管理 API 动态创建它们。168~169```typescript170import { schedules } from "@trigger.dev/sdk";171~172await schedules.create({173 task: "daily-digest",174 cron: "0 9 * * *",175 timezone: "America/New_York",176 externalId: `customer_${customerId}`,177 deduplicationKey: `digest-${customerId}`,178});179```180~181`deduplicationKey` 使调用幂等:在部署时重新运行相同的代码不会堆叠重复的 schedule。182~183## 队列、并发和幂等性184~185三个原语涵盖了大多数速率限制和排序需求。186~187```mermaid188graph TB189 Trigger[trigger payload] --> IK{idempotencyKey<br/>已见过?}190 IK -->|是| Reuse[返回现有 run]191 IK -->|否| CK[concurrencyKey 桶]192 CK --> Q[带<br/>concurrencyLimit 的队列]193 Q -->|槽位可用| Run[运行 task]194 Q -->|槽位已满| Wait[在队列中等待]195```196~197一个常见模式:每个租户一个队列,每个 key 的小并发,以尊重供应商的速率限制,加上一个幂等性 key 以使重试安全。198~199```typescript200await syncShopifyOrders.trigger(201 { shopId },202 {203 queue: { name: `shopify-${shopId}`, concurrencyLimit: 2 },204 concurrencyKey: shopId,205 idempotencyKey: `sync-${shopId}-${Date.now() / 60_000 | 0}`,206 }207);208```209~210## 等待和长时间运行的工作211~212Task 可以暂停而无需保持连接或消耗计算。平台持久化状态,并在等待完成时恢复函数。213~214```typescript215import { wait } from "@trigger.dev/sdk";216~217export const onboarding = task({218 id: "onboarding",219 run: async (payload: { userId: string }) => {220 await sendWelcomeEmail.triggerAndWait({ userId: payload.userId });221 await wait.for({ days: 1 });222 await sendTipsEmail.trigger({ userId: payload.userId });223 await wait.until({ date: oneWeekFromSignup(payload.userId) });224 await sendUpgradeOffer.trigger({ userId: payload.userId });225 },226});227```228~229`triggerAndWait` 是杀手级特性:它触发一个子 task 并暂停父级直到子级完成。你像 async 函数一样组合 task,但编排在数天或数周内持久运行。230~231### 使用 `wait.forToken` 实现 Human-in-the-loop232~233对于审批流程和 AI 门控,`wait.forToken` 暂停直到你的应用程序回调结果。234~235```typescript236import { task, wait } from "@trigger.dev/sdk";237~238export const publishPost = task({239 id: "publish-post",240 run: async (payload: { draftId: string }) => {241 const draft = await generateAIContent(payload.draftId);242~243 const token = await wait.createToken({ timeout: "7d" });244 await notifyEditor({ draftId: draft.id, token: token.id });245~246 const decision = await wait.forToken<{ approved: boolean; notes?: string }>(247 token.id248 );249~250 if (decision.approved) {251 return await publish(draft);252 }253 return await applyFeedback(draft, decision.notes);254 },255});256```257~258编辑打开 UI,审查草稿,单击批准,你的后端完成 token。Task 从中断的地方继续 - 即使已经过了几个小时或几天。259~260## 生命周期 Hook261~262你可以将 `init`、`onStart`、`onSuccess` 和 `onFailure` 附加到 task 或全局附加到 `trigger.config.ts`。将它们用于追踪、错误报告和共享设置。263~264```typescript265// trigger.config.ts266export default defineConfig({267 // ...268 init: async () => {269 Sentry.init({ dsn: process.env.SENTRY_DSN });270 },271 onFailure: async ({ error, ctx }) => {272 Sentry.captureException(error, {273 tags: { taskId: ctx.task.id, runId: ctx.run.id },274 });275 },276});277```278~279`init` 在每个 worker 容器启动时运行一次,而不是每次 run 运行一次,所以它是设置客户端和池的正确位置。280~281## 前端实时更新282~283Trigger.dev 通过流式 API 发布 run 状态变化 - 状态、元数据、输出。React hook 订阅该流并自动重新渲染。284~285```typescript286// trigger/process-video.ts287import { task, metadata } from "@trigger.dev/sdk";288~289export const processVideo = task({290 id: "process-video",291 run: async (payload: { videoId: string }) => {292 metadata.set("stage", "transcoding");293 await transcode(payload.videoId);294~295 metadata.set("stage", "thumbnails");296 await generateThumbnails(payload.videoId);297~298 metadata.set("stage", "uploading");299 const url = await uploadToCDN(payload.videoId);300~301 return { url };302 },303});304```305~306```tsx307// components/VideoStatus.tsx308"use client";309import { useRealtimeRun } from "@trigger.dev/react-hooks";310import type { processVideo } from "@/trigger/process-video";311~312export function VideoStatus({313 runId,314 publicAccessToken,315}: {316 runId: string;317 publicAccessToken: string;318}) {319 const { run, error } = useRealtimeRun<typeof processVideo>(runId, {320 accessToken: publicAccessToken,321 });322~323 if (error) return <p>Error: {error.message}</p>;324 if (!run) return <p>Loading...</p>;325~326 return (327 <div>328 <p>Status: {run.status}</p>329 <p>Stage: {String(run.metadata?.stage ?? "queued")}</p>330 {run.output?.url && <video src={run.output.url} controls />}331 </div>332 );333}334```335~336你在服务器端生成公共访问 token,作用域为特定的 run,并将其发送到客户端。Hook 处理身份验证、重新连接和增量更新。337~338对于一步触发并订阅:339~340```tsx341import { useRealtimeTaskTrigger } from "@trigger.dev/react-hooks";342~343const { submit, run, isLoading } = useRealtimeTaskTrigger<typeof processVideo>(344 "process-video",345 { accessToken: publicAccessToken }346);347~348<button onClick={() => submit({ videoId })} disabled={isLoading}>349 Process video350</button>;351```352~353## AI 代理和流式传输354~355Trigger.dev 已成为 AI 代理的流行运行时,因为相同的原语 - 持久执行、重试、等待、实时元数据、human-in-the-loop - 正是代理所需要的。在 run 进行时将来自模型提供商的 token 流式传输到 `metadata`,前端实时渲染它们,run 在长时间运行的工具调用中存活,无需消耗 serverless 超时。356~357```typescript358import { task, metadata } from "@trigger.dev/sdk";359import { streamText } from "ai";360import { anthropic } from "@ai-sdk/anthropic";361~362export const researchAgent = task({363 id: "research-agent",364 maxDuration: 1800,365 run: async (payload: { question: string }) => {366 const result = streamText({367 model: anthropic("claude-opus-4-7"),368 system: "You are a research assistant. Use the web.",369 prompt: payload.question,370 tools: { webSearch },371 });372~373 let fullText = "";374 for await (const chunk of result.textStream) {375 fullText += chunk;376 metadata.set("partial", fullText);377 }378~379 return { answer: fullText, usage: await result.usage };380 },381});382```383~384前端使用 `useRealtimeRun` 并读取 `run.metadata.partial` 来渲染流式响应,与渲染聊天补全的方式相同 - 只是这个会在整个页面重新加载后存活下来。385~386## 部署387~388部署将你的 task 编译成版本化的 bundle,构建容器,并以原子方式切换流量。旧的进行中的 run 继续使用先前版本。389~390```bash391npx trigger.dev@latest deploy --env prod392```393~394在 CI 中,你通常将其连接到发布你的应用的同一个工作流中:395~396```yaml397# .github/workflows/deploy.yml398- name: Deploy Trigger.dev399 env:400 TRIGGER_ACCESS_TOKEN: ${{ secrets.TRIGGER_ACCESS_TOKEN }}401 run: npx trigger.dev@latest deploy --env prod402```403~404对于预览环境,传递 `--env preview --branch ${{ github.head_ref }}`,Trigger.dev 会为每个分支创建一个隔离的环境,反映了 Vercel 处理预览部署的方式。405~406## 自托管 vs 云407~408Trigger.dev 在 Apache 2.0 许可证下是开源的。你可以在任何容器平台(Docker Compose、Kubernetes、Fly.io)上自托管,或在 trigger.dev 上使用托管的云。409~410| 方面 | 云 | 自托管 |411|--------|-------|-------------|412| **设置** | 注册,运行 `init` | 运行 docker-compose 或 Helm chart |413| **扩展** | 自动 | 你的责任 |414| **定价** | 按 run + 按计算 | 仅基础设施成本 |415| **合规** | SOC 2 | 你的环境提供的任何东西 |416| **最适合** | 大多数团队 | 严格的数据驻留、自定义基础设施 |417~418SDK 和 CLI 在两种模式之间是相同的 - 你更改一个配置标志并指向你自己的实例。419~420## 最佳实践421~422### 1. 保持 payload 小且可序列化423~424传递 ID 和引用,而不是完整对象。在 task 内部拉取数据。这样保持队列小,payload 记录便宜,并允许你更改数据源而无需重新触发。425~426### 2. 在每个外部调用上使用幂等性 key427~428将 task 触发器上的 `idempotencyKey` 与你的供应商 API(Stripe、OpenAI 等)的幂等性 key 结合使用。重试将在端到端上是安全的。429~430### 3. 使用 `triggerAndWait` 进行编排,而不是 `Promise.all` 的触发器431~432调用 `triggerAndWait` 的父级持久组合子 task。一个触发并立即解析的父级失去了链的可观察性。433~434### 4. 标记 run435~436将 `tags` 添加到触发器(`tags: ["user:123", "feature:onboarding"]`),这样你就可以按业务维度过滤仪表板和管理 API。437~438### 5. 保持 `init` 幂等439~440它在每次冷启动时运行。避免在那里进行迁移或一次性副作用。441~442## 结论443~444Trigger.dev 消除了过去需要从头构建作业系统的工作类别。你编写 async TypeScript,从任何地方调用它,平台开箱即用地为你提供持久执行、调度、队列、重试、实时更新和 human-in-the-loop 模式。445~446驱动夜间 cron 的相同表面是驱动多步 AI 代理(流式传输到前端并暂停以审查)的表面。这种融合使该框架在 2026 年值得认真考虑,无论你是在管理需要可靠后台工作的 SaaS,还是发布在 serverless 超时之外存活的 AI 功能。447~448> **入门检查清单:**449>450> - [x] 在 trigger.dev 注册或运行自托管的 Docker 堆栈451> - [x] 在你的项目中运行 `npx trigger.dev@latest init`452> - [x] 用 `task({ id, run })` 定义你的第一个 task453> - [x] 从你的 API 触发它,并在仪表板中查看 run454> - [x] 添加 `idempotencyKey` 和 `concurrencyKey` 以确保生产安全455> - [x] 将 `useRealtimeRun` 连接到状态组件456> - [x] 从 CI 使用 `trigger.dev deploy --env prod` 部署457~
NORMAL · trigger-dev-background-jobs-guide.md [readonly]457 lines · :q to close