Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ export type EnvVar =
| 'VALIDATOR_HA_POLLING_INTERVAL_MS'
| 'VALIDATOR_HA_SIGNING_TIMEOUT_MS'
| 'VALIDATOR_HA_MAX_STUCK_DUTIES_AGE_MS'
| 'VALIDATOR_HA_OLD_DUTIES_MAX_AGE_H'
| 'VALIDATOR_HA_DATABASE_URL'
| 'VALIDATOR_HA_RUN_MIGRATIONS'
| 'VALIDATOR_HA_POOL_MAX'
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/validator-client/src/key_store/ha_key_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ export class HAKeyStore implements ExtendedValidatorKeyStore {
/**
* Start the high-availability key store
*/
public start(): Promise<void> {
return Promise.resolve(this.haSigner.start());
public async start() {
await this.haSigner.start();
}

/**
Expand Down
8 changes: 8 additions & 0 deletions yarn-project/validator-ha-signer/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export interface ValidatorHASignerConfig {
signingTimeoutMs: number;
/** Maximum age of a stuck duty in ms (defaults to 2x hardcoded Aztec slot duration if not set) */
maxStuckDutiesAgeMs?: number;
/** Optional: clean up old duties after this many hours (disabled if not set) */
cleanupOldDutiesAfterHours?: number;
/**
* PostgreSQL connection string
* Format: postgresql://user:password@host:port/database
Expand Down Expand Up @@ -84,6 +86,11 @@ export const validatorHASignerConfigMappings: ConfigMappingsType<ValidatorHASign
description: 'The maximum age of a stuck duty in ms (defaults to 2x Aztec slot duration)',
...optionalNumberConfigHelper(),
},
cleanupOldDutiesAfterHours: {
env: 'VALIDATOR_HA_OLD_DUTIES_MAX_AGE_H',
description: 'Optional: clean up old duties after this many hours (disabled if not set)',
...optionalNumberConfigHelper(),
},
databaseUrl: {
env: 'VALIDATOR_HA_DATABASE_URL',
description:
Expand Down Expand Up @@ -133,6 +140,7 @@ export const ValidatorHASignerConfigSchema = z.object({
pollingIntervalMs: z.number().min(0),
signingTimeoutMs: z.number().min(0),
maxStuckDutiesAgeMs: z.number().min(0).optional(),
cleanupOldDutiesAfterHours: z.number().min(0).optional(),
databaseUrl: z.string().optional(),
poolMaxCount: z.number().min(0).optional(),
poolMinCount: z.number().min(0).optional(),
Expand Down
204 changes: 204 additions & 0 deletions yarn-project/validator-ha-signer/src/db/postgres.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1362,4 +1362,208 @@ describe('PostgresSlashingProtectionDatabase', () => {
}
});
});

describe('cleanupOutdatedRollupDuties', () => {
const CURRENT_ROLLUP_ADDRESS = EthAddress.random();
const OLD_ROLLUP_ADDRESS_1 = EthAddress.random();
const OLD_ROLLUP_ADDRESS_2 = EthAddress.random();
const VALIDATOR_ADDRESS = EthAddress.random();
const NODE_ID = 'node-1';

beforeEach(async () => {
for (const statement of SCHEMA_SETUP) {
await pglite.query(statement);
}
await pglite.query(INSERT_SCHEMA_VERSION, [SCHEMA_VERSION]);
});

it('should clean up duties with outdated rollup addresses', async () => {
const spDb = new PostgresSlashingProtectionDatabase(pool);

// Create duties for old rollup addresses
for (let i = 0; i < 3; i++) {
await spDb.tryInsertOrGetExisting({
rollupAddress: OLD_ROLLUP_ADDRESS_1,
validatorAddress: VALIDATOR_ADDRESS,
slot: SlotNumber(100 + i),
blockNumber: BlockNumber(50 + i),
blockIndexWithinCheckpoint: IndexWithinCheckpoint(0),
dutyType: DutyType.BLOCK_PROPOSAL,
messageHash: Buffer32.random().toString(),
nodeId: NODE_ID,
});
}

for (let i = 0; i < 2; i++) {
await spDb.tryInsertOrGetExisting({
rollupAddress: OLD_ROLLUP_ADDRESS_2,
validatorAddress: VALIDATOR_ADDRESS,
slot: SlotNumber(200 + i),
blockNumber: BlockNumber(150 + i),
blockIndexWithinCheckpoint: IndexWithinCheckpoint(0),
dutyType: DutyType.BLOCK_PROPOSAL,
messageHash: Buffer32.random().toString(),
nodeId: NODE_ID,
});
}

// Create duties for current rollup address
for (let i = 0; i < 2; i++) {
await spDb.tryInsertOrGetExisting({
rollupAddress: CURRENT_ROLLUP_ADDRESS,
validatorAddress: VALIDATOR_ADDRESS,
slot: SlotNumber(300 + i),
blockNumber: BlockNumber(250 + i),
blockIndexWithinCheckpoint: IndexWithinCheckpoint(0),
dutyType: DutyType.BLOCK_PROPOSAL,
messageHash: Buffer32.random().toString(),
nodeId: NODE_ID,
});
}

// Clean up outdated rollup duties
const numCleaned = await spDb.cleanupOutdatedRollupDuties(CURRENT_ROLLUP_ADDRESS);
expect(numCleaned).toBe(5); // 3 from OLD_ROLLUP_ADDRESS_1 + 2 from OLD_ROLLUP_ADDRESS_2

// Verify old rollup duties are gone
const oldDuties = await pglite.query(`SELECT * FROM validator_duties WHERE rollup_address != $1`, [
CURRENT_ROLLUP_ADDRESS.toString(),
]);
expect(oldDuties.rows.length).toBe(0);

// Verify current rollup duties still exist
const currentDuties = await pglite.query(`SELECT * FROM validator_duties WHERE rollup_address = $1`, [
CURRENT_ROLLUP_ADDRESS.toString(),
]);
expect(currentDuties.rows.length).toBe(2);
});

it('should return 0 when no outdated duties exist', async () => {
const spDb = new PostgresSlashingProtectionDatabase(pool);

// Create duties only for current rollup
await spDb.tryInsertOrGetExisting({
rollupAddress: CURRENT_ROLLUP_ADDRESS,
validatorAddress: VALIDATOR_ADDRESS,
slot: SlotNumber(100),
blockNumber: BlockNumber(50),
blockIndexWithinCheckpoint: IndexWithinCheckpoint(0),
dutyType: DutyType.BLOCK_PROPOSAL,
messageHash: Buffer32.random().toString(),
nodeId: NODE_ID,
});

const numCleaned = await spDb.cleanupOutdatedRollupDuties(CURRENT_ROLLUP_ADDRESS);
expect(numCleaned).toBe(0);

// Verify duty still exists
const duties = await pglite.query(`SELECT * FROM validator_duties`);
expect(duties.rows.length).toBe(1);
});
});

describe('cleanupOldDuties', () => {
const ROLLUP_ADDRESS = EthAddress.random();
const VALIDATOR_ADDRESS = EthAddress.random();
const NODE_ID = 'node-1';

beforeEach(async () => {
for (const statement of SCHEMA_SETUP) {
await pglite.query(statement);
}
await pglite.query(INSERT_SCHEMA_VERSION, [SCHEMA_VERSION]);
});

it('should only clean up old signed duties, not signing or recent duties', async () => {
const spDb = new PostgresSlashingProtectionDatabase(pool);
const oldTimestamp = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago

// Insert old signed duties (should be cleaned up)
for (let i = 0; i < 2; i++) {
await pglite.query(
`INSERT INTO validator_duties (
rollup_address, validator_address, slot, block_number, block_index_within_checkpoint,
duty_type, status, message_hash, signature, node_id, lock_token, started_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, 'signed', $7, '0xsignature', $8, 'token', $9, $9)`,
[
ROLLUP_ADDRESS.toString(),
VALIDATOR_ADDRESS.toString(),
(100 + i).toString(),
(50 + i).toString(),
0,
DutyType.BLOCK_PROPOSAL,
Buffer32.random().toString(),
NODE_ID,
oldTimestamp,
],
);
}

// Insert old signing duties (should NOT be cleaned up)
for (let i = 0; i < 2; i++) {
await pglite.query(
`INSERT INTO validator_duties (
rollup_address, validator_address, slot, block_number, block_index_within_checkpoint,
duty_type, status, message_hash, node_id, lock_token, started_at
) VALUES ($1, $2, $3, $4, $5, $6, 'signing', $7, $8, 'token', $9)`,
[
ROLLUP_ADDRESS.toString(),
VALIDATOR_ADDRESS.toString(),
(200 + i).toString(),
(150 + i).toString(),
0,
DutyType.BLOCK_PROPOSAL,
Buffer32.random().toString(),
NODE_ID,
oldTimestamp,
],
);
}

// Insert recent signed duty (should NOT be cleaned up)
const recentResult = await spDb.tryInsertOrGetExisting({
rollupAddress: ROLLUP_ADDRESS,
validatorAddress: VALIDATOR_ADDRESS,
slot: SlotNumber(300),
blockNumber: BlockNumber(250),
blockIndexWithinCheckpoint: IndexWithinCheckpoint(0),
dutyType: DutyType.BLOCK_PROPOSAL,
messageHash: Buffer32.random().toString(),
nodeId: NODE_ID,
});
await spDb.updateDutySigned(
ROLLUP_ADDRESS,
VALIDATOR_ADDRESS,
SlotNumber(300),
DutyType.BLOCK_PROPOSAL,
'0xsignature',
recentResult.record.lockToken,
0,
);

// Clean up duties older than 1 hour
const maxAgeMs = 60 * 60 * 1000; // 1 hour
const numCleaned = await spDb.cleanupOldDuties(maxAgeMs);
expect(numCleaned).toBe(2); // Only the 2 old signed duties

// Verify old signed duties are gone
const oldSignedDuties = await pglite.query(`SELECT * FROM validator_duties WHERE slot >= 100 AND slot < 200`);
expect(oldSignedDuties.rows.length).toBe(0);

// Verify old signing duties still exist (critical safety check)
const signingDuties = await pglite.query(`SELECT * FROM validator_duties WHERE status = 'signing'`);
expect(signingDuties.rows.length).toBe(2);

// Verify recent signed duty still exists
const recentDuty = await pglite.query<DutyRow>(`SELECT * FROM validator_duties WHERE slot = 300`);
expect(recentDuty.rows.length).toBe(1);
expect(recentDuty.rows[0].status).toBe('signed');
});

it('should return 0 when no old signed duties exist', async () => {
const spDb = new PostgresSlashingProtectionDatabase(pool);
const numCleaned = await spDb.cleanupOldDuties(60 * 60 * 1000);
expect(numCleaned).toBe(0);
});
});
});
25 changes: 25 additions & 0 deletions yarn-project/validator-ha-signer/src/db/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import type { QueryResult, QueryResultRow } from 'pg';

import type { SlashingProtectionDatabase, TryInsertOrGetResult } from '../types.js';
import {
CLEANUP_OLD_DUTIES,
CLEANUP_OUTDATED_ROLLUP_DUTIES,
CLEANUP_OWN_STUCK_DUTIES,
DELETE_DUTY,
INSERT_OR_GET_DUTY,
Expand Down Expand Up @@ -256,4 +258,27 @@ export class PostgresSlashingProtectionDatabase implements SlashingProtectionDat
const result = await this.pool.query(CLEANUP_OWN_STUCK_DUTIES, [nodeId, cutoff]);
return result.rowCount ?? 0;
}

/**
* Cleanup duties with outdated rollup address.
* Removes all duties where the rollup address doesn't match the current one.
* Used after a rollup upgrade to clean up duties for the old rollup.
* @returns the number of duties cleaned up
*/
async cleanupOutdatedRollupDuties(currentRollupAddress: EthAddress): Promise<number> {
const result = await this.pool.query(CLEANUP_OUTDATED_ROLLUP_DUTIES, [currentRollupAddress.toString()]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just need to add a note to docs to inform node operators that they can not use the same database to provide slashing protection for nodes running on different rollup version (e.g. current rollup and old rollup) as they will delete each others duties (because of != check here). This is fine as is, just pointing out for docs

return result.rowCount ?? 0;
}

/**
* Cleanup old signed duties.
* Removes only signed duties older than the specified age.
* Does not remove 'signing' duties as they may be in progress.
* @returns the number of duties cleaned up
*/
async cleanupOldDuties(maxAgeMs: number): Promise<number> {
const cutoff = new Date(Date.now() - maxAgeMs);
const result = await this.pool.query(CLEANUP_OLD_DUTIES, [cutoff]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about timezone differences here. Does the pg client encode this date as an unix epoch?

return result.rowCount ?? 0;
}
}
16 changes: 13 additions & 3 deletions yarn-project/validator-ha-signer/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ CREATE TABLE IF NOT EXISTS validator_duties (
block_number BIGINT NOT NULL,
block_index_within_checkpoint INTEGER NOT NULL DEFAULT 0,
duty_type VARCHAR(30) NOT NULL CHECK (duty_type IN ('BLOCK_PROPOSAL', 'CHECKPOINT_PROPOSAL', 'ATTESTATION', 'ATTESTATIONS_AND_SIGNERS', 'GOVERNANCE_VOTE', 'SLASHING_VOTE')),
status VARCHAR(20) NOT NULL CHECK (status IN ('signing', 'signed', 'failed')),
status VARCHAR(20) NOT NULL CHECK (status IN ('signing', 'signed')),
message_hash VARCHAR(66) NOT NULL,
signature VARCHAR(132),
node_id VARCHAR(255) NOT NULL,
Expand Down Expand Up @@ -203,11 +203,11 @@ WHERE status = 'signed'

/**
* Query to clean up old duties (for maintenance)
* Removes duties older than a specified timestamp
* Removes SIGNED duties older than a specified timestamp
*/
export const CLEANUP_OLD_DUTIES = `
DELETE FROM validator_duties
WHERE status IN ('signing', 'signed', 'failed')
WHERE status = 'signed'
AND started_at < $1;
`;

Expand All @@ -222,6 +222,16 @@ WHERE node_id = $1
AND started_at < $2;
`;

/**
* Query to cleanup duties with outdated rollup address
* Removes all duties where the rollup address doesn't match the current one
* Used after a rollup upgrade to clean up duties for the old rollup
*/
export const CLEANUP_OUTDATED_ROLLUP_DUTIES = `
DELETE FROM validator_duties
WHERE rollup_address != $1;
`;

/**
* SQL to drop the validator_duties table
*/
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/validator-ha-signer/src/db/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export interface ValidatorDutyRecord {
startedAt: Date;
/** When the duty signing was completed (success or failure) */
completedAt?: Date;
/** Error message if status is 'failed' */
/** Error message (currently unused) */
errorMessage?: string;
}

Expand Down
Loading
Loading