From 88e60a92e171fb39035963b10e6f3c23686a875b Mon Sep 17 00:00:00 2001 From: Pablo Pettinari Date: Mon, 2 Feb 2026 12:41:40 +0100 Subject: [PATCH] refactor(data-layer): convert fetchers to individual trigger.dev tasks Each fetcher is now a separate trigger.dev task with its own retry policy, enabling per-task visibility in the dashboard and independent failure tracking. The scheduled orchestrators trigger all tasks in parallel. DX preserved: developers still add sources as simple [key, fetchFn] tuples. --- src/data-layer/tasks.ts | 57 ++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/data-layer/tasks.ts b/src/data-layer/tasks.ts index 5d825d6a475..def277b642c 100644 --- a/src/data-layer/tasks.ts +++ b/src/data-layer/tasks.ts @@ -5,7 +5,7 @@ * Hourly tasks run every hour. */ -import { retry, schedules } from "@trigger.dev/sdk/v3" +import { schedules, task } from "@trigger.dev/sdk/v3" import { fetchApps } from "./fetchers/fetchApps" import { fetchBeaconChain } from "./fetchers/fetchBeaconChain" @@ -55,9 +55,9 @@ export const KEYS = { } as const // Task definition: storage key + fetch function -type Task = [string, () => Promise] +type TaskDef = [string, () => Promise] -const DAILY: Task[] = [ +const DAILY: TaskDef[] = [ [KEYS.APPS, fetchApps], [KEYS.CALENDAR_EVENTS, fetchCalendarEvents], [KEYS.COMMUNITY_PICKS, fetchCommunityPicks], @@ -73,7 +73,7 @@ const DAILY: Task[] = [ [KEYS.EVENTS, fetchEvents], ] -const HOURLY: Task[] = [ +const HOURLY: TaskDef[] = [ [KEYS.BEACONCHAIN, fetchBeaconChain], [KEYS.BLOBSCAN_STATS, fetchBlobscanStats], [KEYS.ETHEREUM_MARKETCAP, fetchEthereumMarketcap], @@ -84,42 +84,41 @@ const HOURLY: Task[] = [ [KEYS.STABLECOINS_DATA, fetchStablecoinsData], ] -async function runTasks(tasks: Task[]) { - const results = await Promise.allSettled( - tasks.map(async ([key, fetchFn]) => { - const data = await retry.onThrow(fetchFn, { - maxAttempts: 3, - minTimeoutInMs: 2000, - maxTimeoutInMs: 30000, - factor: 2, - randomize: true, - }) +// ─── Dynamic task creation ─── +function createDataTask([key, fetchFn]: TaskDef) { + return task({ + id: key, + retry: { + maxAttempts: 3, + factor: 2, + minTimeoutInMs: 2000, + maxTimeoutInMs: 30000, + randomize: true, + }, + run: async () => { + const data = await fetchFn() await set(key, data) console.log(`✓ ${key}`) - return key - }) - ) - - const summary = results.map((r, i) => ({ - key: tasks[i][0], - ok: r.status === "fulfilled", - error: r.status === "rejected" ? String(r.reason) : undefined, - })) + return { key } + }, + }) +} - const failed = summary.filter((s) => !s.ok) - failed.forEach((f) => console.error(`✗ ${f.key}: ${f.error}`)) +const dailyFetchTasks = DAILY.map(createDataTask) +const hourlyFetchTasks = HOURLY.map(createDataTask) - return summary -} +// Must export for trigger.dev to discover +export const allFetchTasks = [...dailyFetchTasks, ...hourlyFetchTasks] +// ─── Scheduled orchestrators ─── export const dailyTask = schedules.task({ id: "daily-data-fetch", cron: "0 0 * * *", - run: () => runTasks(DAILY), + run: () => Promise.all(dailyFetchTasks.map((t) => t.trigger())), }) export const hourlyTask = schedules.task({ id: "hourly-data-fetch", cron: "0 * * * *", - run: () => runTasks(HOURLY), + run: () => Promise.all(hourlyFetchTasks.map((t) => t.trigger())), })