From e39206c3549d0f0916220390383e8be138f18d67 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:24:07 +0000 Subject: [PATCH 1/7] init --- yarn-project/kv-store/src/interfaces/map.ts | 8 + yarn-project/kv-store/src/interfaces/store.ts | 9 +- yarn-project/kv-store/src/lmdb/map.test.ts | 23 ++ yarn-project/kv-store/src/lmdb/map.ts | 85 +++++-- yarn-project/kv-store/src/lmdb/store.ts | 13 +- .../attestation_pool/attestation_pool.ts | 10 + .../attestation_pool_test_suite.ts | 207 +++++++++++++++++ .../kv_attestation_pool.test.ts | 19 ++ .../attestation_pool/kv_attestation_pool.ts | 209 ++++++++++++++++++ .../memory_attestation_pool.test.ts | 202 +---------------- .../reqresp/reqresp.integration.test.ts | 1 + 11 files changed, 572 insertions(+), 214 deletions(-) create mode 100644 yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts create mode 100644 yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts create mode 100644 yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts diff --git a/yarn-project/kv-store/src/interfaces/map.ts b/yarn-project/kv-store/src/interfaces/map.ts index 6ded76080db4..e85b0ba7908d 100644 --- a/yarn-project/kv-store/src/interfaces/map.ts +++ b/yarn-project/kv-store/src/interfaces/map.ts @@ -82,6 +82,14 @@ export interface AztecMultiMap extends AztecMap { deleteValue(key: K, val: V): Promise; } +export interface AztecMultiMapWithSize extends AztecMultiMap { + /** + * Gets the size of the map. + * @returns The size of the map + */ + size(): number; +} + /** * A map backed by a persistent store. */ diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index 81c4d956bed2..48e0fa91a27b 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,7 +1,7 @@ import { type AztecArray, type AztecAsyncArray } from './array.js'; import { type Key } from './common.js'; import { type AztecAsyncCounter, type AztecCounter } from './counter.js'; -import { type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; +import { AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; import { type AztecAsyncSet, type AztecSet } from './set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from './singleton.js'; @@ -29,6 +29,13 @@ export interface AztecKVStore { */ openMultiMap(name: string): AztecMultiMap; + /** + * Creates a new multi-map with size. + * @param name - The name of the multi-map + * @returns The multi-map + */ + openMultiMapWithSize(name: string): AztecMultiMapWithSize; + /** * Creates a new array. * @param name - The name of the array diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 224750df8e91..c691ca0041d0 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -6,3 +6,26 @@ describe('LMDBMap', () => { describeAztecMap('Async AztecMap', () => Promise.resolve(openTmpStore(true)), true); }); + +// TODO: add tests for the maps with size + +// describe('LmdbAztecMultiMapWithSize', () => { +// let db: Database; +// let map: LmdbAztecMultiMapWithSize; + +// beforeEach(() => { +// db = open({ dupSort: true } as any); +// map = new LmdbAztecMultiMapWithSize(db, 'test'); +// }); + +// it('should be able to delete values', async () => { +// await map.set('foo', 'bar'); +// await map.set('foo', 'baz'); + +// expect(map.size()).toEqual(2); + +// await map.deleteValue('foo', 'bar'); + +// expect(map.size()).toEqual(1); +// expect(map.get('foo')).toEqual('baz'); +// }); \ No newline at end of file diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index 38d87cf9c6b0..d3f4894bb837 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,7 +1,7 @@ import { type Database, type RangeOptions } from 'lmdb'; import { type Key, type Range } from '../interfaces/common.js'; -import { type AztecAsyncMultiMap, type AztecMultiMap } from '../interfaces/map.js'; +import { type AztecAsyncMultiMap, AztecMultiMapWithSize, type AztecMultiMap } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ type MapValueSlot = ['map', string, 'slot', K]; @@ -32,7 +32,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } get(key: K): V | undefined { - return this.db.get(this.#slot(key))?.[1]; + return this.db.get(this.slot(key))?.[1]; } getAsync(key: K): Promise { @@ -40,7 +40,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } *getValues(key: K): IterableIterator { - const values = this.db.getValues(this.#slot(key)); + const values = this.db.getValues(this.slot(key)); for (const value of values) { yield value?.[1]; } @@ -53,7 +53,7 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } has(key: K): boolean { - return this.db.doesExist(this.#slot(key)); + return this.db.doesExist(this.slot(key)); } hasAsync(key: K): Promise { @@ -61,30 +61,30 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } async set(key: K, val: V): Promise { - await this.db.put(this.#slot(key), [key, val]); + await this.db.put(this.slot(key), [key, val]); } swap(key: K, fn: (val: V | undefined) => V): Promise { return this.db.childTransaction(() => { - const slot = this.#slot(key); + const slot = this.slot(key); const entry = this.db.get(slot); void this.db.put(slot, [key, fn(entry?.[1])]); }); } setIfNotExists(key: K, val: V): Promise { - const slot = this.#slot(key); + const slot = this.slot(key); return this.db.ifNoExists(slot, () => { void this.db.put(slot, [key, val]); }); } async delete(key: K): Promise { - await this.db.remove(this.#slot(key)); + await this.db.remove(this.slot(key)); } async deleteValue(key: K, val: V): Promise { - await this.db.remove(this.#slot(key), [key, val]); + await this.db.remove(this.slot(key), [key, val]); } *entries(range: Range = {}): IterableIterator<[K, V]> { @@ -93,18 +93,18 @@ export class LmdbAztecMap implements AztecMultiMap, Azte // in that case, we need to swap the start and end sentinels const start = reverse ? range.end - ? this.#slot(range.end) + ? this.slot(range.end) : this.#endSentinel : range.start - ? this.#slot(range.start) + ? this.slot(range.start) : this.#startSentinel; const end = reverse ? range.start - ? this.#slot(range.start) + ? this.slot(range.start) : this.#startSentinel : range.end - ? this.#slot(range.end) + ? this.slot(range.end) : this.#endSentinel; const lmdbRange: RangeOptions = { @@ -153,7 +153,64 @@ export class LmdbAztecMap implements AztecMultiMap, Azte } } - #slot(key: K): MapValueSlot { + protected slot(key: K): MapValueSlot { return ['map', this.name, 'slot', key]; } + +} + +export class LmdbAztecMultiMapWithSize extends LmdbAztecMap implements AztecMultiMapWithSize { + #sizeCache?: number; + + constructor(rootDb: Database, mapName: string) { + super(rootDb, mapName); + } + + override async set(key: K, val: V): Promise { + await this.db.childTransaction(() => { + const exists = this.db.doesExist(this.slot(key)); + this.db.putSync(this.slot(key), [key, val], { + appendDup: true + }); + if (!exists) { + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + override async delete(key: K): Promise { + await this.db.childTransaction(async () => { + const exists = this.db.doesExist(this.slot(key)); + if (exists) { + await this.db.remove(this.slot(key)); + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + override async deleteValue(key: K, val: V): Promise { + await this.db.childTransaction(async () => { + const exists = this.db.doesExist(this.slot(key)); + if (exists) { + await this.db.remove(this.slot(key), [key, val]); + this.#sizeCache = undefined; // Invalidate cache + } + }); + } + + /** + * Gets the size of the map by counting entries. + * @returns The number of entries in the map + */ + size(): number { + if (this.#sizeCache === undefined) { + this.#sizeCache = this.db.getCount(); + } + return this.#sizeCache; + } + + // Reset cache on clear/drop operations + clearCache() { + this.#sizeCache = undefined; + } } diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index f0f453a98ad6..8860b1c87c6f 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -9,13 +9,13 @@ import { join } from 'path'; import { type AztecArray, type AztecAsyncArray } from '../interfaces/array.js'; import { type Key } from '../interfaces/common.js'; import { type AztecAsyncCounter, type AztecCounter } from '../interfaces/counter.js'; -import { type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; +import { AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; import { type AztecAsyncSet, type AztecSet } from '../interfaces/set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from '../interfaces/singleton.js'; import { type AztecAsyncKVStore, type AztecKVStore } from '../interfaces/store.js'; import { LmdbAztecArray } from './array.js'; import { LmdbAztecCounter } from './counter.js'; -import { LmdbAztecMap } from './map.js'; +import { LmdbAztecMap, LmdbAztecMultiMapWithSize } from './map.js'; import { LmdbAztecSet } from './set.js'; import { LmdbAztecSingleton } from './singleton.js'; @@ -118,6 +118,15 @@ export class AztecLmdbStore implements AztecKVStore, AztecAsyncKVStore { openCounter(name: string): AztecCounter & AztecAsyncCounter { return new LmdbAztecCounter(this.#data, name); } + /** + * Creates a new AztecMultiMapWithSize in the store. A multi-map with size stores multiple values for a single key automatically. + * @param name - Name of the map + * @returns A new AztecMultiMapWithSize + */ + openMultiMapWithSize(name: string): AztecMultiMapWithSize { + return new LmdbAztecMultiMapWithSize(this.#multiMapData, name); + } + /** * Creates a new AztecArray in the store. diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts index bb7ecb5b7046..d7f3e434d095 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts @@ -39,6 +39,16 @@ export interface AttestationPool { */ deleteAttestationsForSlot(slot: bigint): Promise; + /** + * Delete Attestations for slot and proposal + * + * Removes all attestations associated with a slot and proposal + * + * @param slot - The slot to delete. + * @param proposalId - The proposal to delete. + */ + deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise; + /** * Get Attestations for slot * diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts new file mode 100644 index 000000000000..0e4d243cd30d --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -0,0 +1,207 @@ +import { type BlockAttestation, TxHash } from '@aztec/circuit-types'; +import { Secp256k1Signer } from '@aztec/foundation/crypto'; +import { Fr } from '@aztec/foundation/fields'; + +import { jest } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { type PoolInstrumentation } from '../instrumentation.js'; +import { mockAttestation } from './mocks.js'; +import { AttestationPool } from './attestation_pool.js'; + +const NUMBER_OF_SIGNERS_PER_TEST = 4; + +export function describeAttestationPool(getAttestationPool: () => AttestationPool) { + let ap: AttestationPool; + let signers: Secp256k1Signer[]; + + // Check that metrics are recorded correctly + let metricsMock: MockProxy>; + + beforeEach(() => { + + ap = getAttestationPool(); + signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, () => Secp256k1Signer.random()); + + metricsMock = mock>(); + // Can i overwrite this like this?? + (ap as any).metrics = metricsMock; + }); + + const createAttestationsForSlot = (slotNumber: number) => { + const archive = Fr.random(); + return signers.map(signer => mockAttestation(signer, slotNumber, archive)); + }; + + it('should add attestations to pool', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + + await ap.addAttestations(attestations); + + console.log('add passes'); + + // Check metrics have been updated. + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + console.log('get passes'); + + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + expect(retreivedAttestations).toEqual(attestations); + + // Delete by slot + await ap.deleteAttestationsForSlot(BigInt(slotNumber)); + + console.log('delete passes'); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + }); + + it('Should handle duplicate proposals in a slot', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const txs = [0, 1, 2, 3, 4, 5].map(() => TxHash.random()); + + // Use the same signer for all attestations + const attestations: BlockAttestation[] = []; + const signer = signers[0]; + for (let i = 0; i < NUMBER_OF_SIGNERS_PER_TEST; i++) { + attestations.push(mockAttestation(signer, slotNumber, archive, txs)); + } + + await ap.addAttestations(attestations); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0]).toEqual(attestations[0]); + expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); + expect(retreivedAttestations[0].getSender().toString()).toEqual(signer.address.toString()); + }); + + it('Should store attestations by differing slot', async () => { + const slotNumbers = [1, 2, 3, 4]; + const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i])); + + await ap.addAttestations(attestations); + + for (const attestation of attestations) { + const slot = attestation.payload.header.globalVariables.slotNumber; + const archive = attestation.archive.toString(); + + const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0]).toEqual(attestation); + expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); + } + }); + + it('Should store attestations by differing slot and archive', async () => { + const slotNumbers = [1, 2, 3, 4]; + const archives = [Fr.random(), Fr.random(), Fr.random(), Fr.random()]; + const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i], archives[i])); + + await ap.addAttestations(attestations); + + for (const attestation of attestations) { + const slot = attestation.payload.header.globalVariables.slotNumber; + const proposalId = attestation.archive.toString(); + + const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); + expect(retreivedAttestations.length).toBe(1); + expect(retreivedAttestations[0]).toEqual(attestation); + expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); + } + }); + + it('Should delete attestations', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + expect(retreivedAttestations).toEqual(attestations); + + await ap.deleteAttestations(attestations); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(gottenAfterDelete.length).toBe(0); + }); + + it('Should blanket delete attestations per slot', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive))); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + expect(retreivedAttestations).toEqual(attestations); + + await ap.deleteAttestationsForSlot(BigInt(slotNumber)); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + }); + + it('Should blanket delete attestations per slot and proposal', async () => { + const slotNumber = 420; + const archive = Fr.random(); + const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + expect(retreivedAttestations).toEqual(attestations); + + await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); + + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestationsAfterDelete.length).toBe(0); + }); + + it('Should delete attestations older than a given slot', async () => { + const slotNumbers = [1, 2, 3, 69, 72, 74, 88, 420]; + const attestations = slotNumbers.map(slotNumber => createAttestationsForSlot(slotNumber)).flat(); + const proposalId = attestations[0].archive.toString(); + + await ap.addAttestations(attestations); + + const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1.length).toBe(signers.length); + + const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); + + await ap.deleteAttestationsOlderThan(BigInt(73)); + + const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); + expect(attestationsForSlot1AfterDelete.length).toBe(0); + + expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(1)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(2)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(3)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(69)); + expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(72)); + }); +} diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts new file mode 100644 index 000000000000..77232fbcb74a --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts @@ -0,0 +1,19 @@ + +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; + +import { describeAttestationPool } from './attestation_pool_test_suite.js'; +import { KvAttestationPool } from './kv_attestation_pool.js'; +import { AztecKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/utils'; + +describe('KV Attestation Pool', () => { + let kvAttestationPool: KvAttestationPool; + let store: AztecKVStore; + + beforeEach(() => { + store = openTmpStore(); + kvAttestationPool = new KvAttestationPool(store, new NoopTelemetryClient()); + }); + + describeAttestationPool(() => kvAttestationPool); +}); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts new file mode 100644 index 000000000000..5a4f9f9dbbf8 --- /dev/null +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -0,0 +1,209 @@ +import { type BlockAttestation } from '@aztec/circuit-types'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { type TelemetryClient } from '@aztec/telemetry-client'; + +import { PoolInstrumentation, PoolName } from '../instrumentation.js'; +import { type AttestationPool } from './attestation_pool.js'; +import { AztecKVStore, AztecMap, AztecMultiMapWithSize } from '@aztec/kv-store'; + +export class KvAttestationPool implements AttestationPool { + private metrics: PoolInstrumentation; + + + // TODO: fix all of this up - this is a mess + private attestations: AztecMap; // Store slot -> slotMapKey + private slotMaps: Map>; // Cache of slot maps + private proposalMaps: Map>; // Cache of proposal maps + + constructor(private store: AztecKVStore, telemetry: TelemetryClient, private log = createDebugLogger('aztec:attestation_pool')) { + this.attestations = store.openMap('attestations'); + this.slotMaps = new Map(); + this.proposalMaps = new Map(); + this.metrics = new PoolInstrumentation(telemetry, PoolName.ATTESTATION_POOL); + } + + private getSlotMapKey(slot: string): string { + return `slot-${slot}`; + } + + private getProposalMapKey(proposalId: string): string { + return `proposal-${proposalId}`; + } + + private getSlotMap(slot: string): AztecMultiMapWithSize { + const mapKey = this.getSlotMapKey(slot); + if (!this.slotMaps.has(mapKey)) { + this.slotMaps.set(mapKey, this.store.openMultiMapWithSize(mapKey)); + } + return this.slotMaps.get(mapKey)!; + } + + private getProposalMap(proposalId: string): AztecMultiMapWithSize { + const mapKey = this.getProposalMapKey(proposalId); + if (!this.proposalMaps.has(mapKey)) { + this.proposalMaps.set(mapKey, this.store.openMultiMapWithSize(mapKey)); + } + return this.proposalMaps.get(mapKey)!; + } + + public async addAttestations(attestations: BlockAttestation[]): Promise { + for (const attestation of attestations) { + const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); + const proposalId = attestation.archive.toString(); + const address = attestation.getSender().toString(); + + // Get or create slot map + const slotMapKey = this.getSlotMapKey(slotNumber); + if (!this.attestations.has(slotNumber)) { + await this.attestations.set(slotNumber, slotMapKey); + } + + // Get slot map and store proposal reference + const slotMap = this.getSlotMap(slotNumber); + const proposalMapKey = this.getProposalMapKey(proposalId); + await slotMap.set(proposalId, proposalMapKey); + + // Store the actual attestation in the proposal map + const proposalMap = this.getProposalMap(proposalId); + await proposalMap.set(address, attestation); + + this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`); + } + + this.metrics.recordAddedObjects(attestations.length); + } + + public async getAttestationsForSlot(slot: bigint, proposalId: string): Promise { + const slotString = this.getSlotMapKey(slot.toString()); + if (!this.attestations.has(slotString)) { + return []; + } + + const slotMap = this.getSlotMap(slotString); + if (!slotMap.has(proposalId)) { + return []; + } + + const proposalMap = this.getProposalMap(proposalId); + return Array.from(proposalMap.values()); + } + + #getNumberOfAttestationsInSlot(slot: bigint): number { + let total = 0; + const slotMap = this.getSlotMap(slot.toString()); + + if (slotMap) { + for (const proposalAttestationMapName of slotMap.values() ?? []) { + const proposalMap = this.getProposalMap(proposalAttestationMapName); + total += proposalMap.size(); + } + } + return total; + } + + public async deleteAttestationsOlderThan(oldestSlot: bigint): Promise { + const olderThan = []; + + const slots = this.attestations.keys(); + for (const slot of slots) { + if (BigInt(slot) < oldestSlot) { + olderThan.push(slot); + } else { + break; + } + } + + for (const oldSlot of olderThan) { + await this.deleteAttestationsForSlot(BigInt(oldSlot)); + } + return Promise.resolve(); + } + + public deleteAttestationsForSlot(slot: bigint): Promise { + const numberOfAttestations = this.#getNumberOfAttestationsInSlot(slot); + this.attestations.delete(this.getSlotMapKey(slot.toString())); + + // TODO: delete from store + + this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot}`); + this.metrics.recordRemovedObjects(numberOfAttestations); + return Promise.resolve(); + } + + public async deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise { + const slotAttestationMap = await getSlotOrDefault(this.store, this.attestations, this.getSlotKey(slot)); + if (slotAttestationMap) { + if (slotAttestationMap.has(proposalId)) { + const numberOfAttestations = slotAttestationMap.get(proposalId)?.size() ?? 0; + + slotAttestationMap.delete(proposalId); + + this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot} and proposal ${proposalId}`); + this.metrics.recordRemovedObjects(numberOfAttestations); + } + } + return Promise.resolve(); + } + + public async deleteAttestations(attestations: BlockAttestation[]): Promise { + for (const attestation of attestations) { + const slotNumber = attestation.payload.header.globalVariables.slotNumber; + const slotAttestationMap = (slotNumber.toString()); + if (slotAttestationMap) { + const proposalId = attestation.archive.toString(); + const proposalAttestationMap = await getProposalOrDefault(this.store, slotAttestationMap, proposalId); + if (proposalAttestationMap) { + const address = attestation.getSender(); + proposalAttestationMap.delete(address.toString()); + this.log.debug(`Deleted attestation for slot ${slotNumber} from ${address}`); + } + } + } + this.metrics.recordRemovedObjects(attestations.length); + return Promise.resolve(); + } +} + +/** + * Get Slot or Default + * + * Fetch the slot mapping, if it does not exist, then create a mapping and return it + * @param store - The store to fetch from + * @param map - The map to fetch from + * @param slot - The slot to fetch + * @returns The slot mapping + */ +async function getSlotOrDefault( + store: AztecKVStore, + map: AztecMap>>, + slot: string, +): Promise>> { + if (!map.has(slot)) { + const newMap = store.openMultiMapWithSize>(`slot-${slot}`); + await map.set(slot, newMap); + return newMap; + } + return map.get(slot)!; +} + +/** + * Get Proposal or Default + * + * Fetch the proposal mapping, if it does not exist, then create a mapping and return it + * @param store - The store to fetch from + * @param map - The map to fetch from + * @param proposalId - The proposal id to fetch + * @returns The proposal mapping + */ +async function getProposalOrDefault( + store: AztecKVStore, + map: AztecMultiMapWithSize>, + proposalId: string, +): Promise> { + if (!map.has(proposalId)) { + const newMap = store.openMultiMapWithSize(`proposal-${proposalId}`); + await map.set(proposalId, newMap); + return newMap; + } + return map.get(proposalId)!; +} diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts index ef80dad21ec8..b76090ee116a 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts @@ -1,205 +1,13 @@ -import { type BlockAttestation, TxHash } from '@aztec/circuit-types'; -import { Secp256k1Signer } from '@aztec/foundation/crypto'; -import { Fr } from '@aztec/foundation/fields'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; -import { jest } from '@jest/globals'; -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { type PoolInstrumentation } from '../instrumentation.js'; import { InMemoryAttestationPool } from './memory_attestation_pool.js'; -import { mockAttestation } from './mocks.js'; - -const NUMBER_OF_SIGNERS_PER_TEST = 4; - -describe('MemoryAttestationPool', () => { - let ap: InMemoryAttestationPool; - let signers: Secp256k1Signer[]; - const telemetry = new NoopTelemetryClient(); - - // Check that metrics are recorded correctly - let metricsMock: MockProxy>; +import { describeAttestationPool } from './attestation_pool_test_suite.js'; +describe('In-Memory Attestation Pool', () => { + let inMemoryAttestationPool: InMemoryAttestationPool; beforeEach(() => { - // Use noop telemetry client while testing. - - ap = new InMemoryAttestationPool(telemetry); - signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, () => Secp256k1Signer.random()); - - metricsMock = mock>(); - // Can i overwrite this like this?? - (ap as any).metrics = metricsMock; - }); - - const createAttestationsForSlot = (slotNumber: number) => { - const archive = Fr.random(); - return signers.map(signer => mockAttestation(signer, slotNumber, archive)); - }; - - it('should add attestations to pool', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - - await ap.addAttestations(attestations); - - // Check metrics have been updated. - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - // Delete by slot - await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - expect(retreivedAttestationsAfterDelete.length).toBe(0); - }); - - it('Should handle duplicate proposals in a slot', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const txs = [0, 1, 2, 3, 4, 5].map(() => TxHash.random()); - - // Use the same signer for all attestations - const attestations: BlockAttestation[] = []; - const signer = signers[0]; - for (let i = 0; i < NUMBER_OF_SIGNERS_PER_TEST; i++) { - attestations.push(mockAttestation(signer, slotNumber, archive, txs)); - } - - await ap.addAttestations(attestations); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestations[0]); - expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); - expect(retreivedAttestations[0].getSender().toString()).toEqual(signer.address.toString()); - }); - - it('Should store attestations by differing slot', async () => { - const slotNumbers = [1, 2, 3, 4]; - const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i])); - - await ap.addAttestations(attestations); - - for (const attestation of attestations) { - const slot = attestation.payload.header.globalVariables.slotNumber; - const archive = attestation.archive.toString(); - - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); - expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); - } - }); - - it('Should store attestations by differing slot and archive', async () => { - const slotNumbers = [1, 2, 3, 4]; - const archives = [Fr.random(), Fr.random(), Fr.random(), Fr.random()]; - const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i], archives[i])); - - await ap.addAttestations(attestations); - - for (const attestation of attestations) { - const slot = attestation.payload.header.globalVariables.slotNumber; - const proposalId = attestation.archive.toString(); - - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); - expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); - expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); - } - }); - - it('Should delete attestations', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestations(attestations); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(gottenAfterDelete.length).toBe(0); - }); - - it('Should blanket delete attestations per slot', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive))); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestationsAfterDelete.length).toBe(0); + inMemoryAttestationPool = new InMemoryAttestationPool(new NoopTelemetryClient()); }); - it('Should blanket delete attestations per slot and proposal', async () => { - const slotNumber = 420; - const archive = Fr.random(); - const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); - - await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); - - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); - expect(retreivedAttestationsAfterDelete.length).toBe(0); - }); - - it('Should delete attestations older than a given slot', async () => { - const slotNumbers = [1, 2, 3, 69, 72, 74, 88, 420]; - const attestations = slotNumbers.map(slotNumber => createAttestationsForSlot(slotNumber)).flat(); - const proposalId = attestations[0].archive.toString(); - - await ap.addAttestations(attestations); - - const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); - expect(attestationsForSlot1.length).toBe(signers.length); - - const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); - - await ap.deleteAttestationsOlderThan(BigInt(73)); - - const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); - expect(attestationsForSlot1AfterDelete.length).toBe(0); - - expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(1)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(2)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(3)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(69)); - expect(deleteAttestationsSpy).toHaveBeenCalledWith(BigInt(72)); - }); + describeAttestationPool(() => inMemoryAttestationPool); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts index 6b11f5b01c63..d94075e288cf 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts @@ -69,6 +69,7 @@ const makeMockPools = () => { deleteAttestations: jest.fn(), deleteAttestationsForSlot: jest.fn(), deleteAttestationsOlderThan: jest.fn(), + deleteAttestationsForSlotAndProposal: jest.fn(), getAttestationsForSlot: jest.fn().mockReturnValue(undefined), }, epochProofQuotePool: { From 5fa6b38edad0f5527f33c33391ad942b3290ccbe Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:03:20 +0000 Subject: [PATCH 2/7] feat: kv attestation pool --- yarn-project/kv-store/src/interfaces/map.ts | 13 + yarn-project/kv-store/src/interfaces/store.ts | 9 +- yarn-project/kv-store/src/lmdb/map.ts | 19 +- yarn-project/kv-store/src/lmdb/store.ts | 14 +- .../attestation_pool_test_suite.ts | 33 +-- .../kv_attestation_pool.test.ts | 5 +- .../attestation_pool/kv_attestation_pool.ts | 236 +++++++----------- .../memory_attestation_pool.test.ts | 2 +- 8 files changed, 162 insertions(+), 169 deletions(-) diff --git a/yarn-project/kv-store/src/interfaces/map.ts b/yarn-project/kv-store/src/interfaces/map.ts index e85b0ba7908d..f63505dae9fc 100644 --- a/yarn-project/kv-store/src/interfaces/map.ts +++ b/yarn-project/kv-store/src/interfaces/map.ts @@ -62,6 +62,19 @@ export interface AztecMap extends AztecBaseMap { * @param range - The range of keys to iterate over */ keys(range?: Range): IterableIterator; + + /** + * Clears the map. + */ + clear(): Promise; +} + +export interface AztecMapWithSize extends AztecMap { + /** + * Gets the size of the map. + * @returns The size of the map + */ + size(): number; } /** diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index 48e0fa91a27b..56e8f0a7a017 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,7 +1,7 @@ import { type AztecArray, type AztecAsyncArray } from './array.js'; import { type Key } from './common.js'; import { type AztecAsyncCounter, type AztecCounter } from './counter.js'; -import { AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; +import { type AztecMapWithSize, type AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; import { type AztecAsyncSet, type AztecSet } from './set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from './singleton.js'; @@ -36,6 +36,13 @@ export interface AztecKVStore { */ openMultiMapWithSize(name: string): AztecMultiMapWithSize; + /** + * Creates a new map with size. + * @param name - The name of the map + * @returns The map + */ + openMapWithSize(name: string): AztecMapWithSize; + /** * Creates a new array. * @param name - The name of the array diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index d3f4894bb837..a65c3389212d 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,7 +1,7 @@ import { type Database, type RangeOptions } from 'lmdb'; import { type Key, type Range } from '../interfaces/common.js'; -import { type AztecAsyncMultiMap, AztecMultiMapWithSize, type AztecMultiMap } from '../interfaces/map.js'; +import { type AztecAsyncMultiMap, type AztecMultiMap, type AztecMapWithSize } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ type MapValueSlot = ['map', string, 'slot', K]; @@ -157,9 +157,24 @@ export class LmdbAztecMap implements AztecMultiMap, Azte return ['map', this.name, 'slot', key]; } + async clear(): Promise { + const lmdbRange: RangeOptions = { + start: this.#startSentinel, + end: this.#endSentinel, + }; + + const iterator = this.db.getRange(lmdbRange); + + for (const { + key, + } of iterator) { + await this.db.remove(key); + } + } + } -export class LmdbAztecMultiMapWithSize extends LmdbAztecMap implements AztecMultiMapWithSize { +export class LmdbAztecMapWithSize extends LmdbAztecMap implements AztecMapWithSize, AztecAsyncMultiMap { #sizeCache?: number; constructor(rootDb: Database, mapName: string) { diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index 8860b1c87c6f..3240c76d95ee 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -9,13 +9,13 @@ import { join } from 'path'; import { type AztecArray, type AztecAsyncArray } from '../interfaces/array.js'; import { type Key } from '../interfaces/common.js'; import { type AztecAsyncCounter, type AztecCounter } from '../interfaces/counter.js'; -import { AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; +import { type AztecMapWithSize, type AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; import { type AztecAsyncSet, type AztecSet } from '../interfaces/set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from '../interfaces/singleton.js'; import { type AztecAsyncKVStore, type AztecKVStore } from '../interfaces/store.js'; import { LmdbAztecArray } from './array.js'; import { LmdbAztecCounter } from './counter.js'; -import { LmdbAztecMap, LmdbAztecMultiMapWithSize } from './map.js'; +import { LmdbAztecMap, LmdbAztecMapWithSize } from './map.js'; import { LmdbAztecSet } from './set.js'; import { LmdbAztecSingleton } from './singleton.js'; @@ -124,9 +124,17 @@ export class AztecLmdbStore implements AztecKVStore, AztecAsyncKVStore { * @returns A new AztecMultiMapWithSize */ openMultiMapWithSize(name: string): AztecMultiMapWithSize { - return new LmdbAztecMultiMapWithSize(this.#multiMapData, name); + return new LmdbAztecMapWithSize(this.#multiMapData, name); } + /** + * Creates a new AztecMapWithSize in the store. + * @param name - Name of the map + * @returns A new AztecMapWithSize + */ + openMapWithSize(name: string): AztecMapWithSize { + return new LmdbAztecMapWithSize(this.#data, name); + } /** * Creates a new AztecArray in the store. diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts index 0e4d243cd30d..55a2d8236aec 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -6,8 +6,8 @@ import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import { type PoolInstrumentation } from '../instrumentation.js'; +import { type AttestationPool } from './attestation_pool.js'; import { mockAttestation } from './mocks.js'; -import { AttestationPool } from './attestation_pool.js'; const NUMBER_OF_SIGNERS_PER_TEST = 4; @@ -19,7 +19,6 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo let metricsMock: MockProxy>; beforeEach(() => { - ap = getAttestationPool(); signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, () => Secp256k1Signer.random()); @@ -33,6 +32,16 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo return signers.map(signer => mockAttestation(signer, slotNumber, archive)); }; + // We compare buffers as the objects can have cached values attached to them which are not serialised + // using array containing as the kv store does not respect insertion order + // TODO(md): should i make a version which respects insertion order? + const compareAttestations = (a1: BlockAttestation[], a2: BlockAttestation[]) => { + const a1Buffer = a1.map(attestation => attestation.toBuffer()); + const a2Buffer = a2.map(attestation => attestation.toBuffer()); + expect(a1Buffer.length).toBe(a2Buffer.length); + expect(a1Buffer).toEqual(expect.arrayContaining(a2Buffer)); + }; + it('should add attestations to pool', async () => { const slotNumber = 420; const archive = Fr.random(); @@ -40,22 +49,18 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo await ap.addAttestations(attestations); - console.log('add passes'); - // Check metrics have been updated. expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); - console.log('get passes'); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); + + compareAttestations(retreivedAttestations, attestations); // Delete by slot await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - console.log('delete passes'); - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); @@ -78,7 +83,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestations[0]); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestations[0].toBuffer()); expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); expect(retreivedAttestations[0].getSender().toString()).toEqual(signer.address.toString()); }); @@ -95,7 +100,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); } }); @@ -113,7 +118,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); expect(retreivedAttestations.length).toBe(1); - expect(retreivedAttestations[0]).toEqual(attestation); + expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); } }); @@ -130,7 +135,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); + compareAttestations(retreivedAttestations, attestations); await ap.deleteAttestations(attestations); @@ -150,7 +155,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); + compareAttestations(retreivedAttestations, attestations); await ap.deleteAttestationsForSlot(BigInt(slotNumber)); @@ -170,7 +175,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); - expect(retreivedAttestations).toEqual(attestations); + compareAttestations(retreivedAttestations, attestations); await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts index 77232fbcb74a..2832694784ce 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.test.ts @@ -1,10 +1,9 @@ - +import { type AztecKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/lmdb'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { describeAttestationPool } from './attestation_pool_test_suite.js'; import { KvAttestationPool } from './kv_attestation_pool.js'; -import { AztecKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/utils'; describe('KV Attestation Pool', () => { let kvAttestationPool: KvAttestationPool; diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts index 5a4f9f9dbbf8..63c7049fab0e 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -1,104 +1,70 @@ -import { type BlockAttestation } from '@aztec/circuit-types'; -import { createDebugLogger } from '@aztec/foundation/log'; +import { BlockAttestation } from '@aztec/circuit-types'; +import { Fr } from '@aztec/foundation/fields'; +import { createLogger } from '@aztec/foundation/log'; +import { type AztecKVStore, type AztecMapWithSize, type AztecMultiMap } from '@aztec/kv-store'; import { type TelemetryClient } from '@aztec/telemetry-client'; import { PoolInstrumentation, PoolName } from '../instrumentation.js'; import { type AttestationPool } from './attestation_pool.js'; -import { AztecKVStore, AztecMap, AztecMultiMapWithSize } from '@aztec/kv-store'; export class KvAttestationPool implements AttestationPool { private metrics: PoolInstrumentation; + // Index of all proposal ids in a slot + private attestations: AztecMultiMap; - // TODO: fix all of this up - this is a mess - private attestations: AztecMap; // Store slot -> slotMapKey - private slotMaps: Map>; // Cache of slot maps - private proposalMaps: Map>; // Cache of proposal maps - - constructor(private store: AztecKVStore, telemetry: TelemetryClient, private log = createDebugLogger('aztec:attestation_pool')) { - this.attestations = store.openMap('attestations'); - this.slotMaps = new Map(); - this.proposalMaps = new Map(); + constructor( + private store: AztecKVStore, + telemetry: TelemetryClient, + private log = createLogger('aztec:attestation_pool'), + ) { + this.attestations = store.openMultiMap('attestations'); this.metrics = new PoolInstrumentation(telemetry, PoolName.ATTESTATION_POOL); } - private getSlotMapKey(slot: string): string { - return `slot-${slot}`; - } - - private getProposalMapKey(proposalId: string): string { - return `proposal-${proposalId}`; + private getProposalMapKey(slot: string, proposalId: string): string { + return `proposal-${slot}-${proposalId}`; } - private getSlotMap(slot: string): AztecMultiMapWithSize { - const mapKey = this.getSlotMapKey(slot); - if (!this.slotMaps.has(mapKey)) { - this.slotMaps.set(mapKey, this.store.openMultiMapWithSize(mapKey)); - } - return this.slotMaps.get(mapKey)!; - } - - private getProposalMap(proposalId: string): AztecMultiMapWithSize { - const mapKey = this.getProposalMapKey(proposalId); - if (!this.proposalMaps.has(mapKey)) { - this.proposalMaps.set(mapKey, this.store.openMultiMapWithSize(mapKey)); - } - return this.proposalMaps.get(mapKey)!; + /** + * Get the proposal map for a given slot and proposalId + * + * Essentially a nested mapping of address -> attestation + * + * @param slot - The slot to get the proposal map for + * @param proposalId - The proposalId to get the map for + * @returns The proposal map + */ + private getProposalMap(slot: string, proposalId: string): AztecMapWithSize { + const mapKey = this.getProposalMapKey(slot, proposalId); + return this.store.openMapWithSize(mapKey); } public async addAttestations(attestations: BlockAttestation[]): Promise { - for (const attestation of attestations) { - const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); - const proposalId = attestation.archive.toString(); - const address = attestation.getSender().toString(); - - // Get or create slot map - const slotMapKey = this.getSlotMapKey(slotNumber); - if (!this.attestations.has(slotNumber)) { - await this.attestations.set(slotNumber, slotMapKey); - } - - // Get slot map and store proposal reference - const slotMap = this.getSlotMap(slotNumber); - const proposalMapKey = this.getProposalMapKey(proposalId); - await slotMap.set(proposalId, proposalMapKey); - - // Store the actual attestation in the proposal map - const proposalMap = this.getProposalMap(proposalId); - await proposalMap.set(address, attestation); - - this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`); - } + for (const attestation of attestations) { + const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); + const proposalId = attestation.archive.toString(); + const address = attestation.getSender().toString(); - this.metrics.recordAddedObjects(attestations.length); - } + // Index the proposalId in the slot map + await this.attestations.set(slotNumber, proposalId); - public async getAttestationsForSlot(slot: bigint, proposalId: string): Promise { - const slotString = this.getSlotMapKey(slot.toString()); - if (!this.attestations.has(slotString)) { - return []; - } + // Store the actual attestation in the proposal map + const proposalMap = this.getProposalMap(slotNumber, proposalId); + await proposalMap.set(address, attestation.toBuffer()); - const slotMap = this.getSlotMap(slotString); - if (!slotMap.has(proposalId)) { - return []; + this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`); } - const proposalMap = this.getProposalMap(proposalId); - return Array.from(proposalMap.values()); + this.metrics.recordAddedObjects(attestations.length); } - #getNumberOfAttestationsInSlot(slot: bigint): number { - let total = 0; - const slotMap = this.getSlotMap(slot.toString()); - - if (slotMap) { - for (const proposalAttestationMapName of slotMap.values() ?? []) { - const proposalMap = this.getProposalMap(proposalAttestationMapName); - total += proposalMap.size(); - } - } - return total; + public getAttestationsForSlot(slot: bigint, proposalId: string): Promise { + const slotNumber = new Fr(slot).toString(); + const proposalMap = this.getProposalMap(slotNumber, proposalId); + const attestations = proposalMap.values(); + const attestationsArray = Array.from(attestations).map(attestation => BlockAttestation.fromBuffer(attestation)); + return Promise.resolve(attestationsArray); } public async deleteAttestationsOlderThan(oldestSlot: bigint): Promise { @@ -113,17 +79,26 @@ export class KvAttestationPool implements AttestationPool { } } - for (const oldSlot of olderThan) { - await this.deleteAttestationsForSlot(BigInt(oldSlot)); - } + await Promise.all(olderThan.map(oldSlot => this.deleteAttestationsForSlot(BigInt(oldSlot)))); return Promise.resolve(); } - public deleteAttestationsForSlot(slot: bigint): Promise { - const numberOfAttestations = this.#getNumberOfAttestationsInSlot(slot); - this.attestations.delete(this.getSlotMapKey(slot.toString())); + public async deleteAttestationsForSlot(slot: bigint): Promise { + const deletionPromises = []; - // TODO: delete from store + const slotString = new Fr(slot).toString(); + let numberOfAttestations = 0; + const proposalIds = this.attestations.getValues(slotString); + + if (proposalIds) { + for (const proposalId of proposalIds) { + const proposalMap = this.getProposalMap(slotString, proposalId); + numberOfAttestations += proposalMap.size(); + deletionPromises.push(proposalMap.clear()); + } + } + + await Promise.all(deletionPromises); this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot}`); this.metrics.recordRemovedObjects(numberOfAttestations); @@ -131,79 +106,50 @@ export class KvAttestationPool implements AttestationPool { } public async deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise { - const slotAttestationMap = await getSlotOrDefault(this.store, this.attestations, this.getSlotKey(slot)); + const deletionPromises = []; + + const slotString = new Fr(slot).toString(); + const slotAttestationMap = this.attestations.get(slotString); + if (slotAttestationMap) { - if (slotAttestationMap.has(proposalId)) { - const numberOfAttestations = slotAttestationMap.get(proposalId)?.size() ?? 0; + // Remove the proposalId from the slot index + deletionPromises.push(this.attestations.deleteValue(slotString, proposalId)); - slotAttestationMap.delete(proposalId); + // Delete all attestations for the proposalId + const proposalMap = this.getProposalMap(slotString, proposalId); + const numberOfAttestations = proposalMap.size(); + deletionPromises.push(proposalMap.clear()); - this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot} and proposal ${proposalId}`); - this.metrics.recordRemovedObjects(numberOfAttestations); - } + this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot} and proposal ${proposalId}`); + this.metrics.recordRemovedObjects(numberOfAttestations); } + + await Promise.all(deletionPromises); return Promise.resolve(); } public async deleteAttestations(attestations: BlockAttestation[]): Promise { + const deletionPromises = []; + for (const attestation of attestations) { - const slotNumber = attestation.payload.header.globalVariables.slotNumber; - const slotAttestationMap = (slotNumber.toString()); - if (slotAttestationMap) { - const proposalId = attestation.archive.toString(); - const proposalAttestationMap = await getProposalOrDefault(this.store, slotAttestationMap, proposalId); - if (proposalAttestationMap) { - const address = attestation.getSender(); - proposalAttestationMap.delete(address.toString()); - this.log.debug(`Deleted attestation for slot ${slotNumber} from ${address}`); - } + const slotNumber = attestation.payload.header.globalVariables.slotNumber.toString(); + const proposalId = attestation.archive.toString(); + const proposalMap = this.getProposalMap(slotNumber, proposalId); + + if (proposalMap) { + const address = attestation.getSender().toString(); + deletionPromises.push(proposalMap.delete(address)); + this.log.debug(`Deleted attestation for slot ${slotNumber} from ${address}`); + } + + if (proposalMap.size() === 0) { + deletionPromises.push(this.attestations.deleteValue(slotNumber, proposalId)); } } - this.metrics.recordRemovedObjects(attestations.length); - return Promise.resolve(); - } -} -/** - * Get Slot or Default - * - * Fetch the slot mapping, if it does not exist, then create a mapping and return it - * @param store - The store to fetch from - * @param map - The map to fetch from - * @param slot - The slot to fetch - * @returns The slot mapping - */ -async function getSlotOrDefault( - store: AztecKVStore, - map: AztecMap>>, - slot: string, -): Promise>> { - if (!map.has(slot)) { - const newMap = store.openMultiMapWithSize>(`slot-${slot}`); - await map.set(slot, newMap); - return newMap; - } - return map.get(slot)!; -} + await Promise.all(deletionPromises); -/** - * Get Proposal or Default - * - * Fetch the proposal mapping, if it does not exist, then create a mapping and return it - * @param store - The store to fetch from - * @param map - The map to fetch from - * @param proposalId - The proposal id to fetch - * @returns The proposal mapping - */ -async function getProposalOrDefault( - store: AztecKVStore, - map: AztecMultiMapWithSize>, - proposalId: string, -): Promise> { - if (!map.has(proposalId)) { - const newMap = store.openMultiMapWithSize(`proposal-${proposalId}`); - await map.set(proposalId, newMap); - return newMap; + this.metrics.recordRemovedObjects(attestations.length); + return Promise.resolve(); } - return map.get(proposalId)!; } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts index b76090ee116a..5d2cd81b625c 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.test.ts @@ -1,7 +1,7 @@ import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; -import { InMemoryAttestationPool } from './memory_attestation_pool.js'; import { describeAttestationPool } from './attestation_pool_test_suite.js'; +import { InMemoryAttestationPool } from './memory_attestation_pool.js'; describe('In-Memory Attestation Pool', () => { let inMemoryAttestationPool: InMemoryAttestationPool; From cc51fae006a10fe2d115fa29f0502b2b6bc4770b Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:03:46 +0000 Subject: [PATCH 3/7] fmt --- yarn-project/kv-store/src/interfaces/store.ts | 9 ++++++++- yarn-project/kv-store/src/lmdb/map.test.ts | 2 +- yarn-project/kv-store/src/lmdb/map.ts | 14 +++++++------- yarn-project/kv-store/src/lmdb/store.ts | 9 ++++++++- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index 56e8f0a7a017..bee1e2e0e8a0 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,7 +1,14 @@ import { type AztecArray, type AztecAsyncArray } from './array.js'; import { type Key } from './common.js'; import { type AztecAsyncCounter, type AztecCounter } from './counter.js'; -import { type AztecMapWithSize, type AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from './map.js'; +import { + type AztecAsyncMap, + type AztecAsyncMultiMap, + type AztecMap, + type AztecMapWithSize, + type AztecMultiMap, + type AztecMultiMapWithSize, +} from './map.js'; import { type AztecAsyncSet, type AztecSet } from './set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from './singleton.js'; diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index c691ca0041d0..f6bb8fb24cbd 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -28,4 +28,4 @@ describe('LMDBMap', () => { // expect(map.size()).toEqual(1); // expect(map.get('foo')).toEqual('baz'); -// }); \ No newline at end of file +// }); diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index a65c3389212d..844d82fbbe2f 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,7 +1,7 @@ import { type Database, type RangeOptions } from 'lmdb'; import { type Key, type Range } from '../interfaces/common.js'; -import { type AztecAsyncMultiMap, type AztecMultiMap, type AztecMapWithSize } from '../interfaces/map.js'; +import { type AztecAsyncMultiMap, type AztecMapWithSize, type AztecMultiMap } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ type MapValueSlot = ['map', string, 'slot', K]; @@ -165,16 +165,16 @@ export class LmdbAztecMap implements AztecMultiMap, Azte const iterator = this.db.getRange(lmdbRange); - for (const { - key, - } of iterator) { + for (const { key } of iterator) { await this.db.remove(key); } } - } -export class LmdbAztecMapWithSize extends LmdbAztecMap implements AztecMapWithSize, AztecAsyncMultiMap { +export class LmdbAztecMapWithSize + extends LmdbAztecMap + implements AztecMapWithSize, AztecAsyncMultiMap +{ #sizeCache?: number; constructor(rootDb: Database, mapName: string) { @@ -185,7 +185,7 @@ export class LmdbAztecMapWithSize extends LmdbAztecMap i await this.db.childTransaction(() => { const exists = this.db.doesExist(this.slot(key)); this.db.putSync(this.slot(key), [key, val], { - appendDup: true + appendDup: true, }); if (!exists) { this.#sizeCache = undefined; // Invalidate cache diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index 3240c76d95ee..d78030ec373c 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -9,7 +9,14 @@ import { join } from 'path'; import { type AztecArray, type AztecAsyncArray } from '../interfaces/array.js'; import { type Key } from '../interfaces/common.js'; import { type AztecAsyncCounter, type AztecCounter } from '../interfaces/counter.js'; -import { type AztecMapWithSize, type AztecMultiMapWithSize, type AztecAsyncMap, type AztecAsyncMultiMap, type AztecMap, type AztecMultiMap } from '../interfaces/map.js'; +import { + type AztecAsyncMap, + type AztecAsyncMultiMap, + type AztecMap, + type AztecMapWithSize, + type AztecMultiMap, + type AztecMultiMapWithSize, +} from '../interfaces/map.js'; import { type AztecAsyncSet, type AztecSet } from '../interfaces/set.js'; import { type AztecAsyncSingleton, type AztecSingleton } from '../interfaces/singleton.js'; import { type AztecAsyncKVStore, type AztecKVStore } from '../interfaces/store.js'; From a6d218acafdfa096d887b4c8ad4069088f9908f3 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:15:26 +0000 Subject: [PATCH 4/7] fix: test --- yarn-project/kv-store/src/lmdb/map.test.ts | 53 +++++++++++++++------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index f6bb8fb24cbd..2688f1f71b66 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -1,5 +1,7 @@ import { describeAztecMap } from '../interfaces/map_test_suite.js'; import { openTmpStore } from './index.js'; +import { AztecMapWithSize, AztecMultiMapWithSize } from '../interfaces/map.js'; +import { expect } from 'chai'; describe('LMDBMap', () => { describeAztecMap('Sync AztecMap', () => openTmpStore(true)); @@ -7,25 +9,44 @@ describe('LMDBMap', () => { describeAztecMap('Async AztecMap', () => Promise.resolve(openTmpStore(true)), true); }); -// TODO: add tests for the maps with size +describe('AztecMultiMapWithSize', () => { + let map: AztecMultiMapWithSize; -// describe('LmdbAztecMultiMapWithSize', () => { -// let db: Database; -// let map: LmdbAztecMultiMapWithSize; + beforeEach(() => { + const store = openTmpStore(true); + map = store.openMultiMapWithSize('test'); + }); -// beforeEach(() => { -// db = open({ dupSort: true } as any); -// map = new LmdbAztecMultiMapWithSize(db, 'test'); -// }); + it('should be able to delete values', async () => { + await map.set('foo', 'bar'); + await map.set('foo', 'baz'); -// it('should be able to delete values', async () => { -// await map.set('foo', 'bar'); -// await map.set('foo', 'baz'); + expect(map.size()).to.equal(2); -// expect(map.size()).toEqual(2); + await map.deleteValue('foo', 'bar'); -// await map.deleteValue('foo', 'bar'); + expect(map.size()).to.equal(1); + expect(map.get('foo')).to.equal('baz'); + }); +}); + +describe('AztecMapWithSize', () => { + let map: AztecMapWithSize; + + beforeEach(() => { + const store = openTmpStore(true); + map = store.openMapWithSize('test'); + }); + + it('should be able to delete values', async () => { + await map.set('foo', 'bar'); + await map.set('fizz', 'buzz'); -// expect(map.size()).toEqual(1); -// expect(map.get('foo')).toEqual('baz'); -// }); + expect(map.size()).to.equal(2); + + await map.delete('foo'); + + expect(map.size()).to.equal(1); + expect(map.get('fizz')).to.equal('buzz'); + }); +}); From d55d21060458938c73a03a2eed20de23d13fd0a7 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:43:27 +0000 Subject: [PATCH 5/7] fix: make persistence default --- yarn-project/p2p/src/client/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index c2a01ff4294c..9c4cca388db4 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -14,7 +14,7 @@ import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { P2PClient } from '../client/p2p_client.js'; import { type P2PConfig } from '../config.js'; import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; -import { InMemoryAttestationPool } from '../mem_pools/attestation_pool/memory_attestation_pool.js'; +import { KvAttestationPool } from '../mem_pools/attestation_pool/kv_attestation_pool.js'; import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; import { MemoryEpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/memory_epoch_proof_quote_pool.js'; import { type MemPools } from '../mem_pools/interface.js'; @@ -51,7 +51,7 @@ export const createP2PClient = async ( epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry), attestationPool: clientType === P2PClientType.Full - ? ((deps.attestationPool ?? new InMemoryAttestationPool(telemetry)) as T extends P2PClientType.Full + ? ((deps.attestationPool ?? new KvAttestationPool(store, telemetry)) as T extends P2PClientType.Full ? AttestationPool : undefined) : undefined, From 0931b42dec759a97c83d8080df4767ff2ae1b638 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 13 Dec 2024 07:54:36 +0000 Subject: [PATCH 6/7] fmt --- yarn-project/kv-store/src/lmdb/map.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 2688f1f71b66..0ddc616123c7 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -1,7 +1,8 @@ +import { expect } from 'chai'; + +import { type AztecMapWithSize, type AztecMultiMapWithSize } from '../interfaces/map.js'; import { describeAztecMap } from '../interfaces/map_test_suite.js'; import { openTmpStore } from './index.js'; -import { AztecMapWithSize, AztecMultiMapWithSize } from '../interfaces/map.js'; -import { expect } from 'chai'; describe('LMDBMap', () => { describeAztecMap('Sync AztecMap', () => openTmpStore(true)); From badb0f1d3decab0dc3aa7be47f2390fa44d8fe7c Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:09:40 +0000 Subject: [PATCH 7/7] fix: incorrect size calculation --- yarn-project/kv-store/src/lmdb/map.test.ts | 8 +++++ yarn-project/kv-store/src/lmdb/map.ts | 25 +++++++++------- .../attestation_pool_test_suite.ts | 29 +++++++++++++++++-- .../attestation_pool/kv_attestation_pool.ts | 6 ++-- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 0ddc616123c7..2f78d4aca632 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -12,22 +12,30 @@ describe('LMDBMap', () => { describe('AztecMultiMapWithSize', () => { let map: AztecMultiMapWithSize; + let map2: AztecMultiMapWithSize; beforeEach(() => { const store = openTmpStore(true); map = store.openMultiMapWithSize('test'); + map2 = store.openMultiMapWithSize('test2'); }); it('should be able to delete values', async () => { await map.set('foo', 'bar'); await map.set('foo', 'baz'); + await map2.set('foo', 'bar'); + await map2.set('foo', 'baz'); + expect(map.size()).to.equal(2); + expect(map2.size()).to.equal(2); await map.deleteValue('foo', 'bar'); expect(map.size()).to.equal(1); expect(map.get('foo')).to.equal('baz'); + + expect(map2.size()).to.equal(2); }); }); diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index 844d82fbbe2f..4458c3c3539d 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -13,8 +13,8 @@ export class LmdbAztecMap implements AztecMultiMap, Azte protected db: Database<[K, V], MapValueSlot>; protected name: string; - #startSentinel: MapValueSlot; - #endSentinel: MapValueSlot; + protected startSentinel: MapValueSlot; + protected endSentinel: MapValueSlot; constructor(rootDb: Database, mapName: string) { this.name = mapName; @@ -23,8 +23,8 @@ export class LmdbAztecMap implements AztecMultiMap, Azte // sentinels are used to define the start and end of the map // with LMDB's key encoding, no _primitive value_ can be "less than" an empty buffer or greater than Byte 255 // these will be used later to answer range queries - this.#startSentinel = ['map', this.name, 'slot', Buffer.from([])]; - this.#endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; + this.startSentinel = ['map', this.name, 'slot', Buffer.from([])]; + this.endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; } close(): Promise { @@ -94,18 +94,18 @@ export class LmdbAztecMap implements AztecMultiMap, Azte const start = reverse ? range.end ? this.slot(range.end) - : this.#endSentinel + : this.endSentinel : range.start ? this.slot(range.start) - : this.#startSentinel; + : this.startSentinel; const end = reverse ? range.start ? this.slot(range.start) - : this.#startSentinel + : this.startSentinel : range.end ? this.slot(range.end) - : this.#endSentinel; + : this.endSentinel; const lmdbRange: RangeOptions = { start, @@ -159,8 +159,8 @@ export class LmdbAztecMap implements AztecMultiMap, Azte async clear(): Promise { const lmdbRange: RangeOptions = { - start: this.#startSentinel, - end: this.#endSentinel, + start: this.startSentinel, + end: this.endSentinel, }; const iterator = this.db.getRange(lmdbRange); @@ -219,7 +219,10 @@ export class LmdbAztecMapWithSize */ size(): number { if (this.#sizeCache === undefined) { - this.#sizeCache = this.db.getCount(); + this.#sizeCache = this.db.getCount({ + start: this.startSentinel, + end: this.endSentinel, + }); } return this.#sizeCache; } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts index 55a2d8236aec..f8f838a08abd 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -34,7 +34,6 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // We compare buffers as the objects can have cached values attached to them which are not serialised // using array containing as the kv store does not respect insertion order - // TODO(md): should i make a version which respects insertion order? const compareAttestations = (a1: BlockAttestation[], a2: BlockAttestation[]) => { const a1Buffer = a1.map(attestation => attestation.toBuffer()); const a2Buffer = a2.map(attestation => attestation.toBuffer()); @@ -106,7 +105,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo }); it('Should store attestations by differing slot and archive', async () => { - const slotNumbers = [1, 2, 3, 4]; + const slotNumbers = [1, 1, 2, 3]; const archives = [Fr.random(), Fr.random(), Fr.random(), Fr.random()]; const attestations = signers.map((signer, i) => mockAttestation(signer, slotNumbers[i], archives[i])); @@ -169,9 +168,16 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const attestations = signers.map(signer => mockAttestation(signer, slotNumber, archive)); const proposalId = attestations[0].archive.toString(); + // Add another set of attestations with a different proposalId, yet the same slot + const archive2 = Fr.random(); + const attestations2 = signers.map(signer => mockAttestation(signer, slotNumber, archive2)); + const proposalId2 = attestations2[0].archive.toString(); + await ap.addAttestations(attestations); + await ap.addAttestations(attestations2); expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations2.length); const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); @@ -183,6 +189,25 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); expect(retreivedAttestationsAfterDelete.length).toBe(0); + + const retreivedAttestationsAfterDeleteForOtherProposal = await ap.getAttestationsForSlot( + BigInt(slotNumber), + proposalId2, + ); + expect(retreivedAttestationsAfterDeleteForOtherProposal.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + compareAttestations(retreivedAttestationsAfterDeleteForOtherProposal, attestations2); + }); + + it('Should blanket delete attestations per slot and proposal (does not perform db ops if there are no attestations)', async () => { + const slotNumber = 420; + const proposalId = 'proposalId'; + + const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + expect(retreivedAttestations.length).toBe(0); + + await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); + + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledTimes(0); }); it('Should delete attestations older than a given slot', async () => { diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts index 63c7049fab0e..8de98828eed0 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -74,8 +74,6 @@ export class KvAttestationPool implements AttestationPool { for (const slot of slots) { if (BigInt(slot) < oldestSlot) { olderThan.push(slot); - } else { - break; } } @@ -109,9 +107,9 @@ export class KvAttestationPool implements AttestationPool { const deletionPromises = []; const slotString = new Fr(slot).toString(); - const slotAttestationMap = this.attestations.get(slotString); + const exists = this.attestations.get(slotString); - if (slotAttestationMap) { + if (exists) { // Remove the proposalId from the slot index deletionPromises.push(this.attestations.deleteValue(slotString, proposalId));