diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 7325ee0cd19e..3cd4e7ab4535 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -326,6 +326,7 @@ export type EnvVar = | 'VALIDATOR_HA_POLLING_INTERVAL_MS' | 'VALIDATOR_HA_SIGNING_TIMEOUT_MS' | 'VALIDATOR_HA_MAX_STUCK_DUTIES_AGE_MS' + | 'VALIDATOR_HA_OLD_DUTIES_MAX_AGE_H' | 'VALIDATOR_HA_DATABASE_URL' | 'VALIDATOR_HA_RUN_MIGRATIONS' | 'VALIDATOR_HA_POOL_MAX' diff --git a/yarn-project/validator-client/src/key_store/ha_key_store.ts b/yarn-project/validator-client/src/key_store/ha_key_store.ts index 1327ee980155..a6fd29bcd9d3 100644 --- a/yarn-project/validator-client/src/key_store/ha_key_store.ts +++ b/yarn-project/validator-client/src/key_store/ha_key_store.ts @@ -256,8 +256,8 @@ export class HAKeyStore implements ExtendedValidatorKeyStore { /** * Start the high-availability key store */ - public start(): Promise { - return Promise.resolve(this.haSigner.start()); + public async start() { + await this.haSigner.start(); } /** diff --git a/yarn-project/validator-ha-signer/src/config.ts b/yarn-project/validator-ha-signer/src/config.ts index fb18fecc116a..a766fac15e99 100644 --- a/yarn-project/validator-ha-signer/src/config.ts +++ b/yarn-project/validator-ha-signer/src/config.ts @@ -31,6 +31,8 @@ export interface ValidatorHASignerConfig { signingTimeoutMs: number; /** Maximum age of a stuck duty in ms (defaults to 2x hardcoded Aztec slot duration if not set) */ maxStuckDutiesAgeMs?: number; + /** Optional: clean up old duties after this many hours (disabled if not set) */ + cleanupOldDutiesAfterHours?: number; /** * PostgreSQL connection string * Format: postgresql://user:password@host:port/database @@ -84,6 +86,11 @@ export const validatorHASignerConfigMappings: ConfigMappingsType { } }); }); + + describe('cleanupOutdatedRollupDuties', () => { + const CURRENT_ROLLUP_ADDRESS = EthAddress.random(); + const OLD_ROLLUP_ADDRESS_1 = EthAddress.random(); + const OLD_ROLLUP_ADDRESS_2 = EthAddress.random(); + const VALIDATOR_ADDRESS = EthAddress.random(); + const NODE_ID = 'node-1'; + + beforeEach(async () => { + for (const statement of SCHEMA_SETUP) { + await pglite.query(statement); + } + await pglite.query(INSERT_SCHEMA_VERSION, [SCHEMA_VERSION]); + }); + + it('should clean up duties with outdated rollup addresses', async () => { + const spDb = new PostgresSlashingProtectionDatabase(pool); + + // Create duties for old rollup addresses + for (let i = 0; i < 3; i++) { + await spDb.tryInsertOrGetExisting({ + rollupAddress: OLD_ROLLUP_ADDRESS_1, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(100 + i), + blockNumber: BlockNumber(50 + i), + blockIndexWithinCheckpoint: IndexWithinCheckpoint(0), + dutyType: DutyType.BLOCK_PROPOSAL, + messageHash: Buffer32.random().toString(), + nodeId: NODE_ID, + }); + } + + for (let i = 0; i < 2; i++) { + await spDb.tryInsertOrGetExisting({ + rollupAddress: OLD_ROLLUP_ADDRESS_2, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(200 + i), + blockNumber: BlockNumber(150 + i), + blockIndexWithinCheckpoint: IndexWithinCheckpoint(0), + dutyType: DutyType.BLOCK_PROPOSAL, + messageHash: Buffer32.random().toString(), + nodeId: NODE_ID, + }); + } + + // Create duties for current rollup address + for (let i = 0; i < 2; i++) { + await spDb.tryInsertOrGetExisting({ + rollupAddress: CURRENT_ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(300 + i), + blockNumber: BlockNumber(250 + i), + blockIndexWithinCheckpoint: IndexWithinCheckpoint(0), + dutyType: DutyType.BLOCK_PROPOSAL, + messageHash: Buffer32.random().toString(), + nodeId: NODE_ID, + }); + } + + // Clean up outdated rollup duties + const numCleaned = await spDb.cleanupOutdatedRollupDuties(CURRENT_ROLLUP_ADDRESS); + expect(numCleaned).toBe(5); // 3 from OLD_ROLLUP_ADDRESS_1 + 2 from OLD_ROLLUP_ADDRESS_2 + + // Verify old rollup duties are gone + const oldDuties = await pglite.query(`SELECT * FROM validator_duties WHERE rollup_address != $1`, [ + CURRENT_ROLLUP_ADDRESS.toString(), + ]); + expect(oldDuties.rows.length).toBe(0); + + // Verify current rollup duties still exist + const currentDuties = await pglite.query(`SELECT * FROM validator_duties WHERE rollup_address = $1`, [ + CURRENT_ROLLUP_ADDRESS.toString(), + ]); + expect(currentDuties.rows.length).toBe(2); + }); + + it('should return 0 when no outdated duties exist', async () => { + const spDb = new PostgresSlashingProtectionDatabase(pool); + + // Create duties only for current rollup + await spDb.tryInsertOrGetExisting({ + rollupAddress: CURRENT_ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(100), + blockNumber: BlockNumber(50), + blockIndexWithinCheckpoint: IndexWithinCheckpoint(0), + dutyType: DutyType.BLOCK_PROPOSAL, + messageHash: Buffer32.random().toString(), + nodeId: NODE_ID, + }); + + const numCleaned = await spDb.cleanupOutdatedRollupDuties(CURRENT_ROLLUP_ADDRESS); + expect(numCleaned).toBe(0); + + // Verify duty still exists + const duties = await pglite.query(`SELECT * FROM validator_duties`); + expect(duties.rows.length).toBe(1); + }); + }); + + describe('cleanupOldDuties', () => { + const ROLLUP_ADDRESS = EthAddress.random(); + const VALIDATOR_ADDRESS = EthAddress.random(); + const NODE_ID = 'node-1'; + + beforeEach(async () => { + for (const statement of SCHEMA_SETUP) { + await pglite.query(statement); + } + await pglite.query(INSERT_SCHEMA_VERSION, [SCHEMA_VERSION]); + }); + + it('should only clean up old signed duties, not signing or recent duties', async () => { + const spDb = new PostgresSlashingProtectionDatabase(pool); + const oldTimestamp = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago + + // Insert old signed duties (should be cleaned up) + for (let i = 0; i < 2; i++) { + await pglite.query( + `INSERT INTO validator_duties ( + rollup_address, validator_address, slot, block_number, block_index_within_checkpoint, + duty_type, status, message_hash, signature, node_id, lock_token, started_at, completed_at + ) VALUES ($1, $2, $3, $4, $5, $6, 'signed', $7, '0xsignature', $8, 'token', $9, $9)`, + [ + ROLLUP_ADDRESS.toString(), + VALIDATOR_ADDRESS.toString(), + (100 + i).toString(), + (50 + i).toString(), + 0, + DutyType.BLOCK_PROPOSAL, + Buffer32.random().toString(), + NODE_ID, + oldTimestamp, + ], + ); + } + + // Insert old signing duties (should NOT be cleaned up) + for (let i = 0; i < 2; i++) { + await pglite.query( + `INSERT INTO validator_duties ( + rollup_address, validator_address, slot, block_number, block_index_within_checkpoint, + duty_type, status, message_hash, node_id, lock_token, started_at + ) VALUES ($1, $2, $3, $4, $5, $6, 'signing', $7, $8, 'token', $9)`, + [ + ROLLUP_ADDRESS.toString(), + VALIDATOR_ADDRESS.toString(), + (200 + i).toString(), + (150 + i).toString(), + 0, + DutyType.BLOCK_PROPOSAL, + Buffer32.random().toString(), + NODE_ID, + oldTimestamp, + ], + ); + } + + // Insert recent signed duty (should NOT be cleaned up) + const recentResult = await spDb.tryInsertOrGetExisting({ + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(300), + blockNumber: BlockNumber(250), + blockIndexWithinCheckpoint: IndexWithinCheckpoint(0), + dutyType: DutyType.BLOCK_PROPOSAL, + messageHash: Buffer32.random().toString(), + nodeId: NODE_ID, + }); + await spDb.updateDutySigned( + ROLLUP_ADDRESS, + VALIDATOR_ADDRESS, + SlotNumber(300), + DutyType.BLOCK_PROPOSAL, + '0xsignature', + recentResult.record.lockToken, + 0, + ); + + // Clean up duties older than 1 hour + const maxAgeMs = 60 * 60 * 1000; // 1 hour + const numCleaned = await spDb.cleanupOldDuties(maxAgeMs); + expect(numCleaned).toBe(2); // Only the 2 old signed duties + + // Verify old signed duties are gone + const oldSignedDuties = await pglite.query(`SELECT * FROM validator_duties WHERE slot >= 100 AND slot < 200`); + expect(oldSignedDuties.rows.length).toBe(0); + + // Verify old signing duties still exist (critical safety check) + const signingDuties = await pglite.query(`SELECT * FROM validator_duties WHERE status = 'signing'`); + expect(signingDuties.rows.length).toBe(2); + + // Verify recent signed duty still exists + const recentDuty = await pglite.query(`SELECT * FROM validator_duties WHERE slot = 300`); + expect(recentDuty.rows.length).toBe(1); + expect(recentDuty.rows[0].status).toBe('signed'); + }); + + it('should return 0 when no old signed duties exist', async () => { + const spDb = new PostgresSlashingProtectionDatabase(pool); + const numCleaned = await spDb.cleanupOldDuties(60 * 60 * 1000); + expect(numCleaned).toBe(0); + }); + }); }); diff --git a/yarn-project/validator-ha-signer/src/db/postgres.ts b/yarn-project/validator-ha-signer/src/db/postgres.ts index 4a2fd955ad2c..73aad3799b65 100644 --- a/yarn-project/validator-ha-signer/src/db/postgres.ts +++ b/yarn-project/validator-ha-signer/src/db/postgres.ts @@ -11,6 +11,8 @@ import type { QueryResult, QueryResultRow } from 'pg'; import type { SlashingProtectionDatabase, TryInsertOrGetResult } from '../types.js'; import { + CLEANUP_OLD_DUTIES, + CLEANUP_OUTDATED_ROLLUP_DUTIES, CLEANUP_OWN_STUCK_DUTIES, DELETE_DUTY, INSERT_OR_GET_DUTY, @@ -256,4 +258,27 @@ export class PostgresSlashingProtectionDatabase implements SlashingProtectionDat const result = await this.pool.query(CLEANUP_OWN_STUCK_DUTIES, [nodeId, cutoff]); return result.rowCount ?? 0; } + + /** + * Cleanup duties with outdated rollup address. + * Removes all duties where the rollup address doesn't match the current one. + * Used after a rollup upgrade to clean up duties for the old rollup. + * @returns the number of duties cleaned up + */ + async cleanupOutdatedRollupDuties(currentRollupAddress: EthAddress): Promise { + const result = await this.pool.query(CLEANUP_OUTDATED_ROLLUP_DUTIES, [currentRollupAddress.toString()]); + return result.rowCount ?? 0; + } + + /** + * Cleanup old signed duties. + * Removes only signed duties older than the specified age. + * Does not remove 'signing' duties as they may be in progress. + * @returns the number of duties cleaned up + */ + async cleanupOldDuties(maxAgeMs: number): Promise { + const cutoff = new Date(Date.now() - maxAgeMs); + const result = await this.pool.query(CLEANUP_OLD_DUTIES, [cutoff]); + return result.rowCount ?? 0; + } } diff --git a/yarn-project/validator-ha-signer/src/db/schema.ts b/yarn-project/validator-ha-signer/src/db/schema.ts index b8ec92ff2a86..92cd57c5e618 100644 --- a/yarn-project/validator-ha-signer/src/db/schema.ts +++ b/yarn-project/validator-ha-signer/src/db/schema.ts @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS validator_duties ( block_number BIGINT NOT NULL, block_index_within_checkpoint INTEGER NOT NULL DEFAULT 0, duty_type VARCHAR(30) NOT NULL CHECK (duty_type IN ('BLOCK_PROPOSAL', 'CHECKPOINT_PROPOSAL', 'ATTESTATION', 'ATTESTATIONS_AND_SIGNERS', 'GOVERNANCE_VOTE', 'SLASHING_VOTE')), - status VARCHAR(20) NOT NULL CHECK (status IN ('signing', 'signed', 'failed')), + status VARCHAR(20) NOT NULL CHECK (status IN ('signing', 'signed')), message_hash VARCHAR(66) NOT NULL, signature VARCHAR(132), node_id VARCHAR(255) NOT NULL, @@ -203,11 +203,11 @@ WHERE status = 'signed' /** * Query to clean up old duties (for maintenance) - * Removes duties older than a specified timestamp + * Removes SIGNED duties older than a specified timestamp */ export const CLEANUP_OLD_DUTIES = ` DELETE FROM validator_duties -WHERE status IN ('signing', 'signed', 'failed') +WHERE status = 'signed' AND started_at < $1; `; @@ -222,6 +222,16 @@ WHERE node_id = $1 AND started_at < $2; `; +/** + * Query to cleanup duties with outdated rollup address + * Removes all duties where the rollup address doesn't match the current one + * Used after a rollup upgrade to clean up duties for the old rollup + */ +export const CLEANUP_OUTDATED_ROLLUP_DUTIES = ` +DELETE FROM validator_duties +WHERE rollup_address != $1; +`; + /** * SQL to drop the validator_duties table */ diff --git a/yarn-project/validator-ha-signer/src/db/types.ts b/yarn-project/validator-ha-signer/src/db/types.ts index 3e32fdf1a6ae..5b9ea8f29a42 100644 --- a/yarn-project/validator-ha-signer/src/db/types.ts +++ b/yarn-project/validator-ha-signer/src/db/types.ts @@ -81,7 +81,7 @@ export interface ValidatorDutyRecord { startedAt: Date; /** When the duty signing was completed (success or failure) */ completedAt?: Date; - /** Error message if status is 'failed' */ + /** Error message (currently unused) */ errorMessage?: string; } diff --git a/yarn-project/validator-ha-signer/src/slashing_protection_service.test.ts b/yarn-project/validator-ha-signer/src/slashing_protection_service.test.ts index 54890a9c26f3..407c501cea8e 100644 --- a/yarn-project/validator-ha-signer/src/slashing_protection_service.test.ts +++ b/yarn-project/validator-ha-signer/src/slashing_protection_service.test.ts @@ -4,6 +4,7 @@ import { EthAddress } from '@aztec/foundation/eth-address'; import { sleep } from '@aztec/foundation/sleep'; import { PGlite } from '@electric-sql/pglite'; +import { jest } from '@jest/globals'; import { PostgresSlashingProtectionDatabase } from './db/postgres.js'; import { setupTestSchema } from './db/test_helper.js'; @@ -527,7 +528,7 @@ describe('SlashingProtectionService', () => { describe('lifecycle', () => { it('should start and stop without error', async () => { - service.start(); + await service.start(); await service.stop(); }); @@ -560,7 +561,7 @@ describe('SlashingProtectionService', () => { await sleep(10); // Start the new service - this should trigger immediate cleanup - newService.start(); + await newService.start(); // Give cleanup time to run await sleep(100); @@ -803,4 +804,292 @@ describe('SlashingProtectionService', () => { expect(deletedCorrect2).toBe(true); }); }); + + describe('cleanup methods', () => { + describe('cleanupOutdatedRollupDuties', () => { + it('cleans up outdated rollup duties at startup', async () => { + const oldRollupAddress = EthAddress.random(); + const newRollupAddress = EthAddress.random(); + + // Create duties for old rollup + for (let i = 0; i < 3; i++) { + const params: CheckAndRecordParams = { + rollupAddress: oldRollupAddress, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(100 + i), + blockNumber: BlockNumber(50 + i), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + await service.checkAndRecord(params); + } + + // Create duties for new rollup + for (let i = 0; i < 2; i++) { + const params: CheckAndRecordParams = { + rollupAddress: newRollupAddress, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(200 + i), + blockNumber: BlockNumber(150 + i), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + await service.checkAndRecord(params); + } + + // Create a new service with the new rollup address. + // Use default maxStuckDutiesAgeMs so background cleanup does not remove the new rollup duties + // (they are in 'signing' and would be treated as stuck if maxStuckDutiesAgeMs were 1ms). + const newService = new SlashingProtectionService(db, { + ...config, + l1Contracts: { rollupAddress: newRollupAddress }, + }); + + // Start the service - this should trigger cleanup at startup + await newService.start(); + await newService.stop(); + + // Old rollup duties should be gone + for (let i = 0; i < 3; i++) { + const params: CheckAndRecordParams = { + rollupAddress: oldRollupAddress, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(100 + i), + blockNumber: BlockNumber(50 + i), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + const result = await db.tryInsertOrGetExisting(params); + expect(result.isNew).toBe(true); + } + + // New rollup duties should still exist + for (let i = 0; i < 2; i++) { + const params: CheckAndRecordParams = { + rollupAddress: newRollupAddress, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(200 + i), + blockNumber: BlockNumber(150 + i), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + const result = await db.tryInsertOrGetExisting(params); + expect(result.isNew).toBe(false); + } + }); + }); + + describe('cleanupOldDuties', () => { + it('should only clean up old signed duties', async () => { + // Insert some old signed duties directly + const oldStartedAt = new Date(Date.now() - 60 * 60 * 1000); + for (let i = 0; i < 3; i++) { + await pool.query( + `INSERT INTO validator_duties ( + rollup_address, + validator_address, + slot, + block_number, + block_index_within_checkpoint, + duty_type, + status, + message_hash, + signature, + node_id, + lock_token, + started_at, + completed_at + ) VALUES ($1, $2, $3, $4, $5, $6, 'signed', $7, $8, $9, $10, $11, $12)`, + [ + ROLLUP_ADDRESS.toString(), + VALIDATOR_ADDRESS.toString(), + SlotNumber(100 + i), + BlockNumber(50 + i), + BLOCK_INDEX_WITHIN_CHECKPOINT, + DUTY_TYPE, + MESSAGE_HASH, + SIGNATURE, + NODE_ID, + `lock-${i}`, + oldStartedAt, + oldStartedAt, + ], + ); + } + + // Create a recent signed duty + const recentParams: CheckAndRecordParams = { + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(1000), + blockNumber: BlockNumber(900), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + const recentLockToken = await service.checkAndRecord(recentParams); + await service.recordSuccess({ + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(1000), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + signature: { toString: () => SIGNATURE } as any, + nodeId: NODE_ID, + lockToken: recentLockToken, + }); + + // Create a duty in signing status (not completed) + const signingParams: CheckAndRecordParams = { + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(500), + blockNumber: BlockNumber(250), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + await service.checkAndRecord(signingParams); + + // Run cleanup via the service (old signed duties should be deleted) + const cleanupService = new SlashingProtectionService(db, { + ...config, + cleanupOldDutiesAfterHours: 0.5, // 30 minutes + }); + await cleanupService.start(); + await sleep(50); + await cleanupService.stop(); + + // Verify old signed duties are gone + for (let i = 0; i < 3; i++) { + const params: CheckAndRecordParams = { + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SlotNumber(100 + i), + blockNumber: BlockNumber(50 + i), + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + const result = await db.tryInsertOrGetExisting(params); + expect(result.isNew).toBe(true); + } + + // Verify recent signed duty still exists + const recentResult = await db.tryInsertOrGetExisting(recentParams); + expect(recentResult.isNew).toBe(false); + + // Verify signing duty still exists and is still signing + const signingResult = await db.tryInsertOrGetExisting(signingParams); + expect(signingResult.isNew).toBe(false); + expect(signingResult.record.status).toBe(DutyStatus.SIGNING); + }); + + it('should be called during cleanup cycle when configured', async () => { + // Create and sign a duty + const params: CheckAndRecordParams = { + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SLOT, + blockNumber: BLOCK_NUMBER, + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + const lockToken = await service.checkAndRecord(params); + await service.recordSuccess({ + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SLOT, + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + signature: { toString: () => SIGNATURE } as any, + nodeId: NODE_ID, + lockToken, + }); + + // Wait a bit + await sleep(10); + + // Create a new service with cleanupOldDutiesAfterHours configured + const newService = new SlashingProtectionService(db, { + ...config, + maxStuckDutiesAgeMs: 1, + cleanupOldDutiesAfterHours: 0.000001, // ~3.6ms + }); + + // Start the service - this should trigger cleanup + await newService.start(); + await sleep(100); + await newService.stop(); + + // Duty should be gone + const result = await db.tryInsertOrGetExisting(params); + expect(result.isNew).toBe(true); + }); + + it('should not run cleanupOldDuties more often than its max age', async () => { + const cleanupSpy = jest.spyOn(db, 'cleanupOldDuties'); + + const newService = new SlashingProtectionService(db, { + ...config, + maxStuckDutiesAgeMs: 1, + cleanupOldDutiesAfterHours: 0.001, // ~3.6s + }); + + await newService.start(); + await sleep(50); // allow multiple cleanup cycles + await newService.stop(); + + expect(cleanupSpy).toHaveBeenCalledTimes(1); + }); + + it('should not cleanup when cleanupOldDutiesAfterHours is not configured', async () => { + // Create a duty + const params: CheckAndRecordParams = { + rollupAddress: ROLLUP_ADDRESS, + validatorAddress: VALIDATOR_ADDRESS, + slot: SLOT, + blockNumber: BLOCK_NUMBER, + blockIndexWithinCheckpoint: BLOCK_INDEX_WITHIN_CHECKPOINT, + dutyType: DUTY_TYPE, + messageHash: MESSAGE_HASH, + nodeId: NODE_ID, + }; + await service.checkAndRecord(params); + + // Wait a bit + await sleep(10); + + // Create a new service without cleanupOldDutiesAfterHours configured + const newService = new SlashingProtectionService(db, { + ...config, + maxStuckDutiesAgeMs: 1, + // cleanupOldDutiesAfterHours is undefined + }); + + // Start the service + await newService.start(); + await sleep(100); + await newService.stop(); + + // Duty should still exist (not cleaned up by old duties cleanup) + // But it should be cleaned up by stuck duties cleanup since maxStuckDutiesAgeMs is 1ms + const result = await db.tryInsertOrGetExisting(params); + expect(result.isNew).toBe(true); // Cleaned by stuck duties cleanup + }); + }); + }); }); diff --git a/yarn-project/validator-ha-signer/src/slashing_protection_service.ts b/yarn-project/validator-ha-signer/src/slashing_protection_service.ts index 170cb092ca9c..ffe8a2a8e30d 100644 --- a/yarn-project/validator-ha-signer/src/slashing_protection_service.ts +++ b/yarn-project/validator-ha-signer/src/slashing_protection_service.ts @@ -40,6 +40,7 @@ export class SlashingProtectionService { private readonly maxStuckDutiesAgeMs: number; private cleanupRunningPromise: RunningPromise; + private lastOldDutiesCleanupAtMs?: number; constructor( private readonly db: SlashingProtectionDatabase, @@ -51,11 +52,7 @@ export class SlashingProtectionService { // Default to 144s (2x 72s Aztec slot duration) if not explicitly configured this.maxStuckDutiesAgeMs = config.maxStuckDutiesAgeMs ?? 144_000; - this.cleanupRunningPromise = new RunningPromise( - this.cleanupStuckDuties.bind(this), - this.log, - this.maxStuckDutiesAgeMs, - ); + this.cleanupRunningPromise = new RunningPromise(this.cleanup.bind(this), this.log, this.maxStuckDutiesAgeMs); } /** @@ -67,7 +64,6 @@ export class SlashingProtectionService { * 2. If insert succeeds, we acquired the lock - return the lockToken * 3. If a record exists, handle based on status: * - SIGNED: Throw appropriate error (already signed or slashing protection) - * - FAILED: Delete the failed record * - SIGNING: Wait and poll until status changes, then handle result * * @returns The lockToken that must be used for recordSuccess/deleteDuty @@ -221,7 +217,19 @@ export class SlashingProtectionService { * Start running tasks. * Cleanup runs immediately on start to recover from any previous crashes. */ - start() { + /** + * Start the background cleanup task. + * Also performs one-time cleanup of duties with outdated rollup addresses. + */ + async start() { + // One-time cleanup at startup: remove duties from previous rollup versions + const numOutdatedRollupDuties = await this.db.cleanupOutdatedRollupDuties(this.config.l1Contracts.rollupAddress); + if (numOutdatedRollupDuties > 0) { + this.log.info(`Cleaned up ${numOutdatedRollupDuties} duties with outdated rollup address at startup`, { + currentRollupAddress: this.config.l1Contracts.rollupAddress.toString(), + }); + } + this.cleanupRunningPromise.start(); this.log.info('Slashing protection service started', { nodeId: this.config.nodeId }); } @@ -244,15 +252,36 @@ export class SlashingProtectionService { } /** - * Cleanup own stuck duties + * Periodic cleanup of stuck duties and optionally old signed duties. + * Runs in the background via RunningPromise. */ - private async cleanupStuckDuties() { - const numDuties = await this.db.cleanupOwnStuckDuties(this.config.nodeId, this.maxStuckDutiesAgeMs); - if (numDuties > 0) { - this.log.info(`Cleaned up ${numDuties} stuck duties`, { + private async cleanup() { + // 1. Clean up stuck duties (our own node's duties that got stuck in 'signing' status) + const numStuckDuties = await this.db.cleanupOwnStuckDuties(this.config.nodeId, this.maxStuckDutiesAgeMs); + if (numStuckDuties > 0) { + this.log.verbose(`Cleaned up ${numStuckDuties} stuck duties`, { nodeId: this.config.nodeId, maxStuckDutiesAgeMs: this.maxStuckDutiesAgeMs, }); } + + // 2. Clean up old signed duties if configured + // we shouldn't run this as often as stuck duty cleanup. + if (this.config.cleanupOldDutiesAfterHours !== undefined) { + const maxAgeMs = this.config.cleanupOldDutiesAfterHours * 60 * 60 * 1000; + const nowMs = Date.now(); + const shouldRun = + this.lastOldDutiesCleanupAtMs === undefined || nowMs - this.lastOldDutiesCleanupAtMs >= maxAgeMs; + if (shouldRun) { + const numOldDuties = await this.db.cleanupOldDuties(maxAgeMs); + this.lastOldDutiesCleanupAtMs = nowMs; + if (numOldDuties > 0) { + this.log.verbose(`Cleaned up ${numOldDuties} old signed duties`, { + cleanupOldDutiesAfterHours: this.config.cleanupOldDutiesAfterHours, + maxAgeMs, + }); + } + } + } } } diff --git a/yarn-project/validator-ha-signer/src/types.ts b/yarn-project/validator-ha-signer/src/types.ts index 719f6037865a..9232cf189566 100644 --- a/yarn-project/validator-ha-signer/src/types.ts +++ b/yarn-project/validator-ha-signer/src/types.ts @@ -201,6 +201,21 @@ export interface SlashingProtectionDatabase { */ cleanupOwnStuckDuties(nodeId: string, maxAgeMs: number): Promise; + /** + * Cleanup duties with outdated rollup address. + * Removes all duties where the rollup address doesn't match the current one. + * Used after a rollup upgrade to clean up duties for the old rollup. + * @returns the number of duties cleaned up + */ + cleanupOutdatedRollupDuties(currentRollupAddress: EthAddress): Promise; + + /** + * Cleanup old signed duties. + * Removes only signed duties older than the specified age. + * @returns the number of duties cleaned up + */ + cleanupOldDuties(maxAgeMs: number): Promise; + /** * Close the database connection. * Should be called during graceful shutdown. diff --git a/yarn-project/validator-ha-signer/src/validator_ha_signer.test.ts b/yarn-project/validator-ha-signer/src/validator_ha_signer.test.ts index 0284a0611cdb..10ed9b70b0cd 100644 --- a/yarn-project/validator-ha-signer/src/validator_ha_signer.test.ts +++ b/yarn-project/validator-ha-signer/src/validator_ha_signer.test.ts @@ -56,6 +56,11 @@ describe('ValidatorHASigner', () => { await pool.end(); }); + afterAll(async () => { + await db.close(); + await pglite.close(); + }); + describe('initialization', () => { it('should not initialize when nodeId is not explicitly set', () => { const defaultConfig = { @@ -81,7 +86,7 @@ describe('ValidatorHASigner', () => { describe('lifecycle', () => { it('should start and stop without error when enabled', async () => { const signer = new ValidatorHASigner(db, config); - signer.start(); + await signer.start(); await signer.stop(); }); }); @@ -90,9 +95,9 @@ describe('ValidatorHASigner', () => { let signer: ValidatorHASigner; let signFn: jest.Mock<(messageHash: Buffer32) => Promise>; - beforeEach(() => { + beforeEach(async () => { signer = new ValidatorHASigner(db, config); - signer.start(); + await signer.start(); signFn = jest.fn<(messageHash: Buffer32) => Promise>(); signFn.mockResolvedValue(mockSignature); }); @@ -772,7 +777,7 @@ describe('ValidatorHASigner', () => { const signers = nodeIds.map(nodeId => new ValidatorHASigner(db, { ...config, nodeId })); // Start all signers - signers.forEach(signer => signer.start()); + await Promise.all(signers.map(signer => signer.start())); try { // All signers try to sign the same duty for the same validator @@ -970,7 +975,7 @@ describe('ValidatorHASigner', () => { ...config, l1Contracts: { rollupAddress: oldRollupAddress }, }); - oldSigner.start(); + await oldSigner.start(); try { const signFn = jest.fn<(messageHash: Buffer32) => Promise>(); @@ -996,13 +1001,14 @@ describe('ValidatorHASigner', () => { ...config, l1Contracts: { rollupAddress: newRollupAddress }, }); - newSigner.start(); + // Starting the new signer will clean up duties with outdated rollup addresses + await newSigner.start(); try { const signFn2 = jest.fn<(messageHash: Buffer32) => Promise>(); signFn2.mockResolvedValue(mockSignature); - // Sign same slot with new rollup - should succeed (no conflict) + // Sign same slot with new rollup - should succeed (no conflict, old duty was cleaned up) await newSigner.signWithProtection( VALIDATOR_ADDRESS, MESSAGE_HASH, @@ -1017,7 +1023,7 @@ describe('ValidatorHASigner', () => { expect(signFn2).toHaveBeenCalledTimes(1); - // Verify both duties exist with different rollup addresses + // Verify old duty was cleaned up at startup const oldDuty = await db.tryInsertOrGetExisting({ rollupAddress: oldRollupAddress, validatorAddress: VALIDATOR_ADDRESS, @@ -1029,6 +1035,7 @@ describe('ValidatorHASigner', () => { nodeId: NODE_ID, }); + // Verify new duty exists const newDuty = await db.tryInsertOrGetExisting({ rollupAddress: newRollupAddress, validatorAddress: VALIDATOR_ADDRESS, @@ -1040,11 +1047,9 @@ describe('ValidatorHASigner', () => { nodeId: NODE_ID, }); - expect(oldDuty.isNew).toBe(false); - expect(newDuty.isNew).toBe(false); - expect(oldDuty.record.rollupAddress).toEqual(oldRollupAddress); + expect(oldDuty.isNew).toBe(true); // Old duty was cleaned up + expect(newDuty.isNew).toBe(false); // New duty exists expect(newDuty.record.rollupAddress).toEqual(newRollupAddress); - expect(oldDuty.record.status).toBe(DutyStatus.SIGNED); expect(newDuty.record.status).toBe(DutyStatus.SIGNED); } finally { await newSigner.stop(); diff --git a/yarn-project/validator-ha-signer/src/validator_ha_signer.ts b/yarn-project/validator-ha-signer/src/validator_ha_signer.ts index 30ddb7bbc68a..db93e47183c1 100644 --- a/yarn-project/validator-ha-signer/src/validator_ha_signer.ts +++ b/yarn-project/validator-ha-signer/src/validator_ha_signer.ts @@ -147,8 +147,8 @@ export class ValidatorHASigner { * Start the HA signer background tasks (cleanup of stuck duties). * Should be called after construction and before signing operations. */ - start() { - this.slashingProtection.start(); + async start() { + await this.slashingProtection.start(); } /**