diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index e5cc85f214de..7e92ce8255c6 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -251,6 +251,12 @@ export type EnvVar = | 'TX_COLLECTION_FILE_STORE_URLS' | 'TX_COLLECTION_FILE_STORE_SLOW_DELAY_MS' | 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS' + | 'TX_COLLECTION_FILE_STORE_FAST_WORKER_COUNT' + | 'TX_COLLECTION_FILE_STORE_SLOW_WORKER_COUNT' + | 'TX_COLLECTION_FILE_STORE_FAST_BACKOFF_BASE_MS' + | 'TX_COLLECTION_FILE_STORE_SLOW_BACKOFF_BASE_MS' + | 'TX_COLLECTION_FILE_STORE_FAST_BACKOFF_MAX_MS' + | 'TX_COLLECTION_FILE_STORE_SLOW_BACKOFF_MAX_MS' | 'TX_FILE_STORE_URL' | 'TX_FILE_STORE_UPLOAD_CONCURRENCY' | 'TX_FILE_STORE_MAX_QUEUE_SIZE' diff --git a/yarn-project/p2p/src/services/tx_collection/config.ts b/yarn-project/p2p/src/services/tx_collection/config.ts index 2c3d821bb440..f86950af78e8 100644 --- a/yarn-project/p2p/src/services/tx_collection/config.ts +++ b/yarn-project/p2p/src/services/tx_collection/config.ts @@ -37,6 +37,18 @@ export type TxCollectionConfig = { txCollectionFileStoreSlowDelayMs: number; /** Delay in ms before file store collection starts after fast collection is triggered */ txCollectionFileStoreFastDelayMs: number; + /** Number of concurrent workers for fast file store collection */ + txCollectionFileStoreFastWorkerCount: number; + /** Number of concurrent workers for slow file store collection */ + txCollectionFileStoreSlowWorkerCount: number; + /** Base backoff time in ms for fast file store collection retries */ + txCollectionFileStoreFastBackoffBaseMs: number; + /** Base backoff time in ms for slow file store collection retries */ + txCollectionFileStoreSlowBackoffBaseMs: number; + /** Max backoff time in ms for fast file store collection retries */ + txCollectionFileStoreFastBackoffMaxMs: number; + /** Max backoff time in ms for slow file store collection retries */ + txCollectionFileStoreSlowBackoffMaxMs: number; }; export const txCollectionConfigMappings: ConfigMappingsType = { @@ -121,4 +133,34 @@ export const txCollectionConfigMappings: ConfigMappingsType description: 'Delay before file store collection starts after fast collection', ...numberConfigHelper(2_000), }, + txCollectionFileStoreFastWorkerCount: { + env: 'TX_COLLECTION_FILE_STORE_FAST_WORKER_COUNT', + description: 'Number of concurrent workers for fast file store collection', + ...numberConfigHelper(5), + }, + txCollectionFileStoreSlowWorkerCount: { + env: 'TX_COLLECTION_FILE_STORE_SLOW_WORKER_COUNT', + description: 'Number of concurrent workers for slow file store collection', + ...numberConfigHelper(2), + }, + txCollectionFileStoreFastBackoffBaseMs: { + env: 'TX_COLLECTION_FILE_STORE_FAST_BACKOFF_BASE_MS', + description: 'Base backoff time in ms for fast file store collection retries', + ...numberConfigHelper(1_000), + }, + txCollectionFileStoreSlowBackoffBaseMs: { + env: 'TX_COLLECTION_FILE_STORE_SLOW_BACKOFF_BASE_MS', + description: 'Base backoff time in ms for slow file store collection retries', + ...numberConfigHelper(5_000), + }, + txCollectionFileStoreFastBackoffMaxMs: { + env: 'TX_COLLECTION_FILE_STORE_FAST_BACKOFF_MAX_MS', + description: 'Max backoff time in ms for fast file store collection retries', + ...numberConfigHelper(5_000), + }, + txCollectionFileStoreSlowBackoffMaxMs: { + env: 'TX_COLLECTION_FILE_STORE_SLOW_BACKOFF_MAX_MS', + description: 'Max backoff time in ms for slow file store collection retries', + ...numberConfigHelper(30_000), + }, }; diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts index fc3aaad52284..578a95dc0af9 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts @@ -1,6 +1,7 @@ import { BlockNumber } from '@aztec/foundation/branded-types'; import { createLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { TestDateProvider } from '@aztec/foundation/timer'; import { L2Block } from '@aztec/stdlib/block'; import { Tx, TxHash } from '@aztec/stdlib/tx'; import { getTelemetryClient } from '@aztec/telemetry-client'; @@ -9,7 +10,7 @@ import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js'; -import { FileStoreTxCollection } from './file_store_tx_collection.js'; +import { type FileStoreCollectionConfig, FileStoreTxCollection } from './file_store_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; @@ -19,6 +20,9 @@ describe('FileStoreTxCollection', () => { let txCollectionSink: TxCollectionSink; let txPool: MockProxy; let context: TxAddContext; + let dateProvider: TestDateProvider; + let deadline: Date; + let config: FileStoreCollectionConfig; let txs: Tx[]; let txHashes: TxHash[]; @@ -37,9 +41,9 @@ describe('FileStoreTxCollection', () => { }; const setFileStoreTxs = (source: MockProxy, txs: Tx[]) => { - source.getTxsByHash.mockImplementation(hashes => { - return Promise.resolve(hashes.map(h => txs.find(tx => tx.getTxHash().equals(h)))); - }); + source.getTxsByHash.mockImplementation(hashes => + Promise.resolve(hashes.map(h => txs.find(tx => tx.getTxHash().equals(h)))), + ); }; /** Waits for the sink to emit txs-added events for the expected number of txs. */ @@ -57,22 +61,44 @@ describe('FileStoreTxCollection', () => { return promise; }; + /** Waits until the total number of getTxsByHash calls across all sources reaches the expected count. */ + const waitForSourceCalls = async (sources: MockProxy[], totalCalls: number) => { + const start = Date.now(); + while (Date.now() - start < 60_000) { + const total = sources.reduce((sum, s) => sum + s.getTxsByHash.mock.calls.length, 0); + if (total >= totalCalls) { + return; + } + await new Promise(resolve => setTimeout(resolve, 5)); + } + const total = sources.reduce((sum, s) => sum + s.getTxsByHash.mock.calls.length, 0); + throw new Error(`Timed out waiting for ${totalCalls} source calls (got ${total})`); + }; + beforeEach(async () => { txPool = mock(); jest.spyOn(Math, 'random').mockReturnValue(0); + dateProvider = new TestDateProvider(); const log = createLogger('test'); txCollectionSink = new TxCollectionSink(txPool, getTelemetryClient(), log); fileStoreSources = [makeFileStoreSource('store1'), makeFileStoreSource('store2')]; - fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, log); + config = { + workerCount: 5, + backoffBaseMs: 1000, + backoffMaxMs: 5000, + }; + + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); txs = await Promise.all([makeTx(), makeTx(), makeTx()]); txHashes = txs.map(tx => tx.getTxHash()); const block = await L2Block.random(BlockNumber(1)); context = { type: 'mined', block }; + deadline = new Date(dateProvider.now() + 60 * 60 * 1000); }); afterEach(async () => { @@ -80,121 +106,199 @@ describe('FileStoreTxCollection', () => { jest.restoreAllMocks(); }); - it('downloads txs immediately when startCollecting is called', async () => { + it('downloads txs when startCollecting is called', async () => { setFileStoreTxs(fileStoreSources[0], txs); fileStoreCollection.start(); - // Set up event listener before calling startCollecting const txsAddedPromise = waitForTxsAdded(txs.length); - - fileStoreCollection.startCollecting(txHashes, context); - - // Wait for all txs to be processed via events + fileStoreCollection.startCollecting(txHashes, context, deadline); await txsAddedPromise; expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); expect(txPool.addMinedTxs).toHaveBeenCalled(); }); - it('skips txs marked as found while queued', async () => { + it('skips txs marked as found', async () => { setFileStoreTxs(fileStoreSources[0], txs); fileStoreCollection.start(); - // Queue all txs, then mark the first as found before workers process it - fileStoreCollection.startCollecting(txHashes, context); + fileStoreCollection.startCollecting(txHashes, context, deadline); fileStoreCollection.foundTxs([txs[0]]); - // Set up event listener - only 2 txs should be downloaded const txsAddedPromise = waitForTxsAdded(2); - - // Wait for workers to process await txsAddedPromise; - // First tx should not have been requested from file store const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); const requestedHashes = allCalls.flat().flat(); expect(requestedHashes).not.toContainEqual(txHashes[0]); }); - it('stops tracking txs when foundTxs is called after queueing', async () => { - setFileStoreTxs(fileStoreSources[0], txs); - - fileStoreCollection.start(); - - // Queue all txs, then immediately mark first as found - fileStoreCollection.startCollecting(txHashes, context); - fileStoreCollection.foundTxs([txs[0]]); - - // Set up event listener - only 2 txs should be downloaded - const txsAddedPromise = waitForTxsAdded(2); - - // Wait for workers to process - await txsAddedPromise; - - // First tx should not have been requested from any file store - const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); - const requestedHashes = allCalls.flat().flat(); - expect(requestedHashes).not.toContainEqual(txHashes[0]); - - // Verify second and third tx were downloaded - expect(txPool.addMinedTxs).toHaveBeenCalled(); - }); - - it('tries multiple file stores when tx not found in first', async () => { + it('tries multiple file stores via round-robin', async () => { // Only second store has tx[0] setFileStoreTxs(fileStoreSources[1], [txs[0]]); - // Ensure we always start with source 0 so we can test the fallback to source 1 + // Pin random so we always start at source 0, ensuring we test the fallback to source 1 jest.spyOn(Math, 'random').mockReturnValue(0); fileStoreCollection.start(); - // Set up event listener const txsAddedPromise = waitForTxsAdded(1); - - fileStoreCollection.startCollecting([txHashes[0]], context); + fileStoreCollection.startCollecting([txHashes[0]], context, deadline); await txsAddedPromise; - // First store was tried but didn't have it + // Both stores should have been tried expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); - // Second store was tried and found it expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled(); expect(txPool.addMinedTxs).toHaveBeenCalled(); jest.restoreAllMocks(); }); - it('does not start workers if no file store sources are configured', async () => { + it('does not start workers if no file store sources are configured', () => { const log = createLogger('test'); - fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, log); + fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, config, dateProvider, log); fileStoreCollection.start(); - fileStoreCollection.startCollecting(txHashes, context); - - // Give some time for potential processing - await new Promise(resolve => setTimeout(resolve, 50)); + fileStoreCollection.startCollecting(txHashes, context, deadline); + // With no sources, start() is a no-op (no workers spawned) and startCollecting() returns + // immediately, so no calls should have been made synchronously. expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); }); it('does not re-queue txs that are already pending', async () => { - // Set txs on both sources so download count is deterministic regardless of random start index setFileStoreTxs(fileStoreSources[0], txs); setFileStoreTxs(fileStoreSources[1], txs); + // Use single worker for deterministic behavior + const log = createLogger('test'); + config = { workerCount: 1, backoffBaseMs: 1000, backoffMaxMs: 5000 }; + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); + fileStoreCollection.start(); - // Set up event listener const txsAddedPromise = waitForTxsAdded(txs.length); - fileStoreCollection.startCollecting(txHashes, context); - fileStoreCollection.startCollecting(txHashes, context); // Duplicate call + fileStoreCollection.startCollecting(txHashes, context, deadline); + fileStoreCollection.startCollecting(txHashes, context, deadline); // Duplicate call await txsAddedPromise; - // Each tx should only be downloaded once (one source call per tx) + // With 1 worker processing sequentially, each tx should be found on the first source. + // Duplicate startCollecting should not create extra entries. const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); expect(allCalls.length).toBe(txHashes.length); }); + + it('retries across sources when tx is not found initially', async () => { + // Use a single worker to make behavior deterministic + const log = createLogger('test'); + config = { workerCount: 1, backoffBaseMs: 100, backoffMaxMs: 500 }; + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); + + fileStoreCollection.start(); + + // Initially both sources return empty + fileStoreCollection.startCollecting([txHashes[0]], context, deadline); + + // Wait for first full cycle (2 sources = 2 calls) + await waitForSourceCalls(fileStoreSources, 2); + + // Now make second source return the tx + setFileStoreTxs(fileStoreSources[1], [txs[0]]); + + // Advance time past backoff so the worker retries + dateProvider.setTime(dateProvider.now() + 200); + + const txsAddedPromise = waitForTxsAdded(1); + await txsAddedPromise; + + expect(txPool.addMinedTxs).toHaveBeenCalled(); + }); + + it('expires entries past deadline', async () => { + const log = createLogger('test'); + config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 }; + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); + + // Set a very short deadline + const shortDeadline = new Date(dateProvider.now() + 100); + + fileStoreCollection.start(); + fileStoreCollection.startCollecting([txHashes[0]], context, shortDeadline); + + // Wait for first full cycle (2 sources = 2 calls) + await waitForSourceCalls(fileStoreSources, 2); + + // Advance time past the deadline + dateProvider.setTime(dateProvider.now() + 200); + + // Clear mocks so we can distinguish new calls from old ones + jest.clearAllMocks(); + + // Add a new entry with a valid deadline and set up source to return it. + // This proves the worker is alive and the expired entry was cleaned up. + setFileStoreTxs(fileStoreSources[0], [txs[1]]); + const txsAddedPromise = waitForTxsAdded(1); + fileStoreCollection.startCollecting([txHashes[1]], context, deadline); + await txsAddedPromise; + + // Only txHashes[1] should have been requested after clearing mocks + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + const requestedHashes = allCalls.flat().flat(); + expect(requestedHashes).not.toContainEqual(txHashes[0]); + expect(requestedHashes).toContainEqual(txHashes[1]); + }); + + it('does not start collecting if deadline is in the past', () => { + const pastDeadline = new Date(dateProvider.now() - 1000); + + fileStoreCollection.start(); + fileStoreCollection.startCollecting(txHashes, context, pastDeadline); + + // startCollecting returns immediately without adding entries when deadline is past + expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); + }); + + it('foundTxs stops retry for found txs', async () => { + const log = createLogger('test'); + config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 }; + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); + + setFileStoreTxs(fileStoreSources[0], [txs[1]]); + + fileStoreCollection.start(); + fileStoreCollection.startCollecting(txHashes, context, deadline); + + // Mark first tx as found + fileStoreCollection.foundTxs([txs[0]]); + + const txsAddedPromise = waitForTxsAdded(1); + await txsAddedPromise; + + // tx[0] should never have been attempted + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + const requestedHashes = allCalls.flat().flat(); + expect(requestedHashes).not.toContainEqual(txHashes[0]); + }); + + it('clearPending removes all entries', async () => { + fileStoreCollection.start(); + fileStoreCollection.startCollecting(txHashes, context, deadline); + fileStoreCollection.clearPending(); + + // Verify workers are alive but the cleared entries are gone by adding + // a new entry and confirming only it gets processed. + setFileStoreTxs(fileStoreSources[0], [txs[0]]); + const txsAddedPromise = waitForTxsAdded(1); + fileStoreCollection.startCollecting([txHashes[0]], context, deadline); + await txsAddedPromise; + + // Only the newly added tx[0] should have been requested, not all 3 original txs + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + const requestedHashes = allCalls.flat().flat(); + expect(requestedHashes).not.toContainEqual(txHashes[1]); + expect(requestedHashes).not.toContainEqual(txHashes[2]); + }); }); diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts index 709238344a5d..baa5879ea46e 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts @@ -1,152 +1,198 @@ import { type Logger, createLogger } from '@aztec/foundation/log'; -import { FifoMemoryQueue } from '@aztec/foundation/queue'; +import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; +import { sleep } from '@aztec/foundation/sleep'; +import { DateProvider } from '@aztec/foundation/timer'; import { Tx, TxHash } from '@aztec/stdlib/tx'; import type { FileStoreTxSource } from './file_store_tx_source.js'; import type { TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; -// Internal constants (not configurable by node operators) -const FILE_STORE_DOWNLOAD_CONCURRENCY = 5; // Max concurrent downloads +/** Configuration for a FileStoreTxCollection instance. */ +export type FileStoreCollectionConfig = { + workerCount: number; + backoffBaseMs: number; + backoffMaxMs: number; +}; + +type FileStoreTxEntry = { + txHash: string; + context: TxAddContext; + deadline: Date; + attempts: number; + lastAttemptTime: number; + nextSourceIndex: number; +}; /** * Collects txs from file stores as a fallback after P2P methods have been tried. - * Runs in parallel to slow/fast collection. The delay before starting file store - * collection is managed by the TxCollection orchestrator, not this class. + * Uses a shared worker pool that pulls entries with priority (fewest attempts first), + * retries with round-robin across sources, and applies exponential backoff between + * full cycles through all sources. */ export class FileStoreTxCollection { - /** Map from tx hash to add context for txs queued for download. */ - private pendingTxs = new Map(); + /** Map from tx hash string to entry for all pending downloads. */ + private entries = new Map(); - /** - * Tracks tx hashes found elsewhere, even before startCollecting is called. - * Needed because the orchestrator delays startCollecting via a real sleep, but foundTxs - * may arrive during that delay — before the hashes are added to pendingTxs. - */ - private foundTxHashes = new Set(); - - /** Queue of tx hashes to be downloaded. */ - private downloadQueue = new FifoMemoryQueue(); - - /** Worker promises for concurrent downloads. */ + /** Worker promises for the shared worker pool. */ private workers: Promise[] = []; - /** Whether the collection has been started. */ - private started = false; + /** Whether the worker pool is running. */ + private running = false; + + /** Signal used to wake sleeping workers when new entries arrive or stop is called. */ + private wakeSignal: PromiseWithResolvers; constructor( - private readonly fileStoreSources: FileStoreTxSource[], + private readonly sources: FileStoreTxSource[], private readonly txCollectionSink: TxCollectionSink, + private readonly config: FileStoreCollectionConfig, + private readonly dateProvider: DateProvider = new DateProvider(), private readonly log: Logger = createLogger('p2p:file_store_tx_collection'), - ) {} + ) { + this.wakeSignal = promiseWithResolvers(); + } - /** Starts the file store collection workers. */ - public start() { - if (this.fileStoreSources.length === 0) { - this.log.debug('No file store sources configured, skipping file store collection'); + /** Starts the shared worker pool. */ + public start(): void { + if (this.sources.length === 0) { + this.log.debug('No file store sources configured'); return; } - - this.started = true; - this.downloadQueue = new FifoMemoryQueue(); - - // Start concurrent download workers - for (let i = 0; i < FILE_STORE_DOWNLOAD_CONCURRENCY; i++) { - this.workers.push(this.downloadQueue.process(txHash => this.processDownload(txHash))); + this.running = true; + for (let i = 0; i < this.config.workerCount; i++) { + this.workers.push(this.workerLoop()); } - - this.log.info(`Started file store tx collection with ${this.fileStoreSources.length} sources`, { - sources: this.fileStoreSources.map(s => s.getInfo()), - concurrency: FILE_STORE_DOWNLOAD_CONCURRENCY, - }); } - /** Stops all collection activity. */ - public async stop() { - if (!this.started) { - return; - } - this.started = false; - this.downloadQueue.end(); + /** Stops all workers and clears state. */ + public async stop(): Promise { + this.running = false; + this.wake(); await Promise.all(this.workers); this.workers = []; - this.pendingTxs.clear(); - this.foundTxHashes.clear(); + this.entries.clear(); } - /** Remove the given tx hashes from pending. */ - public stopCollecting(txHashes: TxHash[]) { - for (const txHash of txHashes) { - const hashStr = txHash.toString(); - this.pendingTxs.delete(hashStr); + /** Adds entries to the shared map and wakes workers. */ + public startCollecting(txHashes: TxHash[], context: TxAddContext, deadline: Date): void { + if (this.sources.length === 0 || txHashes.length === 0) { + return; + } + if (+deadline <= this.dateProvider.now()) { + return; } - } - - /** Clears all pending state. Items already in the download queue will still be processed but won't be re-queued. */ - public clearPending() { - this.pendingTxs.clear(); - this.foundTxHashes.clear(); - } - /** Queue the given tx hashes for file store collection. */ - public startCollecting(txHashes: TxHash[], context: TxAddContext) { for (const txHash of txHashes) { const hashStr = txHash.toString(); - if (!this.pendingTxs.has(hashStr) && !this.foundTxHashes.has(hashStr)) { - this.pendingTxs.set(hashStr, context); - this.downloadQueue.put(txHash); + if (!this.entries.has(hashStr)) { + this.entries.set(hashStr, { + txHash: hashStr, + context, + deadline, + attempts: 0, + lastAttemptTime: 0, + nextSourceIndex: Math.floor(Math.random() * this.sources.length), + }); } } + this.wake(); } - /** Stop tracking txs that were found elsewhere. */ - public foundTxs(txs: Tx[]) { + /** Removes entries for txs that have been found elsewhere. */ + public foundTxs(txs: Tx[]): void { for (const tx of txs) { - const hashStr = tx.getTxHash().toString(); - this.pendingTxs.delete(hashStr); - this.foundTxHashes.add(hashStr); + this.entries.delete(tx.getTxHash().toString()); } } - /** Processes a single tx hash from the download queue. */ - private async processDownload(txHash: TxHash) { - const hashStr = txHash.toString(); - const context = this.pendingTxs.get(hashStr); - - // Skip if already found by another method - if (!context) { - return; - } - - await this.downloadTx(txHash, context); - this.pendingTxs.delete(hashStr); + /** Clears all pending entries. */ + public clearPending(): void { + this.entries.clear(); } - /** Attempt to download a tx from file stores (round-robin). */ - private async downloadTx(txHash: TxHash, context: TxAddContext) { - const startIndex = Math.floor(Math.random() * this.fileStoreSources.length); - for (let i = startIndex; i < startIndex + this.fileStoreSources.length; i++) { - const source = this.fileStoreSources[i % this.fileStoreSources.length]; + private async workerLoop(): Promise { + while (this.running) { + const action = this.getNextAction(); + if (action.type === 'sleep') { + await action.promise; + continue; + } + + const entry = action.entry; + const source = this.sources[entry.nextSourceIndex % this.sources.length]; + entry.nextSourceIndex++; + entry.attempts++; + entry.lastAttemptTime = this.dateProvider.now(); try { const result = await this.txCollectionSink.collect( hashes => source.getTxsByHash(hashes), - [txHash], - { - description: `file-store ${source.getInfo()}`, - method: 'file-store', - fileStore: source.getInfo(), - }, - context, + [TxHash.fromString(entry.txHash)], + { description: `file-store ${source.getInfo()}`, method: 'file-store', fileStore: source.getInfo() }, + entry.context, ); - if (result.txs.length > 0) { - return; + this.entries.delete(entry.txHash); } } catch (err) { - this.log.trace(`Failed to download tx ${txHash} from ${source.getInfo()}`, { err }); + this.log.trace(`Error downloading tx ${entry.txHash} from ${source.getInfo()}`, { err }); + } + } + } + + /** Single-pass scan: removes expired entries, finds the best ready entry, or computes sleep time. */ + private getNextAction(): { type: 'process'; entry: FileStoreTxEntry } | { type: 'sleep'; promise: Promise } { + const now = this.dateProvider.now(); + let best: FileStoreTxEntry | undefined; + let earliestReadyAt = Infinity; + + for (const [key, entry] of this.entries) { + if (+entry.deadline <= now) { + this.entries.delete(key); + continue; + } + const backoffMs = this.getBackoffMs(entry); + const readyAt = entry.lastAttemptTime + backoffMs; + if (readyAt > now) { + earliestReadyAt = Math.min(earliestReadyAt, readyAt); + continue; + } + if (!best || entry.attempts < best.attempts) { + best = entry; } } - this.log.trace(`Tx ${txHash} not found in any file store`); + if (best) { + return { type: 'process', entry: best }; + } + if (earliestReadyAt < Infinity) { + return { type: 'sleep', promise: this.sleepOrWake(earliestReadyAt - now) }; + } + return { type: 'sleep', promise: this.waitForWake() }; + } + + /** Computes backoff for an entry. Backoff applies after a full cycle through all sources. */ + private getBackoffMs(entry: FileStoreTxEntry): number { + const fullCycles = Math.floor(entry.attempts / this.sources.length); + if (fullCycles === 0) { + return 0; + } + return Math.min(this.config.backoffBaseMs * Math.pow(2, fullCycles - 1), this.config.backoffMaxMs); + } + + /** Resolves the current wake signal and creates a new one. */ + private wake(): void { + this.wakeSignal.resolve(); + this.wakeSignal = promiseWithResolvers(); + } + + /** Waits until the wake signal is resolved. */ + private async waitForWake(): Promise { + await this.wakeSignal.promise; + } + + /** Sleeps for the given duration or until the wake signal is resolved. */ + private async sleepOrWake(ms: number): Promise { + await Promise.race([sleep(ms), this.wakeSignal.promise]); } } diff --git a/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts index eb337e0a0778..6d264c0a8b5f 100644 --- a/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts @@ -254,9 +254,14 @@ export class SlowTxCollection { /** Computes the proof submission deadline for a given slot, a tx mined in this slot is no longer interesting after this deadline */ private getDeadlineForSlot(slotNumber: SlotNumber): Date { - const epoch = getEpochAtSlot(slotNumber, this.constants); - const submissionEndEpoch = EpochNumber(epoch + this.constants.proofSubmissionEpochs); - const submissionEndTimestamp = getTimestampRangeForEpoch(submissionEndEpoch, this.constants)[1]; - return new Date(Number(submissionEndTimestamp) * 1000); + return getProofDeadlineForSlot(slotNumber, this.constants); } } + +/** Computes the proof submission deadline for a given slot. A tx mined in this slot is no longer interesting after this deadline. */ +export function getProofDeadlineForSlot(slotNumber: SlotNumber, constants: L1RollupConstants): Date { + const epoch = getEpochAtSlot(slotNumber, constants); + const submissionEndEpoch = EpochNumber(epoch + constants.proofSubmissionEpochs); + const submissionEndTimestamp = getTimestampRangeForEpoch(submissionEndEpoch, constants)[1]; + return new Date(Number(submissionEndTimestamp) * 1000); +} diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts index abd6ae020af1..8cb04633dbb5 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts @@ -580,6 +580,7 @@ class TestFastTxCollection extends FastTxCollection { class TestTxCollection extends TxCollection { declare slowCollection: SlowTxCollection; declare fastCollection: TestFastTxCollection; - declare fileStoreCollection: TxCollection['fileStoreCollection']; + declare fileStoreSlowCollection: TxCollection['fileStoreSlowCollection']; + declare fileStoreFastCollection: TxCollection['fileStoreFastCollection']; declare handleTxsAddedToPool: TxPoolV2Events['txs-added']; } diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts index 6d142fbba25e..eae579e2a555 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts @@ -18,7 +18,7 @@ import type { TxCollectionConfig } from './config.js'; import { FastTxCollection } from './fast_tx_collection.js'; import { FileStoreTxCollection } from './file_store_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; -import { SlowTxCollection } from './slow_tx_collection.js'; +import { SlowTxCollection, getProofDeadlineForSlot } from './slow_tx_collection.js'; import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; import type { TxSource } from './tx_source.js'; @@ -56,8 +56,11 @@ export class TxCollection { /** Fast collection methods */ protected readonly fastCollection: FastTxCollection; - /** File store collection */ - protected readonly fileStoreCollection: FileStoreTxCollection; + /** File store collection for slow (mined block) path */ + protected readonly fileStoreSlowCollection: FileStoreTxCollection; + + /** File store collection for fast (proposal/proving) path */ + protected readonly fileStoreFastCollection: FileStoreTxCollection; /** Loop for periodically reconciling found transactions from the tx pool in case we missed some */ private readonly reconcileFoundTxsLoop: RunningPromise; @@ -111,7 +114,28 @@ export class TxCollection { ); this.hasFileStoreSources = fileStoreSources.length > 0; - this.fileStoreCollection = new FileStoreTxCollection(fileStoreSources, this.txCollectionSink, this.log); + this.fileStoreSlowCollection = new FileStoreTxCollection( + fileStoreSources, + this.txCollectionSink, + { + workerCount: config.txCollectionFileStoreSlowWorkerCount, + backoffBaseMs: config.txCollectionFileStoreSlowBackoffBaseMs, + backoffMaxMs: config.txCollectionFileStoreSlowBackoffMaxMs, + }, + this.dateProvider, + this.log, + ); + this.fileStoreFastCollection = new FileStoreTxCollection( + fileStoreSources, + this.txCollectionSink, + { + workerCount: config.txCollectionFileStoreFastWorkerCount, + backoffBaseMs: config.txCollectionFileStoreFastBackoffBaseMs, + backoffMaxMs: config.txCollectionFileStoreFastBackoffMaxMs, + }, + this.dateProvider, + this.log, + ); this.reconcileFoundTxsLoop = new RunningPromise( () => this.reconcileFoundTxsWithPool(), @@ -137,7 +161,8 @@ export class TxCollection { public start(): Promise { this.started = true; this.slowCollection.start(); - this.fileStoreCollection.start(); + this.fileStoreSlowCollection.start(); + this.fileStoreFastCollection.start(); this.reconcileFoundTxsLoop.start(); // TODO(palla/txs): Collect mined unproven tx hashes for txs we dont have in the pool and populate missingTxs on startup @@ -150,7 +175,8 @@ export class TxCollection { await Promise.all([ this.slowCollection.stop(), this.fastCollection.stop(), - this.fileStoreCollection.stop(), + this.fileStoreSlowCollection.stop(), + this.fileStoreFastCollection.stop(), this.reconcileFoundTxsLoop.stop(), ]); @@ -175,6 +201,7 @@ export class TxCollection { // Delay file store collection to give P2P methods time to find txs first if (this.hasFileStoreSources) { const context: TxAddContext = { type: 'mined', block }; + const deadline = getProofDeadlineForSlot(block.header.getSlot(), this.constants); sleep(this.config.txCollectionFileStoreSlowDelayMs) .then(() => { if (this.started) { @@ -182,7 +209,7 @@ export class TxCollection { const stillMissing = new Set(this.slowCollection.getMissingTxHashes().map(h => h.toString())); const remaining = txHashes.filter(h => stillMissing.has(h.toString())); if (remaining.length > 0) { - this.fileStoreCollection.startCollecting(remaining, context); + this.fileStoreSlowCollection.startCollecting(remaining, context, deadline); } } }) @@ -223,7 +250,7 @@ export class TxCollection { sleep(this.config.txCollectionFileStoreFastDelayMs) .then(() => { if (this.started) { - this.fileStoreCollection.startCollecting(hashes, context); + this.fileStoreFastCollection.startCollecting(hashes, context, opts.deadline); } }) .catch(err => this.log.error('Error in file store fast delay', err)); @@ -245,7 +272,8 @@ export class TxCollection { private foundTxs(txs: Tx[]) { this.slowCollection.foundTxs(txs); this.fastCollection.foundTxs(txs); - this.fileStoreCollection.foundTxs(txs); + this.fileStoreSlowCollection.foundTxs(txs); + this.fileStoreFastCollection.foundTxs(txs); } /** @@ -255,7 +283,8 @@ export class TxCollection { public stopCollectingForBlocksUpTo(blockNumber: BlockNumber): void { this.slowCollection.stopCollectingForBlocksUpTo(blockNumber); this.fastCollection.stopCollectingForBlocksUpTo(blockNumber); - this.fileStoreCollection.clearPending(); + this.fileStoreSlowCollection.clearPending(); + this.fileStoreFastCollection.clearPending(); } /** @@ -265,7 +294,8 @@ export class TxCollection { public stopCollectingForBlocksAfter(blockNumber: BlockNumber): void { this.slowCollection.stopCollectingForBlocksAfter(blockNumber); this.fastCollection.stopCollectingForBlocksAfter(blockNumber); - this.fileStoreCollection.clearPending(); + this.fileStoreSlowCollection.clearPending(); + this.fileStoreFastCollection.clearPending(); } /** Every now and then, check if the pool has received one of the txs we are looking for, just to catch any race conditions */