Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 28 additions & 29 deletions src/data-layer/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Hourly tasks run every hour.
*/

import { retry, schedules } from "@trigger.dev/sdk/v3"
import { schedules, task } from "@trigger.dev/sdk/v3"

import { fetchApps } from "./fetchers/fetchApps"
import { fetchBeaconChain } from "./fetchers/fetchBeaconChain"
Expand Down Expand Up @@ -55,9 +55,9 @@ export const KEYS = {
} as const

// Task definition: storage key + fetch function
type Task = [string, () => Promise<unknown>]
type TaskDef = [string, () => Promise<unknown>]

const DAILY: Task[] = [
const DAILY: TaskDef[] = [
[KEYS.APPS, fetchApps],
[KEYS.CALENDAR_EVENTS, fetchCalendarEvents],
[KEYS.COMMUNITY_PICKS, fetchCommunityPicks],
Expand All @@ -73,7 +73,7 @@ const DAILY: Task[] = [
[KEYS.EVENTS, fetchEvents],
]

const HOURLY: Task[] = [
const HOURLY: TaskDef[] = [
[KEYS.BEACONCHAIN, fetchBeaconChain],
[KEYS.BLOBSCAN_STATS, fetchBlobscanStats],
[KEYS.ETHEREUM_MARKETCAP, fetchEthereumMarketcap],
Expand All @@ -84,42 +84,41 @@ const HOURLY: Task[] = [
[KEYS.STABLECOINS_DATA, fetchStablecoinsData],
]

async function runTasks(tasks: Task[]) {
const results = await Promise.allSettled(
tasks.map(async ([key, fetchFn]) => {
const data = await retry.onThrow(fetchFn, {
maxAttempts: 3,
minTimeoutInMs: 2000,
maxTimeoutInMs: 30000,
factor: 2,
randomize: true,
})
// ─── Dynamic task creation ───
function createDataTask([key, fetchFn]: TaskDef) {
return task({
id: key,
retry: {
maxAttempts: 3,
factor: 2,
minTimeoutInMs: 2000,
maxTimeoutInMs: 30000,
randomize: true,
},
run: async () => {
const data = await fetchFn()
await set(key, data)
console.log(`✓ ${key}`)
return key
})
)

const summary = results.map((r, i) => ({
key: tasks[i][0],
ok: r.status === "fulfilled",
error: r.status === "rejected" ? String(r.reason) : undefined,
}))
return { key }
},
})
}

const failed = summary.filter((s) => !s.ok)
failed.forEach((f) => console.error(`✗ ${f.key}: ${f.error}`))
const dailyFetchTasks = DAILY.map(createDataTask)
const hourlyFetchTasks = HOURLY.map(createDataTask)

return summary
}
// Must export for trigger.dev to discover
export const allFetchTasks = [...dailyFetchTasks, ...hourlyFetchTasks]

// ─── Scheduled orchestrators ───
export const dailyTask = schedules.task({
id: "daily-data-fetch",
cron: "0 0 * * *",
run: () => runTasks(DAILY),
run: () => Promise.all(dailyFetchTasks.map((t) => t.trigger())),
})

export const hourlyTask = schedules.task({
id: "hourly-data-fetch",
cron: "0 * * * *",
run: () => runTasks(HOURLY),
run: () => Promise.all(hourlyFetchTasks.map((t) => t.trigger())),
})