From 69739733205f39dfb87bc1cea704740994a859d1 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 4 Feb 2026 16:27:18 +0000 Subject: [PATCH 1/3] feat: download txs from file store as a last resort --- .../foundation/src/array/sorted_array.test.ts | 50 +++++ .../foundation/src/array/sorted_array.ts | 39 ++-- yarn-project/foundation/src/config/env_var.ts | 4 +- .../foundation/src/queue/base_memory_queue.ts | 2 +- .../foundation/src/timer/date.test.ts | 190 ++++++++++++++++-- yarn-project/foundation/src/timer/date.ts | 102 ++++++++++ yarn-project/p2p/src/client/factory.ts | 12 ++ .../p2p/src/services/tx_collection/config.ts | 26 +++ .../file_store_tx_collection.test.ts | 190 ++++++++++++++++++ .../tx_collection/file_store_tx_collection.ts | 150 ++++++++++++++ .../tx_collection/file_store_tx_source.ts | 70 +++++++ .../p2p/src/services/tx_collection/index.ts | 1 + .../tx_collection/tx_collection.test.ts | 89 +++++++- .../services/tx_collection/tx_collection.ts | 58 +++++- .../p2p/src/services/tx_file_store/config.ts | 6 - .../tx_file_store/tx_file_store.test.ts | 1 - 16 files changed, 936 insertions(+), 54 deletions(-) create mode 100644 yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts create mode 100644 yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts create mode 100644 yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts diff --git a/yarn-project/foundation/src/array/sorted_array.test.ts b/yarn-project/foundation/src/array/sorted_array.test.ts index 0e2417458e5b..0e6b0ad378a4 100644 --- a/yarn-project/foundation/src/array/sorted_array.test.ts +++ b/yarn-project/foundation/src/array/sorted_array.test.ts @@ -2,6 +2,7 @@ import { dedupeSortedArray, findInSortedArray, findIndexInSortedArray, + findInsertionIndexInSortedArray, insertIntoSortedArray, merge, removeAnyOf, @@ -125,6 +126,55 @@ describe('sorted_array', () => { } }); + describe('findInsertionIndexInSortedArray', () => { + it('returns 0 for empty array', () => { + expect(findInsertionIndexInSortedArray([], 1, cmp)).toBe(0); + }); + + it('returns count of elements <= needle', () => { + const tests: [number[], number, number][] = [ + [[5], 3, 0], + [[5], 5, 1], + [[5], 7, 1], + + [[1, 3, 5, 7], 0, 0], + [[1, 3, 5, 7], 1, 1], + [[1, 3, 5, 7], 2, 1], + [[1, 3, 5, 7], 3, 2], + [[1, 3, 5, 7], 4, 2], + [[1, 3, 5, 7], 5, 3], + [[1, 3, 5, 7], 6, 3], + [[1, 3, 5, 7], 7, 4], + [[1, 3, 5, 7], 8, 4], + ]; + for (const [arr, needle, expected] of tests) { + expect(findInsertionIndexInSortedArray(arr, needle, cmp)).toBe(expected); + } + }); + + it('handles duplicates by returning index after all equal elements', () => { + expect(findInsertionIndexInSortedArray([1, 2, 2, 2, 3], 2, cmp)).toBe(4); + expect(findInsertionIndexInSortedArray([2, 2, 2], 2, cmp)).toBe(3); + expect(findInsertionIndexInSortedArray([1, 1, 1, 2], 1, cmp)).toBe(3); + }); + + it('works with heterogeneous types', () => { + type Timer = { deadline: number; callback: () => void }; + const arr: Timer[] = [ + { deadline: 100, callback: () => {} }, + { deadline: 300, callback: () => {} }, + { deadline: 500, callback: () => {} }, + ]; + const cmpByDeadline = (timer: Timer, needle: { deadline: number }) => cmp(timer.deadline, needle.deadline); + + expect(findInsertionIndexInSortedArray(arr, { deadline: 0 }, cmpByDeadline)).toBe(0); + expect(findInsertionIndexInSortedArray(arr, { deadline: 100 }, cmpByDeadline)).toBe(1); + expect(findInsertionIndexInSortedArray(arr, { deadline: 200 }, cmpByDeadline)).toBe(1); + expect(findInsertionIndexInSortedArray(arr, { deadline: 300 }, cmpByDeadline)).toBe(2); + expect(findInsertionIndexInSortedArray(arr, { deadline: 600 }, cmpByDeadline)).toBe(3); + }); + }); + it('findIndexInSortedArray with duplicates returns any valid occurrence', () => { // Binary search doesn't guarantee first occurrence, just any valid occurrence const arr = [1, 2, 2, 2, 3]; diff --git a/yarn-project/foundation/src/array/sorted_array.ts b/yarn-project/foundation/src/array/sorted_array.ts index a622661024ae..d3ba1e678ce2 100644 --- a/yarn-project/foundation/src/array/sorted_array.ts +++ b/yarn-project/foundation/src/array/sorted_array.ts @@ -21,34 +21,39 @@ export function dedupeSortedArray(arr: T[], cmp: Cmp): void { } export function insertIntoSortedArray(arr: T[], item: T, cmp: Cmp, allowDuplicates = true): boolean { + const index = findInsertionIndexInSortedArray(arr, item, cmp); + + if (!allowDuplicates) { + // Check element before insertion point (upper bound returns index after equal elements) + if (index > 0 && cmp(arr[index - 1], item) === 0) { + return false; + } + } + + arr.splice(index, 0, item); + return true; +} + +/** + * Finds the index where needle would be inserted to maintain sorted order. + * Returns the count of elements less than or equal to needle. + */ +export function findInsertionIndexInSortedArray(values: T[], needle: N, cmp: (a: T, b: N) => number): number { let start = 0; - let end = arr.length; + let end = values.length; while (start < end) { const mid = start + (((end - start) / 2) | 0); - const comparison = cmp(arr[mid], item); + const comparison = cmp(values[mid], needle); - if (comparison < 0) { + if (comparison <= 0) { start = mid + 1; } else { end = mid; } } - if (!allowDuplicates) { - // Check element at insertion point - if (start < arr.length && cmp(arr[start], item) === 0) { - return false; - } - - // Check element before insertion point (in case we landed after duplicates) - if (start > 0 && cmp(arr[start - 1], item) === 0) { - return false; - } - } - - arr.splice(start, 0, item); - return true; + return start; } export function findIndexInSortedArray(values: T[], needle: N, cmp: (a: T, b: N) => number): number { diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index fbc42a161bdb..23d5e749c22d 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -246,8 +246,10 @@ export type EnvVar = | 'TX_COLLECTION_NODE_RPC_MAX_BATCH_SIZE' | 'TX_COLLECTION_NODE_RPC_URLS' | 'TX_COLLECTION_MISSING_TXS_COLLECTOR_TYPE' + | 'TX_COLLECTION_FILE_STORE_URLS' + | 'TX_COLLECTION_FILE_STORE_SLOW_DELAY_MS' + | 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS' | 'TX_FILE_STORE_URL' - | 'TX_FILE_STORE_DOWNLOAD_URL' | 'TX_FILE_STORE_UPLOAD_CONCURRENCY' | 'TX_FILE_STORE_MAX_QUEUE_SIZE' | 'TX_FILE_STORE_ENABLED' diff --git a/yarn-project/foundation/src/queue/base_memory_queue.ts b/yarn-project/foundation/src/queue/base_memory_queue.ts index 8446e9adf04a..6c99f7619abb 100644 --- a/yarn-project/foundation/src/queue/base_memory_queue.ts +++ b/yarn-project/foundation/src/queue/base_memory_queue.ts @@ -122,7 +122,7 @@ export abstract class BaseMemoryQueue { * @param handler - A function that takes an item of type T and returns a Promise after processing the item. * @returns A Promise that resolves when the queue is finished processing. */ - public async process(handler: (item: T) => Promise) { + public async process(handler: (item: T) => Promise | void) { try { while (true) { const item = await this.get(); diff --git a/yarn-project/foundation/src/timer/date.test.ts b/yarn-project/foundation/src/timer/date.test.ts index 55aeac96d550..0a150ad0eb85 100644 --- a/yarn-project/foundation/src/timer/date.test.ts +++ b/yarn-project/foundation/src/timer/date.test.ts @@ -3,31 +3,185 @@ import { TestDateProvider } from './date.js'; describe('TestDateProvider', () => { let dateProvider: TestDateProvider; + beforeEach(() => { dateProvider = new TestDateProvider(); }); - it('should return the current datetime', () => { - const currentTime = Date.now(); - const result = dateProvider.now(); - expect(result).toBeGreaterThanOrEqual(currentTime); - expect(result).toBeLessThan(currentTime + 100); + afterEach(() => { + dateProvider.clearPendingTimeouts(); + }); + + describe('now', () => { + it('should return the current datetime', () => { + const currentTime = Date.now(); + const result = dateProvider.now(); + expect(result).toBeGreaterThanOrEqual(currentTime); + expect(result).toBeLessThan(currentTime + 100); + }); + + it('should return the overridden datetime', () => { + const overriddenTime = Date.now() + 1000; + dateProvider.setTime(overriddenTime); + const result = dateProvider.now(); + expect(result).toBeGreaterThanOrEqual(overriddenTime); + expect(result).toBeLessThan(overriddenTime + 100); + }); + + it('should keep ticking after overriding', async () => { + const overriddenTime = Date.now() + 1000; + dateProvider.setTime(overriddenTime); + await sleep(510); + const result = dateProvider.now(); + expect(result).toBeGreaterThanOrEqual(overriddenTime + 500); + expect(result).toBeLessThan(overriddenTime + 600); + }); }); - it('should return the overridden datetime', () => { - const overriddenTime = Date.now() + 1000; - dateProvider.setTime(overriddenTime); - const result = dateProvider.now(); - expect(result).toBeGreaterThanOrEqual(overriddenTime); - expect(result).toBeLessThan(overriddenTime + 100); + describe('createTimeoutSignal', () => { + it('should not abort signal before deadline', () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const signal = dateProvider.createTimeoutSignal(1000); + + expect(signal.aborted).toBe(false); + }); + + it('should abort signal when setTime advances past deadline', () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const signal = dateProvider.createTimeoutSignal(1000); + expect(signal.aborted).toBe(false); + + // Advance time past the deadline + dateProvider.setTime(baseTime + 1001); + + expect(signal.aborted).toBe(true); + expect(signal.reason).toBeInstanceOf(DOMException); + expect(signal.reason.name).toBe('TimeoutError'); + }); + + it('should abort immediately when ms <= 0', () => { + const signal = dateProvider.createTimeoutSignal(0); + + expect(signal.aborted).toBe(true); + expect(signal.reason.name).toBe('TimeoutError'); + }); + + it('should abort multiple signals in deadline order when time advances', () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const signal1 = dateProvider.createTimeoutSignal(1000); + const signal2 = dateProvider.createTimeoutSignal(500); + const signal3 = dateProvider.createTimeoutSignal(2000); + + expect(signal1.aborted).toBe(false); + expect(signal2.aborted).toBe(false); + expect(signal3.aborted).toBe(false); + + // Advance past signal2's deadline only + dateProvider.setTime(baseTime + 600); + + expect(signal1.aborted).toBe(false); + expect(signal2.aborted).toBe(true); + expect(signal3.aborted).toBe(false); + + // Advance past signal1's deadline + dateProvider.setTime(baseTime + 1500); + + expect(signal1.aborted).toBe(true); + expect(signal3.aborted).toBe(false); + + // Advance past signal3's deadline + dateProvider.setTime(baseTime + 2500); + + expect(signal3.aborted).toBe(true); + }); }); - it('should keep ticking after overriding', async () => { - const overriddenTime = Date.now() + 1000; - dateProvider.setTime(overriddenTime); - await sleep(510); - const result = dateProvider.now(); - expect(result).toBeGreaterThanOrEqual(overriddenTime + 500); - expect(result).toBeLessThan(overriddenTime + 600); + describe('sleep', () => { + it('should resolve immediately when ms <= 0', async () => { + await expect(dateProvider.sleep(0)).resolves.toBeUndefined(); + }); + + it('should resolve when setTime advances past deadline', async () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const sleepPromise = dateProvider.sleep(1000); + + // Advance time past the deadline + dateProvider.setTime(baseTime + 1001); + + await expect(sleepPromise).resolves.toBeUndefined(); + }); + + it('should resolve multiple sleeps in deadline order when time advances', async () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const resolveOrder: number[] = []; + + const sleep1 = dateProvider.sleep(1000).then(() => resolveOrder.push(1)); + const sleep2 = dateProvider.sleep(500).then(() => resolveOrder.push(2)); + const sleep3 = dateProvider.sleep(2000).then(() => resolveOrder.push(3)); + + // Advance past all deadlines at once + dateProvider.setTime(baseTime + 3000); + + await Promise.all([sleep1, sleep2, sleep3]); + + // Should resolve in deadline order: sleep2 (500ms), sleep1 (1000ms), sleep3 (2000ms) + expect(resolveOrder).toEqual([2, 1, 3]); + }); + }); + + describe('clearPendingTimeouts', () => { + it('should clear pending timeouts so they never abort', () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const signal = dateProvider.createTimeoutSignal(1000); + expect(signal.aborted).toBe(false); + + dateProvider.clearPendingTimeouts(); + + // Advance time past the deadline + dateProvider.setTime(baseTime + 2000); + + // Signal should not have been aborted since we cleared pending timeouts + expect(signal.aborted).toBe(false); + }); + }); + + describe('combined timeout and sleep behavior', () => { + it('should handle interleaved timeouts and sleeps', async () => { + const baseTime = Date.now(); + dateProvider.setTime(baseTime); + + const signal1 = dateProvider.createTimeoutSignal(500); + const sleep1Promise = dateProvider.sleep(750); + const signal2 = dateProvider.createTimeoutSignal(1000); + + // Advance to 600ms - only signal1 should abort + dateProvider.setTime(baseTime + 600); + + expect(signal1.aborted).toBe(true); + expect(signal2.aborted).toBe(false); + + // Advance to 800ms - sleep1 should resolve + dateProvider.setTime(baseTime + 800); + await sleep1Promise; + + expect(signal2.aborted).toBe(false); + + // Advance to 1100ms - signal2 should abort + dateProvider.setTime(baseTime + 1100); + + expect(signal2.aborted).toBe(true); + }); }); }); diff --git a/yarn-project/foundation/src/timer/date.ts b/yarn-project/foundation/src/timer/date.ts index a34dfe4912ce..d53e80e12d66 100644 --- a/yarn-project/foundation/src/timer/date.ts +++ b/yarn-project/foundation/src/timer/date.ts @@ -1,4 +1,7 @@ +import { findInsertionIndexInSortedArray, insertIntoSortedArray } from '../array/sorted_array.js'; import { createLogger } from '../log/pino-logger.js'; +import { promiseWithResolvers } from '../promise/utils.js'; +import { sleep } from '../sleep/index.js'; /** Returns current datetime. */ export class DateProvider { @@ -13,11 +16,42 @@ export class DateProvider { public nowAsDate(): Date { return new Date(this.now()); } + + /** + * Creates an AbortSignal that aborts after the specified timeout. + * In production, this wraps AbortSignal.timeout(ms). + * TestDateProvider overrides this to respect manipulated time. + */ + public createTimeoutSignal(ms: number): AbortSignal { + return AbortSignal.timeout(ms); + } + + /** + * Sleeps for the specified duration. Supports AbortSignal for cancellation. + * TestDateProvider overrides this to resolve when setTime() advances past the deadline. + */ + public sleep(ms: number): Promise { + return sleep(ms); + } } +type TestTimeout = { deadline: number; controller: AbortController }; +type TestSleep = { + deadline: number; + resolve: () => void; + reject: (reason: unknown) => void; +}; + +const deadlineCmp = (a: { deadline: number }, b: { deadline: number }): -1 | 0 | 1 => + a.deadline < b.deadline ? -1 : a.deadline > b.deadline ? 1 : 0; + /** Returns current datetime and allows to override it. */ export class TestDateProvider extends DateProvider { private offset = 0; + // sorted TestTimeout instances by their deadline + private pendingTimeouts: TestTimeout[] = []; + // sorted TestSleep instances by their deadline + private pendingSleeps: TestSleep[] = []; constructor(private readonly logger = createLogger('foundation:test-date-provider')) { super(); @@ -30,6 +64,74 @@ export class TestDateProvider extends DateProvider { public setTime(timeMs: number) { this.offset = timeMs - Date.now(); this.logger.warn(`Time set to ${new Date(timeMs).toISOString()}`, { offset: this.offset, timeMs }); + this.handleTimeAdvance(); + } + + /** + * Creates an AbortSignal that aborts when setTime() advances past the deadline. + * Unlike the base DateProvider, this does NOT use real-time setTimeout. + */ + public override createTimeoutSignal(ms: number): AbortSignal { + const controller = new AbortController(); + const deadline = this.now() + ms; + + if (ms <= 0) { + controller.abort(new DOMException('TimeoutError', 'TimeoutError')); + return controller.signal; + } + + insertIntoSortedArray(this.pendingTimeouts, { deadline, controller }, deadlineCmp); + return controller.signal; + } + + /** + * Sleeps for the specified duration. Resolves when setTime() advances past the deadline. + * Unlike the base DateProvider, this does NOT use real-time setTimeout. + */ + public override sleep(ms: number): Promise { + const deadline = this.now() + ms; + + if (ms <= 0) { + return Promise.resolve(); + } + + const { promise, resolve, reject } = promiseWithResolvers(); + insertIntoSortedArray(this.pendingSleeps, { deadline, resolve, reject }, deadlineCmp); + + return promise; + } + + /** Check pending timeouts and sleeps, abort/resolve any that have expired. */ + private handleTimeAdvance() { + const deadline = { deadline: this.now() }; + + const timeoutIndex = findInsertionIndexInSortedArray(this.pendingTimeouts, deadline, deadlineCmp); + if (timeoutIndex > 0) { + const timeouts = this.pendingTimeouts.splice(0, timeoutIndex); + for (const { controller } of timeouts) { + setImmediate(() => controller.abort(new DOMException('TimeoutError', 'TimeoutError'))); + } + } + + const sleepIdx = findInsertionIndexInSortedArray(this.pendingSleeps, deadline, deadlineCmp); + if (sleepIdx > 0) { + const sleeps = this.pendingSleeps.splice(0, sleepIdx); + for (const { resolve } of sleeps) { + setImmediate(resolve); + } + } + } + + /** Clears all pending timeout and sleep timers. Call in afterEach to prevent Jest warnings. */ + public clearPendingTimeouts() { + for (const { controller } of this.pendingTimeouts) { + controller.abort(new DOMException('TimeoutError', 'TimeoutError')); + } + for (const { reject } of this.pendingSleeps) { + reject(new Error('TestDateProvider cleared')); + } + this.pendingTimeouts = []; + this.pendingSleeps = []; } /** Advances the time by the given number of seconds. */ diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 14c10d1ee77e..a1646d91549f 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -18,6 +18,7 @@ import type { MemPools } from '../mem_pools/interface.js'; import { AztecKVTxPool, type TxPool } from '../mem_pools/tx_pool/index.js'; import { DummyP2PService } from '../services/dummy_service.js'; import { LibP2PService } from '../services/index.js'; +import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js'; import { TxCollection } from '../services/tx_collection/tx_collection.js'; import { type TxSource, createNodeRpcTxSources } from '../services/tx_collection/tx_source.js'; import { TxFileStore } from '../services/tx_file_store/tx_file_store.js'; @@ -105,12 +106,23 @@ export async function createP2PClient( }); } + const fileStoreSources = await createFileStoreTxSources( + config.txCollectionFileStoreUrls, + logger.createChild('file-store-tx-source'), + ); + if (fileStoreSources.length > 0) { + logger.info(`Using ${fileStoreSources.length} file store sources for tx collection.`, { + stores: fileStoreSources.map(s => s.getInfo()), + }); + } + const txCollection = new TxCollection( p2pService.getBatchTxRequesterService(), nodeSources, l1Constants, mempools.txPool, config, + fileStoreSources, dateProvider, telemetry, logger.createChild('tx-collection'), diff --git a/yarn-project/p2p/src/services/tx_collection/config.ts b/yarn-project/p2p/src/services/tx_collection/config.ts index 8811ef37ffda..2c3d821bb440 100644 --- a/yarn-project/p2p/src/services/tx_collection/config.ts +++ b/yarn-project/p2p/src/services/tx_collection/config.ts @@ -31,6 +31,12 @@ export type TxCollectionConfig = { txCollectionNodeRpcMaxBatchSize: number; /** Which collector implementation to use for missing txs collection */ txCollectionMissingTxsCollectorType: MissingTxsCollectorType; + /** A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection */ + txCollectionFileStoreUrls: string[]; + /** Delay in ms before file store collection starts after slow collection is triggered */ + txCollectionFileStoreSlowDelayMs: number; + /** Delay in ms before file store collection starts after fast collection is triggered */ + txCollectionFileStoreFastDelayMs: number; }; export const txCollectionConfigMappings: ConfigMappingsType = { @@ -95,4 +101,24 @@ export const txCollectionConfigMappings: ConfigMappingsType description: 'Which collector implementation to use for missing txs collection (new or old)', ...enumConfigHelper(['new', 'old'] as const, 'new'), }, + txCollectionFileStoreUrls: { + env: 'TX_COLLECTION_FILE_STORE_URLS', + description: 'A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection', + parseEnv: (val: string) => + val + .split(',') + .map(url => url.trim()) + .filter(url => url.length > 0), + defaultValue: [], + }, + txCollectionFileStoreSlowDelayMs: { + env: 'TX_COLLECTION_FILE_STORE_SLOW_DELAY_MS', + description: 'Delay before file store collection starts after slow collection', + ...numberConfigHelper(24_000), + }, + txCollectionFileStoreFastDelayMs: { + env: 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS', + description: 'Delay before file store collection starts after fast collection', + ...numberConfigHelper(2_000), + }, }; diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts new file mode 100644 index 000000000000..c4baf97b6df2 --- /dev/null +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts @@ -0,0 +1,190 @@ +import { createLogger } from '@aztec/foundation/log'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { Tx, TxHash } from '@aztec/stdlib/tx'; +import { getTelemetryClient } from '@aztec/telemetry-client'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import type { TxPool } from '../../mem_pools/index.js'; +import { FileStoreTxCollection } from './file_store_tx_collection.js'; +import type { FileStoreTxSource } from './file_store_tx_source.js'; +import { TxCollectionSink } from './tx_collection_sink.js'; + +describe('FileStoreTxCollection', () => { + let fileStoreCollection: FileStoreTxCollection; + let fileStoreSources: MockProxy[]; + let txCollectionSink: TxCollectionSink; + let txPool: MockProxy; + + let txs: Tx[]; + let txHashes: TxHash[]; + + const makeFileStoreSource = (name: string) => { + const source = mock(); + source.getInfo.mockReturnValue(name); + source.getTxsByHash.mockResolvedValue([]); + return source; + }; + + const makeTx = async () => { + const tx = Tx.random(); + await tx.recomputeHash(); + return tx; + }; + + const setFileStoreTxs = (source: MockProxy, txs: Tx[]) => { + source.getTxsByHash.mockImplementation(async hashes => { + return hashes.map(h => txs.find(tx => tx.getTxHash().equals(h))); + }); + }; + + /** Waits for the sink to emit txs-added events for the expected number of txs. */ + const waitForTxsAdded = (expectedCount: number) => { + const { promise, resolve } = promiseWithResolvers(); + let count = 0; + const handler = ({ txs }: { txs: Tx[] }) => { + count += txs.length; + if (count >= expectedCount) { + txCollectionSink.removeListener('txs-added', handler); + resolve(); + } + }; + txCollectionSink.on('txs-added', handler); + return promise; + }; + + beforeEach(async () => { + txPool = mock(); + txPool.addTxs.mockImplementation(async txs => txs.length); + + const log = createLogger('test'); + txCollectionSink = new TxCollectionSink(txPool, getTelemetryClient(), log); + + fileStoreSources = [makeFileStoreSource('store1'), makeFileStoreSource('store2')]; + + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, log); + + txs = await Promise.all([makeTx(), makeTx(), makeTx()]); + txHashes = txs.map(tx => tx.getTxHash()); + }); + + afterEach(async () => { + await fileStoreCollection.stop(); + }); + + it('downloads txs immediately when startCollecting is called', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + + fileStoreCollection.start(); + + // Set up event listener before calling startCollecting + const txsAddedPromise = waitForTxsAdded(txs.length); + + fileStoreCollection.startCollecting(txHashes); + + // Wait for all txs to be processed via events + await txsAddedPromise; + + expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); + expect(txPool.addTxs).toHaveBeenCalledWith(expect.arrayContaining([txs[0]]), { source: 'tx-collection' }); + expect(txPool.addTxs).toHaveBeenCalledWith(expect.arrayContaining([txs[1]]), { source: 'tx-collection' }); + expect(txPool.addTxs).toHaveBeenCalledWith(expect.arrayContaining([txs[2]]), { source: 'tx-collection' }); + }); + + it('skips txs marked as found while queued', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + + fileStoreCollection.start(); + + // Queue all txs, then mark the first as found before workers process it + fileStoreCollection.startCollecting(txHashes); + fileStoreCollection.foundTxs([txs[0]]); + + // Set up event listener - only 2 txs should be downloaded + const txsAddedPromise = waitForTxsAdded(2); + + // Wait for workers to process + await txsAddedPromise; + + // First tx should not have been requested from file store + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + const requestedHashes = allCalls.flat().flat(); + expect(requestedHashes).not.toContainEqual(txHashes[0]); + }); + + it('stops tracking txs when foundTxs is called', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + + fileStoreCollection.start(); + + // Mark first tx as found before queueing + fileStoreCollection.foundTxs([txs[0]]); + + // Set up event listener - only 2 txs should be downloaded + const txsAddedPromise = waitForTxsAdded(2); + + // Queue all txs - but first one was already found + fileStoreCollection.startCollecting(txHashes); + + // Wait for workers to process + await txsAddedPromise; + + // First tx should not have been requested from any file store + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + const requestedHashes = allCalls.flat().flat(); + expect(requestedHashes).not.toContainEqual(txHashes[0]); + + // Verify second and third tx were downloaded + expect(txPool.addTxs).toHaveBeenCalledWith(expect.arrayContaining([txs[1]]), { source: 'tx-collection' }); + expect(txPool.addTxs).toHaveBeenCalledWith(expect.arrayContaining([txs[2]]), { source: 'tx-collection' }); + }); + + it('tries multiple file stores when tx not found in first', async () => { + // Only second store has tx[0] + setFileStoreTxs(fileStoreSources[1], [txs[0]]); + + fileStoreCollection.start(); + + // Set up event listener + const txsAddedPromise = waitForTxsAdded(1); + + fileStoreCollection.startCollecting([txHashes[0]]); + await txsAddedPromise; + + // First store was tried but didn't have it + expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); + // Second store was tried and found it + expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled(); + expect(txPool.addTxs).toHaveBeenCalledWith([txs[0]], { source: 'tx-collection' }); + }); + + it('does not start workers if no file store sources are configured', async () => { + const log = createLogger('test'); + fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, log); + fileStoreCollection.start(); + fileStoreCollection.startCollecting(txHashes); + + // Give some time for potential processing + await new Promise(resolve => setTimeout(resolve, 50)); + + expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); + }); + + it('does not re-queue txs that are already pending', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + + fileStoreCollection.start(); + + // Set up event listener + const txsAddedPromise = waitForTxsAdded(txs.length); + + fileStoreCollection.startCollecting(txHashes); + fileStoreCollection.startCollecting(txHashes); // Duplicate call + + await txsAddedPromise; + + // Each tx should only be downloaded once + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + expect(allCalls.length).toBe(txHashes.length); + }); +}); diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts new file mode 100644 index 000000000000..f269c6a0b03e --- /dev/null +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts @@ -0,0 +1,150 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { FifoMemoryQueue } from '@aztec/foundation/queue'; +import { Tx, TxHash } from '@aztec/stdlib/tx'; + +import type { FileStoreTxSource } from './file_store_tx_source.js'; +import type { TxCollectionSink } from './tx_collection_sink.js'; + +// Internal constants (not configurable by node operators) +const FILE_STORE_DOWNLOAD_CONCURRENCY = 5; // Max concurrent downloads + +/** + * Collects txs from file stores as a fallback after P2P methods have been tried. + * Runs in parallel to slow/fast collection. The delay before starting file store + * collection is managed by the TxCollection orchestrator, not this class. + */ +export class FileStoreTxCollection { + /** Set of tx hashes that have been queued for download (prevents duplicate queueing). */ + private pendingTxs = new Set(); + + /** Set of tx hashes that were found elsewhere (prevents queueing txs already found via P2P). */ + private foundTxHashes = new Set(); + + /** Queue of tx hashes to be downloaded. */ + private downloadQueue = new FifoMemoryQueue(); + + /** Worker promises for concurrent downloads. */ + private workers: Promise[] = []; + + /** Round-robin index for distributing requests across file store sources. */ + private currentSourceIndex = 0; + + /** Whether the collection has been started. */ + private started = false; + + constructor( + private readonly fileStoreSources: FileStoreTxSource[], + private readonly txCollectionSink: TxCollectionSink, + private readonly log: Logger = createLogger('p2p:file_store_tx_collection'), + ) {} + + /** Starts the file store collection workers. */ + public start() { + if (this.fileStoreSources.length === 0) { + this.log.debug('No file store sources configured, skipping file store collection'); + return; + } + + this.started = true; + this.downloadQueue = new FifoMemoryQueue(); + + // Start concurrent download workers + for (let i = 0; i < FILE_STORE_DOWNLOAD_CONCURRENCY; i++) { + this.workers.push(this.downloadQueue.process(txHash => this.processDownload(txHash))); + } + + this.log.info(`Started file store tx collection with ${this.fileStoreSources.length} sources`, { + sources: this.fileStoreSources.map(s => s.getInfo()), + concurrency: FILE_STORE_DOWNLOAD_CONCURRENCY, + }); + } + + /** Stops all collection activity. */ + public async stop() { + if (!this.started) { + return; + } + this.started = false; + this.downloadQueue.end(); + await Promise.all(this.workers); + this.workers = []; + this.pendingTxs.clear(); + this.foundTxHashes.clear(); + } + + /** Remove the given tx hashes from pending and mark them as found. */ + public stopCollecting(txHashes: TxHash[]) { + for (const txHash of txHashes) { + const hashStr = txHash.toString(); + this.pendingTxs.delete(hashStr); + this.foundTxHashes.add(hashStr); + } + } + + /** Clears all pending state. Items already in the download queue will still be processed but won't be re-queued. */ + public clearPending() { + this.pendingTxs.clear(); + this.foundTxHashes.clear(); + } + + /** Queue the given tx hashes for file store collection. */ + public startCollecting(txHashes: TxHash[]) { + for (const txHash of txHashes) { + const hashStr = txHash.toString(); + if (!this.pendingTxs.has(hashStr) && !this.foundTxHashes.has(hashStr)) { + this.pendingTxs.add(hashStr); + this.downloadQueue.put(txHash); + } + } + } + + /** Stop tracking txs that were found elsewhere. */ + public foundTxs(txs: Tx[]) { + for (const tx of txs) { + const hashStr = tx.getTxHash().toString(); + this.pendingTxs.delete(hashStr); + this.foundTxHashes.add(hashStr); + } + } + + /** Processes a single tx hash from the download queue. */ + private async processDownload(txHash: TxHash) { + const hashStr = txHash.toString(); + + // Skip if already found by another method + if (this.foundTxHashes.has(hashStr)) { + this.pendingTxs.delete(hashStr); + return; + } + + await this.downloadTx(txHash); + this.pendingTxs.delete(hashStr); + } + + /** Attempt to download a tx from file stores (round-robin). */ + private async downloadTx(txHash: TxHash) { + // Try each source starting from current index + for (let i = 0; i < this.fileStoreSources.length; i++) { + const sourceIndex = (this.currentSourceIndex + i) % this.fileStoreSources.length; + const source = this.fileStoreSources[sourceIndex]; + + try { + const result = await this.txCollectionSink.collect(hashes => source.getTxsByHash(hashes), [txHash], { + description: `file-store ${source.getInfo()}`, + method: 'file-store', + fileStore: source.getInfo(), + }); + + if (result.txs.length > 0) { + // Found the tx, advance round-robin for next request + this.currentSourceIndex = (sourceIndex + 1) % this.fileStoreSources.length; + return; + } + } catch (err) { + this.log.trace(`Failed to download tx ${txHash} from ${source.getInfo()}`, { err }); + } + } + + this.log.trace(`Tx ${txHash} not found in any file store`); + } +} diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts new file mode 100644 index 000000000000..b88f6b028ede --- /dev/null +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts @@ -0,0 +1,70 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { type ReadOnlyFileStore, createReadOnlyFileStore } from '@aztec/stdlib/file-store'; +import { Tx, type TxHash } from '@aztec/stdlib/tx'; + +import type { TxSource } from './tx_source.js'; + +/** TxSource implementation that downloads txs from a file store. */ +export class FileStoreTxSource implements TxSource { + private constructor( + private readonly fileStore: ReadOnlyFileStore, + private readonly baseUrl: string, + private readonly log: Logger, + ) {} + + /** + * Creates a FileStoreTxSource from a URL. + * @param url - The file store URL (s3://, gs://, file://, http://, https://). + * @param log - Optional logger. + * @returns The FileStoreTxSource instance, or undefined if creation fails. + */ + public static async create( + url: string, + log: Logger = createLogger('p2p:file_store_tx_source'), + ): Promise { + try { + const fileStore = await createReadOnlyFileStore(url, log); + if (!fileStore) { + log.warn(`Failed to create file store for URL: ${url}`); + return undefined; + } + return new FileStoreTxSource(fileStore, url, log); + } catch (err) { + log.warn(`Error creating file store for URL: ${url}`, { error: err }); + return undefined; + } + } + + public getInfo(): string { + return `file-store:${this.baseUrl}`; + } + + public getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { + return Promise.all( + txHashes.map(async txHash => { + const path = `txs/${txHash.toString()}.bin`; + try { + const buffer = await this.fileStore.read(path); + return Tx.fromBuffer(buffer); + } catch { + // Tx not found or error reading - return undefined + return undefined; + } + }), + ); + } +} + +/** + * Creates FileStoreTxSource instances from URLs. + * @param urls - Array of file store URLs. + * @param log - Optional logger. + * @returns Array of successfully created FileStoreTxSource instances. + */ +export async function createFileStoreTxSources( + urls: string[], + log: Logger = createLogger('p2p:file_store_tx_source'), +): Promise { + const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, log))); + return sources.filter((s): s is FileStoreTxSource => s !== undefined); +} diff --git a/yarn-project/p2p/src/services/tx_collection/index.ts b/yarn-project/p2p/src/services/tx_collection/index.ts index aa4a01d54e81..9349339ce29b 100644 --- a/yarn-project/p2p/src/services/tx_collection/index.ts +++ b/yarn-project/p2p/src/services/tx_collection/index.ts @@ -5,3 +5,4 @@ export { BatchTxRequesterCollector, SendBatchRequestCollector, } from './proposal_tx_collector.js'; +export { FileStoreTxSource, createFileStoreTxSources } from './file_store_tx_source.js'; diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts index 4453656890e8..dadcf9bf7fd4 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts @@ -22,6 +22,7 @@ import { chunkTxHashesRequest } from '../reqresp/protocols/tx.js'; import { ReqRespStatus } from '../reqresp/status.js'; import { type TxCollectionConfig, txCollectionConfigMappings } from './config.js'; import { FastTxCollection } from './fast_tx_collection.js'; +import type { FileStoreTxSource } from './file_store_tx_source.js'; import type { SlowTxCollection } from './slow_tx_collection.js'; import { type FastCollectionRequest, TxCollection } from './tx_collection.js'; import type { TxSource } from './tx_source.js'; @@ -50,8 +51,15 @@ describe('TxCollection', () => { return node; }; + const makeFileStoreSource = (name: string) => { + const source = mock(); + source.getInfo.mockReturnValue(name); + source.getTxsByHash.mockResolvedValue([]); + return source; + }; + const makeTx = async (txHash?: string | TxHash) => { - const tx = Tx.random({ txHash }) as Tx; + const tx = Tx.random({ txHash }); await tx.recomputeHash(); return tx; }; @@ -132,6 +140,8 @@ describe('TxCollection', () => { txCollectionFastMaxParallelRequestsPerNode: 2, txCollectionFastNodeIntervalMs: 100, txCollectionMissingTxsCollectorType: 'old', + txCollectionFileStoreSlowDelayMs: 100, + txCollectionFileStoreFastDelayMs: 100, }; txs = await Promise.all([makeTx(), makeTx(), makeTx()]); @@ -140,11 +150,12 @@ describe('TxCollection', () => { deadline = new Date(dateProvider.now() + 60 * 60 * 1000); mockP2PService.reqResp = reqResp; - txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, [], dateProvider); }); afterEach(async () => { await txCollection.stop(); + dateProvider.clearPendingTimeouts(); }); describe('slow collection', () => { @@ -230,7 +241,7 @@ describe('TxCollection', () => { }); it('collects missing txs directly via reqresp if there are no nodes configured', async () => { - txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, [], dateProvider); txCollection.startCollecting(block, txHashes); setReqRespTxs([txs[0]]); @@ -260,7 +271,7 @@ describe('TxCollection', () => { it('does not request missing txs being collected via fast collection', async () => { config = { ...config, txCollectionDisableSlowDuringFastRequests: false }; - txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, [], dateProvider); const innerCollectFastPromise = promiseWithResolvers(); jest.spyOn(txCollection.fastCollection, 'collectFast').mockImplementation(async request => { @@ -280,7 +291,7 @@ describe('TxCollection', () => { it('pauses slow collection if fast collection is ongoing', async () => { config = { ...config, txCollectionDisableSlowDuringFastRequests: true }; - txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, [], dateProvider); const innerCollectFastPromise = promiseWithResolvers(); jest.spyOn(txCollection.fastCollection, 'collectFast').mockImplementation(async request => { @@ -301,7 +312,7 @@ describe('TxCollection', () => { it('stops collecting a tx when found via fast collection', async () => { config = { ...config, txCollectionDisableSlowDuringFastRequests: true }; - txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, nodes, constants, txPool, config, [], dateProvider); setNodeTxs(nodes[0], txs); txCollection.startCollecting(block, txHashes); @@ -420,7 +431,7 @@ describe('TxCollection', () => { }); it('collects via reqresp if no nodes are configured', async () => { - txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, dateProvider); + txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, [], dateProvider); setReqRespTxs(txs); const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); expectReqRespToHaveBeenCalledWith(txHashes); @@ -499,6 +510,69 @@ describe('TxCollection', () => { expect(reqResp.sendBatchRequest).not.toHaveBeenCalled(); }); }); + + describe('file store collection', () => { + let fileStoreSources: MockProxy[]; + + const setFileStoreTxs = (source: MockProxy, txsToReturn: Tx[]) => { + source.getTxsByHash.mockImplementation(async hashes => { + return hashes.map(h => txsToReturn.find(tx => tx.txHash.equals(h))); + }); + }; + + beforeEach(() => { + fileStoreSources = [makeFileStoreSource('store1')]; + txCollection = new TestTxCollection( + mockP2PService, + nodes, + constants, + txPool, + config, + fileStoreSources, + dateProvider, + ); + }); + + it('collects txs from file store after slow delay', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + txPool.addTxs.mockImplementation(async addedTxs => addedTxs.length); + txPool.hasTx.mockResolvedValue(false); + + await txCollection.start(); + txCollection.startCollecting(block, txHashes); + + // File store should not have been called yet (delay hasn't elapsed) + expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); + + // Advance time past the 4s slow delay + dateProvider.setTime(dateProvider.now() + 200); + // Allow the async sleep resolution and worker processing to complete + await sleep(100); + + // File store should now have been called for each tx + expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); + }); + + it('does not download txs from file store if found via P2P before delay expires', async () => { + setFileStoreTxs(fileStoreSources[0], txs); + txPool.addTxs.mockImplementation(async addedTxs => addedTxs.length); + txPool.hasTx.mockResolvedValue(false); + + await txCollection.start(); + txCollection.startCollecting(block, txHashes); + + // Simulate all txs found via P2P before delay expires + await txCollection.handleTxsAddedToPool({ txs, source: 'test' }); + + // Now advance time past the delay + dateProvider.setTime(dateProvider.now() + 200); + await sleep(100); + + // File store should not have downloaded any txs because they were all found + const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); + expect(allCalls.length).toBe(0); + }); + }); }); class TestFastTxCollection extends FastTxCollection { @@ -513,5 +587,6 @@ class TestFastTxCollection extends FastTxCollection { class TestTxCollection extends TxCollection { declare slowCollection: SlowTxCollection; declare fastCollection: TestFastTxCollection; + declare fileStoreCollection: TxCollection['fileStoreCollection']; declare handleTxsAddedToPool: TxPoolEvents['txs-added']; } diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts index 1b2b7fb54cc0..38f305404562 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts @@ -16,11 +16,13 @@ import type { TxPoolEvents } from '../../mem_pools/tx_pool/tx_pool.js'; import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; import type { TxCollectionConfig } from './config.js'; import { FastTxCollection } from './fast_tx_collection.js'; +import { FileStoreTxCollection } from './file_store_tx_collection.js'; +import type { FileStoreTxSource } from './file_store_tx_source.js'; import { SlowTxCollection } from './slow_tx_collection.js'; import { TxCollectionSink } from './tx_collection_sink.js'; import type { TxSource } from './tx_source.js'; -export type CollectionMethod = 'fast-req-resp' | 'fast-node-rpc' | 'slow-req-resp' | 'slow-node-rpc'; +export type CollectionMethod = 'fast-req-resp' | 'fast-node-rpc' | 'slow-req-resp' | 'slow-node-rpc' | 'file-store'; export type MissingTxInfo = { blockNumber: BlockNumber; deadline: Date; readyForReqResp: boolean }; @@ -54,6 +56,9 @@ export class TxCollection { /** Fast collection methods */ protected readonly fastCollection: FastTxCollection; + /** File store collection */ + protected readonly fileStoreCollection: FileStoreTxCollection; + /** Loop for periodically reconciling found transactions from the tx pool in case we missed some */ private readonly reconcileFoundTxsLoop: RunningPromise; @@ -66,12 +71,19 @@ export class TxCollection { /** Handler for the txs-added event from the tx collection sink */ protected readonly handleTxsFound: TxPoolEvents['txs-added']; + /** Whether the service has been started. */ + private started = false; + + /** Whether file store sources are configured. */ + private readonly hasFileStoreSources: boolean; + constructor( private readonly p2pService: BatchTxRequesterLibP2PService, private readonly nodes: TxSource[], private readonly constants: L1RollupConstants, private readonly txPool: TxPool, private readonly config: TxCollectionConfig, + fileStoreSources: FileStoreTxSource[] = [], private readonly dateProvider: DateProvider = new DateProvider(), telemetryClient: TelemetryClient = getTelemetryClient(), private readonly log: Logger = createLogger('p2p:tx_collection_service'), @@ -98,6 +110,9 @@ export class TxCollection { this.log, ); + this.hasFileStoreSources = fileStoreSources.length > 0; + this.fileStoreCollection = new FileStoreTxCollection(fileStoreSources, this.txCollectionSink, this.log); + this.reconcileFoundTxsLoop = new RunningPromise( () => this.reconcileFoundTxsWithPool(), this.log, @@ -120,7 +135,9 @@ export class TxCollection { /** Starts all collection loops. */ public start(): Promise { + this.started = true; this.slowCollection.start(); + this.fileStoreCollection.start(); this.reconcileFoundTxsLoop.start(); // TODO(palla/txs): Collect mined unproven tx hashes for txs we dont have in the pool and populate missingTxs on startup @@ -129,7 +146,13 @@ export class TxCollection { /** Stops all activity. */ public async stop() { - await Promise.all([this.slowCollection.stop(), this.fastCollection.stop(), this.reconcileFoundTxsLoop.stop()]); + this.started = false; + await Promise.all([ + this.slowCollection.stop(), + this.fastCollection.stop(), + this.fileStoreCollection.stop(), + this.reconcileFoundTxsLoop.stop(), + ]); this.txPool.removeListener('txs-added', this.handleTxsAddedToPool); this.txCollectionSink.removeListener('txs-added', this.handleTxsFound); @@ -147,7 +170,19 @@ export class TxCollection { /** Starts collecting the given tx hashes for the given L2Block in the slow loop */ public startCollecting(block: L2Block, txHashes: TxHash[]) { - return this.slowCollection.startCollecting(block, txHashes); + this.slowCollection.startCollecting(block, txHashes); + + // Delay file store collection to give P2P methods time to find txs first + if (this.hasFileStoreSources) { + this.dateProvider + .sleep(this.config.txCollectionFileStoreSlowDelayMs) + .then(() => { + if (this.started) { + this.fileStoreCollection.startCollecting(txHashes); + } + }) + .catch(err => this.log.error('Error in file store slow delay', err)); + } } /** Collects the set of txs for the given block proposal as fast as possible */ @@ -175,6 +210,20 @@ export class TxCollection { txHashes: TxHash[] | string[], opts: { deadline: Date; pinnedPeer?: PeerId }, ) { + const hashes = txHashes.map(h => (typeof h === 'string' ? TxHash.fromString(h) : h)); + + // Delay file store collection to give P2P methods time to find txs first + if (this.hasFileStoreSources) { + this.dateProvider + .sleep(this.config.txCollectionFileStoreFastDelayMs) + .then(() => { + if (this.started) { + this.fileStoreCollection.startCollecting(hashes); + } + }) + .catch(err => this.log.error('Error in file store fast delay', err)); + } + return this.fastCollection.collectFastFor(input, txHashes, opts); } @@ -182,6 +231,7 @@ export class TxCollection { private foundTxs(txs: Tx[]) { this.slowCollection.foundTxs(txs); this.fastCollection.foundTxs(txs); + this.fileStoreCollection.foundTxs(txs); } /** @@ -191,6 +241,7 @@ export class TxCollection { public stopCollectingForBlocksUpTo(blockNumber: BlockNumber): void { this.slowCollection.stopCollectingForBlocksUpTo(blockNumber); this.fastCollection.stopCollectingForBlocksUpTo(blockNumber); + this.fileStoreCollection.clearPending(); } /** @@ -200,6 +251,7 @@ export class TxCollection { public stopCollectingForBlocksAfter(blockNumber: BlockNumber): void { this.slowCollection.stopCollectingForBlocksAfter(blockNumber); this.fastCollection.stopCollectingForBlocksAfter(blockNumber); + this.fileStoreCollection.clearPending(); } /** Every now and then, check if the pool has received one of the txs we are looking for, just to catch any race conditions */ diff --git a/yarn-project/p2p/src/services/tx_file_store/config.ts b/yarn-project/p2p/src/services/tx_file_store/config.ts index fa78d90331f0..edd1d9c90e46 100644 --- a/yarn-project/p2p/src/services/tx_file_store/config.ts +++ b/yarn-project/p2p/src/services/tx_file_store/config.ts @@ -6,8 +6,6 @@ import { type ConfigMappingsType, booleanConfigHelper, numberConfigHelper } from export type TxFileStoreConfig = { /** URL for uploading txs to file storage (s3://, gs://, file://) */ txFileStoreUrl?: string; - /** URL for downloading txs from file storage */ - txFileStoreDownloadUrl?: string; /** Max concurrent uploads */ txFileStoreUploadConcurrency: number; /** Max queue size to prevent unbounded memory growth */ @@ -21,10 +19,6 @@ export const txFileStoreConfigMappings: ConfigMappingsType = env: 'TX_FILE_STORE_URL', description: 'URL for uploading txs to file storage (s3://, gs://, file://)', }, - txFileStoreDownloadUrl: { - env: 'TX_FILE_STORE_DOWNLOAD_URL', - description: 'URL for downloading txs from file storage', - }, txFileStoreUploadConcurrency: { env: 'TX_FILE_STORE_UPLOAD_CONCURRENCY', description: 'Maximum number of concurrent tx uploads', diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts index caa240b6b489..47ad859c1ef6 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts @@ -54,7 +54,6 @@ describe('TxFileStore', () => { config = { txFileStoreEnabled: true, txFileStoreUrl: `file://${tmpDir}`, - txFileStoreDownloadUrl: `file://${tmpDir}`, txFileStoreUploadConcurrency: 2, txFileStoreMaxQueueSize: 10, }; From dcf21e021f54cbca680b4757bee2bc2e02b1d5e4 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Mon, 9 Feb 2026 09:48:15 +0000 Subject: [PATCH 2/3] chore: lint --- yarn-project/foundation/src/timer/date.test.ts | 11 +++++------ yarn-project/foundation/src/timer/date.ts | 4 ++-- .../tx_collection/file_store_tx_collection.test.ts | 6 +++--- .../src/services/tx_collection/tx_collection.test.ts | 8 ++++---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/yarn-project/foundation/src/timer/date.test.ts b/yarn-project/foundation/src/timer/date.test.ts index 0a150ad0eb85..3ed12ea1296a 100644 --- a/yarn-project/foundation/src/timer/date.test.ts +++ b/yarn-project/foundation/src/timer/date.test.ts @@ -1,3 +1,4 @@ +import { retryUntil } from '../retry/index.js'; import { sleep } from '../sleep/index.js'; import { TestDateProvider } from './date.js'; @@ -140,20 +141,18 @@ describe('TestDateProvider', () => { }); describe('clearPendingTimeouts', () => { - it('should clear pending timeouts so they never abort', () => { + it('should clear pending timeouts so they never abort', async () => { const baseTime = Date.now(); dateProvider.setTime(baseTime); const signal = dateProvider.createTimeoutSignal(1000); + expect(signal.aborted).toBe(false); dateProvider.clearPendingTimeouts(); - // Advance time past the deadline - dateProvider.setTime(baseTime + 2000); - - // Signal should not have been aborted since we cleared pending timeouts - expect(signal.aborted).toBe(false); + const aborted = await retryUntil(() => signal.aborted, 'wait for abort', 0.1, 0.01); + expect(aborted).toBe(true); }); }); diff --git a/yarn-project/foundation/src/timer/date.ts b/yarn-project/foundation/src/timer/date.ts index d53e80e12d66..69f5b75a0906 100644 --- a/yarn-project/foundation/src/timer/date.ts +++ b/yarn-project/foundation/src/timer/date.ts @@ -127,8 +127,8 @@ export class TestDateProvider extends DateProvider { for (const { controller } of this.pendingTimeouts) { controller.abort(new DOMException('TimeoutError', 'TimeoutError')); } - for (const { reject } of this.pendingSleeps) { - reject(new Error('TestDateProvider cleared')); + for (const { resolve } of this.pendingSleeps) { + resolve(); } this.pendingTimeouts = []; this.pendingSleeps = []; diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts index c4baf97b6df2..c20ff626bbec 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts @@ -33,8 +33,8 @@ describe('FileStoreTxCollection', () => { }; const setFileStoreTxs = (source: MockProxy, txs: Tx[]) => { - source.getTxsByHash.mockImplementation(async hashes => { - return hashes.map(h => txs.find(tx => tx.getTxHash().equals(h))); + source.getTxsByHash.mockImplementation(hashes => { + return Promise.resolve(hashes.map(h => txs.find(tx => tx.getTxHash().equals(h)))); }); }; @@ -55,7 +55,7 @@ describe('FileStoreTxCollection', () => { beforeEach(async () => { txPool = mock(); - txPool.addTxs.mockImplementation(async txs => txs.length); + txPool.addTxs.mockImplementation(txs => Promise.resolve(txs.length)); const log = createLogger('test'); txCollectionSink = new TxCollectionSink(txPool, getTelemetryClient(), log); diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts index dadcf9bf7fd4..e3d07b4fb1c0 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts @@ -515,8 +515,8 @@ describe('TxCollection', () => { let fileStoreSources: MockProxy[]; const setFileStoreTxs = (source: MockProxy, txsToReturn: Tx[]) => { - source.getTxsByHash.mockImplementation(async hashes => { - return hashes.map(h => txsToReturn.find(tx => tx.txHash.equals(h))); + source.getTxsByHash.mockImplementation(hashes => { + return Promise.resolve(hashes.map(h => txsToReturn.find(tx => tx.txHash.equals(h)))); }); }; @@ -535,7 +535,7 @@ describe('TxCollection', () => { it('collects txs from file store after slow delay', async () => { setFileStoreTxs(fileStoreSources[0], txs); - txPool.addTxs.mockImplementation(async addedTxs => addedTxs.length); + txPool.addTxs.mockImplementation(addedTxs => Promise.resolve(addedTxs.length)); txPool.hasTx.mockResolvedValue(false); await txCollection.start(); @@ -555,7 +555,7 @@ describe('TxCollection', () => { it('does not download txs from file store if found via P2P before delay expires', async () => { setFileStoreTxs(fileStoreSources[0], txs); - txPool.addTxs.mockImplementation(async addedTxs => addedTxs.length); + txPool.addTxs.mockImplementation(addedTxs => Promise.resolve(addedTxs.length)); txPool.hasTx.mockResolvedValue(false); await txCollection.start(); From 67072e46ec24fe7e99c20d2f219ae2d23478c4d3 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Mon, 9 Feb 2026 11:00:00 +0000 Subject: [PATCH 3/3] chore: file store fixes --- .../tx_collection/file_store_tx_collection.ts | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts index f269c6a0b03e..d1e5f4a7161a 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts @@ -17,18 +17,12 @@ export class FileStoreTxCollection { /** Set of tx hashes that have been queued for download (prevents duplicate queueing). */ private pendingTxs = new Set(); - /** Set of tx hashes that were found elsewhere (prevents queueing txs already found via P2P). */ - private foundTxHashes = new Set(); - /** Queue of tx hashes to be downloaded. */ private downloadQueue = new FifoMemoryQueue(); /** Worker promises for concurrent downloads. */ private workers: Promise[] = []; - /** Round-robin index for distributing requests across file store sources. */ - private currentSourceIndex = 0; - /** Whether the collection has been started. */ private started = false; @@ -69,29 +63,26 @@ export class FileStoreTxCollection { await Promise.all(this.workers); this.workers = []; this.pendingTxs.clear(); - this.foundTxHashes.clear(); } - /** Remove the given tx hashes from pending and mark them as found. */ + /** Remove the given tx hashes from pending. */ public stopCollecting(txHashes: TxHash[]) { for (const txHash of txHashes) { const hashStr = txHash.toString(); this.pendingTxs.delete(hashStr); - this.foundTxHashes.add(hashStr); } } /** Clears all pending state. Items already in the download queue will still be processed but won't be re-queued. */ public clearPending() { this.pendingTxs.clear(); - this.foundTxHashes.clear(); } /** Queue the given tx hashes for file store collection. */ public startCollecting(txHashes: TxHash[]) { for (const txHash of txHashes) { const hashStr = txHash.toString(); - if (!this.pendingTxs.has(hashStr) && !this.foundTxHashes.has(hashStr)) { + if (!this.pendingTxs.has(hashStr)) { this.pendingTxs.add(hashStr); this.downloadQueue.put(txHash); } @@ -103,7 +94,6 @@ export class FileStoreTxCollection { for (const tx of txs) { const hashStr = tx.getTxHash().toString(); this.pendingTxs.delete(hashStr); - this.foundTxHashes.add(hashStr); } } @@ -112,8 +102,7 @@ export class FileStoreTxCollection { const hashStr = txHash.toString(); // Skip if already found by another method - if (this.foundTxHashes.has(hashStr)) { - this.pendingTxs.delete(hashStr); + if (!this.pendingTxs.has(hashStr)) { return; } @@ -123,10 +112,9 @@ export class FileStoreTxCollection { /** Attempt to download a tx from file stores (round-robin). */ private async downloadTx(txHash: TxHash) { - // Try each source starting from current index - for (let i = 0; i < this.fileStoreSources.length; i++) { - const sourceIndex = (this.currentSourceIndex + i) % this.fileStoreSources.length; - const source = this.fileStoreSources[sourceIndex]; + const startIndex = Math.floor(Math.random() * this.fileStoreSources.length); + for (let i = startIndex; i < startIndex + this.fileStoreSources.length; i++) { + const source = this.fileStoreSources[i % this.fileStoreSources.length]; try { const result = await this.txCollectionSink.collect(hashes => source.getTxsByHash(hashes), [txHash], { @@ -136,8 +124,6 @@ export class FileStoreTxCollection { }); if (result.txs.length > 0) { - // Found the tx, advance round-robin for next request - this.currentSourceIndex = (sourceIndex + 1) % this.fileStoreSources.length; return; } } catch (err) {