diff --git a/yarn-project/archiver/src/errors.ts b/yarn-project/archiver/src/errors.ts index 9ef345cf89e8..bdc128dc9db8 100644 --- a/yarn-project/archiver/src/errors.ts +++ b/yarn-project/archiver/src/errors.ts @@ -89,6 +89,23 @@ export class BlockNotFoundError extends Error { } } +/** Thrown when logs are added for a tag whose last stored log has a higher block number than the new log. */ +export class OutOfOrderLogInsertionError extends Error { + constructor( + public readonly logType: 'private' | 'public', + public readonly tag: string, + public readonly lastBlockNumber: number, + public readonly newBlockNumber: number, + ) { + super( + `Out-of-order ${logType} log insertion for tag ${tag}: ` + + `last existing log is from block ${lastBlockNumber} but new log is from block ${newBlockNumber}`, + ); + this.name = 'OutOfOrderLogInsertionError'; + } +} + +/** Thrown when a proposed block conflicts with an already checkpointed block (different content). */ export class CannotOverwriteCheckpointedBlockError extends Error { constructor( public readonly blockNumber: number, diff --git a/yarn-project/archiver/src/modules/data_source_base.ts b/yarn-project/archiver/src/modules/data_source_base.ts index 7bdb3e1faf99..228b0d69e18c 100644 --- a/yarn-project/archiver/src/modules/data_source_base.ts +++ b/yarn-project/archiver/src/modules/data_source_base.ts @@ -165,16 +165,21 @@ export abstract class ArchiverDataSourceBase return (await this.store.getPendingChainValidationStatus()) ?? { valid: true }; } - public getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise { - return this.store.getPrivateLogsByTags(tags, page); + public getPrivateLogsByTags( + tags: SiloedTag[], + page?: number, + upToBlockNumber?: BlockNumber, + ): Promise { + return this.store.getPrivateLogsByTags(tags, page, upToBlockNumber); } public getPublicLogsByTagsFromContract( contractAddress: AztecAddress, tags: Tag[], page?: number, + upToBlockNumber?: BlockNumber, ): Promise { - return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page); + return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber); } public getPublicLogs(filter: LogFilter): Promise { diff --git a/yarn-project/archiver/src/store/kv_archiver_store.test.ts b/yarn-project/archiver/src/store/kv_archiver_store.test.ts index d05044ded8d2..36010e3e6649 100644 --- a/yarn-project/archiver/src/store/kv_archiver_store.test.ts +++ b/yarn-project/archiver/src/store/kv_archiver_store.test.ts @@ -50,6 +50,7 @@ import { CheckpointNumberNotSequentialError, InitialBlockNumberNotSequentialError, InitialCheckpointNumberNotSequentialError, + OutOfOrderLogInsertionError, } from '../errors.js'; import { MessageStoreError } from '../store/message_store.js'; import type { InboxMessage } from '../structs/inbox_message.js'; @@ -2343,6 +2344,32 @@ describe('KVArchiverDataStore', () => { ]); }); + it('throws on out-of-order private log insertion', async () => { + const sharedTag = makePrivateLogTag(99, 0, 0); + + // Create blocks 4 and 5 with the same shared tag + const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive; + const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, { + previousArchive: prevArchive1, + numTxsPerBlock, + privateLogs: { numLogsPerTx: numPrivateLogsPerTx }, + }); + checkpoint4.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag); + + const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive; + const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, { + previousArchive: prevArchive2, + numTxsPerBlock, + privateLogs: { numLogsPerTx: numPrivateLogsPerTx }, + }); + checkpoint5.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag); + + // Store block 5's logs first (higher block number), then try to store block 4's logs + // (lower block number) — this should fail. + await store.addLogs([checkpoint5.checkpoint.blocks[0]]); + await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError); + }); + it('is possible to request logs for non-existing tags and determine their position', async () => { const tags = [makePrivateLogTag(99, 88, 77), makePrivateLogTag(1, 1, 1)]; @@ -2361,6 +2388,48 @@ describe('KVArchiverDataStore', () => { ]); }); + it('filters logs up to specified block number', async () => { + // Tags are unique per block, so create a shared tag across blocks by adding logs with the same tag + const sharedTag = makePrivateLogTag(1, 2, 1); + + // Add extra blocks with logs sharing the same tag + for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) { + const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive; + const newCheckpoint = await makeCheckpointWithLogs(blockNum, { + previousArchive, + numTxsPerBlock, + privateLogs: { numLogsPerTx: numPrivateLogsPerTx }, + }); + const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1]; + newLog.fields[0] = sharedTag.value; + newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1] = newLog; + await store.addCheckpoints([newCheckpoint]); + await store.addLogs([newCheckpoint.checkpoint.blocks[0]]); + logsCheckpoints.push(newCheckpoint); + } + + // Without filter, should return logs from block 1 and the extra blocks + const allLogs = await store.getPrivateLogsByTags([sharedTag]); + expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true); + + // With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1 + const filteredLogs = await store.getPrivateLogsByTags([sharedTag], 0, BlockNumber(numBlocksForLogs)); + expect(filteredLogs[0].length).toBeGreaterThan(0); + for (const log of filteredLogs[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs); + } + expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length); + }); + + it('returns all logs when upToBlockNumber is not set', async () => { + const tag = makePrivateLogTag(1, 2, 1); + + const logsWithoutFilter = await store.getPrivateLogsByTags([tag]); + const logsWithUndefined = await store.getPrivateLogsByTags([tag], 0, undefined); + + expect(logsWithoutFilter).toEqual(logsWithUndefined); + }); + describe('pagination', () => { const paginationTag = makePrivateLogTag(1, 2, 1); @@ -2382,6 +2451,20 @@ describe('KVArchiverDataStore', () => { } }); + it('pagination works correctly with upToBlockNumber', async () => { + // With a low upToBlockNumber, the filtered set should be smaller than MAX_LOGS_PER_TAG + const filteredPage0 = await store.getPrivateLogsByTags([paginationTag], 0, BlockNumber(5)); + for (const log of filteredPage0[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(5); + } + + // Page 1 with the same filter should only contain remaining filtered logs + const filteredPage1 = await store.getPrivateLogsByTags([paginationTag], 1, BlockNumber(5)); + for (const log of filteredPage1[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(5); + } + }); + it('returns first page of logs when page=0', async () => { const logsByTags = await store.getPrivateLogsByTags([paginationTag], 0); @@ -2549,6 +2632,32 @@ describe('KVArchiverDataStore', () => { ]); }); + it('throws on out-of-order public log insertion', async () => { + const sharedTag = makePublicLogTag(99, 0, 0); + + // Create blocks 4 and 5 with the same shared tag + const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive; + const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, { + previousArchive: prevArchive1, + numTxsPerBlock, + publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress }, + }); + checkpoint4.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress); + + const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive; + const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, { + previousArchive: prevArchive2, + numTxsPerBlock, + publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress }, + }); + checkpoint5.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress); + + // Store block 5's logs first (higher block number), then try to store block 4's logs + // (lower block number) — this should fail. + await store.addLogs([checkpoint5.checkpoint.blocks[0]]); + await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError); + }); + it('is possible to request logs for non-existing tags and determine their position', async () => { const tags = [makePublicLogTag(99, 88, 77), makePublicLogTag(1, 1, 0)]; @@ -2567,6 +2676,52 @@ describe('KVArchiverDataStore', () => { ]); }); + it('filters logs up to specified block number', async () => { + const sharedTag = makePublicLogTag(1, 2, 1); + + // Add extra blocks with logs sharing the same tag + for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) { + const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive; + const newCheckpoint = await makeCheckpointWithLogs(blockNum, { + previousArchive, + numTxsPerBlock, + publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress }, + }); + const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1]; + newLog.fields[0] = sharedTag.value; + newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1] = newLog; + await store.addCheckpoints([newCheckpoint]); + await store.addLogs([newCheckpoint.checkpoint.blocks[0]]); + logsCheckpoints.push(newCheckpoint); + } + + // Without filter, should return logs from block 1 and the extra blocks + const allLogs = await store.getPublicLogsByTagsFromContract(contractAddress, [sharedTag]); + expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true); + + // With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1 + const filteredLogs = await store.getPublicLogsByTagsFromContract( + contractAddress, + [sharedTag], + 0, + BlockNumber(numBlocksForLogs), + ); + expect(filteredLogs[0].length).toBeGreaterThan(0); + for (const log of filteredLogs[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs); + } + expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length); + }); + + it('returns all logs when upToBlockNumber is not set', async () => { + const tag = makePublicLogTag(1, 2, 1); + + const logsWithoutFilter = await store.getPublicLogsByTagsFromContract(contractAddress, [tag]); + const logsWithUndefined = await store.getPublicLogsByTagsFromContract(contractAddress, [tag], 0, undefined); + + expect(logsWithoutFilter).toEqual(logsWithUndefined); + }); + describe('pagination', () => { const paginationTag = makePublicLogTag(1, 2, 1); @@ -2588,6 +2743,28 @@ describe('KVArchiverDataStore', () => { } }); + it('pagination works correctly with upToBlockNumber', async () => { + const filteredPage0 = await store.getPublicLogsByTagsFromContract( + contractAddress, + [paginationTag], + 0, + BlockNumber(5), + ); + for (const log of filteredPage0[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(5); + } + + const filteredPage1 = await store.getPublicLogsByTagsFromContract( + contractAddress, + [paginationTag], + 1, + BlockNumber(5), + ); + for (const log of filteredPage1[0]) { + expect(log.blockNumber).toBeLessThanOrEqual(5); + } + }); + it('returns first page of logs when page=0', async () => { const logsByTags = await store.getPublicLogsByTagsFromContract(contractAddress, [paginationTag], 0); diff --git a/yarn-project/archiver/src/store/kv_archiver_store.ts b/yarn-project/archiver/src/store/kv_archiver_store.ts index d46075e2a588..e5bb12b28a3d 100644 --- a/yarn-project/archiver/src/store/kv_archiver_store.ts +++ b/yarn-project/archiver/src/store/kv_archiver_store.ts @@ -472,10 +472,11 @@ export class KVArchiverDataStore implements ContractDataSource { * array implies no logs match that tag. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. */ - getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise { + getPrivateLogsByTags(tags: SiloedTag[], page?: number, upToBlockNumber?: BlockNumber): Promise { try { - return this.#logStore.getPrivateLogsByTags(tags, page); + return this.#logStore.getPrivateLogsByTags(tags, page, upToBlockNumber); } catch (err) { return Promise.reject(err); } @@ -487,14 +488,16 @@ export class KVArchiverDataStore implements ContractDataSource { * @param contractAddress - The contract address to search logs for. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. */ getPublicLogsByTagsFromContract( contractAddress: AztecAddress, tags: Tag[], page?: number, + upToBlockNumber?: BlockNumber, ): Promise { try { - return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page); + return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber); } catch (err) { return Promise.reject(err); } diff --git a/yarn-project/archiver/src/store/log_store.ts b/yarn-project/archiver/src/store/log_store.ts index e389cba458e2..a591964b248d 100644 --- a/yarn-project/archiver/src/store/log_store.ts +++ b/yarn-project/archiver/src/store/log_store.ts @@ -22,6 +22,7 @@ import { } from '@aztec/stdlib/logs'; import { TxHash } from '@aztec/stdlib/tx'; +import { OutOfOrderLogInsertionError } from '../errors.js'; import type { BlockStore } from './block_store.js'; /** @@ -165,10 +166,21 @@ export class LogStore { for (const taggedLogBuffer of currentPrivateTaggedLogs) { if (taggedLogBuffer.logBuffers && taggedLogBuffer.logBuffers.length > 0) { - privateTaggedLogs.set( - taggedLogBuffer.tag, - taggedLogBuffer.logBuffers!.concat(privateTaggedLogs.get(taggedLogBuffer.tag)!), - ); + const newLogs = privateTaggedLogs.get(taggedLogBuffer.tag)!; + if (newLogs.length === 0) { + continue; + } + const lastExisting = TxScopedL2Log.fromBuffer(taggedLogBuffer.logBuffers.at(-1)!); + const firstNew = TxScopedL2Log.fromBuffer(newLogs[0]); + if (lastExisting.blockNumber > firstNew.blockNumber) { + throw new OutOfOrderLogInsertionError( + 'private', + taggedLogBuffer.tag, + lastExisting.blockNumber, + firstNew.blockNumber, + ); + } + privateTaggedLogs.set(taggedLogBuffer.tag, taggedLogBuffer.logBuffers.concat(newLogs)); } } @@ -200,10 +212,21 @@ export class LogStore { for (const taggedLogBuffer of currentPublicTaggedLogs) { if (taggedLogBuffer.logBuffers && taggedLogBuffer.logBuffers.length > 0) { - publicTaggedLogs.set( - taggedLogBuffer.tag, - taggedLogBuffer.logBuffers!.concat(publicTaggedLogs.get(taggedLogBuffer.tag)!), - ); + const newLogs = publicTaggedLogs.get(taggedLogBuffer.tag)!; + if (newLogs.length === 0) { + continue; + } + const lastExisting = TxScopedL2Log.fromBuffer(taggedLogBuffer.logBuffers.at(-1)!); + const firstNew = TxScopedL2Log.fromBuffer(newLogs[0]); + if (lastExisting.blockNumber > firstNew.blockNumber) { + throw new OutOfOrderLogInsertionError( + 'public', + taggedLogBuffer.tag, + lastExisting.blockNumber, + firstNew.blockNumber, + ); + } + publicTaggedLogs.set(taggedLogBuffer.tag, taggedLogBuffer.logBuffers.concat(newLogs)); } } @@ -322,17 +345,30 @@ export class LogStore { * array implies no logs match that tag. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. * @returns An array of log arrays, one per tag. Returns at most MAX_LOGS_PER_TAG logs per tag per page. If * MAX_LOGS_PER_TAG logs are returned for a tag, the caller should fetch the next page to check for more logs. */ - async getPrivateLogsByTags(tags: SiloedTag[], page: number = 0): Promise { + async getPrivateLogsByTags( + tags: SiloedTag[], + page: number = 0, + upToBlockNumber?: BlockNumber, + ): Promise { const logs = await Promise.all(tags.map(tag => this.#privateLogsByTag.getAsync(tag.toString()))); + const start = page * MAX_LOGS_PER_TAG; const end = start + MAX_LOGS_PER_TAG; - return logs.map( - logBuffers => logBuffers?.slice(start, end).map(logBuffer => TxScopedL2Log.fromBuffer(logBuffer)) ?? [], - ); + return logs.map(logBuffers => { + const deserialized = logBuffers?.slice(start, end).map(buf => TxScopedL2Log.fromBuffer(buf)) ?? []; + if (upToBlockNumber !== undefined) { + const cutoff = deserialized.findIndex(log => log.blockNumber > upToBlockNumber); + if (cutoff !== -1) { + return deserialized.slice(0, cutoff); + } + } + return deserialized; + }); } /** @@ -341,6 +377,7 @@ export class LogStore { * @param contractAddress - The contract address to search logs for. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. * @returns An array of log arrays, one per tag. Returns at most MAX_LOGS_PER_TAG logs per tag per page. If * MAX_LOGS_PER_TAG logs are returned for a tag, the caller should fetch the next page to check for more logs. */ @@ -348,6 +385,7 @@ export class LogStore { contractAddress: AztecAddress, tags: Tag[], page: number = 0, + upToBlockNumber?: BlockNumber, ): Promise { const logs = await Promise.all( tags.map(tag => { @@ -358,9 +396,16 @@ export class LogStore { const start = page * MAX_LOGS_PER_TAG; const end = start + MAX_LOGS_PER_TAG; - return logs.map( - logBuffers => logBuffers?.slice(start, end).map(logBuffer => TxScopedL2Log.fromBuffer(logBuffer)) ?? [], - ); + return logs.map(logBuffers => { + const deserialized = logBuffers?.slice(start, end).map(buf => TxScopedL2Log.fromBuffer(buf)) ?? []; + if (upToBlockNumber !== undefined) { + const cutoff = deserialized.findIndex(log => log.blockNumber > upToBlockNumber); + if (cutoff !== -1) { + return deserialized.slice(0, cutoff); + } + } + return deserialized; + }); } /** diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 242c8204f744..efa71db86bc6 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -793,18 +793,22 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { page?: number, referenceBlock?: BlockHash, ): Promise { + let upToBlockNumber: BlockNumber | undefined; if (referenceBlock) { const initialBlockHash = await this.#getInitialHeaderHash(); - if (!referenceBlock.equals(initialBlockHash)) { + if (referenceBlock.equals(initialBlockHash)) { + upToBlockNumber = BlockNumber(0); + } else { const header = await this.blockSource.getBlockHeaderByHash(referenceBlock); if (!header) { throw new Error( `Block ${referenceBlock.toString()} not found in the node. This might indicate a reorg has occurred.`, ); } + upToBlockNumber = header.globalVariables.blockNumber; } } - return this.logsSource.getPrivateLogsByTags(tags, page); + return this.logsSource.getPrivateLogsByTags(tags, page, upToBlockNumber); } public async getPublicLogsByTagsFromContract( @@ -813,18 +817,22 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { page?: number, referenceBlock?: BlockHash, ): Promise { + let upToBlockNumber: BlockNumber | undefined; if (referenceBlock) { const initialBlockHash = await this.#getInitialHeaderHash(); - if (!referenceBlock.equals(initialBlockHash)) { + if (referenceBlock.equals(initialBlockHash)) { + upToBlockNumber = BlockNumber(0); + } else { const header = await this.blockSource.getBlockHeaderByHash(referenceBlock); if (!header) { throw new Error( `Block ${referenceBlock.toString()} not found in the node. This might indicate a reorg has occurred.`, ); } + upToBlockNumber = header.globalVariables.blockNumber; } } - return this.logsSource.getPublicLogsByTagsFromContract(contractAddress, tags, page); + return this.logsSource.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber); } /** diff --git a/yarn-project/stdlib/src/interfaces/l2_logs_source.ts b/yarn-project/stdlib/src/interfaces/l2_logs_source.ts index 6db7b87c1093..8d66368c4a15 100644 --- a/yarn-project/stdlib/src/interfaces/l2_logs_source.ts +++ b/yarn-project/stdlib/src/interfaces/l2_logs_source.ts @@ -16,10 +16,11 @@ export interface L2LogsSource { * array implies no logs match that tag. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. * @returns An array of log arrays, one per tag. Returns at most 10 logs per tag per page. If 10 logs are returned * for a tag, the caller should fetch the next page to check for more logs. */ - getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise; + getPrivateLogsByTags(tags: SiloedTag[], page?: number, upToBlockNumber?: BlockNumber): Promise; /** * Gets public logs that match any of the `tags` from the specified contract. For each tag, an array of matching @@ -27,6 +28,7 @@ export interface L2LogsSource { * @param contractAddress - The contract address to search logs for. * @param tags - The tags to search for. * @param page - The page number (0-indexed) for pagination. + * @param upToBlockNumber - If set, only return logs from blocks up to and including this block number. * @returns An array of log arrays, one per tag. Returns at most 10 logs per tag per page. If 10 logs are returned * for a tag, the caller should fetch the next page to check for more logs. */ @@ -34,6 +36,7 @@ export interface L2LogsSource { contractAddress: AztecAddress, tags: Tag[], page?: number, + upToBlockNumber?: BlockNumber, ): Promise; /**