Skip to content

Commit b0b0df6

Browse files
authored
fix(run-engine): prevent stalled SUSPENDED snapshots from endlessly retrying (#2448)
1 parent e629810 commit b0b0df6

File tree

8 files changed

+354
-50
lines changed

8 files changed

+354
-50
lines changed

apps/webapp/app/env.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,14 @@ const EnvironmentSchema = z.object({
519519
RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15),
520520
RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000),
521521

522+
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT: z.coerce.number().int().default(12),
523+
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS: z.coerce
524+
.number()
525+
.int()
526+
.default(60_000 * 60 * 6),
527+
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS: z.coerce.number().int().default(60_000),
528+
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2),
529+
522530
RUN_ENGINE_WORKER_REDIS_HOST: z
523531
.string()
524532
.optional()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ function createRunEngine() {
107107
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
108108
SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED,
109109
},
110+
suspendedHeartbeatRetriesConfig: {
111+
maxCount: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT,
112+
maxDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS,
113+
initialDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS,
114+
factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR,
115+
},
110116
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
111117
billing: {
112118
getCurrentPlan: async (orgId: string) => {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,10 +1161,12 @@ export class RunEngine {
11611161
async #handleStalledSnapshot({
11621162
runId,
11631163
snapshotId,
1164+
restartAttempt,
11641165
tx,
11651166
}: {
11661167
runId: string;
11671168
snapshotId: string;
1169+
restartAttempt?: number;
11681170
tx?: PrismaClientOrTransaction;
11691171
}) {
11701172
const prisma = tx ?? this.prisma;
@@ -1297,11 +1299,86 @@ export class RunEngine {
12971299
snapshotId: latestSnapshot.id,
12981300
});
12991301

1300-
switch (result) {
1302+
switch (result.status) {
13011303
case "blocked": {
1304+
if (!this.options.suspendedHeartbeatRetriesConfig) {
1305+
break;
1306+
}
1307+
1308+
if (result.waitpoints.length === 0) {
1309+
this.logger.info("handleStalledSnapshot SUSPENDED blocked but no waitpoints", {
1310+
runId,
1311+
result,
1312+
snapshotId: latestSnapshot.id,
1313+
});
1314+
// If the run is blocked but there are no waitpoints, we don't restart the heartbeat
1315+
break;
1316+
}
1317+
1318+
const hasRunOrBatchWaitpoints = result.waitpoints.some(
1319+
(w) => w.type === "RUN" || w.type === "BATCH"
1320+
);
1321+
1322+
if (!hasRunOrBatchWaitpoints) {
1323+
this.logger.info(
1324+
"handleStalledSnapshot SUSPENDED blocked but no run or batch waitpoints",
1325+
{
1326+
runId,
1327+
result,
1328+
snapshotId: latestSnapshot.id,
1329+
}
1330+
);
1331+
// If the run is blocked by waitpoints that are not RUN or BATCH, we don't restart the heartbeat
1332+
break;
1333+
}
1334+
1335+
const initialDelayMs =
1336+
this.options.suspendedHeartbeatRetriesConfig.initialDelayMs ?? 60_000;
1337+
const $restartAttempt = (restartAttempt ?? 0) + 1; // Start at 1
1338+
const maxDelayMs =
1339+
this.options.suspendedHeartbeatRetriesConfig.maxDelayMs ?? 60_000 * 60 * 6; // 6 hours
1340+
const factor = this.options.suspendedHeartbeatRetriesConfig.factor ?? 2;
1341+
const maxCount = this.options.suspendedHeartbeatRetriesConfig.maxCount ?? 12;
1342+
1343+
if ($restartAttempt >= maxCount) {
1344+
this.logger.info(
1345+
"handleStalledSnapshot SUSPENDED blocked with waitpoints, max retries reached",
1346+
{
1347+
runId,
1348+
result,
1349+
snapshotId: latestSnapshot.id,
1350+
restartAttempt: $restartAttempt,
1351+
maxCount,
1352+
config: this.options.suspendedHeartbeatRetriesConfig,
1353+
}
1354+
);
1355+
1356+
break;
1357+
}
1358+
1359+
// Calculate the delay based on the retry attempt
1360+
const delayMs = Math.min(
1361+
initialDelayMs * Math.pow(factor, $restartAttempt - 1),
1362+
maxDelayMs
1363+
);
1364+
1365+
this.logger.info(
1366+
"handleStalledSnapshot SUSPENDED blocked with waitpoints, restarting heartbeat",
1367+
{
1368+
runId,
1369+
result,
1370+
snapshotId: latestSnapshot.id,
1371+
delayMs,
1372+
restartAttempt: $restartAttempt,
1373+
}
1374+
);
1375+
13021376
// Reschedule the heartbeat
13031377
await this.executionSnapshotSystem.restartHeartbeatForRun({
13041378
runId,
1379+
delayMs,
1380+
restartAttempt: $restartAttempt,
1381+
tx,
13051382
});
13061383
break;
13071384
}

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -396,32 +396,31 @@ export class ExecutionSnapshotSystem {
396396

397397
public async restartHeartbeatForRun({
398398
runId,
399+
delayMs,
400+
restartAttempt,
399401
tx,
400402
}: {
401403
runId: string;
404+
delayMs: number;
405+
restartAttempt: number;
402406
tx?: PrismaClientOrTransaction;
403407
}): Promise<ExecutionResult> {
404408
const prisma = tx ?? this.$.prisma;
405409

406410
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
407411

408-
//extending the heartbeat
409-
const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus);
410-
411-
if (intervalMs !== null) {
412-
this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", {
413-
runId,
414-
snapshotId: latestSnapshot.id,
415-
intervalMs,
416-
});
412+
this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", {
413+
runId,
414+
snapshotId: latestSnapshot.id,
415+
delayMs,
416+
});
417417

418-
await this.$.worker.enqueue({
419-
id: `heartbeatSnapshot.${runId}`,
420-
job: "heartbeatSnapshot",
421-
payload: { snapshotId: latestSnapshot.id, runId },
422-
availableAt: new Date(Date.now() + intervalMs),
423-
});
424-
}
418+
await this.$.worker.enqueue({
419+
id: `heartbeatSnapshot.${runId}`,
420+
job: "heartbeatSnapshot",
421+
payload: { snapshotId: latestSnapshot.id, runId, restartAttempt },
422+
availableAt: new Date(Date.now() + delayMs),
423+
});
425424

