Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion yarn-project/pxe/src/tagging/get_all_logs_by_tags.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Fr } from '@aztec/foundation/curves/bn254';
import { AztecAddress } from '@aztec/stdlib/aztec-address';
import { BlockHash } from '@aztec/stdlib/block';
import { MAX_LOGS_PER_TAG } from '@aztec/stdlib/interfaces/api-limit';
import { MAX_LOGS_PER_TAG, MAX_RPC_LEN } from '@aztec/stdlib/interfaces/api-limit';
import type { AztecNode } from '@aztec/stdlib/interfaces/server';
import { SiloedTag, Tag } from '@aztec/stdlib/logs';
import { randomTxScopedPrivateL2Log } from '@aztec/stdlib/testing';
Expand Down Expand Up @@ -72,4 +72,79 @@ describe('getAllPrivateLogsByTags', () => {

expect(result).toEqual([]);
});

describe('batching when tags exceed MAX_RPC_LEN', () => {
let manyTags: SiloedTag[];

beforeAll(async () => {
manyTags = await Promise.all(Array.from({ length: MAX_RPC_LEN + 50 }, () => new SiloedTag(Fr.random())));
});

it('splits tags into batches and concatenates results', async () => {
const batch1Tags = manyTags.slice(0, MAX_RPC_LEN);
const batch2Tags = manyTags.slice(MAX_RPC_LEN);

aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => {
return Promise.resolve(tags.map(tag => [randomTxScopedPrivateL2Log({ tag: tag.value })]));
});

const result = await getAllPrivateLogsByTags(aztecNode, manyTags, MOCK_ANCHOR_BLOCK_HASH);

expect(result).toHaveLength(MAX_RPC_LEN + 50);
expect(result.every(logs => logs.length === 1)).toBe(true);

// Should have been called twice: once per batch
expect(aztecNode.getPrivateLogsByTags).toHaveBeenCalledTimes(2);
expect(aztecNode.getPrivateLogsByTags).toHaveBeenNthCalledWith(1, batch1Tags, 0, MOCK_ANCHOR_BLOCK_HASH);
expect(aztecNode.getPrivateLogsByTags).toHaveBeenNthCalledWith(2, batch2Tags, 0, MOCK_ANCHOR_BLOCK_HASH);
});

it('paginates within each batch', async () => {
aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[], page: number) => {
if (tags.length === MAX_RPC_LEN && page === 0) {
// First batch, first page: first tag has MAX_LOGS_PER_TAG logs (triggers pagination)
return Promise.resolve(
tags.map((tag, i) =>
i === 0
? Array(MAX_LOGS_PER_TAG).fill(randomTxScopedPrivateL2Log({ tag: tag.value }))
: [randomTxScopedPrivateL2Log({ tag: tag.value })],
),
);
}
if (tags.length === MAX_RPC_LEN && page === 1) {
// First batch, second page: 3 more logs for first tag
return Promise.resolve(
tags.map((tag, i) => (i === 0 ? Array(3).fill(randomTxScopedPrivateL2Log({ tag: tag.value })) : [])),
);
}
// Second batch (50 tags), single page
return Promise.resolve(tags.map(tag => [randomTxScopedPrivateL2Log({ tag: tag.value })]));
});

const result = await getAllPrivateLogsByTags(aztecNode, manyTags, MOCK_ANCHOR_BLOCK_HASH);

expect(result).toHaveLength(MAX_RPC_LEN + 50);
// First tag in batch 1 got paginated: MAX_LOGS_PER_TAG + 3
expect(result[0]).toHaveLength(MAX_LOGS_PER_TAG + 3);
// Other tags in batch 1 got 1 log each
expect(result[1]).toHaveLength(1);
// Tags in batch 2 got 1 log each
expect(result[MAX_RPC_LEN]).toHaveLength(1);

// 2 pages for first batch + 1 page for second batch = 3 calls
expect(aztecNode.getPrivateLogsByTags).toHaveBeenCalledTimes(3);
});

it('does not batch when tags fit within MAX_RPC_LEN', async () => {
const exactlyMaxTags = manyTags.slice(0, MAX_RPC_LEN);

aztecNode.getPrivateLogsByTags.mockResolvedValue(exactlyMaxTags.map(() => []));

const result = await getAllPrivateLogsByTags(aztecNode, exactlyMaxTags, MOCK_ANCHOR_BLOCK_HASH);

expect(result).toHaveLength(MAX_RPC_LEN);
expect(aztecNode.getPrivateLogsByTags).toHaveBeenCalledTimes(1);
expect(aztecNode.getPrivateLogsByTags).toHaveBeenCalledWith(exactlyMaxTags, 0, MOCK_ANCHOR_BLOCK_HASH);
});
});
});
32 changes: 28 additions & 4 deletions yarn-project/pxe/src/tagging/get_all_logs_by_tags.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { AztecAddress } from '@aztec/stdlib/aztec-address';
import type { BlockHash } from '@aztec/stdlib/block';
import { MAX_LOGS_PER_TAG } from '@aztec/stdlib/interfaces/api-limit';
import { MAX_LOGS_PER_TAG, MAX_RPC_LEN } from '@aztec/stdlib/interfaces/api-limit';
import type { AztecNode } from '@aztec/stdlib/interfaces/client';
import type { SiloedTag, Tag, TxScopedL2Log } from '@aztec/stdlib/logs';

Expand Down Expand Up @@ -31,6 +31,26 @@ async function getAllPages<T>(numTags: number, fetchPage: (page: number) => Prom
return allResultsPerTag;
}

/**
* Splits tags into chunks of MAX_RPC_LEN, fetches logs for each chunk using getAllPages, then stitches the results
* back into a single array preserving the original tag order.
*/
async function getAllPagesInBatches<Tag, T>(
tags: Tag[],
fetchAllPagesForBatch: (batch: Tag[]) => Promise<T[][]>,
): Promise<T[][]> {
if (tags.length <= MAX_RPC_LEN) {
return fetchAllPagesForBatch(tags);
}

const batches: Tag[][] = [];
for (let i = 0; i < tags.length; i += MAX_RPC_LEN) {
batches.push(tags.slice(i, i + MAX_RPC_LEN));
}
const batchResults = await Promise.all(batches.map(fetchAllPagesForBatch));
return batchResults.flat();
}

/**
* Fetches all private logs for the given tags, automatically paginating through all pages.
* @param aztecNode - The Aztec node to query.
Expand All @@ -44,7 +64,9 @@ export function getAllPrivateLogsByTags(
tags: SiloedTag[],
anchorBlockHash: BlockHash,
): Promise<TxScopedL2Log[][]> {
return getAllPages(tags.length, page => aztecNode.getPrivateLogsByTags(tags, page, anchorBlockHash));
return getAllPagesInBatches(tags, batch =>
getAllPages(batch.length, page => aztecNode.getPrivateLogsByTags(batch, page, anchorBlockHash)),
);
}

/**
Expand All @@ -62,7 +84,9 @@ export function getAllPublicLogsByTagsFromContract(
tags: Tag[],
anchorBlockHash: BlockHash,
): Promise<TxScopedL2Log[][]> {
return getAllPages(tags.length, page =>
aztecNode.getPublicLogsByTagsFromContract(contractAddress, tags, page, anchorBlockHash),
return getAllPagesInBatches(tags, batch =>
getAllPages(batch.length, page =>
aztecNode.getPublicLogsByTagsFromContract(contractAddress, batch, page, anchorBlockHash),
),
);
}
Loading