Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions yarn-project/archiver/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ export class BlockNotFoundError extends Error {
}
}

/** Thrown when logs are added for a tag whose last stored log has a higher block number than the new log. */
export class OutOfOrderLogInsertionError extends Error {
constructor(
public readonly logType: 'private' | 'public',
public readonly tag: string,
public readonly lastBlockNumber: number,
public readonly newBlockNumber: number,
) {
super(
`Out-of-order ${logType} log insertion for tag ${tag}: ` +
`last existing log is from block ${lastBlockNumber} but new log is from block ${newBlockNumber}`,
);
this.name = 'OutOfOrderLogInsertionError';
}
}

/** Thrown when a proposed block conflicts with an already checkpointed block (different content). */
export class CannotOverwriteCheckpointedBlockError extends Error {
constructor(
public readonly blockNumber: number,
Expand Down
11 changes: 8 additions & 3 deletions yarn-project/archiver/src/modules/data_source_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,21 @@ export abstract class ArchiverDataSourceBase
return (await this.store.getPendingChainValidationStatus()) ?? { valid: true };
}

public getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise<TxScopedL2Log[][]> {
return this.store.getPrivateLogsByTags(tags, page);
public getPrivateLogsByTags(
tags: SiloedTag[],
page?: number,
upToBlockNumber?: BlockNumber,
): Promise<TxScopedL2Log[][]> {
return this.store.getPrivateLogsByTags(tags, page, upToBlockNumber);
}

public getPublicLogsByTagsFromContract(
contractAddress: AztecAddress,
tags: Tag[],
page?: number,
upToBlockNumber?: BlockNumber,
): Promise<TxScopedL2Log[][]> {
return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page);
return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber);
}

public getPublicLogs(filter: LogFilter): Promise<GetPublicLogsResponse> {
Expand Down
177 changes: 177 additions & 0 deletions yarn-project/archiver/src/store/kv_archiver_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
CheckpointNumberNotSequentialError,
InitialBlockNumberNotSequentialError,
InitialCheckpointNumberNotSequentialError,
OutOfOrderLogInsertionError,
} from '../errors.js';
import { MessageStoreError } from '../store/message_store.js';
import type { InboxMessage } from '../structs/inbox_message.js';
Expand Down Expand Up @@ -2343,6 +2344,32 @@ describe('KVArchiverDataStore', () => {
]);
});

it('throws on out-of-order private log insertion', async () => {
const sharedTag = makePrivateLogTag(99, 0, 0);

// Create blocks 4 and 5 with the same shared tag
const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, {
previousArchive: prevArchive1,
numTxsPerBlock,
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
});
checkpoint4.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag);

const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive;
const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, {
previousArchive: prevArchive2,
numTxsPerBlock,
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
});
checkpoint5.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag);

// Store block 5's logs first (higher block number), then try to store block 4's logs
// (lower block number) — this should fail.
await store.addLogs([checkpoint5.checkpoint.blocks[0]]);
await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError);
});

it('is possible to request logs for non-existing tags and determine their position', async () => {
const tags = [makePrivateLogTag(99, 88, 77), makePrivateLogTag(1, 1, 1)];

Expand All @@ -2361,6 +2388,48 @@ describe('KVArchiverDataStore', () => {
]);
});

it('filters logs up to specified block number', async () => {
// Tags are unique per block, so create a shared tag across blocks by adding logs with the same tag
const sharedTag = makePrivateLogTag(1, 2, 1);

// Add extra blocks with logs sharing the same tag
for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) {
const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
const newCheckpoint = await makeCheckpointWithLogs(blockNum, {
previousArchive,
numTxsPerBlock,
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
});
const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1];
newLog.fields[0] = sharedTag.value;
newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1] = newLog;
await store.addCheckpoints([newCheckpoint]);
await store.addLogs([newCheckpoint.checkpoint.blocks[0]]);
logsCheckpoints.push(newCheckpoint);
}

// Without filter, should return logs from block 1 and the extra blocks
const allLogs = await store.getPrivateLogsByTags([sharedTag]);
expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true);

// With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1
const filteredLogs = await store.getPrivateLogsByTags([sharedTag], 0, BlockNumber(numBlocksForLogs));
expect(filteredLogs[0].length).toBeGreaterThan(0);
for (const log of filteredLogs[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs);
}
expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length);
});

it('returns all logs when upToBlockNumber is not set', async () => {
const tag = makePrivateLogTag(1, 2, 1);

const logsWithoutFilter = await store.getPrivateLogsByTags([tag]);
const logsWithUndefined = await store.getPrivateLogsByTags([tag], 0, undefined);

expect(logsWithoutFilter).toEqual(logsWithUndefined);
});

