Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions yarn-project/foundation/src/fifo/memory_fifo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ export class MemoryFifo<T> {

/**
* Process items from the queue using a provided handler function.
* The function iterates over items in the queue, invoking the handler for each item until the queue is empty or flushing.
* The function iterates over items in the queue, invoking the handler for each item until the queue is empty and flushing.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the get function above. the process loop here only breaks when we get null, which only happens if (this.items.length === 0 && this.flushing)

* If the handler throws an error, it will be caught and logged as 'Queue handler exception:', but the iteration will continue.
* The process function returns a promise that resolves when there are no more items in the queue or the queue is flushing.
* The process function returns a promise that resolves when there are no more items in the queue and the queue is flushing.
*
* @param handler - A function that takes an item of type T and returns a Promise<void> after processing the item.
* @returns A Promise<void> that resolves when the queue is finished processing.
Expand Down
63 changes: 42 additions & 21 deletions yarn-project/pxe/src/pxe_service/pxe_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import {
PublicCallRequest,
} from '@aztec/circuits.js';
import { computeCommitmentNonce, siloNullifier } from '@aztec/circuits.js/abis';
import { encodeArguments } from '@aztec/foundation/abi';
import { DecodedReturn, encodeArguments } from '@aztec/foundation/abi';
import { padArrayEnd } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/fields';
import { SerialQueue } from '@aztec/foundation/fifo';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { NoirWasmVersion } from '@aztec/noir-compiler/versions';
import {
Expand Down Expand Up @@ -70,6 +71,9 @@ export class PXEService implements PXE {
private simulator: AcirSimulator;
private log: DebugLogger;
private sandboxVersion: string;
// serialize synchronizer and calls to simulateTx.
// ensures that state is not changed while simulating
private jobQueue = new SerialQueue();

constructor(
private keyStore: KeyStore,
Expand All @@ -79,7 +83,7 @@ export class PXEService implements PXE {
logSuffix?: string,
) {
this.log = createDebugLogger(logSuffix ? `aztec:pxe_service_${logSuffix}` : `aztec:pxe_service`);
this.synchronizer = new Synchronizer(node, db, logSuffix);
this.synchronizer = new Synchronizer(node, db, this.jobQueue, logSuffix);
this.contractDataOracle = new ContractDataOracle(db, node);
this.simulator = getAcirSimulator(db, node, keyStore, this.contractDataOracle);

Expand All @@ -93,7 +97,11 @@ export class PXEService implements PXE {
*/
public async start() {
const { l2BlockPollingIntervalMS } = this.config;
await this.synchronizer.start(1, l2BlockPollingIntervalMS);
this.synchronizer.start(1, l2BlockPollingIntervalMS);
this.jobQueue.start();
this.log.info('Started Job Queue');
await this.jobQueue.syncPoint();
this.log.info('Synced Job Queue');
await this.restoreNoteProcessors();
const info = await this.getNodeInfo();
this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`);
Expand Down Expand Up @@ -121,8 +129,10 @@ export class PXEService implements PXE {
* @returns A Promise resolving once the server has been stopped successfully.
*/
public async stop() {
await this.jobQueue.cancel();
this.log.info('Cancelled Job Queue');
await this.synchronizer.stop();
this.log.info('Stopped');
this.log.info('Stopped Synchronizer');
}

/** Returns an estimate of the db size in bytes. */
Expand Down Expand Up @@ -336,18 +346,21 @@ export class PXEService implements PXE {
throw new Error(`Unspecified internal are not allowed`);
}

// We get the contract address from origin, since contract deployments are signalled as origin from their own address
// TODO: Is this ok? Should it be changed to be from ZERO?
const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined;
const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined;
// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// We get the contract address from origin, since contract deployments are signalled as origin from their own address
// TODO: Is this ok? Should it be changed to be from ZERO?
const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined;
const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined;

const tx = await this.#simulateAndProve(txRequest, newContract);
if (simulatePublic) {
await this.#simulatePublicCalls(tx);
}
this.log.info(`Executed local simulation for ${await tx.getTxHash()}`);
const tx = await this.#simulateAndProve(txRequest, newContract);
if (simulatePublic) {
await this.#simulatePublicCalls(tx);
}
this.log.info(`Executed local simulation for ${await tx.getTxHash()}`);

return tx;
return tx;
});
}

public async sendTx(tx: Tx): Promise<TxHash> {
Expand All @@ -360,13 +373,21 @@ export class PXEService implements PXE {
return txHash;
}

public async viewTx(functionName: string, args: any[], to: AztecAddress, _from?: AztecAddress) {
// TODO - Should check if `from` has the permission to call the view function.
const functionCall = await this.#getFunctionCall(functionName, args, to);
const executionResult = await this.#simulateUnconstrained(functionCall);

// TODO - Return typed result based on the function artifact.
return executionResult;
public async viewTx(
functionName: string,
args: any[],
to: AztecAddress,
_from?: AztecAddress,
): Promise<DecodedReturn> {
// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// TODO - Should check if `from` has the permission to call the view function.
const functionCall = await this.#getFunctionCall(functionName, args, to);
const executionResult = await this.#simulateUnconstrained(functionCall);

// TODO - Return typed result based on the function artifact.
return executionResult;
});
}

public async getTxReceipt(txHash: TxHash): Promise<TxReceipt> {
Expand Down
7 changes: 5 additions & 2 deletions yarn-project/pxe/src/synchronizer/synchronizer.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BlockHeader, CompleteAddress, EthAddress, Fr, GrumpkinScalar } from '@aztec/circuits.js';
import { Grumpkin } from '@aztec/circuits.js/barretenberg';
import { SerialQueue } from '@aztec/foundation/fifo';
import { TestKeyStore } from '@aztec/key-store';
import { AztecLmdbStore } from '@aztec/kv-store';
import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Block, MerkleTreeId } from '@aztec/types';
Expand All @@ -17,6 +18,7 @@ describe('Synchronizer', () => {
let synchronizer: TestSynchronizer;
let roots: Record<MerkleTreeId, Fr>;
let blockHeader: BlockHeader;
let jobQueue: SerialQueue;

beforeEach(async () => {
blockHeader = BlockHeader.random();
Expand All @@ -31,7 +33,8 @@ describe('Synchronizer', () => {

aztecNode = mock<AztecNode>();
database = new KVPxeDatabase(await AztecLmdbStore.create(EthAddress.random()));
synchronizer = new TestSynchronizer(aztecNode, database);
jobQueue = new SerialQueue();
synchronizer = new TestSynchronizer(aztecNode, database, jobQueue);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the job queue is only used when synchronizer.start is used. This doesn't happen in this test, nor does it happen in the pxe_service tests.

});

it('sets tree roots from aztec node on initial sync', async () => {
Expand Down Expand Up @@ -128,7 +131,7 @@ class TestSynchronizer extends Synchronizer {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<void> {
public workNoteProcessorCatchUp(): Promise<boolean> {
return super.workNoteProcessorCatchUp();
}
}
68 changes: 47 additions & 21 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AztecAddress, BlockHeader, Fr, PublicKey } from '@aztec/circuits.js';
import { computeGlobalsHash } from '@aztec/circuits.js/abis';
import { SerialQueue } from '@aztec/foundation/fifo';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { InterruptibleSleep } from '@aztec/foundation/sleep';
import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockContext, L2BlockL2Logs, LogType } from '@aztec/types';
Expand All @@ -24,7 +25,7 @@ export class Synchronizer {
private log: DebugLogger;
private noteProcessorsToCatchUp: NoteProcessor[] = [];

constructor(private node: AztecNode, private db: PxeDatabase, logSuffix = '') {
constructor(private node: AztecNode, private db: PxeDatabase, private jobQueue: SerialQueue, logSuffix = '') {
this.log = createDebugLogger(logSuffix ? `aztec:pxe_synchronizer_${logSuffix}` : 'aztec:pxe_synchronizer');
}

Expand All @@ -36,23 +37,35 @@ export class Synchronizer {
* @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @param retryInterval - The time interval (in ms) to wait before retrying if no data is available.
*/
public async start(limit = 1, retryInterval = 1000) {
public start(limit = 1, retryInterval = 1000) {
if (this.running) {
return;
}
this.running = true;

await this.initialSync();
this.jobQueue
.put(() => this.initialSync())
.catch(err => {
this.log.error(`Error in synchronizer initial sync`, err);
this.running = false;
throw err;
});

const run = async () => {
while (this.running) {
if (this.noteProcessorsToCatchUp.length > 0) {
// There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor.
await this.workNoteProcessorCatchUp(limit, retryInterval);
} else {
// No note processor needs to catch up. We continue with the normal flow.
await this.work(limit, retryInterval);
}
await this.jobQueue.put(async () => {
let moreWork = true;
while (moreWork && this.running) {
if (this.noteProcessorsToCatchUp.length > 0) {
// There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor.
moreWork = await this.workNoteProcessorCatchUp(limit);
} else {
// No note processor needs to catch up. We continue with the normal flow.
moreWork = await this.work(limit);
}
}
});
await this.interruptibleSleep.sleep(retryInterval);
}
};

Expand All @@ -70,26 +83,29 @@ export class Synchronizer {
await this.db.setBlockData(latestBlockNumber, latestBlockHeader);
}

protected async work(limit = 1, retryInterval = 1000): Promise<void> {
/**
* Fetches encrypted logs and blocks from the Aztec node and processes them for all note processors.
*
* @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
*/
protected async work(limit = 1): Promise<boolean> {
const from = this.getSynchedBlockNumber() + 1;
try {
let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED);
if (!encryptedLogs.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED);
if (!unencryptedLogs.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

// Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks.
const blocks = await this.node.getBlocks(from, encryptedLogs.length);
if (!blocks.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

if (blocks.length !== encryptedLogs.length) {
Expand Down Expand Up @@ -120,21 +136,30 @@ export class Synchronizer {
for (const noteProcessor of this.noteProcessors) {
await noteProcessor.process(blockContexts, encryptedLogs);
}
return true;
} catch (err) {
this.log.error(`Error in synchronizer work`, err);
await this.interruptibleSleep.sleep(retryInterval);
return false;
}
}

protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise<void> {
/**
* Catch up a note processor that is lagging behind the main sync,
* e.g. because we just added a new account.
*
* @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
*/
protected async workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const toBlockNumber = this.getSynchedBlockNumber();

if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor already synched, nothing to do
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
return;
// could be more work if there are more note processors to catch up
return true;
}

const from = noteProcessor.status.syncedToBlock + 1;
Expand Down Expand Up @@ -184,9 +209,10 @@ export class Synchronizer {
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
}
return true;
} catch (err) {
this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err);
await this.interruptibleSleep.sleep(retryInterval);
return false;
}
}

Expand Down