From da33d997075d98b9b7b53ebf0d25cd10de9ce1ed Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 28 Aug 2025 13:42:53 +0100 Subject: [PATCH] fix(run-engine): prevent stalled SUSPENDED snapshots from endlessly retrying --- apps/webapp/app/env.server.ts | 8 + apps/webapp/app/v3/runEngine.server.ts | 6 + .../run-engine/src/engine/index.ts | 79 ++++++- .../engine/systems/executionSnapshotSystem.ts | 31 ++- .../src/engine/systems/waitpointSystem.ts | 61 ++++- .../src/engine/tests/heartbeats.test.ts | 212 ++++++++++++++++-- .../run-engine/src/engine/types.ts | 6 + .../run-engine/src/engine/workerCatalog.ts | 1 + 8 files changed, 354 insertions(+), 50 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 905edfb89d..a1e8854dbe 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -519,6 +519,14 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15), RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000), + RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT: z.coerce.number().int().default(12), + RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS: z.coerce + .number() + .int() + .default(60_000 * 60 * 6), + RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS: z.coerce.number().int().default(60_000), + RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2), + RUN_ENGINE_WORKER_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index c3ddc89b92..ce2b38d94c 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -107,6 +107,12 @@ function createRunEngine() { EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS, SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED, }, + suspendedHeartbeatRetriesConfig: { + maxCount: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT, + maxDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS, + initialDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS, + factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR, + }, retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS, billing: { getCurrentPlan: async (orgId: string) => { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 3e13b56d16..a551e6ec12 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1161,10 +1161,12 @@ export class RunEngine { async #handleStalledSnapshot({ runId, snapshotId, + restartAttempt, tx, }: { runId: string; snapshotId: string; + restartAttempt?: number; tx?: PrismaClientOrTransaction; }) { const prisma = tx ?? this.prisma; @@ -1297,11 +1299,86 @@ export class RunEngine { snapshotId: latestSnapshot.id, }); - switch (result) { + switch (result.status) { case "blocked": { + if (!this.options.suspendedHeartbeatRetriesConfig) { + break; + } + + if (result.waitpoints.length === 0) { + this.logger.info("handleStalledSnapshot SUSPENDED blocked but no waitpoints", { + runId, + result, + snapshotId: latestSnapshot.id, + }); + // If the run is blocked but there are no waitpoints, we don't restart the heartbeat + break; + } + + const hasRunOrBatchWaitpoints = result.waitpoints.some( + (w) => w.type === "RUN" || w.type === "BATCH" + ); + + if (!hasRunOrBatchWaitpoints) { + this.logger.info( + "handleStalledSnapshot SUSPENDED blocked but no run or batch waitpoints", + { + runId, + result, + snapshotId: latestSnapshot.id, + } + ); + // If the run is blocked by waitpoints that are not RUN or BATCH, we don't restart the heartbeat + break; + } + + const initialDelayMs = + this.options.suspendedHeartbeatRetriesConfig.initialDelayMs ?? 60_000; + const $restartAttempt = (restartAttempt ?? 0) + 1; // Start at 1 + const maxDelayMs = + this.options.suspendedHeartbeatRetriesConfig.maxDelayMs ?? 60_000 * 60 * 6; // 6 hours + const factor = this.options.suspendedHeartbeatRetriesConfig.factor ?? 2; + const maxCount = this.options.suspendedHeartbeatRetriesConfig.maxCount ?? 12; + + if ($restartAttempt >= maxCount) { + this.logger.info( + "handleStalledSnapshot SUSPENDED blocked with waitpoints, max retries reached", + { + runId, + result, + snapshotId: latestSnapshot.id, + restartAttempt: $restartAttempt, + maxCount, + config: this.options.suspendedHeartbeatRetriesConfig, + } + ); + + break; + } + + // Calculate the delay based on the retry attempt + const delayMs = Math.min( + initialDelayMs * Math.pow(factor, $restartAttempt - 1), + maxDelayMs + ); + + this.logger.info( + "handleStalledSnapshot SUSPENDED blocked with waitpoints, restarting heartbeat", + { + runId, + result, + snapshotId: latestSnapshot.id, + delayMs, + restartAttempt: $restartAttempt, + } + ); + // Reschedule the heartbeat await this.executionSnapshotSystem.restartHeartbeatForRun({ runId, + delayMs, + restartAttempt: $restartAttempt, + tx, }); break; } diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index 4e5628d2de..f1277faecb 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -396,32 +396,31 @@ export class ExecutionSnapshotSystem { public async restartHeartbeatForRun({ runId, + delayMs, + restartAttempt, tx, }: { runId: string; + delayMs: number; + restartAttempt: number; tx?: PrismaClientOrTransaction; }): Promise { const prisma = tx ?? this.$.prisma; const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); - //extending the heartbeat - const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus); - - if (intervalMs !== null) { - this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", { - runId, - snapshotId: latestSnapshot.id, - intervalMs, - }); + this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", { + runId, + snapshotId: latestSnapshot.id, + delayMs, + }); - await this.$.worker.enqueue({ - id: `heartbeatSnapshot.${runId}`, - job: "heartbeatSnapshot", - payload: { snapshotId: latestSnapshot.id, runId }, - availableAt: new Date(Date.now() + intervalMs), - }); - } + await this.$.worker.enqueue({ + id: `heartbeatSnapshot.${runId}`, + job: "heartbeatSnapshot", + payload: { snapshotId: latestSnapshot.id, runId, restartAttempt }, + availableAt: new Date(Date.now() + delayMs), + }); return executionResultFromSnapshot(latestSnapshot); } diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 236afe008c..4e23934b73 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -21,6 +21,22 @@ export type WaitpointSystemOptions = { enqueueSystem: EnqueueSystem; }; +type WaitpointContinuationWaitpoint = Pick; + +export type WaitpointContinuationResult = + | { + status: "unblocked"; + waitpoints: Array; + } + | { + status: "skipped"; + reason: string; + } + | { + status: "blocked"; + waitpoints: Array; + }; + export class WaitpointSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; @@ -480,7 +496,7 @@ export class WaitpointSystem { runId, }: { runId: string; - }): Promise<"blocked" | "unblocked" | "skipped"> { + }): Promise { this.$.logger.debug(`continueRunIfUnblocked: start`, { runId, }); @@ -496,7 +512,7 @@ export class WaitpointSystem { batchId: true, batchIndex: true, waitpoint: { - select: { id: true, status: true }, + select: { id: true, status: true, type: true, completedAfter: true }, }, }, }); @@ -507,7 +523,11 @@ export class WaitpointSystem { runId, blockingWaitpoints, }); - return "blocked"; + + return { + status: "blocked", + waitpoints: blockingWaitpoints.map((w) => w.waitpoint), + }; } // 3. Get the run with environment @@ -547,7 +567,10 @@ export class WaitpointSystem { executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is already executing", + }; } case "QUEUED": { this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, { @@ -556,7 +579,10 @@ export class WaitpointSystem { executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is already queued", + }; } case "PENDING_EXECUTING": { this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, { @@ -565,7 +591,10 @@ export class WaitpointSystem { executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is already pending executing", + }; } case "QUEUED_EXECUTING": { this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, { @@ -574,7 +603,10 @@ export class WaitpointSystem { executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is already queued executing", + }; } case "EXECUTING": { this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, { @@ -583,7 +615,10 @@ export class WaitpointSystem { executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is already executing", + }; } case "PENDING_CANCEL": case "FINISHED": { @@ -592,7 +627,10 @@ export class WaitpointSystem { snapshot, executionStatus: snapshot.executionStatus, }); - return "skipped"; + return { + status: "skipped", + reason: "run is finished", + }; } case "EXECUTING_WITH_WAITPOINTS": { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( @@ -693,7 +731,10 @@ export class WaitpointSystem { }); } - return "unblocked"; + return { + status: "unblocked", + waitpoints: blockingWaitpoints.map((w) => w.waitpoint), + }; }); // end of runlock } diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index c9654c612d..3fe05289c4 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -490,7 +490,162 @@ describe("RunEngine heartbeats", () => { } }); - containerTest("Suspended", async ({ prisma, redisOptions }) => { + containerTest( + "Suspended when blocked by a manual waitpoint", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const heartbeatTimeout = 1000; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + heartbeatTimeoutsMs: { + SUSPENDED: heartbeatTimeout, + }, + suspendedHeartbeatRetriesConfig: { + maxCount: 10, + maxDelayMs: 1000, + initialDelayMs: 1000, + factor: 2, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + + //dequeue the run + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + + //create an attempt + await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //cancel run + //create a manual waitpoint + const waitpointResult = await engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + expect(waitpointResult.waitpoint.status).toBe("PENDING"); + + //block the run + const blockedResult = await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: waitpointResult.waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + const blockedExecutionData = await engine.getRunExecutionData({ runId: run.id }); + expect(blockedExecutionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Create a checkpoint + const checkpointResult = await engine.createCheckpoint({ + runId: run.id, + snapshotId: blockedResult.id, + checkpoint: { + type: "DOCKER", + reason: "TEST_CHECKPOINT", + location: "test-location", + imageRef: "test-image-ref", + }, + }); + + expect(checkpointResult.ok).toBe(true); + + const snapshot = checkpointResult.ok ? checkpointResult.snapshot : null; + + assertNonNullable(snapshot); + + // Verify checkpoint creation + expect(snapshot.executionStatus).toBe("SUSPENDED"); + + // Now wait for the heartbeat to timeout, but it should retry later + await setTimeout(heartbeatTimeout * 1.5); + + // Simulate a suspended run without any blocking waitpoints by deleting any blocking task run waitpoints + await prisma.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: run.id, + }, + }); + + // Now wait for the heartbeat to timeout again + await setTimeout(heartbeatTimeout * 2); + + // We don't restart the heartbeat because there are no run or batch waitpoints + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData2); + expect(executionData2.snapshot.executionStatus).toBe("SUSPENDED"); + } finally { + await engine.quit(); + } + } + ); + + containerTest("Suspended when blocked by a run waitpoint", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); const heartbeatTimeout = 1000; @@ -526,6 +681,12 @@ describe("RunEngine heartbeats", () => { heartbeatTimeoutsMs: { SUSPENDED: heartbeatTimeout, }, + suspendedHeartbeatRetriesConfig: { + maxCount: 10, + maxDelayMs: heartbeatTimeout, + initialDelayMs: heartbeatTimeout, + factor: 2, + }, tracer: trace.getTracer("test", "0.0.0"), }); @@ -533,11 +694,10 @@ describe("RunEngine heartbeats", () => { const taskIdentifier = "test-task"; //create background worker - const backgroundWorker = await setupBackgroundWorker( - engine, - authenticatedEnvironment, - taskIdentifier - ); + const backgroundWorker = await setupBackgroundWorker(engine, authenticatedEnvironment, [ + taskIdentifier, + "child-task", + ]); //trigger the run const run = await engine.trigger( @@ -574,21 +734,27 @@ describe("RunEngine heartbeats", () => { snapshotId: dequeued[0].snapshot.id, }); - //cancel run - //create a manual waitpoint - const waitpointResult = await engine.createManualWaitpoint({ - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - expect(waitpointResult.waitpoint.status).toBe("PENDING"); - - //block the run - const blockedResult = await engine.blockRunWithWaitpoint({ - runId: run.id, - waitpoints: waitpointResult.waitpoint.id, - projectId: authenticatedEnvironment.projectId, - organizationId: authenticatedEnvironment.organizationId, - }); + const childRun = await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: "child-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/child-task`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: run.id, + workerQueue: "main", + }, + prisma + ); const blockedExecutionData = await engine.getRunExecutionData({ runId: run.id }); expect(blockedExecutionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); @@ -596,7 +762,7 @@ describe("RunEngine heartbeats", () => { // Create a checkpoint const checkpointResult = await engine.createCheckpoint({ runId: run.id, - snapshotId: blockedResult.id, + snapshotId: blockedExecutionData!.snapshot.id, checkpoint: { type: "DOCKER", reason: "TEST_CHECKPOINT", @@ -627,7 +793,7 @@ describe("RunEngine heartbeats", () => { // Now wait for the heartbeat to timeout again await setTimeout(heartbeatTimeout * 2); - // Expect the run to be queued + // We don't restart the heartbeat because there are no run or batch waitpoints const executionData2 = await engine.getRunExecutionData({ runId: run.id }); assertNonNullable(executionData2); expect(executionData2.snapshot.executionStatus).toBe("QUEUED"); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 502a9b318f..15a0322d39 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -71,6 +71,12 @@ export type RunEngineOptions = { /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; heartbeatTimeoutsMs?: Partial; + suspendedHeartbeatRetriesConfig?: { + maxCount?: number; + maxDelayMs?: number; + initialDelayMs?: number; + factor?: number; + }; queueRunsWaitingForWorkerBatchSize?: number; tracer: Tracer; meter?: Meter; diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index f3900d0f8f..81918ac119 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -12,6 +12,7 @@ export const workerCatalog = { schema: z.object({ runId: z.string(), snapshotId: z.string(), + restartAttempt: z.number().optional(), }), visibilityTimeoutMs: 30_000, },