426425
return executionResultFromSnapshot(latestSnapshot);
427426
}

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,22 @@ export type WaitpointSystemOptions = {
2121
enqueueSystem: EnqueueSystem;
2222
};
2323

24+
type WaitpointContinuationWaitpoint = Pick<Waitpoint, "id" | "type" | "completedAfter" | "status">;
25+
26+
export type WaitpointContinuationResult =
27+
| {
28+
status: "unblocked";
29+
waitpoints: Array<WaitpointContinuationWaitpoint>;
30+
}
31+
| {
32+
status: "skipped";
33+
reason: string;
34+
}
35+
| {
36+
status: "blocked";
37+
waitpoints: Array<WaitpointContinuationWaitpoint>;
38+
};
39+
2440
export class WaitpointSystem {
2541
private readonly $: SystemResources;
2642
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
@@ -480,7 +496,7 @@ export class WaitpointSystem {
480496
runId,
481497
}: {
482498
runId: string;
483-
}): Promise<"blocked" | "unblocked" | "skipped"> {
499+
}): Promise<WaitpointContinuationResult> {
484500
this.$.logger.debug(`continueRunIfUnblocked: start`, {
485501
runId,
486502
});
@@ -496,7 +512,7 @@ export class WaitpointSystem {
496512
batchId: true,
497513
batchIndex: true,
498514
waitpoint: {
499-
select: { id: true, status: true },
515+
select: { id: true, status: true, type: true, completedAfter: true },
500516
},
501517
},
502518
});
@@ -507,7 +523,11 @@ export class WaitpointSystem {
507523
runId,
508524
blockingWaitpoints,
509525
});
510-
return "blocked";
526+
527+
return {
528+
status: "blocked",
529+
waitpoints: blockingWaitpoints.map((w) => w.waitpoint),
530+
};
511531
}
512532

513533
// 3. Get the run with environment
@@ -547,7 +567,10 @@ export class WaitpointSystem {
547567
executionStatus: snapshot.executionStatus,
548568
});
549569

550-
return "skipped";
570+
return {
571+
status: "skipped",
572+
reason: "run is already executing",
573+
};
551574
}
552575
case "QUEUED": {
553576
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
@@ -556,7 +579,10 @@ export class WaitpointSystem {
556579
executionStatus: snapshot.executionStatus,
557580
});
558581

559-
return "skipped";
582+
return {
583+
status: "skipped",
584+
reason: "run is already queued",
585+
};
560586
}
561587
case "PENDING_EXECUTING": {
562588
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
@@ -565,7 +591,10 @@ export class WaitpointSystem {
565591
executionStatus: snapshot.executionStatus,
566592
});
567593

568-
return "skipped";
594+
return {
595+
status: "skipped",
596+
reason: "run is already pending executing",
597+
};
569598
}
570599
case "QUEUED_EXECUTING": {
571600
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
@@ -574,7 +603,10 @@ export class WaitpointSystem {
574603
executionStatus: snapshot.executionStatus,
575604
});
576605

577-
return "skipped";
606+
return {
607+
status: "skipped",
608+
reason: "run is already queued executing",
609+
};
578610
}
579611
case "EXECUTING": {
580612
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
@@ -583,7 +615,10 @@ export class WaitpointSystem {
583615
executionStatus: snapshot.executionStatus,
584616
});
585617

586-
return "skipped";
618+
return {
619+
status: "skipped",
620+
reason: "run is already executing",
621+
};
587622
}
588623
case "PENDING_CANCEL":
589624
case "FINISHED": {
@@ -592,7 +627,10 @@ export class WaitpointSystem {
592627
snapshot,
593628
executionStatus: snapshot.executionStatus,
594629
});
595-
return "skipped";
630+
return {
631+
status: "skipped",
632+
reason: "run is finished",
633+
};
596634
}
597635
case "EXECUTING_WITH_WAITPOINTS": {
598636
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
@@ -693,7 +731,10 @@ export class WaitpointSystem {
693731
});
694732
}
695733

696-
return "unblocked";
734+
return {
735+
status: "unblocked",
736+
waitpoints: blockingWaitpoints.map((w) => w.waitpoint),
737+
};
697738
}); // end of runlock
698739
}
699740

0 commit comments

Comments
 (0)