Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,6 @@ WorldStateStatusFull WorldState::sync_block(const StateReference& block_state_re
const std::vector<crypto::merkle_tree::PublicDataLeafValue>& public_writes)
{
validate_trees_are_equally_synched();
WorldStateStatusFull status;
if (is_same_state_reference(WorldStateRevision::uncommitted(), block_state_ref) &&
is_archive_tip(WorldStateRevision::uncommitted(), block_header_hash)) {
std::pair<bool, std::string> result = commit(status);
if (!result.first) {
throw std::runtime_error(result.second);
}
populate_status_summary(status);
return status;
}
rollback();

Fork::SharedPtr fork = retrieve_fork(CANONICAL_FORK_ID);
Expand Down Expand Up @@ -658,22 +648,32 @@ WorldStateStatusFull WorldState::sync_block(const StateReference& block_state_re

signal.wait_for_level();

if (!success) {
throw std::runtime_error("Failed to sync block: " + err_message);
}
// Check resulting state and commit if successful
WorldStateStatusFull status;
try {
if (!success) {
throw std::runtime_error("Failed to sync block: " + err_message);
}

if (!is_archive_tip(WorldStateRevision::uncommitted(), block_header_hash)) {
throw std::runtime_error("Can't synch block: block header hash is not the tip of the archive tree");
}
if (!is_archive_tip(WorldStateRevision::uncommitted(), block_header_hash)) {
throw std::runtime_error("Can't synch block: block header hash is not the tip of the archive tree");
}

if (!is_same_state_reference(WorldStateRevision::uncommitted(), block_state_ref)) {
throw std::runtime_error("Can't synch block: block state does not match world state");
}
if (!is_same_state_reference(WorldStateRevision::uncommitted(), block_state_ref)) {
throw std::runtime_error("Can't synch block: block state does not match world state");
}

std::pair<bool, std::string> result = commit(status);
if (!result.first) {
throw std::runtime_error(result.second);
std::pair<bool, std::string> result = commit(status);
if (!result.first) {
throw std::runtime_error(result.second);
}
} catch (const std::exception& e) {
// We failed, rollback any uncommitted state before leaving
rollback();
throw;
}

// Success return the status
populate_status_summary(status);
return status;
}
Expand Down Expand Up @@ -726,6 +726,9 @@ WorldStateStatusSummary WorldState::set_finalized_blocks(const block_number_t& t
}
WorldStateStatusFull WorldState::unwind_blocks(const block_number_t& toBlockNumber)
{
// Ensure no uncommitted state
rollback();

WorldStateRevision revision{ .forkId = CANONICAL_FORK_ID, .blockNumber = 0, .includeUncommitted = false };
std::array<TreeMeta, NUM_TREES> responses;
get_all_tree_info(revision, responses);
Expand Down
8 changes: 5 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export function getConfigEnvVars(): AztecNodeConfig {

type ConfigRequiredToBuildKeyStore = TxSenderConfig & SequencerClientConfig & SharedNodeConfig & ValidatorClientConfig;

function createKeyStoreFromWeb3Signer(config: ConfigRequiredToBuildKeyStore) {
function createKeyStoreFromWeb3Signer(config: ConfigRequiredToBuildKeyStore): KeyStore | undefined {
const validatorKeyStores: ValidatorKeyStore[] = [];

if (
Expand Down Expand Up @@ -124,7 +124,7 @@ function createKeyStoreFromWeb3Signer(config: ConfigRequiredToBuildKeyStore) {
return keyStore;
}

function createKeyStoreFromPrivateKeys(config: ConfigRequiredToBuildKeyStore) {
function createKeyStoreFromPrivateKeys(config: ConfigRequiredToBuildKeyStore): KeyStore | undefined {
const validatorKeyStores: ValidatorKeyStore[] = [];
const ethPrivateKeys = config.validatorPrivateKeys
? config.validatorPrivateKeys.getValue().map(x => ethPrivateKeySchema.parse(x))
Expand Down Expand Up @@ -158,7 +158,9 @@ function createKeyStoreFromPrivateKeys(config: ConfigRequiredToBuildKeyStore) {
return keyStore;
}

export function createKeyStoreForValidator(config: TxSenderConfig & SequencerClientConfig & SharedNodeConfig) {
export function createKeyStoreForValidator(
config: TxSenderConfig & SequencerClientConfig & SharedNodeConfig,
): KeyStore | undefined {
if (config.web3SignerUrl !== undefined && config.web3SignerUrl.length > 0) {
return createKeyStoreFromWeb3Signer(config);
}
Expand Down
75 changes: 53 additions & 22 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ import {
getTelemetryClient,
trackSpan,
} from '@aztec/telemetry-client';
import { NodeKeystoreAdapter, ValidatorClient, createValidatorClient } from '@aztec/validator-client';
import {
NodeKeystoreAdapter,
ValidatorClient,
createBlockProposalHandler,
createValidatorClient,
} from '@aztec/validator-client';
import { createWorldStateSynchronizer } from '@aztec/world-state';

import { createPublicClient, fallback, http } from 'viem';
Expand Down Expand Up @@ -212,6 +217,8 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
}
}

await keyStoreManager?.validateSigners();

// If we are a validator, verify our configuration before doing too much more.
if (!config.disableValidator) {
if (keyStoreManager === undefined) {
Expand Down Expand Up @@ -300,12 +307,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
deps.p2pClientDeps,
);

// Start world state and wait for it to sync to the archiver.
await worldStateSynchronizer.start();

// Start p2p. Note that it depends on world state to be running.
await p2pClient.start();

// We should really not be modifying the config object
config.txPublicSetupAllowList = config.txPublicSetupAllowList ?? (await getDefaultAllowedSetupFunctions());

const blockBuilder = new BlockBuilder(
Expand All @@ -316,8 +318,52 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
telemetry,
);

// We'll accumulate sentinel watchers here
const watchers: Watcher[] = [];

// Create validator client if required
const validatorClient = createValidatorClient(config, {
p2pClient,
telemetry,
dateProvider,
epochCache,
blockBuilder,
blockSource: archiver,
l1ToL2MessageSource: archiver,
keyStoreManager,
});

// If we have a validator client, register it as a source of offenses for the slasher,
// and have it register callbacks on the p2p client *before* we start it, otherwise messages
// like attestations or auths will fail.
if (validatorClient) {
watchers.push(validatorClient);
if (!options.dontStartSequencer) {
await validatorClient.registerHandlers();
}
}

// If there's no validator client but alwaysReexecuteBlockProposals is enabled,
// create a BlockProposalHandler to reexecute block proposals for monitoring
if (!validatorClient && config.alwaysReexecuteBlockProposals) {
log.info('Setting up block proposal reexecution for monitoring');
createBlockProposalHandler(config, {
blockBuilder,
epochCache,
blockSource: archiver,
l1ToL2MessageSource: archiver,
p2pClient,
dateProvider,
telemetry,
}).registerForReexecution(p2pClient);
}

// Start world state and wait for it to sync to the archiver.
await worldStateSynchronizer.start();

// Start p2p. Note that it depends on world state to be running.
await p2pClient.start();

const validatorsSentinel = await createSentinel(epochCache, archiver, p2pClient, config);
if (validatorsSentinel) {
// we can run a sentinel without trying to slash.
Expand Down Expand Up @@ -349,21 +395,6 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
watchers.push(attestationsBlockWatcher);
}

const validatorClient = createValidatorClient(config, {
p2pClient,
telemetry,
dateProvider,
epochCache,
blockBuilder,
blockSource: archiver,
l1ToL2MessageSource: archiver,
keyStoreManager,
});

if (validatorClient) {
watchers.push(validatorClient);
}

log.verbose(`All Aztec Node subsystems synced`);

// Validator enabled, create/start relevant service
Expand Down
6 changes: 4 additions & 2 deletions yarn-project/aztec/src/bin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
import { injectCommands as injectBuilderCommands } from '@aztec/builder';
import { injectCommands as injectWalletCommands } from '@aztec/cli-wallet';
import { enrichEnvironmentWithChainConfig } from '@aztec/cli/config';
import { enrichEnvironmentWithChainConfig, enrichEnvironmentWithNetworkConfig } from '@aztec/cli/config';
import { injectCommands as injectContractCommands } from '@aztec/cli/contracts';
import { injectCommands as injectDevnetCommands } from '@aztec/cli/devnet';
import { injectCommands as injectInfrastructureCommands } from '@aztec/cli/infrastructure';
Expand Down Expand Up @@ -39,7 +39,9 @@ async function main() {
networkValue = args[networkIndex].split('=')[1] || args[networkIndex + 1];
}

await enrichEnvironmentWithChainConfig(getActiveNetworkName(networkValue));
const networkName = getActiveNetworkName(networkValue);
await enrichEnvironmentWithChainConfig(networkName);
await enrichEnvironmentWithNetworkConfig(networkName);

const cliVersion = getCliVersion();
let program = new Command('aztec');
Expand Down
7 changes: 6 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ export async function startProverAgent(

await preloadCrsDataForServerSideProving(config, userLog);

const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 10 })));
const fetch = makeTracedFetch(
// retry connections every 3s, up to 30s before giving up
[1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3],
false,
makeUndiciFetch(new Agent({ connections: 10 })),
);
const broker = createProvingJobBrokerClient(config.proverBrokerUrl, getVersions(), fetch);

const telemetry = initTelemetryClient(extractRelevantOptions(options, telemetryClientConfigMappings, 'tel'));
Expand Down
16 changes: 15 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_broker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { getL1Config } from '@aztec/cli/config';
import { getPublicClient } from '@aztec/ethereum';
import type { NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import type { LogFn } from '@aztec/foundation/log';
import {
Expand All @@ -11,7 +12,7 @@ import { getProverNodeBrokerConfigFromEnv } from '@aztec/prover-node';
import type { ProvingJobBroker } from '@aztec/stdlib/interfaces/server';
import { getConfigEnvVars as getTelemetryClientConfig, initTelemetryClient } from '@aztec/telemetry-client';

import { extractRelevantOptions } from '../util.js';
import { extractRelevantOptions, setupUpdateMonitor } from '../util.js';

export async function startProverBroker(
options: any,
Expand All @@ -33,6 +34,7 @@ export async function startProverBroker(
throw new Error('L1 registry address is required to start Aztec Node without --deploy-aztec-contracts option');
}

const followsCanonicalRollup = typeof config.rollupVersion !== 'number';
const { addresses, config: rollupConfig } = await getL1Config(
config.l1Contracts.registryAddress,
config.l1RpcUrls,
Expand All @@ -45,6 +47,18 @@ export async function startProverBroker(

const client = initTelemetryClient(getTelemetryClientConfig());
const broker = await createAndStartProvingBroker(config, client);

if (options.autoUpdate !== 'disabled' && options.autoUpdateUrl) {
await setupUpdateMonitor(
options.autoUpdate,
new URL(options.autoUpdateUrl),
followsCanonicalRollup,
getPublicClient(config),
config.l1Contracts.registryAddress,
signalHandlers,
);
}

services.proverBroker = [broker, ProvingJobBrokerSchema];
signalHandlers.push(() => broker.stop());

Expand Down
7 changes: 6 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ export async function startProverNode(
if (proverConfig.proverBrokerUrl) {
// at 1TPS we'd enqueue ~1k tube proofs and ~1k AVM proofs immediately
// set a lower connection limit such that we don't overload the server
const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 100 })));
// Keep retrying up to 30s
const fetch = makeTracedFetch(
[1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3],
false,
makeUndiciFetch(new Agent({ connections: 100 })),
);
broker = createProvingJobBrokerClient(proverConfig.proverBrokerUrl, getVersions(proverConfig), fetch);
} else if (options.proverBroker) {
({ broker } = await startProverBroker(options, signalHandlers, services, userLog));
Expand Down
67 changes: 67 additions & 0 deletions yarn-project/cli/src/config/cached_fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { createLogger } from '@aztec/aztec.js';

import { mkdir, readFile, stat, writeFile } from 'fs/promises';
import { dirname } from 'path';

export interface CachedFetchOptions {
/** Cache duration in milliseconds */
cacheDurationMs: number;
/** The cache file */
cacheFile?: string;
}

/**
* Fetches data from a URL with file-based caching support.
* This utility can be used by both remote config and bootnodes fetching.
*
* @param url - The URL to fetch from
* @param networkName - Network name for cache directory structure
* @param options - Caching and error handling options
* @param cacheDir - Optional cache directory (defaults to no caching)
* @returns The fetched and parsed JSON data, or undefined if fetch fails and throwOnError is false
*/
export async function cachedFetch<T = any>(
url: string,
options: CachedFetchOptions,
fetch = globalThis.fetch,
log = createLogger('cached_fetch'),
): Promise<T | undefined> {
const { cacheDurationMs, cacheFile } = options;

// Try to read from cache first
try {
if (cacheFile) {
const info = await stat(cacheFile);
if (info.mtimeMs + cacheDurationMs > Date.now()) {
const cachedData = JSON.parse(await readFile(cacheFile, 'utf-8'));
return cachedData;
}
}
} catch {
log.trace('Failed to read data from cache');
}

try {
const response = await fetch(url);
if (!response.ok) {
log.warn(`Failed to fetch from ${url}: ${response.status} ${response.statusText}`);
return undefined;
}

const data = await response.json();

try {
if (cacheFile) {
await mkdir(dirname(cacheFile), { recursive: true });
await writeFile(cacheFile, JSON.stringify(data), 'utf-8');
}
} catch (err) {
log.warn('Failed to cache data on disk: ' + cacheFile, { cacheFile, err });
}

return data;
} catch (err) {
log.warn(`Failed to fetch from ${url}`, { err });
return undefined;
}
}
Loading
Loading