From f4a86395ee2cfdc1a49888e79eec3e286736f4c9 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 31 Jan 2025 16:44:23 +0000 Subject: [PATCH 1/5] refactor: ensure new kv-store is used on the server --- .../aztec/src/cli/cmds/start_archiver.ts | 2 +- .../aztec/src/cli/cmds/start_p2p_bootstrap.ts | 2 +- .../src/blobstore/blob_store_test_suite.ts | 6 +- .../src/blobstore/disk_blob_store.test.ts | 4 +- .../src/blobstore/disk_blob_store.ts | 17 +++-- .../src/blobstore/memory_blob_store.test.ts | 2 +- yarn-project/blob-sink/src/server/factory.ts | 6 +- yarn-project/blob-sink/src/server/server.ts | 4 +- .../kv-store/src/stores/l2_tips_store.test.ts | 6 +- yarn-project/p2p-bootstrap/src/index.ts | 2 +- yarn-project/p2p/src/client/factory.ts | 10 +-- yarn-project/p2p/src/mocks/index.ts | 4 +- .../reqresp/reqresp.integration.test.ts | 10 +-- yarn-project/p2p/src/utils.test.ts | 12 ++-- .../prover-node/src/prover-node.test.ts | 4 +- .../sequencer-client/src/slasher/factory.ts | 6 +- .../src/slasher/slasher_client.test.ts | 15 ++-- .../src/slasher/slasher_client.ts | 71 +++++++++++-------- 18 files changed, 93 insertions(+), 90 deletions(-) diff --git a/yarn-project/aztec/src/cli/cmds/start_archiver.ts b/yarn-project/aztec/src/cli/cmds/start_archiver.ts index 82c5cee86b52..99a3a180b45d 100644 --- a/yarn-project/aztec/src/cli/cmds/start_archiver.ts +++ b/yarn-project/aztec/src/cli/cmds/start_archiver.ts @@ -10,7 +10,7 @@ import { createBlobSinkClient } from '@aztec/blob-sink/client'; import { ArchiverApiSchema } from '@aztec/circuit-types'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type DataStoreConfig, dataConfigMappings } from '@aztec/kv-store/config'; -import { createStore } from '@aztec/kv-store/lmdb'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { getConfigEnvVars as getTelemetryClientConfig, initTelemetryClient } from '@aztec/telemetry-client'; import { extractRelevantOptions } from '../util.js'; diff --git a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts index ae9181f68417..f312b59fa8d5 100644 --- a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts +++ b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts @@ -1,7 +1,7 @@ import { P2PBootstrapApiSchema } from '@aztec/circuit-types'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn, createLogger } from '@aztec/foundation/log'; -import { createStore } from '@aztec/kv-store/lmdb'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { type BootnodeConfig, BootstrapNode, bootnodeConfigMappings } from '@aztec/p2p'; import { getConfigEnvVars as getTelemetryClientConfig, initTelemetryClient } from '@aztec/telemetry-client'; diff --git a/yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts b/yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts index d6182de269ab..5fc5f5aa5591 100644 --- a/yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts +++ b/yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts @@ -4,11 +4,11 @@ import { Fr } from '@aztec/foundation/fields'; import { BlobWithIndex } from '../types/index.js'; import { type BlobStore } from './interface.js'; -export function describeBlobStore(getBlobStore: () => BlobStore) { +export function describeBlobStore(getBlobStore: () => Promise) { let blobStore: BlobStore; - beforeEach(() => { - blobStore = getBlobStore(); + beforeEach(async () => { + blobStore = await getBlobStore(); }); it('should store and retrieve a blob', async () => { diff --git a/yarn-project/blob-sink/src/blobstore/disk_blob_store.test.ts b/yarn-project/blob-sink/src/blobstore/disk_blob_store.test.ts index 8b523dbaef14..5d28902f47be 100644 --- a/yarn-project/blob-sink/src/blobstore/disk_blob_store.test.ts +++ b/yarn-project/blob-sink/src/blobstore/disk_blob_store.test.ts @@ -1,8 +1,8 @@ -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { describeBlobStore } from './blob_store_test_suite.js'; import { DiskBlobStore } from './disk_blob_store.js'; describe('DiskBlobStore', () => { - describeBlobStore(() => new DiskBlobStore(openTmpStore())); + describeBlobStore(async () => new DiskBlobStore(await openTmpStore('test'))); }); diff --git a/yarn-project/blob-sink/src/blobstore/disk_blob_store.ts b/yarn-project/blob-sink/src/blobstore/disk_blob_store.ts index 63e4dc10ab6e..0da855c197dd 100644 --- a/yarn-project/blob-sink/src/blobstore/disk_blob_store.ts +++ b/yarn-project/blob-sink/src/blobstore/disk_blob_store.ts @@ -1,32 +1,31 @@ -import { type AztecKVStore, type AztecMap } from '@aztec/kv-store'; +import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import { type BlobWithIndex, BlobsWithIndexes } from '../types/index.js'; import { type BlobStore } from './interface.js'; export class DiskBlobStore implements BlobStore { - blobs: AztecMap; + blobs: AztecAsyncMap; - constructor(store: AztecKVStore) { + constructor(store: AztecAsyncKVStore) { this.blobs = store.openMap('blobs'); } - public getBlobSidecars(blockId: string, indices?: number[]): Promise { - const blobBuffer = this.blobs.get(`${blockId}`); + public async getBlobSidecars(blockId: string, indices?: number[]): Promise { + const blobBuffer = await this.blobs.getAsync(`${blockId}`); if (!blobBuffer) { - return Promise.resolve(undefined); + return undefined; } const blobsWithIndexes = BlobsWithIndexes.fromBuffer(blobBuffer); if (indices) { // If indices are provided, return the blobs at the specified indices - return Promise.resolve(blobsWithIndexes.getBlobsFromIndices(indices)); + return blobsWithIndexes.getBlobsFromIndices(indices); } // If no indices are provided, return all blobs - return Promise.resolve(blobsWithIndexes.blobs); + return blobsWithIndexes.blobs; } public async addBlobSidecars(blockId: string, blobSidecars: BlobWithIndex[]): Promise { await this.blobs.set(blockId, new BlobsWithIndexes(blobSidecars).toBuffer()); - return Promise.resolve(); } } diff --git a/yarn-project/blob-sink/src/blobstore/memory_blob_store.test.ts b/yarn-project/blob-sink/src/blobstore/memory_blob_store.test.ts index 2f13926cd1a4..43151631f06e 100644 --- a/yarn-project/blob-sink/src/blobstore/memory_blob_store.test.ts +++ b/yarn-project/blob-sink/src/blobstore/memory_blob_store.test.ts @@ -2,5 +2,5 @@ import { describeBlobStore } from './blob_store_test_suite.js'; import { MemoryBlobStore } from './memory_blob_store.js'; describe('MemoryBlobStore', () => { - describeBlobStore(() => new MemoryBlobStore()); + describeBlobStore(() => Promise.resolve(new MemoryBlobStore())); }); diff --git a/yarn-project/blob-sink/src/server/factory.ts b/yarn-project/blob-sink/src/server/factory.ts index 43a0df8e6c38..8e33f9335d23 100644 --- a/yarn-project/blob-sink/src/server/factory.ts +++ b/yarn-project/blob-sink/src/server/factory.ts @@ -1,5 +1,5 @@ -import { type AztecKVStore } from '@aztec/kv-store'; -import { createStore } from '@aztec/kv-store/lmdb'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { type TelemetryClient } from '@aztec/telemetry-client'; import { type BlobSinkConfig } from './config.js'; @@ -7,7 +7,7 @@ import { BlobSinkServer } from './server.js'; // If data store settings are provided, the store is created and returned. // Otherwise, undefined is returned and an in memory store will be used. -async function getDataStoreConfig(config?: BlobSinkConfig): Promise { +async function getDataStoreConfig(config?: BlobSinkConfig): Promise { if (!config?.dataStoreConfig) { return undefined; } diff --git a/yarn-project/blob-sink/src/server/server.ts b/yarn-project/blob-sink/src/server/server.ts index 27382c4773eb..60c4b5d6b7cc 100644 --- a/yarn-project/blob-sink/src/server/server.ts +++ b/yarn-project/blob-sink/src/server/server.ts @@ -1,6 +1,6 @@ import { Blob } from '@aztec/foundation/blob'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore } from '@aztec/kv-store'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import express, { type Express, type Request, type Response, json } from 'express'; @@ -32,7 +32,7 @@ export class BlobSinkServer { private metrics: BlobSinkMetrics; private log: Logger = createLogger('aztec:blob-sink'); - constructor(config?: BlobSinkConfig, store?: AztecKVStore, telemetry: TelemetryClient = getTelemetryClient()) { + constructor(config?: BlobSinkConfig, store?: AztecAsyncKVStore, telemetry: TelemetryClient = getTelemetryClient()) { this.port = config?.port ?? 5052; // 5052 is beacon chain default http port this.app = express(); diff --git a/yarn-project/kv-store/src/stores/l2_tips_store.test.ts b/yarn-project/kv-store/src/stores/l2_tips_store.test.ts index f78b8c1810f4..5e9fe1a868c0 100644 --- a/yarn-project/kv-store/src/stores/l2_tips_store.test.ts +++ b/yarn-project/kv-store/src/stores/l2_tips_store.test.ts @@ -2,7 +2,7 @@ import { type L2Block } from '@aztec/circuit-types'; import { type BlockHeader, Fr } from '@aztec/circuits.js'; import { times } from '@aztec/foundation/collection'; import { type AztecAsyncKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { expect } from 'chai'; @@ -12,8 +12,8 @@ describe('L2TipsStore', () => { let kvStore: AztecAsyncKVStore; let tipsStore: L2TipsStore; - beforeEach(() => { - kvStore = openTmpStore(true); + beforeEach(async () => { + kvStore = await openTmpStore('test', true); tipsStore = new L2TipsStore(kvStore, 'test'); }); diff --git a/yarn-project/p2p-bootstrap/src/index.ts b/yarn-project/p2p-bootstrap/src/index.ts index 104a396960e8..4246fd2164d1 100644 --- a/yarn-project/p2p-bootstrap/src/index.ts +++ b/yarn-project/p2p-bootstrap/src/index.ts @@ -1,5 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; -import { createStore } from '@aztec/kv-store/lmdb'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { type BootnodeConfig, BootstrapNode } from '@aztec/p2p'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index c6de223d919a..d319f746c1b0 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -6,9 +6,9 @@ import { } from '@aztec/circuit-types'; import { type EpochCache } from '@aztec/epoch-cache'; import { createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore } from '@aztec/kv-store'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; import { type DataStoreConfig } from '@aztec/kv-store/config'; -import { createStore as createStoreV2 } from '@aztec/kv-store/lmdb-v2'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import { P2PClient } from '../client/p2p_client.js'; @@ -26,7 +26,7 @@ import { configureP2PClientAddresses, createLibP2PPeerIdFromPrivateKey, getPeerI type P2PClientDeps = { txPool?: TxPool; - store?: AztecKVStore; + store?: AztecAsyncKVStore; attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined; epochProofQuotePool?: EpochProofQuotePool; }; @@ -43,8 +43,8 @@ export const createP2PClient = async ( ) => { let config = { ..._config }; const logger = createLogger('p2p'); - const store = await createStoreV2('p2p-v2', config, createLogger('p2p:lmdb-v2')); - const archive = await createStoreV2('p2p-archive', config, createLogger('p2p-archive:lmdb-v2')); + const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb-v2'))); + const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb-v2')); const mempools: MemPools = { txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit), diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index d55e44c9be06..41120411f8e9 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -8,7 +8,7 @@ import { import { type EpochCache } from '@aztec/epoch-cache'; import { timesParallel } from '@aztec/foundation/collection'; import { type DataStoreConfig } from '@aztec/kv-store/config'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import { gossipsub } from '@chainsafe/libp2p-gossipsub'; @@ -263,7 +263,7 @@ export async function createBootstrapNode( async function startBootstrapNode(config: BootnodeConfig, telemetry: TelemetryClient) { // Open an ephemeral store that will only exist in memory - const store = openTmpStore(true); + const store = await openTmpStore('bootstrap-node', true); const bootstrapNode = new BootstrapNode(store, telemetry); await bootstrapNode.start(config); return bootstrapNode; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts index 334aaaecfe92..44a951f66b58 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts @@ -11,9 +11,9 @@ import { import { type EpochCache } from '@aztec/epoch-cache'; import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import { type AztecKVStore } from '@aztec/kv-store'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; import { type DataStoreConfig } from '@aztec/kv-store/config'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { SignableENR } from '@chainsafe/enr'; import { describe, expect, it, jest } from '@jest/globals'; @@ -38,7 +38,7 @@ function generatePeerIdPrivateKeys(numberOfPeers: number): string[] { const peerIdPrivateKeys: string[] = []; for (let i = 0; i < numberOfPeers; i++) { // magic number is multiaddr prefix: https://multiformats.io/multiaddr/ - peerIdPrivateKeys.push('08021220' + generatePrivateKey().substr(2, 66)); + peerIdPrivateKeys.push('08021220' + generatePrivateKey().slice(2, 68)); } return peerIdPrivateKeys; } @@ -51,7 +51,7 @@ describe('Req Resp p2p client integration', () => { let epochProofQuotePool: MockProxy; let epochCache: MockProxy; let l2BlockSource: MockL2BlockSource; - let kvStore: AztecKVStore; + let kvStore: AztecAsyncKVStore; let worldState: WorldStateSynchronizer; let proofVerifier: ClientProtocolCircuitVerifier; const logger = createLogger('p2p:test:client-integration'); @@ -120,7 +120,7 @@ describe('Req Resp p2p client integration', () => { await l2BlockSource.createBlocks(100); proofVerifier = alwaysTrueVerifier ? new AlwaysTrueCircuitVerifier() : new AlwaysFalseCircuitVerifier(); - kvStore = openTmpStore(); + kvStore = await openTmpStore('test'); const deps = { txPool: txPool as unknown as TxPool, attestationPool: attestationPool as unknown as AttestationPool, diff --git a/yarn-project/p2p/src/utils.test.ts b/yarn-project/p2p/src/utils.test.ts index 89e9174b577e..50ecbf73a225 100644 --- a/yarn-project/p2p/src/utils.test.ts +++ b/yarn-project/p2p/src/utils.test.ts @@ -1,5 +1,5 @@ -import { type AztecKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { generateKeyPair, marshalPrivateKey } from '@libp2p/crypto/keys'; import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; @@ -19,13 +19,13 @@ describe('p2p utils', () => { expect(reconstructedPeerId.publicKey).toEqual(peerId.publicKey); }); - const readFromSingleton = async (store: AztecKVStore) => { + const readFromSingleton = async (store: AztecAsyncKVStore) => { const peerIdPrivateKeySingleton = store.openSingleton('peerIdPrivateKey'); - return await peerIdPrivateKeySingleton.get(); + return await peerIdPrivateKeySingleton.getAsync(); }; it('If nothing is provided, it should create a new peer id private key, and persist it', async () => { - const store = openTmpStore(); + const store = await openTmpStore('test'); const config = {} as P2PConfig; const peerIdPrivateKey = await getPeerIdPrivateKey(config, store); @@ -45,7 +45,7 @@ describe('p2p utils', () => { }); it('If a value is provided in the config, it should use and persist that value', async () => { - const store = openTmpStore(); + const store = await openTmpStore('test'); const newPeerIdPrivateKey = await generateKeyPair('secp256k1'); const privateKeyString = Buffer.from(marshalPrivateKey(newPeerIdPrivateKey)).toString('hex'); diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 76cb629d0532..c959cd0daf7f 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -21,7 +21,7 @@ import { times, timesParallel } from '@aztec/foundation/collection'; import { Signature } from '@aztec/foundation/eth-signature'; import { makeBackoff, retry } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { type BootstrapNode, InMemoryTxPool, MemoryEpochProofQuotePool, P2PClient } from '@aztec/p2p'; import { createBootstrapNode, createTestLibP2PService } from '@aztec/p2p/mocks'; import { type PublicProcessorFactory } from '@aztec/simulator/server'; @@ -385,7 +385,7 @@ describe('prover-node', () => { getTelemetryClient(), port, ); - const kvStore = openTmpStore(); + const kvStore = await openTmpStore('test'); return new P2PClient(P2PClientType.Prover, kvStore, l2BlockSource, mempools, libp2pService); }; diff --git a/yarn-project/sequencer-client/src/slasher/factory.ts b/yarn-project/sequencer-client/src/slasher/factory.ts index c6fe4c624e04..7a38e96c25e8 100644 --- a/yarn-project/sequencer-client/src/slasher/factory.ts +++ b/yarn-project/sequencer-client/src/slasher/factory.ts @@ -1,9 +1,9 @@ import type { L2BlockSource } from '@aztec/circuit-types'; import { type L1ContractsConfig, type L1ReaderConfig } from '@aztec/ethereum'; import { createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore } from '@aztec/kv-store'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; import { type DataStoreConfig } from '@aztec/kv-store/config'; -import { createStore } from '@aztec/kv-store/lmdb'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import { SlasherClient } from './slasher_client.js'; @@ -13,7 +13,7 @@ export const createSlasherClient = async ( _config: SlasherConfig & DataStoreConfig & L1ContractsConfig & L1ReaderConfig, l2BlockSource: L2BlockSource, telemetry: TelemetryClient = getTelemetryClient(), - deps: { store?: AztecKVStore } = {}, + deps: { store?: AztecAsyncKVStore } = {}, ) => { const config = { ..._config }; const store = deps.store ?? (await createStore('slasher', config, createLogger('slasher:lmdb'))); diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts index e379bcadd93f..2cfca24c6386 100644 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts +++ b/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts @@ -9,8 +9,8 @@ import { import { EthAddress } from '@aztec/foundation/eth-address'; import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; -import { type AztecKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/lmdb'; +import { type AztecAsyncKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { expect } from '@jest/globals'; @@ -19,7 +19,7 @@ import { SlasherClient, type SlasherConfig } from './slasher_client.js'; // Most of this test are directly copied from the P2P client test. describe('In-Memory Slasher Client', () => { let blockSource: MockL2BlockSource; - let kvStore: AztecKVStore; + let kvStore: AztecAsyncKVStore; let client: SlasherClient; let config: SlasherConfig & L1ContractsConfig & L1ReaderConfig; @@ -42,19 +42,14 @@ describe('In-Memory Slasher Client', () => { viemPollingIntervalMS: 1000, }; - kvStore = openTmpStore(); + kvStore = await openTmpStore('test'); client = new SlasherClient(config, kvStore, blockSource); }); const advanceToProvenBlock = async (getProvenBlockNumber: number, provenEpochNumber = getProvenBlockNumber) => { blockSource.setProvenBlockNumber(getProvenBlockNumber); blockSource.setProvenEpochNumber(provenEpochNumber); - await retryUntil( - () => Promise.resolve(client.getSyncedProvenBlockNum() >= getProvenBlockNumber), - 'synced', - 10, - 0.1, - ); + await retryUntil(async () => (await client.getSyncedProvenBlockNum()) >= getProvenBlockNumber, 'synced', 10, 0.1); }; afterEach(async () => { diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.ts index 5c1eeb560d37..a92dae9fe3af 100644 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.ts +++ b/yarn-project/sequencer-client/src/slasher/slasher_client.ts @@ -10,7 +10,7 @@ import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants'; import { type L1ContractsConfig, type L1ReaderConfig, createEthereumChain } from '@aztec/ethereum'; import { EthAddress } from '@aztec/foundation/eth-address'; import { createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store'; +import type { AztecAsyncKVStore, AztecAsyncMap, AztecAsyncSingleton } from '@aztec/kv-store'; import { SlashFactoryAbi } from '@aztec/l1-artifacts'; import { type TelemetryClient, WithTracer, getTelemetryClient } from '@aztec/telemetry-client'; @@ -92,9 +92,9 @@ export class SlasherClient extends WithTracer { private latestBlockNumberAtStart = -1; private provenBlockNumberAtStart = -1; - private synchedBlockHashes: AztecMap; - private synchedLatestBlockNumber: AztecSingleton; - private synchedProvenBlockNumber: AztecSingleton; + private synchedBlockHashes: AztecAsyncMap; + private synchedLatestBlockNumber: AztecAsyncSingleton; + private synchedProvenBlockNumber: AztecAsyncSingleton; private blockStream; @@ -110,7 +110,7 @@ export class SlasherClient extends WithTracer { constructor( private config: SlasherConfig & L1ContractsConfig & L1ReaderConfig, - private store: AztecKVStore, + private store: AztecAsyncKVStore, private l2BlockSource: L2BlockSource, telemetry: TelemetryClient = getTelemetryClient(), private log = createLogger('slasher'), @@ -178,17 +178,17 @@ export class SlasherClient extends WithTracer { } public getL2BlockHash(number: number): Promise { - return Promise.resolve(this.synchedBlockHashes.get(number)); + return this.synchedBlockHashes.getAsync(number); } - public getL2Tips(): Promise { - const latestBlockNumber = this.getSyncedLatestBlockNum(); + public async getL2Tips(): Promise { + const latestBlockNumber = await this.getSyncedLatestBlockNum(); let latestBlockHash: string | undefined; - const provenBlockNumber = this.getSyncedProvenBlockNum(); + const provenBlockNumber = await this.getSyncedProvenBlockNum(); let provenBlockHash: string | undefined; if (latestBlockNumber > 0) { - latestBlockHash = this.synchedBlockHashes.get(latestBlockNumber); + latestBlockHash = await this.synchedBlockHashes.getAsync(latestBlockNumber); if (typeof latestBlockHash === 'undefined') { this.log.warn(`Block hash for latest block ${latestBlockNumber} not found`); throw new Error(); @@ -196,7 +196,7 @@ export class SlasherClient extends WithTracer { } if (provenBlockNumber > 0) { - provenBlockHash = this.synchedBlockHashes.get(provenBlockNumber); + provenBlockHash = await this.synchedBlockHashes.getAsync(provenBlockNumber); if (typeof provenBlockHash === 'undefined') { this.log.warn(`Block hash for proven block ${provenBlockNumber} not found`); throw new Error(); @@ -220,7 +220,7 @@ export class SlasherClient extends WithTracer { // TODO (alexg): I think we can prune the block hashes map here break; case 'chain-proven': { - const from = this.getSyncedProvenBlockNum() + 1; + const from = (await this.getSyncedProvenBlockNum()) + 1; const limit = event.blockNumber - from + 1; await this.handleProvenL2Blocks(await this.l2BlockSource.getBlocks(from, limit)); break; @@ -247,8 +247,8 @@ export class SlasherClient extends WithTracer { this.latestBlockNumberAtStart = await this.l2BlockSource.getBlockNumber(); this.provenBlockNumberAtStart = await this.l2BlockSource.getProvenBlockNumber(); - const syncedLatestBlock = this.getSyncedLatestBlockNum() + 1; - const syncedProvenBlock = this.getSyncedProvenBlockNum() + 1; + const syncedLatestBlock = (await this.getSyncedLatestBlockNum()) + 1; + const syncedProvenBlock = (await this.getSyncedProvenBlockNum()) + 1; // if there are blocks to be retrieved, go to a synching state if (syncedLatestBlock <= this.latestBlockNumberAtStart || syncedProvenBlock <= this.provenBlockNumberAtStart) { @@ -278,6 +278,8 @@ export class SlasherClient extends WithTracer { this.log.debug('Stopping Slasher client...'); await this.blockStream.stop(); this.log.debug('Stopped block downloader'); + await this.store.close(); + this.log.debug('Stopped slasher store'); this.setCurrentState(SlasherClientState.STOPPED); this.log.info('Slasher client stopped.'); } @@ -294,16 +296,16 @@ export class SlasherClient extends WithTracer { * Public function to check the latest block number that the slasher client is synced to. * @returns Block number of latest L2 Block we've synced with. */ - public getSyncedLatestBlockNum() { - return this.synchedLatestBlockNumber.get() ?? INITIAL_L2_BLOCK_NUM - 1; + public async getSyncedLatestBlockNum(): Promise { + return (await this.synchedLatestBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; } /** * Public function to check the latest proven block number that the slasher client is synced to. * @returns Block number of latest proven L2 Block we've synced with. */ - public getSyncedProvenBlockNum() { - return this.synchedProvenBlockNumber.get() ?? INITIAL_L2_BLOCK_NUM - 1; + public async getSyncedProvenBlockNum(): Promise { + return (await this.synchedProvenBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; } /** @@ -311,7 +313,7 @@ export class SlasherClient extends WithTracer { * @returns Information about slasher client status: state & syncedToBlockNum. */ public async getStatus(): Promise { - const blockNumber = this.getSyncedLatestBlockNum(); + const blockNumber = await this.getSyncedLatestBlockNum(); const blockHash = blockNumber == 0 ? '' @@ -332,16 +334,19 @@ export class SlasherClient extends WithTracer { */ private async handleLatestL2Blocks(blocks: L2Block[]): Promise { if (!blocks.length) { - return Promise.resolve(); + return; } - const lastBlockNum = blocks[blocks.length - 1].number; - await Promise.all( - blocks.map(async block => this.synchedBlockHashes.set(block.number, (await block.hash()).toString())), - ); - await this.synchedLatestBlockNumber.set(lastBlockNum); - this.log.debug(`Synched to latest block ${lastBlockNum}`); - this.startServiceIfSynched(); + await this.store.transactionAsync(async () => { + for (const block of blocks) { + await this.synchedBlockHashes.set(block.number, (await block.hash()).toString()); + } + + const lastBlockNum = blocks[blocks.length - 1].number; + await this.synchedLatestBlockNumber.set(lastBlockNum); + }); + + await this.startServiceIfSynched(); } /** @@ -357,7 +362,7 @@ export class SlasherClient extends WithTracer { await this.synchedProvenBlockNumber.set(lastBlockNum); this.log.debug(`Synched to proven block ${lastBlockNum}`); - this.startServiceIfSynched(); + await this.startServiceIfSynched(); } private async handlePruneL2Blocks(latestBlock: number): Promise { @@ -381,11 +386,15 @@ export class SlasherClient extends WithTracer { await this.synchedLatestBlockNumber.set(latestBlock); } - private startServiceIfSynched() { + private async startServiceIfSynched() { + const [latestBlock, provenBlock] = await Promise.all([ + this.getSyncedLatestBlockNum(), + this.getSyncedProvenBlockNum(), + ]); if ( this.currentState === SlasherClientState.SYNCHING && - this.getSyncedLatestBlockNum() >= this.latestBlockNumberAtStart && - this.getSyncedProvenBlockNum() >= this.provenBlockNumberAtStart + latestBlock >= this.latestBlockNumberAtStart && + provenBlock >= this.provenBlockNumberAtStart ) { this.log.debug(`Synched to blocks at start`); this.setCurrentState(SlasherClientState.RUNNING); From d3c60dfe3aa5d72801b93e0f0a9683a0bc33c8df Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Sun, 2 Feb 2025 16:18:40 +0000 Subject: [PATCH 2/5] fix: catch messages sent to closed store --- .../lmdb_store/lmdb_store_wrapper.cpp | 1 + .../nodejs_module/util/message_processor.hpp | 7 ++++- yarn-project/kv-store/src/lmdb-v2/store.ts | 26 +++++++++++++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp index e93b5902f816..993a57d21fce 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp @@ -248,6 +248,7 @@ StatsResponse LMDBStoreWrapper::get_stats() BoolResponse LMDBStoreWrapper::close() { + _msg_processor.close(); _store.reset(nullptr); return { true }; } diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/util/message_processor.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/util/message_processor.hpp index d6dd84c2846e..3a646c71c7e1 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/util/message_processor.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/util/message_processor.hpp @@ -56,7 +56,9 @@ class AsyncMessageProcessor { // complete on an separate thread auto deferred = std::make_shared(env); - if (info.Length() < 1) { + if (!open) { + deferred->Reject(Napi::TypeError::New(env, "Message processor is closed").Value()); + } else if (info.Length() < 1) { deferred->Reject(Napi::TypeError::New(env, "Wrong number of arguments").Value()); } else if (!info[0].IsBuffer()) { deferred->Reject(Napi::TypeError::New(env, "Argument must be a buffer").Value()); @@ -82,8 +84,11 @@ class AsyncMessageProcessor { return deferred->Promise(); } + void close() { open = false; } + private: bb::messaging::MessageDispatcher dispatcher; + bool open = true; template void _register_handler(uint32_t msgType, const std::function& fn) diff --git a/yarn-project/kv-store/src/lmdb-v2/store.ts b/yarn-project/kv-store/src/lmdb-v2/store.ts index aff65119d8c9..5dfd8bab63e5 100644 --- a/yarn-project/kv-store/src/lmdb-v2/store.ts +++ b/yarn-project/kv-store/src/lmdb-v2/store.ts @@ -25,6 +25,7 @@ import { LMDBSingleValue } from './singleton.js'; import { WriteTransaction } from './write_transaction.js'; export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { + private open = false; private channel: MsgpackChannel; private writerCtx = new AsyncLocalStorage(); private writerQueue = new SerialQueue(); @@ -46,15 +47,17 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { private async start() { this.writerQueue.start(); - await this.sendMessage(LMDBMessageType.OPEN_DATABASE, { + await this.channel.sendMessage(LMDBMessageType.OPEN_DATABASE, { db: Database.DATA, uniqueKeys: true, }); - await this.sendMessage(LMDBMessageType.OPEN_DATABASE, { + await this.channel.sendMessage(LMDBMessageType.OPEN_DATABASE, { db: Database.INDEX, uniqueKeys: false, }); + + this.open = true; } public static async new( @@ -70,10 +73,16 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { } public getReadTx(): ReadTransaction { + if (!this.open) { + throw new Error('Store is closed'); + } return new ReadTransaction(this); } public getCurrentWriteTx(): WriteTransaction | undefined { + if (!this.open) { + throw new Error('Store is closed'); + } const currentWrite = this.writerCtx.getStore(); return currentWrite; } @@ -105,6 +114,10 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { async transactionAsync>>( callback: (tx: WriteTransaction) => Promise, ): Promise { + if (!this.open) { + throw new Error('Store is closed'); + } + // transactionAsync might be called recursively // send any writes to the parent tx, but don't close it // if the callback throws then the parent tx will rollback automatically @@ -144,6 +157,11 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { } async close() { + if (!this.open) { + // already closed + return; + } + this.open = false; await this.writerQueue.cancel(); await this.sendMessage(LMDBMessageType.CLOSE, undefined); } @@ -152,6 +170,10 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { msgType: T, body: LMDBRequestBody[T], ): Promise { + if (!this.open) { + throw new Error('Store is closed'); + } + if (msgType === LMDBMessageType.START_CURSOR) { await this.availableCursors.acquire(); } From 1246cbefcb6e0bdc2530bf01504a9a1d55ece3ad Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Sun, 2 Feb 2025 16:26:02 +0000 Subject: [PATCH 3/5] fix: close all open cursors on close --- .../nodejs_module/lmdb_store/lmdb_store_wrapper.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp index 993a57d21fce..3538da8caa8f 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp @@ -248,8 +248,18 @@ StatsResponse LMDBStoreWrapper::get_stats() BoolResponse LMDBStoreWrapper::close() { + // prevent this store from receiving further messages _msg_processor.close(); + + { + // close all of the open read cursors + std::lock_guard cursors(_cursor_mutex); + _cursors.clear(); + } + + // and finally close the database handle _store.reset(nullptr); + return { true }; } From 4d34c9212ea3312b838d337ddc8ab01077924e87 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Sun, 2 Feb 2025 21:15:38 +0000 Subject: [PATCH 4/5] fix: reopen data store in slasher test suite --- yarn-project/kv-store/src/lmdb-v2/factory.ts | 10 ++++++++++ yarn-project/kv-store/src/lmdb-v2/store.ts | 4 ++++ .../src/slasher/slasher_client.test.ts | 18 ++++++++++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/yarn-project/kv-store/src/lmdb-v2/factory.ts b/yarn-project/kv-store/src/lmdb-v2/factory.ts index 29994263ad6e..2c20c562269f 100644 --- a/yarn-project/kv-store/src/lmdb-v2/factory.ts +++ b/yarn-project/kv-store/src/lmdb-v2/factory.ts @@ -77,3 +77,13 @@ export async function openTmpStore( return AztecLMDBStoreV2.new(dataDir, dbMapSizeKb, maxReaders, cleanup, log); } + +export function openStoreAt( + dataDir: string, + dbMapSizeKb = 10 * 1_024 * 1_024, // 10GB + maxReaders = MAX_READERS, + log: Logger = createLogger('kv-store:lmdb-v2'), +): Promise { + log.debug(`Opening data store at: ${dataDir} with size: ${dbMapSizeKb} KB (LMDB v2)`); + return AztecLMDBStoreV2.new(dataDir, dbMapSizeKb, maxReaders, undefined, log); +} diff --git a/yarn-project/kv-store/src/lmdb-v2/store.ts b/yarn-project/kv-store/src/lmdb-v2/store.ts index 5dfd8bab63e5..2fb282eee115 100644 --- a/yarn-project/kv-store/src/lmdb-v2/store.ts +++ b/yarn-project/kv-store/src/lmdb-v2/store.ts @@ -44,6 +44,10 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { this.availableCursors = new Semaphore(maxReaders - 1); } + public get dataDirectory(): string { + return this.dataDir; + } + private async start() { this.writerQueue.start(); diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts index 2cfca24c6386..65e4a4a2b7d1 100644 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts +++ b/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts @@ -10,9 +10,10 @@ import { EthAddress } from '@aztec/foundation/eth-address'; import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; import { type AztecAsyncKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import { openStoreAt, openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { expect } from '@jest/globals'; +import { rm } from 'fs/promises'; import { SlasherClient, type SlasherConfig } from './slasher_client.js'; @@ -22,6 +23,7 @@ describe('In-Memory Slasher Client', () => { let kvStore: AztecAsyncKVStore; let client: SlasherClient; let config: SlasherConfig & L1ContractsConfig & L1ReaderConfig; + let tmpDir: string; beforeEach(async () => { blockSource = new MockL2BlockSource(); @@ -42,7 +44,10 @@ describe('In-Memory Slasher Client', () => { viemPollingIntervalMS: 1000, }; - kvStore = await openTmpStore('test'); + // ephemeral false so that we can close and re-open during tests + const store = await openTmpStore('test', false); + kvStore = store; + tmpDir = store.dataDirectory; client = new SlasherClient(config, kvStore, blockSource); }); @@ -56,6 +61,8 @@ describe('In-Memory Slasher Client', () => { if (client.isReady()) { await client.stop(); } + + await rm(tmpDir, { recursive: true, force: true }); }); it('can start & stop', async () => { @@ -70,10 +77,13 @@ describe('In-Memory Slasher Client', () => { it('restores the previous block number it was at', async () => { await client.start(); + const synchedBlock = await client.getSyncedLatestBlockNum(); await client.stop(); - const client2 = new SlasherClient(config, kvStore, blockSource); - expect(client2.getSyncedLatestBlockNum()).toEqual(client.getSyncedLatestBlockNum()); + const reopenedStore = await openStoreAt(tmpDir); + const client2 = new SlasherClient(config, reopenedStore, blockSource); + expect(await client2.getSyncedLatestBlockNum()).toEqual(synchedBlock); + await client2.stop(); }); describe('Chain prunes', () => { From dbaf56cf9220885dc35260a6aa31c7b85253c12c Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Mon, 3 Feb 2025 17:45:05 +0000 Subject: [PATCH 5/5] fix: closed store --- yarn-project/kv-store/src/lmdb-v2/store.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/kv-store/src/lmdb-v2/store.ts b/yarn-project/kv-store/src/lmdb-v2/store.ts index 2fb282eee115..11ab859061e7 100644 --- a/yarn-project/kv-store/src/lmdb-v2/store.ts +++ b/yarn-project/kv-store/src/lmdb-v2/store.ts @@ -167,7 +167,7 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { } this.open = false; await this.writerQueue.cancel(); - await this.sendMessage(LMDBMessageType.CLOSE, undefined); + await this.channel.sendMessage(LMDBMessageType.CLOSE, undefined); } public async sendMessage(