大多数生产应用程序需要的工作不适合请求/响应循环:发送电子邮件、处理上传、运行 AI 管道、同步第三方数据、生成报告。传统的答案是一个队列(Redis、SQS、RabbitMQ)、一个 worker 集群、一个调度程序,以及一堆脆弱的胶水代码,它们会在每次部署时崩溃。
Trigger.dev 将该堆栈折叠成一个 TypeScript SDK。你编写函数,从任何地方调用它们,平台处理排队、重试、可观察性、调度和持久执行。Task 运行所需的时间 - 没有 10 秒的 serverless 超时,重新部署时不会丢失工作。
为什么选择 Trigger.dev
2026 年的转变是持久执行。工作流必须能够在重启、崩溃、部署和速率限制中存活。它们还必须实时将进度流式传输到 UI,并暂停以等待人工输入。Trigger.dev 在版本 3 中围绕这些要求进行了重建,并继续扩展其 AI 基础设施表面。
模型很简单:你将 task 定义为导出,SDK 拾取它们,平台在隔离的容器中调度并运行它们,run 状态被持久化,因此你可以恢复、重试和观察。
入门
初始化项目
npx trigger.dev@latest login npx trigger.dev@latest init
这会创建一个 trigger.config.ts 文件和一个带有示例 task 的 trigger/ 目录。配置文件是项目的真相来源:哪些目录包含 task、构建设置、生命周期 hook 和运行时选项。
// trigger.config.ts import { defineConfig } from "@trigger.dev/sdk"; export default defineConfig({ project: "proj_abc123", runtime: "node", logLevel: "log", maxDuration: 3600, retries: { enabledInDev: true, default: { maxAttempts: 3, factor: 2, minTimeoutInMs: 1000, maxTimeoutInMs: 30_000, }, }, dirs: ["./trigger"], });
在本地运行 task
npx trigger.dev@latest dev
dev 服务器连接到云端,注册你的 task,并通过你的本地代码流式传输 run。你在编辑器中设置断点,并在真实触发器上命中它们 - 与你在任何普通 Node.js 项目中使用的循环相同。
定义一个 Task
Task 是一个用唯一 id 和 run 函数导出的对象。SDK 检查 dirs 中的导出并自动注册它们。
// trigger/send-welcome-email.ts import { task } from "@trigger.dev/sdk"; import { Resend } from "resend"; const resend = new Resend(process.env.RESEND_API_KEY); export const sendWelcomeEmail = task({ id: "send-welcome-email", retry: { maxAttempts: 5, factor: 1.8, minTimeoutInMs: 500, maxTimeoutInMs: 30_000, }, run: async (payload: { email: string; name: string }) => { const { data, error } = await resend.emails.send({ from: "hello@spinny.dev", to: payload.email, subject: `Welcome, ${payload.name}`, html: `<p>Glad you are here, ${payload.name}.</p>`, }); if (error) throw error; return { messageId: data?.id }; }, });
需要注意三件事:
- run 体内没有超时。 平台通过配置中的
maxDuration管理执行时间,而不是在运行时。 - Throw 即重试。 SDK 捕获异常并根据
retry策略以指数退避重新运行。 - 返回值被持久化。 其他 task 和你的前端可以从任何地方读取
run.output。
触发 Task
你从后端、API 路由或另一个 task 调用 task。
import { sendWelcomeEmail } from "@/trigger/send-welcome-email"; const handle = await sendWelcomeEmail.trigger( { email: "user@example.com", name: "Alex" }, { idempotencyKey: `welcome-${userId}`, concurrencyKey: `tenant-${tenantId}`, queue: { name: "emails", concurrencyLimit: 50 }, delay: "30s", ttl: "10m", } ); console.log(handle.id); // run_xyz - 用它来跟踪或显示进度
这些选项在一次调用中解锁了大量行为:
idempotencyKey- 如果具有相同 key 的 run 已经存在,SDK 返回现有 handle 而不是重复工作。concurrencyKey- 序列化共享 key 的 run,以避免超出每租户的速率限制。queue.concurrencyLimit- 队列在所有 key 上的全局上限。delay- 安排 run 在未来时间运行。ttl- 如果 run 到那时还没有开始,自动让它过期。
批量触发
对于扇出工作负载,batchTrigger 每次调用最多接受 500 个项目,并为每个项目创建一个 run。
await sendWelcomeEmail.batchTrigger( newUsers.map((u) => ({ payload: { email: u.email, name: u.name }, options: { idempotencyKey: `welcome-${u.id}` }, })) );
计划任务
Cron 作业成为一等声明。schedule 本身是一个独立的对象,你可以多次将其附加到 task。
// trigger/daily-digest.ts import { schedules } from "@trigger.dev/sdk"; export const dailyDigest = schedules.task({ id: "daily-digest", cron: "0 9 * * *", run: async (payload) => { console.log("Scheduled at:", payload.timestamp); console.log("Last run:", payload.lastTimestamp); console.log("Timezone:", payload.timezone); console.log("Next 5 runs:", payload.upcoming); await sendDigestForDate(payload.timestamp); }, });
对于每租户的 schedule - 比如每个客户一个 cron - 你通过管理 API 动态创建它们。
import { schedules } from "@trigger.dev/sdk"; await schedules.create({ task: "daily-digest", cron: "0 9 * * *", timezone: "America/New_York", externalId: `customer_${customerId}`, deduplicationKey: `digest-${customerId}`, });
deduplicationKey 使调用幂等:在部署时重新运行相同的代码不会堆叠重复的 schedule。
队列、并发和幂等性
三个原语涵盖了大多数速率限制和排序需求。
一个常见模式:每个租户一个队列,每个 key 的小并发,以尊重供应商的速率限制,加上一个幂等性 key 以使重试安全。
await syncShopifyOrders.trigger( { shopId }, { queue: { name: `shopify-${shopId}`, concurrencyLimit: 2 }, concurrencyKey: shopId, idempotencyKey: `sync-${shopId}-${Date.now() / 60_000 | 0}`, } );
等待和长时间运行的工作
Task 可以暂停而无需保持连接或消耗计算。平台持久化状态,并在等待完成时恢复函数。
import { wait } from "@trigger.dev/sdk"; export const onboarding = task({ id: "onboarding", run: async (payload: { userId: string }) => { await sendWelcomeEmail.triggerAndWait({ userId: payload.userId }); await wait.for({ days: 1 }); await sendTipsEmail.trigger({ userId: payload.userId }); await wait.until({ date: oneWeekFromSignup(payload.userId) }); await sendUpgradeOffer.trigger({ userId: payload.userId }); }, });
triggerAndWait 是杀手级特性:它触发一个子 task 并暂停父级直到子级完成。你像 async 函数一样组合 task,但编排在数天或数周内持久运行。
使用 wait.forToken 实现 Human-in-the-loop
对于审批流程和 AI 门控,wait.forToken 暂停直到你的应用程序回调结果。
import { task, wait } from "@trigger.dev/sdk"; export const publishPost = task({ id: "publish-post", run: async (payload: { draftId: string }) => { const draft = await generateAIContent(payload.draftId); const token = await wait.createToken({ timeout: "7d" }); await notifyEditor({ draftId: draft.id, token: token.id }); const decision = await wait.forToken<{ approved: boolean; notes?: string }>( token.id ); if (decision.approved) { return await publish(draft); } return await applyFeedback(draft, decision.notes); }, });
编辑打开 UI,审查草稿,单击批准,你的后端完成 token。Task 从中断的地方继续 - 即使已经过了几个小时或几天。
生命周期 Hook
你可以将 init、onStart、onSuccess 和 onFailure 附加到 task 或全局附加到 trigger.config.ts。将它们用于追踪、错误报告和共享设置。
// trigger.config.ts export default defineConfig({ // ... init: async () => { Sentry.init({ dsn: process.env.SENTRY_DSN }); }, onFailure: async ({ error, ctx }) => { Sentry.captureException(error, { tags: { taskId: ctx.task.id, runId: ctx.run.id }, }); }, });
init 在每个 worker 容器启动时运行一次,而不是每次 run 运行一次,所以它是设置客户端和池的正确位置。
前端实时更新
Trigger.dev 通过流式 API 发布 run 状态变化 - 状态、元数据、输出。React hook 订阅该流并自动重新渲染。
// trigger/process-video.ts import { task, metadata } from "@trigger.dev/sdk"; export const processVideo = task({ id: "process-video", run: async (payload: { videoId: string }) => { metadata.set("stage", "transcoding"); await transcode(payload.videoId); metadata.set("stage", "thumbnails"); await generateThumbnails(payload.videoId); metadata.set("stage", "uploading"); const url = await uploadToCDN(payload.videoId); return { url }; }, });
// components/VideoStatus.tsx "use client"; import { useRealtimeRun } from "@trigger.dev/react-hooks"; import type { processVideo } from "@/trigger/process-video"; export function VideoStatus({ runId, publicAccessToken, }: { runId: string; publicAccessToken: string; }) { const { run, error } = useRealtimeRun<typeof processVideo>(runId, { accessToken: publicAccessToken, }); if (error) return <p>Error: {error.message}</p>; if (!run) return <p>Loading...</p>; return ( <div> <p>Status: {run.status}</p> <p>Stage: {String(run.metadata?.stage ?? "queued")}</p> {run.output?.url && <video src={run.output.url} controls />} </div> ); }
你在服务器端生成公共访问 token,作用域为特定的 run,并将其发送到客户端。Hook 处理身份验证、重新连接和增量更新。
对于一步触发并订阅:
import { useRealtimeTaskTrigger } from "@trigger.dev/react-hooks"; const { submit, run, isLoading } = useRealtimeTaskTrigger<typeof processVideo>( "process-video", { accessToken: publicAccessToken } ); <button onClick={() => submit({ videoId })} disabled={isLoading}> Process video </button>;
AI 代理和流式传输
Trigger.dev 已成为 AI 代理的流行运行时,因为相同的原语 - 持久执行、重试、等待、实时元数据、human-in-the-loop - 正是代理所需要的。在 run 进行时将来自模型提供商的 token 流式传输到 metadata,前端实时渲染它们,run 在长时间运行的工具调用中存活,无需消耗 serverless 超时。
import { task, metadata } from "@trigger.dev/sdk"; import { streamText } from "ai"; import { anthropic } from "@ai-sdk/anthropic"; export const researchAgent = task({ id: "research-agent", maxDuration: 1800, run: async (payload: { question: string }) => { const result = streamText({ model: anthropic("claude-opus-4-7"), system: "You are a research assistant. Use the web.", prompt: payload.question, tools: { webSearch }, }); let fullText = ""; for await (const chunk of result.textStream) { fullText += chunk; metadata.set("partial", fullText); } return { answer: fullText, usage: await result.usage }; }, });
前端使用 useRealtimeRun 并读取 run.metadata.partial 来渲染流式响应,与渲染聊天补全的方式相同 - 只是这个会在整个页面重新加载后存活下来。
部署
部署将你的 task 编译成版本化的 bundle,构建容器,并以原子方式切换流量。旧的进行中的 run 继续使用先前版本。
npx trigger.dev@latest deploy --env prod
在 CI 中,你通常将其连接到发布你的应用的同一个工作流中:
# .github/workflows/deploy.yml - name: Deploy Trigger.dev env: TRIGGER_ACCESS_TOKEN: ${{ secrets.TRIGGER_ACCESS_TOKEN }} run: npx trigger.dev@latest deploy --env prod
对于预览环境,传递 --env preview --branch ${{ github.head_ref }},Trigger.dev 会为每个分支创建一个隔离的环境,反映了 Vercel 处理预览部署的方式。
自托管 vs 云
Trigger.dev 在 Apache 2.0 许可证下是开源的。你可以在任何容器平台(Docker Compose、Kubernetes、Fly.io)上自托管,或在 trigger.dev 上使用托管的云。
| 方面 | 云 | 自托管 |
|---|---|---|
| 设置 | 注册,运行 init | 运行 docker-compose 或 Helm chart |
| 扩展 | 自动 | 你的责任 |
| 定价 | 按 run + 按计算 | 仅基础设施成本 |
| 合规 | SOC 2 | 你的环境提供的任何东西 |
| 最适合 | 大多数团队 | 严格的数据驻留、自定义基础设施 |
SDK 和 CLI 在两种模式之间是相同的 - 你更改一个配置标志并指向你自己的实例。
最佳实践
1. 保持 payload 小且可序列化
传递 ID 和引用,而不是完整对象。在 task 内部拉取数据。这样保持队列小,payload 记录便宜,并允许你更改数据源而无需重新触发。
2. 在每个外部调用上使用幂等性 key
将 task 触发器上的 idempotencyKey 与你的供应商 API(Stripe、OpenAI 等)的幂等性 key 结合使用。重试将在端到端上是安全的。
3. 使用 triggerAndWait 进行编排,而不是 Promise.all 的触发器
调用 triggerAndWait 的父级持久组合子 task。一个触发并立即解析的父级失去了链的可观察性。
4. 标记 run
将 tags 添加到触发器(tags: ["user:123", "feature:onboarding"]),这样你就可以按业务维度过滤仪表板和管理 API。
5. 保持 init 幂等
它在每次冷启动时运行。避免在那里进行迁移或一次性副作用。
结论
Trigger.dev 消除了过去需要从头构建作业系统的工作类别。你编写 async TypeScript,从任何地方调用它,平台开箱即用地为你提供持久执行、调度、队列、重试、实时更新和 human-in-the-loop 模式。
驱动夜间 cron 的相同表面是驱动多步 AI 代理(流式传输到前端并暂停以审查)的表面。这种融合使该框架在 2026 年值得认真考虑,无论你是在管理需要可靠后台工作的 SaaS,还是发布在 serverless 超时之外存活的 AI 功能。
入门检查清单:
- 在 trigger.dev 注册或运行自托管的 Docker 堆栈
- 在你的项目中运行
npx trigger.dev@latest init- 用
task({ id, run })定义你的第一个 task- 从你的 API 触发它,并在仪表板中查看 run
- 添加
idempotencyKey和concurrencyKey以确保生产安全- 将
useRealtimeRun连接到状态组件- 从 CI 使用
trigger.dev deploy --env prod部署