From f7543812c0214ddbae07d215fc0a80fc13402e33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Venturo?= Date: Mon, 10 Mar 2025 16:16:34 +0000 Subject: [PATCH 1/5] Add job queue --- .../transfer_to_private.test.ts | 5 +- .../pxe/src/pxe_service/pxe_service.ts | 217 +++++++++++------- 2 files changed, 132 insertions(+), 90 deletions(-) diff --git a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts index c0f6c1368b2e..dff4de288591 100644 --- a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts +++ b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts @@ -21,7 +21,7 @@ describe('e2e_token_contract transfer_to_private', () => { await t.tokenSim.check(); }); - it('to self', async () => { + it.only('to self', async () => { const balancePub = await asset.methods.balance_of_public(accounts[0].address).simulate(); const amount = balancePub / 2n; expect(amount).toBeGreaterThan(0n); @@ -30,6 +30,9 @@ describe('e2e_token_contract transfer_to_private', () => { // Check that the result matches token sim tokenSim.transferToPrivate(accounts[0].address, accounts[0].address, amount); + + console.log('ABOUT TO CALL TOKEN SIM'); + await tokenSim.check(); }); diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index f10fb2300e1e..d436990a6590 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -1,6 +1,7 @@ import { L1_TO_L2_MSG_TREE_HEIGHT } from '@aztec/constants'; import { Fr, type Point } from '@aztec/foundation/fields'; import { type Logger, createLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { Timer } from '@aztec/foundation/timer'; import type { SiblingPath } from '@aztec/foundation/trees'; import { KeyStore } from '@aztec/key-store'; @@ -103,6 +104,7 @@ export class PXEService implements PXE { private proofCreator: PrivateKernelProver, private protocolContractsProvider: ProtocolContractsProvider, private log: Logger, + private jobQueue: SerialQueue, ) {} /** @@ -160,6 +162,8 @@ export class PXEService implements PXE { log, ); const simulator = new AcirSimulator(pxeOracleInterface, simulationProvider); + const jobQueue = new SerialQueue(); + jobQueue.start(); const pxeService = new PXEService( node, synchronizer, @@ -177,6 +181,7 @@ export class PXEService implements PXE { proofCreator, protocolContractsProvider, log, + jobQueue, ); await pxeService.#registerProtocolContracts(); const info = await pxeService.getNodeInfo(); @@ -456,24 +461,37 @@ export class PXEService implements PXE { return await this.node.getCurrentBaseFees(); } - public async proveTx( + public proveTx( txRequest: TxExecutionRequest, privateExecutionResult: PrivateExecutionResult, ): Promise { - try { - const { publicInputs, clientIvcProof } = await this.#prove(txRequest, this.proofCreator, privateExecutionResult, { - simulate: false, - skipFeeEnforcement: false, - profile: false, - }); - return new TxProvingResult(privateExecutionResult, publicInputs, clientIvcProof!); - } catch (err: any) { - throw this.contextualizeError(err, inspect(txRequest), inspect(privateExecutionResult)); + if (this.jobQueue.length() != 0) { + this.log.warn( + `PXE service job queue is not empty (${this.jobQueue.length()} entries) - concurrency is not supported, do not rely on it`, + ); } + + return this.jobQueue.put(async () => { + try { + const { publicInputs, clientIvcProof } = await this.#prove( + txRequest, + this.proofCreator, + privateExecutionResult, + { + simulate: false, + skipFeeEnforcement: false, + profile: false, + }, + ); + return new TxProvingResult(privateExecutionResult, publicInputs, clientIvcProof!); + } catch (err: any) { + throw this.contextualizeError(err, inspect(txRequest), inspect(privateExecutionResult)); + } + }); } // TODO(#7456) Prevent msgSender being defined here for the first call - public async simulateTx( + public simulateTx( txRequest: TxExecutionRequest, simulatePublic: boolean, msgSender: AztecAddress | undefined = undefined, @@ -482,74 +500,87 @@ export class PXEService implements PXE { profile: boolean = false, scopes?: AztecAddress[], ): Promise { - try { - const txInfo = { - origin: txRequest.origin, - functionSelector: txRequest.functionSelector, - simulatePublic, - msgSender, - chainId: txRequest.txContext.chainId, - version: txRequest.txContext.version, - authWitnesses: txRequest.authWitnesses.map(w => w.requestHash), - }; - this.log.info( - `Simulating transaction execution request to ${txRequest.functionSelector} at ${txRequest.origin}`, - txInfo, + if (this.jobQueue.length() != 0) { + this.log.warn( + `PXE service job queue already has ${this.jobQueue.length()} entries - PXE does not support concurrency and so you should not try to use it this way!`, ); - const timer = new Timer(); - await this.synchronizer.sync(); - const privateExecutionResult = await this.#executePrivate(txRequest, msgSender, scopes); - - const { publicInputs, profileResult } = await this.#prove(txRequest, this.proofCreator, privateExecutionResult, { - simulate: !profile, - skipFeeEnforcement, - profile, - }); - - const privateSimulationResult = new PrivateSimulationResult(privateExecutionResult, publicInputs); - const simulatedTx = privateSimulationResult.toSimulatedTx(); - let publicOutput: PublicSimulationOutput | undefined; - if (simulatePublic && publicInputs.forPublic) { - publicOutput = await this.#simulatePublicCalls(simulatedTx, skipFeeEnforcement); - } + } - if (!skipTxValidation) { - const validationResult = await this.node.isValidTx(simulatedTx, { isSimulation: true, skipFeeEnforcement }); - if (validationResult.result === 'invalid') { - throw new Error('The simulated transaction is unable to be added to state and is invalid.'); + return this.jobQueue.put(async () => { + try { + const txInfo = { + origin: txRequest.origin, + functionSelector: txRequest.functionSelector, + simulatePublic, + msgSender, + chainId: txRequest.txContext.chainId, + version: txRequest.txContext.version, + authWitnesses: txRequest.authWitnesses.map(w => w.requestHash), + }; + this.log.info( + `Simulating transaction execution request to ${txRequest.functionSelector} at ${txRequest.origin}`, + txInfo, + ); + const timer = new Timer(); + await this.synchronizer.sync(); + const privateExecutionResult = await this.#executePrivate(txRequest, msgSender, scopes); + + const { publicInputs, profileResult } = await this.#prove( + txRequest, + this.proofCreator, + privateExecutionResult, + { + simulate: !profile, + skipFeeEnforcement, + profile, + }, + ); + + const privateSimulationResult = new PrivateSimulationResult(privateExecutionResult, publicInputs); + const simulatedTx = privateSimulationResult.toSimulatedTx(); + let publicOutput: PublicSimulationOutput | undefined; + if (simulatePublic && publicInputs.forPublic) { + publicOutput = await this.#simulatePublicCalls(simulatedTx, skipFeeEnforcement); } - } - const txHash = await simulatedTx.getTxHash(); - this.log.info(`Simulation completed for ${txHash.toString()} in ${timer.ms()}ms`, { - txHash, - ...txInfo, - ...(profileResult ? { gateCounts: profileResult.gateCounts } : {}), - ...(publicOutput - ? { - gasUsed: publicOutput.gasUsed, - revertCode: publicOutput.txEffect.revertCode.getCode(), - revertReason: publicOutput.revertReason, - } - : {}), - }); + if (!skipTxValidation) { + const validationResult = await this.node.isValidTx(simulatedTx, { isSimulation: true, skipFeeEnforcement }); + if (validationResult.result === 'invalid') { + throw new Error('The simulated transaction is unable to be added to state and is invalid.'); + } + } - return TxSimulationResult.fromPrivateSimulationResultAndPublicOutput( - privateSimulationResult, - publicOutput, - profileResult, - ); - } catch (err: any) { - throw this.contextualizeError( - err, - inspect(txRequest), - `simulatePublic=${simulatePublic}`, - `msgSender=${msgSender?.toString() ?? 'undefined'}`, - `skipTxValidation=${skipTxValidation}`, - `profile=${profile}`, - `scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`, - ); - } + const txHash = await simulatedTx.getTxHash(); + this.log.info(`Simulation completed for ${txHash.toString()} in ${timer.ms()}ms`, { + txHash, + ...txInfo, + ...(profileResult ? { gateCounts: profileResult.gateCounts } : {}), + ...(publicOutput + ? { + gasUsed: publicOutput.gasUsed, + revertCode: publicOutput.txEffect.revertCode.getCode(), + revertReason: publicOutput.revertReason, + } + : {}), + }); + + return TxSimulationResult.fromPrivateSimulationResultAndPublicOutput( + privateSimulationResult, + publicOutput, + profileResult, + ); + } catch (err: any) { + throw this.contextualizeError( + err, + inspect(txRequest), + `simulatePublic=${simulatePublic}`, + `msgSender=${msgSender?.toString() ?? 'undefined'}`, + `skipTxValidation=${skipTxValidation}`, + `profile=${profile}`, + `scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`, + ); + } + }); } public async sendTx(tx: Tx): Promise { @@ -565,29 +596,37 @@ export class PXEService implements PXE { return txHash; } - public async simulateUnconstrained( + public simulateUnconstrained( functionName: string, args: any[], to: AztecAddress, _from?: AztecAddress, scopes?: AztecAddress[], ): Promise { - try { - await this.synchronizer.sync(); - // TODO - Should check if `from` has the permission to call the view function. - const functionCall = await this.#getFunctionCall(functionName, args, to); - const executionResult = await this.#simulateUnconstrained(functionCall, scopes); - - // TODO - Return typed result based on the function artifact. - return executionResult; - } catch (err: any) { - const stringifiedArgs = args.map(arg => arg.toString()).join(', '); - throw this.contextualizeError( - err, - `simulateUnconstrained ${to}:${functionName}(${stringifiedArgs})`, - `scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`, + if (this.jobQueue.length() != 0) { + this.log.warn( + `PXE service job queue already has ${this.jobQueue.length()} entries - PXE does not support concurrency and so you should not try to use it this way!`, ); } + + return this.jobQueue.put(async () => { + try { + await this.synchronizer.sync(); + // TODO - Should check if `from` has the permission to call the view function. + const functionCall = await this.#getFunctionCall(functionName, args, to); + const executionResult = await this.#simulateUnconstrained(functionCall, scopes); + + // TODO - Return typed result based on the function artifact. + return executionResult; + } catch (err: any) { + const stringifiedArgs = args.map(arg => arg.toString()).join(', '); + throw this.contextualizeError( + err, + `simulateUnconstrained ${to}:${functionName}(${stringifiedArgs})`, + `scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`, + ); + } + }); } public getTxReceipt(txHash: TxHash): Promise { From c9a46f525f0a25817ae92122bdd7a0a0e618a378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Venturo?= Date: Tue, 11 Mar 2025 14:03:08 +0000 Subject: [PATCH 2/5] feat!: disable PXE concurrency --- docs/docs/migration_notes.md | 4 + .../pxe/src/pxe_service/pxe_service.ts | 106 ++++++++++-------- 2 files changed, 64 insertions(+), 46 deletions(-) diff --git a/docs/docs/migration_notes.md b/docs/docs/migration_notes.md index 7714ef917640..665a741c42ef 100644 --- a/docs/docs/migration_notes.md +++ b/docs/docs/migration_notes.md @@ -8,6 +8,10 @@ Aztec is in full-speed development. Literally every version breaks compatibility ## TBD +### [PXE] Concurrent contract function simulation disabled + +PXE is no longer be able to execute contract functions concurrently (e.g. by collecting calls to `simulateTx` and then using `await Promise.all`). They will instead be put in a job queue and executed sequentially in order of arrival. + ### [aztec.js] Changes to `BatchCall` and `BaseContractInteraction` The constructor arguments of `BatchCall` have been updated to improve usability. Previously, it accepted an array of `FunctionCall`, requiring users to manually set additional data such as `authwit` and `capsules`. Now, `BatchCall` takes an array of `BaseContractInteraction`, which encapsulates all necessary information. diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index d436990a6590..6c4b505094ab 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -163,7 +163,7 @@ export class PXEService implements PXE { ); const simulator = new AcirSimulator(pxeOracleInterface, simulationProvider); const jobQueue = new SerialQueue(); - jobQueue.start(); + const pxeService = new PXEService( node, synchronizer, @@ -183,12 +183,32 @@ export class PXEService implements PXE { log, jobQueue, ); + + pxeService.jobQueue.start(); + await pxeService.#registerProtocolContracts(); const info = await pxeService.getNodeInfo(); log.info(`Started PXE connected to chain ${info.l1ChainId} version ${info.protocolVersion}`); return pxeService; } + /** + * Enqueues a job for execution once no other jobs are running. Returns a promise that will resolve once the job is + * complete. + * + * Useful for tasks that cannot run concurrently, such as contract function simulation. + */ + #putInJobQueue(fn: () => Promise): Promise { + // TODO(#12636): relax the conditions under which we forbid concurrency. + if (this.jobQueue.length() != 0) { + this.log.warn( + `PXE is already processing ${this.jobQueue.length()} jobs, concurrent execution is not supported. Will run once those are complete.`, + ); + } + + return this.jobQueue.put(fn); + } + isL1ToL2MessageSynced(l1ToL2Message: Fr): Promise { return this.node.isL1ToL2MessageSynced(l1ToL2Message); } @@ -369,35 +389,39 @@ export class PXEService implements PXE { ); } - public async updateContract(contractAddress: AztecAddress, artifact: ContractArtifact): Promise { - const currentInstance = await this.contractDataProvider.getContractInstance(contractAddress); - const contractClass = await getContractClassFromArtifact(artifact); - await this.synchronizer.sync(); + public updateContract(contractAddress: AztecAddress, artifact: ContractArtifact): Promise { + // We disable concurrently updating contracts to avoid concurrently syncing with the node, or changing a contract's + // class while we're simulating it. + return this.#putInJobQueue(async () => { + const currentInstance = await this.contractDataProvider.getContractInstance(contractAddress); + const contractClass = await getContractClassFromArtifact(artifact); + await this.synchronizer.sync(); - const header = await this.syncDataProvider.getBlockHeader(); + const header = await this.syncDataProvider.getBlockHeader(); - const currentClassId = await readCurrentClassId( - contractAddress, - currentInstance, - this.node, - header.globalVariables.blockNumber.toNumber(), - ); - if (!contractClass.id.equals(currentClassId)) { - throw new Error('Could not update contract to a class different from the current one.'); - } + const currentClassId = await readCurrentClassId( + contractAddress, + currentInstance, + this.node, + header.globalVariables.blockNumber.toNumber(), + ); + if (!contractClass.id.equals(currentClassId)) { + throw new Error('Could not update contract to a class different from the current one.'); + } - await this.contractDataProvider.addContractArtifact(contractClass.id, artifact); + await this.contractDataProvider.addContractArtifact(contractClass.id, artifact); - const publicFunctionSignatures = artifact.functions - .filter(fn => fn.functionType === FunctionType.PUBLIC) - .map(fn => decodeFunctionSignature(fn.name, fn.parameters)); - await this.node.registerContractFunctionSignatures(contractAddress, publicFunctionSignatures); + const publicFunctionSignatures = artifact.functions + .filter(fn => fn.functionType === FunctionType.PUBLIC) + .map(fn => decodeFunctionSignature(fn.name, fn.parameters)); + await this.node.registerContractFunctionSignatures(contractAddress, publicFunctionSignatures); - // TODO(#10007): Node should get public contract class from the registration event, not from PXE registration - await this.node.addContractClass({ ...contractClass, privateFunctions: [], unconstrainedFunctions: [] }); - currentInstance.currentContractClassId = contractClass.id; - await this.contractDataProvider.addContractInstance(currentInstance); - this.log.info(`Updated contract ${artifact.name} at ${contractAddress.toString()} to class ${contractClass.id}`); + // TODO(#10007): Node should get public contract class from the registration event, not from PXE registration + await this.node.addContractClass({ ...contractClass, privateFunctions: [], unconstrainedFunctions: [] }); + currentInstance.currentContractClassId = contractClass.id; + await this.contractDataProvider.addContractInstance(currentInstance); + this.log.info(`Updated contract ${artifact.name} at ${contractAddress.toString()} to class ${contractClass.id}`); + }); } public getContracts(): Promise { @@ -465,13 +489,9 @@ export class PXEService implements PXE { txRequest: TxExecutionRequest, privateExecutionResult: PrivateExecutionResult, ): Promise { - if (this.jobQueue.length() != 0) { - this.log.warn( - `PXE service job queue is not empty (${this.jobQueue.length()} entries) - concurrency is not supported, do not rely on it`, - ); - } - - return this.jobQueue.put(async () => { + // We disable proving concurrently mostly out of caution, since it accesses some of our stores. Proving is so + // computationally demanding that it'd be rare for someone to try to do it concurrently regardless. + return this.#putInJobQueue(async () => { try { const { publicInputs, clientIvcProof } = await this.#prove( txRequest, @@ -500,13 +520,10 @@ export class PXEService implements PXE { profile: boolean = false, scopes?: AztecAddress[], ): Promise { - if (this.jobQueue.length() != 0) { - this.log.warn( - `PXE service job queue already has ${this.jobQueue.length()} entries - PXE does not support concurrency and so you should not try to use it this way!`, - ); - } - - return this.jobQueue.put(async () => { + // We disable concurrent simulations since those might execute oracles which read and write to the PXE stores (e.g. + // to the capsules), and we need to prevent concurrent runs from interfering with one another (e.g. attempting to + // delete the same read value, or reading values that another simulation is currently modifying). + return this.#putInJobQueue(async () => { try { const txInfo = { origin: txRequest.origin, @@ -603,13 +620,10 @@ export class PXEService implements PXE { _from?: AztecAddress, scopes?: AztecAddress[], ): Promise { - if (this.jobQueue.length() != 0) { - this.log.warn( - `PXE service job queue already has ${this.jobQueue.length()} entries - PXE does not support concurrency and so you should not try to use it this way!`, - ); - } - - return this.jobQueue.put(async () => { + // We disable concurrent simulations since those might execute oracles which read and write to the PXE stores (e.g. + // to the capsules), and we need to prevent concurrent runs from interfering with one another (e.g. attempting to + // delete the same read value, or reading values that another simulation is currently modifying). + return this.#putInJobQueue(async () => { try { await this.synchronizer.sync(); // TODO - Should check if `from` has the permission to call the view function. From 4c2f7c1267a7590f28a739d468229b0f3fba08a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Venturo?= Date: Tue, 11 Mar 2025 14:04:26 +0000 Subject: [PATCH 3/5] Restore e2e test --- .../src/e2e_token_contract/transfer_to_private.test.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts index dff4de288591..57b85ec6250f 100644 --- a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts +++ b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts @@ -21,7 +21,7 @@ describe('e2e_token_contract transfer_to_private', () => { await t.tokenSim.check(); }); - it.only('to self', async () => { + it('to self', async () => { const balancePub = await asset.methods.balance_of_public(accounts[0].address).simulate(); const amount = balancePub / 2n; expect(amount).toBeGreaterThan(0n); @@ -31,8 +31,6 @@ describe('e2e_token_contract transfer_to_private', () => { // Check that the result matches token sim tokenSim.transferToPrivate(accounts[0].address, accounts[0].address, amount); - console.log('ABOUT TO CALL TOKEN SIM'); - await tokenSim.check(); }); From 1039a9d4fee00f3aa9577f46c8ef5ed7b1a01bf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Venturo?= Date: Tue, 11 Mar 2025 14:08:41 +0000 Subject: [PATCH 4/5] Remove extra whitespace --- .../src/e2e_token_contract/transfer_to_private.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts index 57b85ec6250f..5fd9623d50d0 100644 --- a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts +++ b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts @@ -21,7 +21,7 @@ describe('e2e_token_contract transfer_to_private', () => { await t.tokenSim.check(); }); - it('to self', async () => { + it.only('to self', async () => { const balancePub = await asset.methods.balance_of_public(accounts[0].address).simulate(); const amount = balancePub / 2n; expect(amount).toBeGreaterThan(0n); @@ -30,7 +30,6 @@ describe('e2e_token_contract transfer_to_private', () => { // Check that the result matches token sim tokenSim.transferToPrivate(accounts[0].address, accounts[0].address, amount); - await tokenSim.check(); }); From 4643439b11bd36889d39126bb295c72549d26232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Venturo?= Date: Tue, 11 Mar 2025 14:20:22 +0000 Subject: [PATCH 5/5] restore test --- .../src/e2e_token_contract/transfer_to_private.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts index 5fd9623d50d0..c0f6c1368b2e 100644 --- a/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts +++ b/yarn-project/end-to-end/src/e2e_token_contract/transfer_to_private.test.ts @@ -21,7 +21,7 @@ describe('e2e_token_contract transfer_to_private', () => { await t.tokenSim.check(); }); - it.only('to self', async () => { + it('to self', async () => { const balancePub = await asset.methods.balance_of_public(accounts[0].address).simulate(); const amount = balancePub / 2n; expect(amount).toBeGreaterThan(0n);