describe('pagination', () => {
const paginationTag = makePrivateLogTag(1, 2, 1);

Expand All @@ -2382,6 +2451,20 @@ describe('KVArchiverDataStore', () => {
}
});

it('pagination works correctly with upToBlockNumber', async () => {
// With a low upToBlockNumber, the filtered set should be smaller than MAX_LOGS_PER_TAG
const filteredPage0 = await store.getPrivateLogsByTags([paginationTag], 0, BlockNumber(5));
for (const log of filteredPage0[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(5);
}

// Page 1 with the same filter should only contain remaining filtered logs
const filteredPage1 = await store.getPrivateLogsByTags([paginationTag], 1, BlockNumber(5));
for (const log of filteredPage1[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(5);
}
});

it('returns first page of logs when page=0', async () => {
const logsByTags = await store.getPrivateLogsByTags([paginationTag], 0);

Expand Down Expand Up @@ -2549,6 +2632,32 @@ describe('KVArchiverDataStore', () => {
]);
});

it('throws on out-of-order public log insertion', async () => {
const sharedTag = makePublicLogTag(99, 0, 0);

// Create blocks 4 and 5 with the same shared tag
const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, {
previousArchive: prevArchive1,
numTxsPerBlock,
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
});
checkpoint4.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress);

const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive;
const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, {
previousArchive: prevArchive2,
numTxsPerBlock,
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
});
checkpoint5.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress);

// Store block 5's logs first (higher block number), then try to store block 4's logs
// (lower block number) — this should fail.
await store.addLogs([checkpoint5.checkpoint.blocks[0]]);
await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError);
});

it('is possible to request logs for non-existing tags and determine their position', async () => {
const tags = [makePublicLogTag(99, 88, 77), makePublicLogTag(1, 1, 0)];

Expand All @@ -2567,6 +2676,52 @@ describe('KVArchiverDataStore', () => {
]);
});

it('filters logs up to specified block number', async () => {
const sharedTag = makePublicLogTag(1, 2, 1);

// Add extra blocks with logs sharing the same tag
for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) {
const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
const newCheckpoint = await makeCheckpointWithLogs(blockNum, {
previousArchive,
numTxsPerBlock,
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
});
const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1];
newLog.fields[0] = sharedTag.value;
newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1] = newLog;
await store.addCheckpoints([newCheckpoint]);
await store.addLogs([newCheckpoint.checkpoint.blocks[0]]);
logsCheckpoints.push(newCheckpoint);
}

// Without filter, should return logs from block 1 and the extra blocks
const allLogs = await store.getPublicLogsByTagsFromContract(contractAddress, [sharedTag]);
expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true);

// With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1
const filteredLogs = await store.getPublicLogsByTagsFromContract(
contractAddress,
[sharedTag],
0,
BlockNumber(numBlocksForLogs),
);
expect(filteredLogs[0].length).toBeGreaterThan(0);
for (const log of filteredLogs[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs);
}
expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length);
});

it('returns all logs when upToBlockNumber is not set', async () => {
const tag = makePublicLogTag(1, 2, 1);

const logsWithoutFilter = await store.getPublicLogsByTagsFromContract(contractAddress, [tag]);
const logsWithUndefined = await store.getPublicLogsByTagsFromContract(contractAddress, [tag], 0, undefined);

expect(logsWithoutFilter).toEqual(logsWithUndefined);
});

describe('pagination', () => {
const paginationTag = makePublicLogTag(1, 2, 1);

Expand All @@ -2588,6 +2743,28 @@ describe('KVArchiverDataStore', () => {
}
});

it('pagination works correctly with upToBlockNumber', async () => {
const filteredPage0 = await store.getPublicLogsByTagsFromContract(
contractAddress,
[paginationTag],
0,
BlockNumber(5),
);
for (const log of filteredPage0[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(5);
}

const filteredPage1 = await store.getPublicLogsByTagsFromContract(
contractAddress,
[paginationTag],
1,
BlockNumber(5),
);
for (const log of filteredPage1[0]) {
expect(log.blockNumber).toBeLessThanOrEqual(5);
}
});

it('returns first page of logs when page=0', async () => {
const logsByTags = await store.getPublicLogsByTagsFromContract(contractAddress, [paginationTag], 0);

Expand Down
9 changes: 6 additions & 3 deletions yarn-project/archiver/src/store/kv_archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,11 @@ export class KVArchiverDataStore implements ContractDataSource {
* array implies no logs match that tag.
* @param tags - The tags to search for.
* @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page.
* @param upToBlockNumber - If set, only return logs from blocks up to and including this block number.
*/
getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise<TxScopedL2Log[][]> {
getPrivateLogsByTags(tags: SiloedTag[], page?: number, upToBlockNumber?: BlockNumber): Promise<TxScopedL2Log[][]> {
try {
return this.#logStore.getPrivateLogsByTags(tags, page);
return this.#logStore.getPrivateLogsByTags(tags, page, upToBlockNumber);
} catch (err) {
return Promise.reject(err);
}
Expand All @@ -487,14 +488,16 @@ export class KVArchiverDataStore implements ContractDataSource {
* @param contractAddress - The contract address to search logs for.
* @param tags - The tags to search for.
* @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page.
* @param upToBlockNumber - If set, only return logs from blocks up to and including this block number.
*/
getPublicLogsByTagsFromContract(
contractAddress: AztecAddress,
tags: Tag[],
page?: number,
upToBlockNumber?: BlockNumber,
): Promise<TxScopedL2Log[][]> {
try {
return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page);
return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber);
} catch (err) {
return Promise.reject(err);
}
Expand Down
Loading
Loading