diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.cpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.cpp index 98549b6552c5..8e49cf005c56 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.cpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.cpp @@ -89,6 +89,11 @@ LMDBTreeStore::LMDBTreeStore(std::string directory, std::string name, uint64_t m } } +const std::string& LMDBTreeStore::get_name() const +{ + return _name; +} + void LMDBTreeStore::get_stats(TreeDBStats& stats, ReadTransaction& tx) { stats.mapSize = _environment->get_map_size(); diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.hpp index 4fde37126b45..eaa06f3e18fd 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/lmdb_store/lmdb_tree_store.hpp @@ -171,6 +171,8 @@ class LMDBTreeStore : public LMDBStoreBase { LMDBTreeStore& operator=(LMDBTreeStore&& other) = delete; ~LMDBTreeStore() override = default; + const std::string& get_name() const; + void get_stats(TreeDBStats& stats, ReadTransaction& tx); void write_block_data(const block_number_t& blockNumber, const BlockPayload& blockData, WriteTransaction& tx); diff --git a/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp b/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp index dd81015fea69..9bc2bab4cf2c 100644 --- a/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp +++ b/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp @@ -29,4 +29,18 @@ LMDBStoreBase::WriteTransaction::Ptr LMDBStoreBase::create_write_transaction() c _environment->wait_for_writer(); return std::make_unique(_environment); } + +void LMDBStoreBase::copy_store(const std::string& dstPath, bool compact) +{ + // Create a write tx to acquire a write lock to prevent writes while copying. From LMDB docs: + // "[mdb_copy] can trigger significant file size growth if run in parallel with write transactions, + // because pages which they free during copying cannot be reused until the copy is done." + WriteTransaction::Ptr tx = create_write_transaction(); + call_lmdb_func("mdb_env_copy2", + mdb_env_copy2, + _environment->underlying(), + dstPath.c_str(), + static_cast(compact ? MDB_CP_COMPACT : 0)); +} + } // namespace bb::lmdblib \ No newline at end of file diff --git a/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.hpp b/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.hpp index 1ba5760385bb..f9a40cd34277 100644 --- a/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.hpp +++ b/barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.hpp @@ -21,6 +21,7 @@ class LMDBStoreBase { ReadTransaction::SharedPtr create_shared_read_transaction() const; WriteTransaction::Ptr create_write_transaction() const; LMDBDatabaseCreationTransaction::Ptr create_db_transaction() const; + void copy_store(const std::string& dstPath, bool compact); protected: std::string _dbDirectory; diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_message.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_message.hpp index 73ceb56a9452..18fa7f5a48b8 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_message.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_message.hpp @@ -26,6 +26,7 @@ enum LMDBStoreMessageType { STATS, CLOSE, + COPY_STORE, }; struct OpenDatabaseRequest { @@ -118,6 +119,12 @@ struct StatsResponse { MSGPACK_FIELDS(stats, dbMapSizeBytes); }; +struct CopyStoreRequest { + std::string dstPath; + std::optional compact; + MSGPACK_FIELDS(dstPath, compact); +}; + } // namespace bb::nodejs::lmdb_store MSGPACK_ADD_ENUM(bb::nodejs::lmdb_store::LMDBStoreMessageType) diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp index 2d6ae87bd80d..1f86428ccc2b 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.cpp @@ -71,6 +71,8 @@ LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info) // The close operation requires exclusive execution, no other operations can be run concurrently with it _msg_processor.register_handler(LMDBStoreMessageType::CLOSE, this, &LMDBStoreWrapper::close, true); + + _msg_processor.register_handler(LMDBStoreMessageType::COPY_STORE, this, &LMDBStoreWrapper::copy_store, true); } Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info) @@ -279,6 +281,14 @@ BoolResponse LMDBStoreWrapper::close() return { true }; } +BoolResponse LMDBStoreWrapper::copy_store(const CopyStoreRequest& req) +{ + verify_store(); + _store->copy_store(req.dstPath, req.compact.value_or(false)); + + return { true }; +} + std::pair LMDBStoreWrapper::_advance_cursor(const lmdblib::LMDBCursor& cursor, bool reverse, uint64_t page_size) diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.hpp index 39b494668d90..ec8281e51dff 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/lmdb_store/lmdb_store_wrapper.hpp @@ -57,6 +57,8 @@ class LMDBStoreWrapper : public Napi::ObjectWrap { BoolResponse close(); + BoolResponse copy_store(const CopyStoreRequest& req); + static std::pair _advance_cursor(const lmdblib::LMDBCursor& cursor, bool reverse, uint64_t page_size); diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp index 3e86a31637a8..d208acc676ce 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp @@ -257,6 +257,10 @@ WorldStateWrapper::WorldStateWrapper(const Napi::CallbackInfo& info) _dispatcher.register_target( WorldStateMessageType::REVERT_CHECKPOINT, [this](msgpack::object& obj, msgpack::sbuffer& buffer) { return revert_checkpoint(obj, buffer); }); + + _dispatcher.register_target( + WorldStateMessageType::COPY_STORES, + [this](msgpack::object& obj, msgpack::sbuffer& buffer) { return copy_stores(obj, buffer); }); } Napi::Value WorldStateWrapper::call(const Napi::CallbackInfo& info) @@ -824,6 +828,20 @@ bool WorldStateWrapper::get_status(msgpack::object& obj, msgpack::sbuffer& buf) return true; } +bool WorldStateWrapper::copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer) +{ + TypedMessage request; + obj.convert(request); + + _ws->copy_stores(request.value.dstPath, request.value.compact.value_or(false)); + + MsgHeader header(request.header.messageId); + messaging::TypedMessage resp_msg(WorldStateMessageType::COPY_STORES, header, {}); + msgpack::pack(buffer, resp_msg); + + return true; +} + Napi::Function WorldStateWrapper::get_class(Napi::Env env) { return DefineClass(env, diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp index 0f6c8d8dfe1f..903db2ec6805 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp @@ -68,6 +68,8 @@ class WorldStateWrapper : public Napi::ObjectWrap { bool checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer); bool commit_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer); bool revert_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer); + + bool copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer); }; } // namespace bb::nodejs diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp index 2547bc85a7b3..7d8ad709d2ba 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp @@ -52,6 +52,8 @@ enum WorldStateMessageType { COMMIT_CHECKPOINT, REVERT_CHECKPOINT, + COPY_STORES, + CLOSE = 999, }; @@ -231,6 +233,12 @@ struct SyncBlockRequest { publicDataWrites); }; +struct CopyStoresRequest { + std::string dstPath; + std::optional compact; + MSGPACK_FIELDS(dstPath, compact); +}; + } // namespace bb::nodejs MSGPACK_ADD_ENUM(bb::nodejs::WorldStateMessageType) diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp index 64e7b0442921..fbbfa99d99f6 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp @@ -166,6 +166,18 @@ void WorldState::create_canonical_fork(const std::string& dataDir, _forks[fork->_forkId] = fork; } +void WorldState::copy_stores(const std::string& dstPath, bool compact) const +{ + auto copyStore = [&](const LMDBTreeStore::SharedPtr& store) { + std::filesystem::path directory = dstPath; + directory /= store->get_name(); + std::filesystem::create_directories(directory); + store->copy_store(directory, compact); + }; + + std::for_each(_persistentStores->begin(), _persistentStores->end(), copyStore); +} + Fork::SharedPtr WorldState::retrieve_fork(const uint64_t& forkId) const { std::unique_lock lock(mtx); diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp index c2eae1ee4e57..d994e029cf8b 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp @@ -88,6 +88,14 @@ class WorldState { const std::vector& prefilled_public_data, uint32_t initial_header_generator_point); + /** + * @brief Copies all underlying LMDB stores to the target directory while acquiring a write lock + * + * @param dstPath Parent folder where trees will be copied + * @param compact Whether to compact stores when copying + */ + void copy_stores(const std::string& dstPath, bool compact) const; + /** * @brief Get tree metadata for a particular tree * diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state_stores.hpp b/barretenberg/cpp/src/barretenberg/world_state/world_state_stores.hpp index 380b90698abf..31c924d0a0cb 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state_stores.hpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state_stores.hpp @@ -36,10 +36,18 @@ struct WorldStateStores { , messageStore(std::move(other.messageStore)) {} + auto begin() const { return stores.begin(); } + auto end() const { return stores.end(); } + WorldStateStores(const WorldStateStores& other) = delete; ~WorldStateStores() = default; WorldStateStores& operator=(WorldStateStores&& other) = delete; WorldStateStores& operator=(WorldStateStores& other) = delete; + + private: + std::array stores{ + nullifierStore, publicDataStore, archiveStore, noteHashStore, messageStore + }; }; } // namespace bb::world_state \ No newline at end of file diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 5e5cf29f5c4f..c51853f6947f 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -86,6 +86,7 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { private l1BlockNumber: bigint | undefined; private l1Timestamp: bigint | undefined; + private initialSyncComplete: boolean = false; public readonly tracer: Tracer; @@ -280,6 +281,7 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { // but the corresponding blocks have not been processed (see #12631). this.l1Timestamp = currentL1Timestamp; this.l1BlockNumber = currentL1BlockNumber; + this.initialSyncComplete = true; if (initialRun) { this.log.info(`Initial archiver sync to L1 block ${currentL1BlockNumber} complete.`, { @@ -293,7 +295,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { /** Queries the rollup contract on whether a prune can be executed on the immediatenext L1 block. */ private async canPrune(currentL1BlockNumber: bigint, currentL1Timestamp: bigint) { const time = (currentL1Timestamp ?? 0n) + BigInt(this.l1constants.ethereumSlotDuration); - return await this.rollup.canPruneAtTime(time, { blockNumber: currentL1BlockNumber }); + const result = await this.rollup.canPruneAtTime(time, { blockNumber: currentL1BlockNumber }); + if (result) { + this.log.debug(`Rollup contract allows pruning at L1 block ${currentL1BlockNumber} time ${time}`, { + currentL1Timestamp, + pruneTime: time, + currentL1BlockNumber, + }); + } + return result; } /** Checks if there'd be a reorg for the next block submission and start pruning now. */ @@ -542,6 +552,18 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { return { provenBlockNumber }; } + /** Resumes the archiver after a stop. */ + public resume() { + if (!this.runningPromise) { + throw new Error(`Archiver was never started`); + } + if (this.runningPromise.isRunning()) { + this.log.warn(`Archiver already running`); + } + this.log.info(`Restarting archiver`); + this.runningPromise.start(); + } + /** * Stops the archiver. * @returns A promise signalling completion of the stop process. @@ -554,6 +576,10 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { return Promise.resolve(); } + public backupTo(destPath: string): Promise { + return this.dataStore.backupTo(destPath); + } + public getL1Constants(): Promise { return Promise.resolve(this.l1constants); } @@ -654,6 +680,11 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { return l1Timestamp + leeway >= endTimestamp; } + /** Returns whether the archiver has completed an initial sync run successfully. */ + public isInitialSyncComplete(): boolean { + return this.initialSyncComplete; + } + /** * Gets up to `limit` amount of L2 blocks starting from `from`. * @param from - Number of the first block to return (inclusive). @@ -874,6 +905,8 @@ class ArchiverStoreHelper | 'addContractInstanceUpdates' | 'deleteContractInstanceUpdates' | 'addFunctions' + | 'backupTo' + | 'close' > { #log = createLogger('archiver:block-helper'); diff --git a/yarn-project/archiver/src/archiver/archiver_store.ts b/yarn-project/archiver/src/archiver/archiver_store.ts index ed5c3b287908..404f3cd4d64e 100644 --- a/yarn-project/archiver/src/archiver/archiver_store.ts +++ b/yarn-project/archiver/src/archiver/archiver_store.ts @@ -244,4 +244,10 @@ export interface ArchiverDataStore { * Estimates the size of the store in bytes. */ estimateSize(): Promise<{ mappingSize: number; actualSize: number; numItems: number }>; + + /** Backups the archiver db to the target folder. Returns the path to the db file. */ + backupTo(path: string): Promise; + + /** Closes the underlying data store. */ + close(): Promise; } diff --git a/yarn-project/archiver/src/archiver/index.ts b/yarn-project/archiver/src/archiver/index.ts index 72515a0441b2..9cb0d7dc1a94 100644 --- a/yarn-project/archiver/src/archiver/index.ts +++ b/yarn-project/archiver/src/archiver/index.ts @@ -2,5 +2,5 @@ export * from './archiver.js'; export * from './config.js'; export { type PublishedL2Block, type L1PublishedData } from './structs/published.js'; export type { ArchiverDataStore } from './archiver_store.js'; -export { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js'; +export { KVArchiverDataStore, ARCHIVER_DB_VERSION } from './kv_archiver_store/kv_archiver_store.js'; export { ContractInstanceStore } from './kv_archiver_store/contract_instance_store.js'; diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts index e995880bfb1e..a993751b8482 100644 --- a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts @@ -17,6 +17,8 @@ import { type LogFilter, PrivateLog, type TxScopedL2Log } from '@aztec/stdlib/lo import type { InboxLeaf } from '@aztec/stdlib/messaging'; import type { BlockHeader, TxHash, TxReceipt } from '@aztec/stdlib/tx'; +import { join } from 'path'; + import type { ArchiverDataStore, ArchiverL1SynchPoint } from '../archiver_store.js'; import type { DataRetrieval } from '../structs/data_retrieval.js'; import type { PublishedL2Block } from '../structs/published.js'; @@ -26,11 +28,13 @@ import { ContractInstanceStore } from './contract_instance_store.js'; import { LogStore } from './log_store.js'; import { MessageStore } from './message_store.js'; +export const ARCHIVER_DB_VERSION = 1; + /** * LMDB implementation of the ArchiverDataStore interface. */ export class KVArchiverDataStore implements ArchiverDataStore { - public static readonly SCHEMA_VERSION = 1; + public static readonly SCHEMA_VERSION = ARCHIVER_DB_VERSION; #blockStore: BlockStore; #logStore: LogStore; @@ -49,6 +53,15 @@ export class KVArchiverDataStore implements ArchiverDataStore { this.#contractInstanceStore = new ContractInstanceStore(db); } + public async backupTo(path: string, compress = true): Promise { + await this.db.backupTo(path, compress); + return join(path, 'data.mdb'); + } + + public close() { + return this.db.close(); + } + // TODO: These function names are in memory only as they are for development/debugging. They require the full contract // artifact supplied to the node out of band. This should be reviewed and potentially removed as part of // the node api cleanup process. diff --git a/yarn-project/archiver/src/factory.ts b/yarn-project/archiver/src/factory.ts index 39abdbcbc28f..4234f40f5aac 100644 --- a/yarn-project/archiver/src/factory.ts +++ b/yarn-project/archiver/src/factory.ts @@ -14,9 +14,23 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien import { Archiver } from './archiver/archiver.js'; import type { ArchiverConfig } from './archiver/config.js'; -import { KVArchiverDataStore } from './archiver/index.js'; +import { ARCHIVER_DB_VERSION, KVArchiverDataStore } from './archiver/kv_archiver_store/kv_archiver_store.js'; import { createArchiverClient } from './rpc/index.js'; +export const ARCHIVER_STORE_NAME = 'archiver'; + +/** Creates an archiver store. */ +export async function createArchiverStore( + userConfig: Pick & DataStoreConfig, +) { + const config = { + ...userConfig, + dataStoreMapSizeKB: userConfig.archiverStoreMapSizeKb ?? userConfig.dataStoreMapSizeKB, + }; + const store = await createStore(ARCHIVER_STORE_NAME, ARCHIVER_DB_VERSION, config, createLogger('archiver:lmdb')); + return new KVArchiverDataStore(store, config.maxLogs); +} + /** * Creates a local archiver. * @param config - The archiver configuration. @@ -26,19 +40,12 @@ import { createArchiverClient } from './rpc/index.js'; * @returns The local archiver. */ export async function createArchiver( - _config: ArchiverConfig & DataStoreConfig, + config: ArchiverConfig & DataStoreConfig, blobSinkClient: BlobSinkClientInterface, opts: { blockUntilSync: boolean } = { blockUntilSync: true }, telemetry: TelemetryClient = getTelemetryClient(), ): Promise { - const config = { ..._config, dataStoreMapSizeKB: _config.archiverStoreMapSizeKb ?? _config.dataStoreMapSizeKB }; - const store = await createStore( - 'archiver', - KVArchiverDataStore.SCHEMA_VERSION, - config, - createLogger('archiver:lmdb'), - ); - const archiverStore = new KVArchiverDataStore(store, config.maxLogs); + const archiverStore = await createArchiverStore(config); await registerProtocolContracts(archiverStore); return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync); } diff --git a/yarn-project/aztec-node/src/actions/snapshot-sync.ts b/yarn-project/aztec-node/src/actions/snapshot-sync.ts new file mode 100644 index 000000000000..18178adf7749 --- /dev/null +++ b/yarn-project/aztec-node/src/actions/snapshot-sync.ts @@ -0,0 +1,200 @@ +import { + ARCHIVER_DB_VERSION, + ARCHIVER_STORE_NAME, + type ArchiverConfig, + type ArchiverDataStore, + createArchiverStore, +} from '@aztec/archiver'; +import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; +import { type EthereumClientConfig, type L1ContractAddresses, getPublicClient } from '@aztec/ethereum'; +import type { EthAddress } from '@aztec/foundation/eth-address'; +import { tryRmDir } from '@aztec/foundation/fs'; +import type { Logger } from '@aztec/foundation/log'; +import type { DataStoreConfig } from '@aztec/kv-store/config'; +import { DatabaseVersionManager } from '@aztec/stdlib/database-version'; +import { type ReadOnlyFileStore, createReadOnlyFileStore } from '@aztec/stdlib/file-store'; +import { + type SnapshotMetadata, + type SnapshotsIndexMetadata, + downloadSnapshot, + getLatestSnapshotMetadata, + makeSnapshotLocalPaths, +} from '@aztec/stdlib/snapshots'; +import { NATIVE_WORLD_STATE_DBS, WORLD_STATE_DB_VERSION, WORLD_STATE_DIR } from '@aztec/world-state'; + +import { mkdir, mkdtemp, rename } from 'fs/promises'; +import { join } from 'path'; + +import type { AztecNodeConfig } from '../aztec-node/config.js'; + +// Half day worth of L1 blocks +const MIN_L1_BLOCKS_TO_TRIGGER_REPLACE = 86400 / 2 / 12; + +type SnapshotSyncConfig = Pick< + AztecNodeConfig, + 'syncMode' | 'snapshotsUrl' | 'l1ChainId' | 'version' | 'dataDirectory' +> & + Pick & + DataStoreConfig & + EthereumClientConfig & { + l1Contracts: Pick; + minL1BlocksToTriggerReplace?: number; + }; + +export async function trySnapshotSync(config: SnapshotSyncConfig, log: Logger) { + let archiverStore: ArchiverDataStore | undefined; + let downloadDir: string | undefined; + + try { + const { syncMode, snapshotsUrl, dataDirectory, l1ChainId, version: l2Version, l1Contracts } = config; + if (syncMode === 'full') { + log.debug('Snapshot sync is disabled. Running full sync.', { syncMode: syncMode }); + return false; + } + + if (!snapshotsUrl) { + log.verbose('Snapshot sync is disabled. No snapshots URL provided.'); + return false; + } + + if (!dataDirectory) { + log.verbose('Snapshot sync is disabled. No local data directory defined.'); + return false; + } + + let fileStore: ReadOnlyFileStore; + try { + fileStore = await createReadOnlyFileStore(snapshotsUrl, log); + } catch (err) { + log.error(`Invalid config for downloading snapshots`, err); + return false; + } + + // Create an archiver store to check the current sync state + archiverStore = await createArchiverStore(config); + + const minL1BlocksToTriggerReplace = config.minL1BlocksToTriggerReplace ?? MIN_L1_BLOCKS_TO_TRIGGER_REPLACE; + const archiverL2BlockNumber = await archiverStore.getSynchedL2BlockNumber(); + if ( + syncMode === 'snapshot' && + archiverL2BlockNumber !== undefined && + archiverL2BlockNumber >= INITIAL_L2_BLOCK_NUM + ) { + log.verbose( + `Skipping non-forced snapshot sync as archiver is already synced to L2 block ${archiverL2BlockNumber}.`, + ); + return false; + } + + const currentL1BlockNumber = await getPublicClient(config).getBlockNumber(); + const archiverL1BlockNumber = await archiverStore.getSynchPoint().then(s => s.blocksSynchedTo); + if (archiverL1BlockNumber && currentL1BlockNumber - archiverL1BlockNumber < minL1BlocksToTriggerReplace) { + log.verbose( + `Skipping snapshot sync as archiver is less than ${ + currentL1BlockNumber - archiverL1BlockNumber + } L1 blocks behind.`, + { archiverL1BlockNumber, currentL1BlockNumber, minL1BlocksToTriggerReplace }, + ); + return false; + } + + const indexMetadata: SnapshotsIndexMetadata = { l1ChainId, l2Version, rollupAddress: l1Contracts.rollupAddress }; + let snapshot: SnapshotMetadata | undefined; + try { + snapshot = await getLatestSnapshotMetadata(indexMetadata, fileStore); + } catch (err) { + log.error(`Failed to get latest snapshot metadata. Skipping snapshot sync.`, err, { + ...indexMetadata, + snapshotsUrl, + }); + return false; + } + + if (!snapshot) { + log.verbose(`No snapshot found. Skipping snapshot sync.`, { ...indexMetadata, snapshotsUrl }); + return false; + } + + if (snapshot.schemaVersions.archiver !== ARCHIVER_DB_VERSION) { + log.warn( + `Skipping snapshot sync as last snapshot has schema version ${snapshot.schemaVersions.archiver} but expected ${ARCHIVER_DB_VERSION}.`, + snapshot, + ); + return false; + } + + if (snapshot.schemaVersions.worldState !== WORLD_STATE_DB_VERSION) { + log.warn( + `Skipping snapshot sync as last snapshot has world state schema version ${snapshot.schemaVersions.worldState} but we expected ${WORLD_STATE_DB_VERSION}.`, + snapshot, + ); + return false; + } + + if (archiverL1BlockNumber && snapshot.l1BlockNumber < archiverL1BlockNumber) { + log.verbose( + `Skipping snapshot sync since local archiver is at L1 block ${archiverL1BlockNumber} which is further than last snapshot at ${snapshot.l1BlockNumber}`, + { snapshot, archiverL1BlockNumber }, + ); + return false; + } + + if (archiverL1BlockNumber && snapshot.l1BlockNumber - Number(archiverL1BlockNumber) < minL1BlocksToTriggerReplace) { + log.verbose( + `Skipping snapshot sync as archiver is less than ${ + snapshot.l1BlockNumber - Number(archiverL1BlockNumber) + } L1 blocks behind latest snapshot.`, + { snapshot, archiverL1BlockNumber }, + ); + return false; + } + + // Green light. Download the snapshot to a temp location. + downloadDir = await mkdtemp(join(dataDirectory, 'download-')); + const downloadPaths = makeSnapshotLocalPaths(downloadDir); + log.info( + `Downloading snapshot at L1 block ${snapshot.l1BlockNumber} L2 block ${snapshot.l2BlockNumber} from ${snapshotsUrl} to ${downloadDir} for snapshot sync`, + { snapshot, downloadPaths }, + ); + await downloadSnapshot(snapshot, downloadPaths, fileStore); + log.info(`Snapshot downloaded at ${downloadDir}`, { snapshotsUrl, snapshot, downloadPaths }); + + // If download was successful, close the archiver store, clear lock and version, and move download there + await archiverStore.close(); + archiverStore = undefined; + const archiverPath = join(dataDirectory, ARCHIVER_STORE_NAME); + await prepareTarget(archiverPath, ARCHIVER_DB_VERSION, l1Contracts.rollupAddress); + await rename(downloadPaths.archiver, join(archiverPath, 'data.mdb')); + log.info(`Archiver database set up from snapshot`, { path: archiverPath }); + + // Same for the world state dbs, only that we do not close them, since we assume they are not yet in use + const worldStateBasePath = join(dataDirectory, WORLD_STATE_DIR); + await prepareTarget(worldStateBasePath, WORLD_STATE_DB_VERSION, l1Contracts.rollupAddress); + for (const [name, dir] of NATIVE_WORLD_STATE_DBS) { + const path = join(worldStateBasePath, dir); + await mkdir(path, { recursive: true }); + await rename(downloadPaths[name], join(path, 'data.mdb')); + log.info(`World state database ${name} set up from snapshot`, { path }); + } + + log.info(`Snapshot synced to L1 block ${snapshot.l1BlockNumber} L2 block ${snapshot.l2BlockNumber}`, { snapshot }); + } finally { + if (archiverStore) { + log.verbose(`Closing temporary archiver data store`); + await archiverStore.close(); + } + if (downloadDir) { + await tryRmDir(downloadDir, log); + } + } + + return true; +} + +/** Deletes target dir and writes the new version file. */ +async function prepareTarget(target: string, schemaVersion: number, rollupAddress: EthAddress) { + const noOpen = () => Promise.resolve(undefined); + const versionManager = new DatabaseVersionManager(schemaVersion, rollupAddress, target, noOpen); + await versionManager.resetDataDirectory(); + await versionManager.writeVersion(); +} diff --git a/yarn-project/aztec-node/src/actions/upload-snapshot.ts b/yarn-project/aztec-node/src/actions/upload-snapshot.ts new file mode 100644 index 000000000000..842d7faf3fd1 --- /dev/null +++ b/yarn-project/aztec-node/src/actions/upload-snapshot.ts @@ -0,0 +1,100 @@ +import { ARCHIVER_DB_VERSION, type Archiver } from '@aztec/archiver'; +import { tryRmDir } from '@aztec/foundation/fs'; +import type { Logger } from '@aztec/foundation/log'; +import { createFileStore } from '@aztec/stdlib/file-store'; +import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { SnapshotDataUrls, UploadSnapshotMetadata } from '@aztec/stdlib/snapshots'; +import { uploadSnapshot as uploadSnapshotToStore } from '@aztec/stdlib/snapshots'; +import { WORLD_STATE_DB_VERSION } from '@aztec/world-state'; + +import { existsSync } from 'fs'; +import { mkdtemp } from 'fs/promises'; +import { tmpdir } from 'os'; +import { join } from 'path'; + +import type { AztecNodeConfig } from '../aztec-node/config.js'; + +/** + * Pauses the archiver and world state sync, creates backups of the archiver and world state lmdb environments, + * and uploads them to the specified location. Location must be a URL supported by our file store (eg `gs://bucketname/path`). + */ +export async function uploadSnapshot( + location: string, + archiver: Archiver, + worldState: WorldStateSynchronizer, + config: Pick, + log: Logger, +) { + const store = await createFileStore(location); + if (!store) { + throw new Error(`Failed to create file store for snapshot upload for location ${location}.`); + } + + const backupDir = await mkdtemp(join(config.dataDirectory ?? tmpdir(), 'snapshot-')); + + try { + const paths = await createBackups(backupDir, archiver, worldState, log); + const versions = { archiver: ARCHIVER_DB_VERSION, worldState: WORLD_STATE_DB_VERSION }; + const metadata = await buildSnapshotMetadata(archiver, config); + log.info(`Uploading snapshot to ${location}`, { snapshot: metadata }); + const snapshot = await uploadSnapshotToStore(paths, versions, metadata, store); + log.info(`Snapshot uploaded successfully`, { snapshot }); + } finally { + log.info(`Cleaning up backup dir ${backupDir}`); + await tryRmDir(backupDir, log); + } +} + +async function buildSnapshotMetadata( + archiver: Archiver, + config: Pick, +): Promise { + const [rollupAddress, l1BlockNumber, { latest }] = await Promise.all([ + archiver.getRollupAddress(), + archiver.getL1BlockNumber(), + archiver.getL2Tips(), + ] as const); + + const { number: l2BlockNumber, hash: l2BlockHash } = latest; + if (!l2BlockHash) { + throw new Error(`Failed to get L2 block hash from archiver.`); + } + + return { + l1ChainId: config.l1ChainId, + l2Version: config.version, + rollupAddress, + l2BlockNumber, + l2BlockHash, + l1BlockNumber: Number(l1BlockNumber), + }; +} + +async function createBackups(backupDir: string, archiver: Archiver, worldState: WorldStateSynchronizer, log: Logger) { + try { + log.info(`Pausing archiver and world state sync to start snapshot upload`); + await archiver.stop(); + await worldState.stopSync(); + + log.info(`Creating backups of lmdb environments to ${backupDir}`); + const [archiverPath, worldStatePaths] = await Promise.all([ + archiver.backupTo(join(backupDir, 'archiver')), + worldState.backupTo(join(backupDir, 'world-state')), + ]); + const paths: SnapshotDataUrls = { ...worldStatePaths, archiver: archiverPath }; + + const missing = Object.entries(paths).filter(([_key, path]) => !path || !existsSync(path)); + if (missing.length > 0) { + throw new Error(`Missing backup files: ${missing.map(([key, path]) => `${path} (${key})`).join(', ')}`); + } + + log.info(`Data stores backed up to ${backupDir}`, { paths }); + return paths; + } catch (err) { + throw new Error(`Error creating backups for snapshot upload: ${err}`); + } finally { + log.info(`Resuming archiver and world state sync`); + worldState.resumeSync(); + archiver.resume(); + } +} diff --git a/yarn-project/aztec-node/src/aztec-node/config.ts b/yarn-project/aztec-node/src/aztec-node/config.ts index 8d880b01f56e..927f62cb6552 100644 --- a/yarn-project/aztec-node/src/aztec-node/config.ts +++ b/yarn-project/aztec-node/src/aztec-node/config.ts @@ -34,8 +34,12 @@ export type AztecNodeConfig = ArchiverConfig & testAccounts: boolean; /** Whether to populate the genesis state with initial fee juice for the sponsored FPC */ sponsoredFPC: boolean; - } & { + /** L1 contracts addresses */ l1Contracts: L1ContractAddresses; + /** Sync mode: full to always sync via L1, snapshot to download a snapshot if there is no local data, force-snapshot to download even if there is local data. */ + syncMode: 'full' | 'snapshot' | 'force-snapshot'; + /** Base URL for snapshots index. Index file will be searched at `SNAPSHOTS_BASE_URL/aztec-L1_CHAIN_ID-VERSION-ROLLUP_ADDRESS/index.json` */ + snapshotsUrl?: string; }; export const aztecNodeConfigMappings: ConfigMappingsType = { @@ -66,6 +70,16 @@ export const aztecNodeConfigMappings: ConfigMappingsType = { description: 'Whether to populate the genesis state with initial fee juice for the sponsored FPC.', ...booleanConfigHelper(false), }, + syncMode: { + env: 'SYNC_MODE', + description: + 'Set sync mode to `full` to always sync via L1, `snapshot` to download a snapshot if there is no local data, `force-snapshot` to download even if there is local data.', + defaultValue: 'snapshot', + }, + snapshotsUrl: { + env: 'SYNC_SNAPSHOTS_URL', + description: 'Base URL for snapshots index.', + }, }; /** diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index fc4a3a463e5e..472f1a2ad9cf 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -1,4 +1,4 @@ -import { createArchiver } from '@aztec/archiver'; +import { Archiver, createArchiver } from '@aztec/archiver'; import { BBCircuitVerifier, TestCircuitVerifier } from '@aztec/bb-prover'; import { type BlobSinkClientInterface, createBlobSinkClient } from '@aztec/blob-sink/client'; import { @@ -86,6 +86,8 @@ import { import { createValidatorClient } from '@aztec/validator-client'; import { createWorldStateSynchronizer } from '@aztec/world-state'; +import { trySnapshotSync } from '../actions/snapshot-sync.js'; +import { uploadSnapshot } from '../actions/upload-snapshot.js'; import { createSentinel } from '../sentinel/factory.js'; import { Sentinel } from '../sentinel/sentinel.js'; import { type AztecNodeConfig, getPackageVersion } from './config.js'; @@ -98,6 +100,9 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { private packageVersion: string; private metrics: NodeMetrics; + // Prevent two snapshot operations to happen simultaneously + private isUploadingSnapshot = false; + // Serial queue to ensure that we only send one tx at a time private txQueue: SerialQueue = new SerialQueue(); @@ -161,13 +166,17 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { const dateProvider = deps.dateProvider ?? new DateProvider(); const blobSinkClient = deps.blobSinkClient ?? createBlobSinkClient(config); const ethereumChain = createEthereumChain(config.l1RpcUrls, config.l1ChainId); - //validate that the actual chain id matches that specified in configuration + + // validate that the actual chain id matches that specified in configuration if (config.l1ChainId !== ethereumChain.chainInfo.id) { throw new Error( `RPC URL configured for chain id ${ethereumChain.chainInfo.id} but expected id ${config.l1ChainId}`, ); } + // attempt fast sync if needed + await trySnapshotSync(config, log); + const archiver = await createArchiver(config, blobSinkClient, { blockUntilSync: true }, telemetry); // now create the merkle trees and the world state synchronizer @@ -991,6 +1000,43 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { return this.validatorsSentinel?.computeStats() ?? Promise.resolve({ stats: {}, slotWindow: 0 }); } + public async startSnapshotUpload(location: string): Promise { + // Note that we are forcefully casting the blocksource as an archiver + // We break support for archiver running remotely to the node + const archiver = this.blockSource as Archiver; + if (!('backupTo' in archiver)) { + throw new Error('Archiver implementation does not support backups. Cannot generate snapshot.'); + } + + // Test that the archiver has done an initial sync. + if (!archiver.isInitialSyncComplete()) { + throw new Error(`Archiver initial sync not complete. Cannot start snapshot.`); + } + + // And it has an L2 block hash + const l2BlockHash = await archiver.getL2Tips().then(tips => tips.latest.hash); + if (!l2BlockHash) { + throw new Error(`Archiver has no latest L2 block hash downloaded. Cannot start snapshot.`); + } + + if (this.isUploadingSnapshot) { + throw new Error(`Snapshot upload already in progress. Cannot start another one until complete.`); + } + + // Do not wait for the upload to be complete to return to the caller, but flag that an operation is in progress + this.isUploadingSnapshot = true; + void uploadSnapshot(location, this.blockSource as Archiver, this.worldStateSynchronizer, this.config, this.log) + .then(() => { + this.isUploadingSnapshot = false; + }) + .catch(err => { + this.isUploadingSnapshot = false; + this.log.error(`Error uploading snapshot: ${err}`); + }); + + return Promise.resolve(); + } + /** * Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched * @param blockNumber - The block number at which to get the data. diff --git a/yarn-project/aztec/src/cli/aztec_start_options.ts b/yarn-project/aztec/src/cli/aztec_start_options.ts index ee7e435252f2..8cc9abf11e57 100644 --- a/yarn-project/aztec/src/cli/aztec_start_options.ts +++ b/yarn-project/aztec/src/cli/aztec_start_options.ts @@ -273,6 +273,19 @@ export const aztecStartOptions: { [key: string]: AztecStartOption[] } = { envVar: 'TEST_ACCOUNTS', ...booleanConfigHelper(), }, + { + flag: '--node.syncMode ', + description: + 'Set sync mode to `full` to always sync via L1, `snapshot` to download a snapshot if there is no local data, `force-snapshot` to download even if there is local data.', + defaultValue: 'snapshot', + envVar: 'SYNC_MODE', + }, + { + flag: '--node.snapshotsUrl ', + description: 'Base URL for downloading snapshots for fast sync.', + defaultValue: undefined, + envVar: 'SYNC_SNAPSHOTS_URL', + }, ], 'P2P SUBSYSTEM': [ { diff --git a/yarn-project/end-to-end/src/e2e_snapshot_sync.test.ts b/yarn-project/end-to-end/src/e2e_snapshot_sync.test.ts new file mode 100644 index 000000000000..331869f946c9 --- /dev/null +++ b/yarn-project/end-to-end/src/e2e_snapshot_sync.test.ts @@ -0,0 +1,97 @@ +import { type AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node'; +import { type AztecNode, type Logger, MerkleTreeId, retryUntil } from '@aztec/aztec.js'; +import { RollupContract } from '@aztec/ethereum'; +import { ChainMonitor } from '@aztec/ethereum/test'; +import { randomBytes } from '@aztec/foundation/crypto'; +import { tryRmDir } from '@aztec/foundation/fs'; +import { withLogNameSuffix } from '@aztec/foundation/log'; + +import { mkdtemp, readdir } from 'fs/promises'; +import { tmpdir } from 'os'; +import { join } from 'path'; + +import { type EndToEndContext, setup } from './fixtures/utils.js'; + +const L1_BLOCK_TIME_IN_S = process.env.L1_BLOCK_TIME ? parseInt(process.env.L1_BLOCK_TIME) : 8; +const L2_TARGET_BLOCK_NUM = 3; + +describe('e2e_snapshot_sync', () => { + let context: EndToEndContext; + let monitor: ChainMonitor; + let log: Logger; + let snapshotDir: string; + let snapshotLocation: string; + + beforeAll(async () => { + context = await setup(0, { + minTxsPerBlock: 0, + ethereumSlotDuration: L1_BLOCK_TIME_IN_S, + aztecSlotDuration: L1_BLOCK_TIME_IN_S * 2, + aztecEpochDuration: 64, + startProverNode: false, + realProofs: false, + skipProtocolContracts: true, + salt: 1, + }); + + log = context.logger; + snapshotDir = await mkdtemp(join(tmpdir(), 'snapshots-')); + snapshotLocation = `file://${snapshotDir}`; + monitor = new ChainMonitor(RollupContract.getFromConfig(context.config), log).start(); + }); + + afterAll(async () => { + monitor.stop(); + await context.teardown(); + await tryRmDir(snapshotDir, log); + }); + + // Adapted from epochs-test + const createNonValidatorNode = async (suffix: string, config: Partial = {}) => { + log.warn('Creating and syncing a node without a validator...'); + return await withLogNameSuffix(suffix, () => + AztecNodeService.createAndSync({ + ...context.config, + disableValidator: true, + dataDirectory: join(context.config.dataDirectory!, randomBytes(8).toString('hex')), + ...config, + }), + ); + }; + + it('waits until a few L2 blocks have been mined', async () => { + log.warn(`Waiting for L2 blocks to be mined`); + await retryUntil(() => monitor.l2BlockNumber > L2_TARGET_BLOCK_NUM, 'l2-blocks-mined', 90, 1); + log.warn(`L2 block height is now ${monitor.l2BlockNumber}`); + }); + + it('creates a snapshot', async () => { + log.warn(`Creating snapshot`); + await context.aztecNodeAdmin!.startSnapshotUpload(snapshotLocation); + await retryUntil(() => readdir(snapshotDir).then(files => files.length > 0), 'snapshot-created', 90, 1); + log.warn(`Snapshot created`); + }); + + it('downloads snapshot from new node', async () => { + log.warn(`Syncing brand new node with snapshot sync`); + const node = await createNonValidatorNode('1', { + blobSinkUrl: undefined, // set no blob sink so it cannot sync on its own + snapshotsUrl: snapshotLocation, + syncMode: 'snapshot', + }); + + log.warn(`New node synced`); + const block = await node.getBlock(L2_TARGET_BLOCK_NUM); + expect(block).toBeDefined(); + const blockHash = await block!.hash(); + + log.warn(`Checking for L2 block ${L2_TARGET_BLOCK_NUM} with hash ${blockHash} on both nodes`); + const getBlockHashLeafIndex = (node: AztecNode) => + node.findLeavesIndexes(L2_TARGET_BLOCK_NUM, MerkleTreeId.ARCHIVE, [blockHash]).then(([i]) => i); + expect(await getBlockHashLeafIndex(context.aztecNode)).toBeDefined(); + expect(await getBlockHashLeafIndex(node)).toBeDefined(); + + log.warn(`Stopping new node`); + await node.stop(); + }); +}); diff --git a/yarn-project/ethereum/src/client.ts b/yarn-project/ethereum/src/client.ts index 1b03e6d8a78b..95618493bccb 100644 --- a/yarn-project/ethereum/src/client.ts +++ b/yarn-project/ethereum/src/client.ts @@ -15,6 +15,8 @@ type Config = { viemPollingIntervalMS?: number; }; +export type { Config as EthereumClientConfig }; + // TODO: Use these methods to abstract the creation of viem clients. /** Returns a viem public client given the L1 config. */ diff --git a/yarn-project/foundation/src/collection/object.ts b/yarn-project/foundation/src/collection/object.ts index 0eb02b31fb2a..402d3a1eee06 100644 --- a/yarn-project/foundation/src/collection/object.ts +++ b/yarn-project/foundation/src/collection/object.ts @@ -50,3 +50,17 @@ export function omit(object: T, ...props: string[]): Partial>(obj: T): { [K in keyof T]: [K, T[K]] }[keyof T][] { + // See https://stackoverflow.com/a/76176570 + return Object.entries(obj) as { [K in keyof T]: [K, T[K]] }[keyof T][]; +} + +/** Equivalent to Object.fromEntries but preserves types. */ +export function fromEntries>( + entries: T, +): { [K in T[number] as K[0]]: K[1] } { + // See https://stackoverflow.com/a/76176570 + return Object.fromEntries(entries) as { [K in T[number] as K[0]]: K[1] }; +} diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 017a797d4530..6040e74639e9 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -164,6 +164,8 @@ export type EnvVar = | 'SEQ_MAX_L1_TX_INCLUSION_TIME_INTO_SLOT' | 'SLASH_FACTORY_CONTRACT_ADDRESS' | 'STAKING_ASSET_CONTRACT_ADDRESS' + | 'SYNC_MODE' + | 'SYNC_SNAPSHOTS_URL' | 'REWARD_DISTRIBUTOR_CONTRACT_ADDRESS' | 'TELEMETRY' | 'TEST_ACCOUNTS' diff --git a/yarn-project/foundation/src/fs/index.ts b/yarn-project/foundation/src/fs/index.ts index 979586f57f60..e3bcc9c26b8b 100644 --- a/yarn-project/foundation/src/fs/index.ts +++ b/yarn-project/foundation/src/fs/index.ts @@ -1 +1,2 @@ export * from './run_in_dir.js'; +export * from './try_rm_dir.js'; diff --git a/yarn-project/foundation/src/fs/try_rm_dir.ts b/yarn-project/foundation/src/fs/try_rm_dir.ts new file mode 100644 index 000000000000..531915ddaae5 --- /dev/null +++ b/yarn-project/foundation/src/fs/try_rm_dir.ts @@ -0,0 +1,15 @@ +import { rm } from 'fs/promises'; + +import type { Logger } from '../log/index.js'; + +export async function tryRmDir(dir: string | undefined, logger?: Logger): Promise { + if (dir === undefined) { + return; + } + try { + logger?.debug(`Cleaning up directory at ${dir}`); + await rm(dir, { recursive: true, force: true, maxRetries: 3 }); + } catch (err) { + logger?.warn(`Failed to delete directory at ${dir}: ${err}`); + } +} diff --git a/yarn-project/kv-store/src/indexeddb/store.test.ts b/yarn-project/kv-store/src/indexeddb/store.test.ts deleted file mode 100644 index d2f529b62215..000000000000 --- a/yarn-project/kv-store/src/indexeddb/store.test.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { describeAztecStore } from '../interfaces/store_test_suite.js'; -import { mockLogger } from '../interfaces/utils.js'; -import { AztecIndexedDBStore } from './store.js'; - -describe('AztecIndexedDBStore', () => { - describeAztecStore( - 'AztecStore', - async () => await AztecIndexedDBStore.open(mockLogger, 'test', false), - async () => await AztecIndexedDBStore.open(mockLogger, undefined, false), - async () => await AztecIndexedDBStore.open(mockLogger, undefined, true), - ); -}); diff --git a/yarn-project/kv-store/src/indexeddb/store.ts b/yarn-project/kv-store/src/indexeddb/store.ts index d0d895357cd9..d4a8f50dfc8d 100644 --- a/yarn-project/kv-store/src/indexeddb/store.ts +++ b/yarn-project/kv-store/src/indexeddb/store.ts @@ -74,29 +74,6 @@ export class AztecIndexedDBStore implements AztecAsyncKVStore { return kvStore; } - /** - * Forks the current DB into a new DB by backing it up to a temporary location and opening a new indexedb. - * @returns A new AztecIndexedDBStore. - */ - async fork(): Promise { - const forkedStore = await AztecIndexedDBStore.open(this.#log, undefined, true); - this.#log.verbose(`Forking store to ${forkedStore.#name}`); - - // Copy old data to new store - const oldData = this.#rootDB.transaction('data').store; - const dataToWrite = []; - for await (const cursor of oldData.iterate()) { - dataToWrite.push(cursor.value); - } - const tx = forkedStore.#rootDB.transaction('data', 'readwrite').store; - for (const data of dataToWrite) { - await tx.add(data); - } - - this.#log.debug(`Forked store at ${forkedStore.#name} opened successfully`); - return forkedStore; - } - /** * Creates a new AztecMap in the store. * @param name - Name of the map @@ -200,4 +177,8 @@ export class AztecIndexedDBStore implements AztecAsyncKVStore { close(): Promise { return Promise.resolve(); } + + backupTo(_dstPath: string, _compact?: boolean): Promise { + throw new Error('Method not implemented.'); + } } diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index 8f35d33e1947..0733b48f01e5 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -61,11 +61,6 @@ export interface AztecKVStore { */ clear(): Promise; - /** - * Forks the store. - */ - fork(): Promise; - /** * Deletes the store */ @@ -130,28 +125,18 @@ export interface AztecAsyncKVStore { */ transactionAsync>>(callback: () => Promise): Promise; - /** - * Clears all entries in the store - */ + /** Clears all entries in the store */ clear(): Promise; - /** - * Forks the store. - */ - fork(): Promise; - - /** - * Deletes the store - */ + /** Deletes the store */ delete(): Promise; - /** - * Estimates the size of the store in bytes. - */ + /** Estimates the size of the store in bytes. */ estimateSize(): Promise; - /** - * Closes the store - */ + /** Closes the store */ close(): Promise; + + /** Backups the store to the target folder.*/ + backupTo(dstPath: string, compact?: boolean): Promise; } diff --git a/yarn-project/kv-store/src/interfaces/store_test_suite.ts b/yarn-project/kv-store/src/interfaces/store_test_suite.ts deleted file mode 100644 index 3e92053d507b..000000000000 --- a/yarn-project/kv-store/src/interfaces/store_test_suite.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { expect } from 'chai'; - -import type { AztecAsyncSingleton, AztecSingleton } from './singleton.js'; -import type { AztecAsyncKVStore, AztecKVStore } from './store.js'; -import { isSyncStore } from './utils.js'; - -export function describeAztecStore( - testName: string, - getPersistentStore: () => Promise, - getPersistentNoPathStore: () => Promise, - getEphemeralStore: () => Promise, -) { - describe(testName, () => { - async function get( - store: AztecKVStore | AztecAsyncKVStore, - singleton: AztecSingleton | AztecAsyncSingleton, - ) { - return isSyncStore(store) - ? (singleton as AztecSingleton).get() - : await (singleton as AztecAsyncSingleton).getAsync(); - } - - const itForks = async (store: AztecKVStore | AztecAsyncKVStore) => { - const singleton = store.openSingleton('singleton'); - await singleton.set('foo'); - - const forkedStore = await store.fork(); - const forkedSingleton = forkedStore.openSingleton('singleton'); - expect(await get(store, singleton)).to.equal('foo'); - await forkedSingleton.set('bar'); - expect(await get(store, singleton)).to.equal('foo'); - expect(await get(forkedStore, forkedSingleton)).to.equal('bar'); - await forkedSingleton.delete(); - expect(await get(store, singleton)).to.equal('foo'); - await forkedStore.delete(); - }; - - it('forks a persistent store', async () => { - const store = await getPersistentStore(); - await itForks(store); - await store.delete(); - }); - - it('forks a persistent store with no path', async () => { - const store = await getPersistentNoPathStore(); - await itForks(store); - await store.delete(); - }); - - it('forks an ephemeral store', async () => { - const store = await getEphemeralStore(); - await itForks(store); - await store.delete(); - }); - }); -} diff --git a/yarn-project/kv-store/src/lmdb-v2/message.ts b/yarn-project/kv-store/src/lmdb-v2/message.ts index 34875e56f316..2b599eba35f5 100644 --- a/yarn-project/kv-store/src/lmdb-v2/message.ts +++ b/yarn-project/kv-store/src/lmdb-v2/message.ts @@ -19,6 +19,7 @@ export enum LMDBMessageType { STATS, CLOSE, + COPY_STORE, } type Key = Uint8Array; @@ -63,6 +64,11 @@ interface CloseCursorRequest { cursor: number; } +interface CopyStoreRequest { + dstPath: string; + compact: boolean; +} + export interface Batch { addEntries: Array; removeEntries: Array; @@ -87,6 +93,7 @@ export type LMDBRequestBody = { [LMDBMessageType.STATS]: void; [LMDBMessageType.CLOSE]: void; + [LMDBMessageType.COPY_STORE]: CopyStoreRequest; }; interface GetResponse { @@ -139,6 +146,8 @@ export type LMDBResponseBody = { [LMDBMessageType.STATS]: StatsResponse; [LMDBMessageType.CLOSE]: BoolResponse; + + [LMDBMessageType.COPY_STORE]: BoolResponse; }; export interface LMDBMessageChannel { diff --git a/yarn-project/kv-store/src/lmdb-v2/store.test.ts b/yarn-project/kv-store/src/lmdb-v2/store.test.ts index 88e65271d9a7..57e4e1ed6599 100644 --- a/yarn-project/kv-store/src/lmdb-v2/store.test.ts +++ b/yarn-project/kv-store/src/lmdb-v2/store.test.ts @@ -2,9 +2,12 @@ import { promiseWithResolvers } from '@aztec/foundation/promise'; import { sleep } from '@aztec/foundation/sleep'; import { expect } from 'chai'; +import { mkdtemp } from 'fs/promises'; +import { tmpdir } from 'os'; +import { join } from 'path'; import { stub } from 'sinon'; -import { openTmpStore } from './factory.js'; +import { openStoreAt, openTmpStore } from './factory.js'; import type { ReadTransaction } from './read_transaction.js'; import type { AztecLMDBStoreV2 } from './store.js'; @@ -178,4 +181,19 @@ describe('AztecLMDBStoreV2', () => { readTx.close(); }); + + it('copies and restores data', async () => { + const key = Buffer.from('foo'); + const value = Buffer.from('bar'); + await store.transactionAsync(tx => tx.set(key, value)); + expect(Buffer.from((await store.getReadTx().get(key))!).toString()).to.eq('bar'); + + const backupDir = await mkdtemp(join(tmpdir(), 'lmdb-store-test-backup')); + await store.backupTo(backupDir, true); + + const store2 = await openStoreAt(backupDir); + expect(Buffer.from((await store2.getReadTx().get(key))!).toString()).to.eq('bar'); + await store2.close(); + await store2.delete(); + }); }); diff --git a/yarn-project/kv-store/src/lmdb-v2/store.ts b/yarn-project/kv-store/src/lmdb-v2/store.ts index 6c17193c54f4..3b2c7030c418 100644 --- a/yarn-project/kv-store/src/lmdb-v2/store.ts +++ b/yarn-project/kv-store/src/lmdb-v2/store.ts @@ -3,7 +3,7 @@ import { Semaphore, SerialQueue } from '@aztec/foundation/queue'; import { MsgpackChannel, NativeLMDBStore } from '@aztec/native'; import { AsyncLocalStorage } from 'async_hooks'; -import { rm } from 'fs/promises'; +import { mkdir, rm } from 'fs/promises'; import type { AztecAsyncArray } from '../interfaces/array.js'; import type { Key, StoreSize } from '../interfaces/common.js'; @@ -82,6 +82,11 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { return db; } + public async backupTo(dstPath: string, compact = true) { + await mkdir(dstPath, { recursive: true }); + await this.channel.sendMessage(LMDBMessageType.COPY_STORE, { dstPath, compact }); + } + public getReadTx(): ReadTransaction { if (!this.open) { throw new Error('Store is closed'); @@ -155,10 +160,6 @@ export class AztecLMDBStoreV2 implements AztecAsyncKVStore, LMDBMessageChannel { return Promise.resolve(); } - fork(): Promise { - throw new Error('Not implemented'); - } - async delete(): Promise { await this.close(); await rm(this.dataDir, { recursive: true, force: true, maxRetries: 3 }); diff --git a/yarn-project/kv-store/src/lmdb/store.test.ts b/yarn-project/kv-store/src/lmdb/store.test.ts deleted file mode 100644 index 1c47ab90a07d..000000000000 --- a/yarn-project/kv-store/src/lmdb/store.test.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { promises as fs } from 'fs'; -import { tmpdir } from 'os'; -import { join } from 'path'; - -import { describeAztecStore } from '../interfaces/store_test_suite.js'; -import { AztecLmdbStore } from './store.js'; - -const defaultMapSize = 1024 * 1024 * 1024 * 10; - -describe('AztecLmdbStore', () => { - describeAztecStore( - 'AztecStore', - async () => { - const path = await fs.mkdtemp(join(tmpdir(), 'aztec-store-test-')); - return AztecLmdbStore.open(path, defaultMapSize, false); - }, - () => Promise.resolve(AztecLmdbStore.open(undefined, defaultMapSize, false)), - () => Promise.resolve(AztecLmdbStore.open(undefined, defaultMapSize, true)), - ); -}); diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index 5a5a85c704b8..fc83245f4640 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -75,21 +75,6 @@ export class AztecLmdbStore implements AztecKVStore, AztecAsyncKVStore { return new AztecLmdbStore(rootDb, ephemeral, dbPath); } - /** - * Forks the current DB into a new DB by backing it up to a temporary location and opening a new lmdb db. - * @returns A new AztecLmdbStore. - */ - async fork() { - const baseDir = this.path; - this.#log.debug(`Forking store with basedir ${baseDir}`); - const forkPath = await fs.mkdtemp(join(baseDir, 'aztec-store-fork-')); - this.#log.verbose(`Forking store to ${forkPath}`); - await this.#rootDb.backup(forkPath, false); - const forkDb = open(forkPath, { noSync: this.isEphemeral }); - this.#log.debug(`Forked store at ${forkPath} opened successfully`); - return new AztecLmdbStore(forkDb, this.isEphemeral, forkPath); - } - /** * Creates a new AztecMap in the store. * @param name - Name of the map @@ -238,4 +223,8 @@ export class AztecLmdbStore implements AztecKVStore, AztecAsyncKVStore { } return { actualSize, numItems }; } + + backupTo(_dstPath: string, _compact?: boolean): Promise { + throw new Error('Method not implemented.'); + } } diff --git a/yarn-project/prover-client/src/proving_broker/proof_store/gcs_proof_store.ts b/yarn-project/prover-client/src/proving_broker/proof_store/gcs_proof_store.ts index c845e97f9d75..f4d37dcb46f2 100644 --- a/yarn-project/prover-client/src/proving_broker/proof_store/gcs_proof_store.ts +++ b/yarn-project/prover-client/src/proving_broker/proof_store/gcs_proof_store.ts @@ -16,6 +16,7 @@ import type { ProofStore } from './proof_store.js'; const INPUTS_PATH = 'inputs'; +// REFACTOR(#13067): Use the stdlib/file-store instead of referencing google-cloud-storage directly. export class GoogleCloudStorageProofStore implements ProofStore { private readonly storage: Storage; diff --git a/yarn-project/simulator/src/public/hinting_db_sources.ts b/yarn-project/simulator/src/public/hinting_db_sources.ts index 797a58dc9503..d65521b25d34 100644 --- a/yarn-project/simulator/src/public/hinting_db_sources.ts +++ b/yarn-project/simulator/src/public/hinting_db_sources.ts @@ -97,7 +97,7 @@ export class HintingPublicTreesDB extends PublicTreesDB { // Getters. public override async getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise> { const path = await super.getSiblingPath(treeId, index); - const key = await this.#getHintKey(treeId); + const key = await this.getHintKey(treeId); this.hints.getSiblingPathHints.push(new AvmGetSiblingPathHint(key, treeId, index, path.toFields())); return Promise.resolve(path); } @@ -120,7 +120,7 @@ export class HintingPublicTreesDB extends PublicTreesDB { )}, ${value}}) returned undefined. Possible wrong tree setup or corrupted state.`, ); } - const key = await this.#getHintKey(treeId); + const key = await this.getHintKey(treeId); this.hints.getPreviousValueIndexHints.push( new AvmGetPreviousValueIndexHint(key, treeId, new Fr(value), result.index, result.alreadyPresent), ); @@ -133,7 +133,7 @@ export class HintingPublicTreesDB extends PublicTreesDB { ): Promise { const preimage = await super.getLeafPreimage(treeId, index); if (preimage) { - const key = await this.#getHintKey(treeId); + const key = await this.getHintKey(treeId); switch (treeId) { case MerkleTreeId.PUBLIC_DATA_TREE: @@ -162,11 +162,11 @@ export class HintingPublicTreesDB extends PublicTreesDB { leaves: Buffer[], ): Promise> { HintingPublicTreesDB.log.debug('sequentialInsert not hinted yet!'); - const beforeState = await this.#getHintKey(treeId); + const beforeState = await this.getHintKey(treeId); const result = await super.sequentialInsert(treeId, leaves); - const afterState = await this.#getHintKey(treeId); + const afterState = await this.getHintKey(treeId); HintingPublicTreesDB.log.debug( `Evolved tree state (${getTreeName(treeId)}): ${beforeState.root}, ${beforeState.nextAvailableLeafIndex} -> ${ afterState.root @@ -182,21 +182,21 @@ export class HintingPublicTreesDB extends PublicTreesDB { // WARNING: is this enough? we might actually need the number of the checkpoint or similar... // We will need to keep a stack of checkpoints on the C++ side. const beforeState = { - [MerkleTreeId.PUBLIC_DATA_TREE]: await this.#getHintKey(MerkleTreeId.PUBLIC_DATA_TREE), - [MerkleTreeId.NULLIFIER_TREE]: await this.#getHintKey(MerkleTreeId.NULLIFIER_TREE), - [MerkleTreeId.NOTE_HASH_TREE]: await this.#getHintKey(MerkleTreeId.NOTE_HASH_TREE), - [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: await this.#getHintKey(MerkleTreeId.L1_TO_L2_MESSAGE_TREE), - [MerkleTreeId.ARCHIVE]: await this.#getHintKey(MerkleTreeId.ARCHIVE), + [MerkleTreeId.PUBLIC_DATA_TREE]: await this.getHintKey(MerkleTreeId.PUBLIC_DATA_TREE), + [MerkleTreeId.NULLIFIER_TREE]: await this.getHintKey(MerkleTreeId.NULLIFIER_TREE), + [MerkleTreeId.NOTE_HASH_TREE]: await this.getHintKey(MerkleTreeId.NOTE_HASH_TREE), + [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: await this.getHintKey(MerkleTreeId.L1_TO_L2_MESSAGE_TREE), + [MerkleTreeId.ARCHIVE]: await this.getHintKey(MerkleTreeId.ARCHIVE), }; await super.revertCheckpoint(); const afterState = { - [MerkleTreeId.PUBLIC_DATA_TREE]: await this.#getHintKey(MerkleTreeId.PUBLIC_DATA_TREE), - [MerkleTreeId.NULLIFIER_TREE]: await this.#getHintKey(MerkleTreeId.NULLIFIER_TREE), - [MerkleTreeId.NOTE_HASH_TREE]: await this.#getHintKey(MerkleTreeId.NOTE_HASH_TREE), - [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: await this.#getHintKey(MerkleTreeId.L1_TO_L2_MESSAGE_TREE), - [MerkleTreeId.ARCHIVE]: await this.#getHintKey(MerkleTreeId.ARCHIVE), + [MerkleTreeId.PUBLIC_DATA_TREE]: await this.getHintKey(MerkleTreeId.PUBLIC_DATA_TREE), + [MerkleTreeId.NULLIFIER_TREE]: await this.getHintKey(MerkleTreeId.NULLIFIER_TREE), + [MerkleTreeId.NOTE_HASH_TREE]: await this.getHintKey(MerkleTreeId.NOTE_HASH_TREE), + [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: await this.getHintKey(MerkleTreeId.L1_TO_L2_MESSAGE_TREE), + [MerkleTreeId.ARCHIVE]: await this.getHintKey(MerkleTreeId.ARCHIVE), }; HintingPublicTreesDB.log.debug('Evolved tree state:'); @@ -210,7 +210,7 @@ export class HintingPublicTreesDB extends PublicTreesDB { } // Private methods. - async #getHintKey(treeId: MerkleTreeId): Promise { + private async getHintKey(treeId: MerkleTreeId): Promise { const treeInfo = await super.getTreeInfo(treeId); return new AppendOnlyTreeSnapshot(Fr.fromBuffer(treeInfo.root), Number(treeInfo.size)); } diff --git a/yarn-project/stdlib/package.json b/yarn-project/stdlib/package.json index e62ed3c77074..c7679329d16f 100644 --- a/yarn-project/stdlib/package.json +++ b/yarn-project/stdlib/package.json @@ -47,7 +47,9 @@ "./config": "./dest/config/index.js", "./testing/jest": "./dest/tests/jest.js", "./database-version": "./dest/database-version/index.js", - "./validators": "./dest/validators/index.js" + "./validators": "./dest/validators/index.js", + "./file-store": "./dest/file-store/index.js", + "./snapshots": "./dest/snapshots/index.js" }, "typedocOptions": { "entryPoints": [ @@ -71,6 +73,7 @@ "@aztec/constants": "workspace:^", "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", + "@google-cloud/storage": "^7.15.0", "lodash.chunk": "^4.2.0", "lodash.isequal": "^4.5.0", "lodash.omit": "^4.5.0", @@ -140,4 +143,4 @@ "../../foundation/src/jest/setup.mjs" ] } -} +} \ No newline at end of file diff --git a/yarn-project/stdlib/src/database-version/version_manager.ts b/yarn-project/stdlib/src/database-version/version_manager.ts index 83a294077077..aeb089ab54ef 100644 --- a/yarn-project/stdlib/src/database-version/version_manager.ts +++ b/yarn-project/stdlib/src/database-version/version_manager.ts @@ -79,13 +79,15 @@ export class DatabaseVersion { export type DatabaseVersionManagerFs = Pick; +export const DATABASE_VERSION_FILE_NAME = 'db_version'; + /** * A manager for handling database versioning and migrations. * This class will check the version of data in a directory and either * reset or upgrade based on version compatibility. */ export class DatabaseVersionManager { - public static readonly VERSION_FILE = 'db_version'; + public static readonly VERSION_FILE = DATABASE_VERSION_FILE_NAME; private readonly versionFile: string; private readonly currentVersion: DatabaseVersion; @@ -119,6 +121,11 @@ export class DatabaseVersionManager { this.currentVersion = new DatabaseVersion(schemaVersion, rollupAddress); } + static async writeVersion(version: DatabaseVersion, dataDir: string, fileSystem: DatabaseVersionManagerFs = fs) { + await fileSystem.mkdir(dataDir, { recursive: true }); + return fileSystem.writeFile(join(dataDir, DatabaseVersionManager.VERSION_FILE), version.toBuffer()); + } + /** * Checks the stored version against the current version and handles the outcome * by either resetting the data directory or calling an upgrade function @@ -163,7 +170,7 @@ export class DatabaseVersionManager { needsReset = true; } } else { - this.log.warn('Rollup address changed, resetting data directory'); + this.log.warn('Rollup address changed, resetting data directory', { versionFile: this.versionFile }); needsReset = true; } @@ -181,17 +188,14 @@ export class DatabaseVersionManager { /** * Writes the current version to the version file */ - private async writeVersion(): Promise { - // Ensure the directory exists - await this.fileSystem.mkdir(this.dataDirectory, { recursive: true }); - // Write the version file - await this.fileSystem.writeFile(this.versionFile, this.currentVersion.toBuffer()); + public writeVersion(dir?: string): Promise { + return DatabaseVersionManager.writeVersion(this.currentVersion, dir ?? this.dataDirectory, this.fileSystem); } /** * Resets the data directory by deleting it and recreating it */ - private async resetDataDirectory(): Promise { + public async resetDataDirectory(): Promise { try { await this.fileSystem.rm(this.dataDirectory, { recursive: true, force: true, maxRetries: 3 }); await this.fileSystem.mkdir(this.dataDirectory, { recursive: true }); diff --git a/yarn-project/stdlib/src/file-store/factory.ts b/yarn-project/stdlib/src/file-store/factory.ts new file mode 100644 index 000000000000..e7c797615dd4 --- /dev/null +++ b/yarn-project/stdlib/src/file-store/factory.ts @@ -0,0 +1,61 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; + +import { GoogleCloudFileStore } from './gcs.js'; +import { HttpFileStore } from './http.js'; +import type { FileStore, ReadOnlyFileStore } from './interface.js'; +import { LocalFileStore } from './local.js'; + +const supportedExamples = [ + `gs://bucket-name/path/to/store`, + `file:///absolute/local/path/to/store`, + `https://host/path`, +]; + +export async function createFileStore(config: string, logger?: Logger): Promise; +export async function createFileStore(config: undefined, logger?: Logger): Promise; +export async function createFileStore( + config: string | undefined, + logger = createLogger('stdlib:file-store'), +): Promise { + if (config === undefined) { + return undefined; + } else if (config.startsWith('file://')) { + const url = new URL(config); + if (url.host) { + throw new Error(`File store URL only supports local paths (got host ${url.host} from ${config})`); + } + const path = url.pathname; + logger.info(`Creating local file file store at ${path}`); + return new LocalFileStore(path); + } else if (config.startsWith('gs://')) { + try { + const url = new URL(config); + const bucket = url.host; + const path = url.pathname.replace(/^\/+/, ''); + logger.info(`Creating google cloud file store at ${bucket} ${path}`); + const store = new GoogleCloudFileStore(bucket, path); + await store.checkCredentials(); + return store; + } catch (err) { + throw new Error(`Invalid google cloud store definition: '${config}'.`); + } + } else { + throw new Error(`Unknown file store config: '${config}'. Supported values are ${supportedExamples.join(', ')}.`); + } +} + +export async function createReadOnlyFileStore(config: string, logger?: Logger): Promise; +export async function createReadOnlyFileStore(config: undefined, logger?: Logger): Promise; +export async function createReadOnlyFileStore( + config: string | undefined, + logger = createLogger('stdlib:file-store'), +): Promise { + if (config === undefined) { + return undefined; + } else if (config.startsWith('http://') || config.startsWith('https://')) { + logger.info(`Creating read-only HTTP file store at ${config}`); + return new HttpFileStore(config, logger); + } else { + return await createFileStore(config, logger); + } +} diff --git a/yarn-project/stdlib/src/file-store/gcs.ts b/yarn-project/stdlib/src/file-store/gcs.ts new file mode 100644 index 000000000000..10bdb6f8f7c2 --- /dev/null +++ b/yarn-project/stdlib/src/file-store/gcs.ts @@ -0,0 +1,121 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; + +import { File, Storage, type UploadOptions } from '@google-cloud/storage'; +import { join } from 'path'; + +import type { FileStore, FileStoreSaveOptions } from './interface.js'; + +export class GoogleCloudFileStore implements FileStore { + private readonly storage: Storage; + + constructor( + private readonly bucketName: string, + private readonly basePath: string, + private readonly log: Logger = createLogger('stdlib:gcs-file-store'), + ) { + this.storage = new Storage(); + } + + public async checkCredentials() { + await this.storage.getServiceAccount(); + } + + public async save( + path: string, + data: Buffer, + opts: FileStoreSaveOptions = { public: false, metadata: {}, compress: false }, + ): Promise { + const fullPath = this.getFullPath(path); + try { + const bucket = this.storage.bucket(this.bucketName); + const file = bucket.file(fullPath); + await file.save(data, { metadata: opts.metadata, gzip: opts.compress }); + return this.handleUploadedFile(file, opts); + } catch (err: any) { + throw new Error(`Error saving file to google cloud storage at ${fullPath}: ${err.message ?? err}`); + } + } + + public async upload( + destPath: string, + srcPath: string, + opts: FileStoreSaveOptions = { compress: true, public: false, metadata: {} }, + ): Promise { + const fullPath = this.getFullPath(destPath); + try { + const bucket = this.storage.bucket(this.bucketName); + const file = bucket.file(fullPath); + const uploadOpts: UploadOptions = { + destination: file, + gzip: opts.compress, + metadata: opts.metadata, + }; + await bucket.upload(srcPath, uploadOpts); + return this.handleUploadedFile(file, opts); + } catch (err: any) { + throw new Error(`Error saving file to google cloud storage at ${fullPath}: ${err.message ?? err}`); + } + } + + private async handleUploadedFile(file: File, opts: { public?: boolean }): Promise { + if (opts.public) { + try { + if (!(await file.isPublic())) { + await file.makePublic(); + } + } catch (err: any) { + this.log.warn( + `Error making file ${file.name} public: ${ + err.message ?? err + }. This is expected if we handle public access at the bucket level.`, + ); + } + return file.publicUrl().replaceAll('%2F', '/'); + } else { + return file.cloudStorageURI.toString(); + } + } + + public async read(pathOrUrlStr: string): Promise { + const file = await this.getFileObject(pathOrUrlStr); + const contents = await file.download(); + return contents[0]; + } + + public async download(pathOrUrlStr: string, destPath: string): Promise { + const file = await this.getFileObject(pathOrUrlStr); + await file.download({ destination: destPath }); + } + + public async exists(pathOrUrlStr: string): Promise { + const { bucketName, fullPath } = this.getBucketAndFullPath(pathOrUrlStr); + const bucket = this.storage.bucket(bucketName); + const file = bucket.file(fullPath); + const [exists] = await file.exists(); + return exists; + } + + private async getFileObject(pathOrUrlStr: string): Promise { + const { bucketName, fullPath } = this.getBucketAndFullPath(pathOrUrlStr); + const bucket = this.storage.bucket(bucketName); + const file = bucket.file(fullPath); + if (!(await file.exists())) { + throw new Error(`File at ${fullPath} in gcs bucket ${bucketName} does not exist`); + } + return file; + } + + private getBucketAndFullPath(pathOrUrlStr: string): { bucketName: string; fullPath: string } { + if (URL.canParse(pathOrUrlStr)) { + const url = new URL(pathOrUrlStr); + // Note that we accept reading from anywhere, not just our bucket + return { fullPath: url.pathname.replace(/^\/+/, ''), bucketName: url.host }; + } else { + return { fullPath: this.getFullPath(pathOrUrlStr), bucketName: this.bucketName }; + } + } + + private getFullPath(path: string): string { + return this.basePath && this.basePath.length > 0 ? join(this.basePath, path) : path; + } +} diff --git a/yarn-project/stdlib/src/file-store/http.ts b/yarn-project/stdlib/src/file-store/http.ts new file mode 100644 index 000000000000..9862697904c9 --- /dev/null +++ b/yarn-project/stdlib/src/file-store/http.ts @@ -0,0 +1,58 @@ +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { makeBackoff, retry } from '@aztec/foundation/retry'; + +import { createWriteStream } from 'fs'; +import { mkdir } from 'fs/promises'; +import { dirname } from 'path'; +import { Readable } from 'stream'; +import { finished } from 'stream/promises'; + +import type { ReadOnlyFileStore } from './interface.js'; + +export class HttpFileStore implements ReadOnlyFileStore { + private readonly fetch: typeof fetch; + + constructor(private readonly baseUrl: string, private readonly log: Logger = createLogger('stdlib:http-file-store')) { + this.fetch = async (...args: Parameters): Promise => { + return await retry( + () => fetch(...args), + `Fetching ${args[0]}`, + makeBackoff([1, 1, 3]), + this.log, + /*failSilently=*/ true, + ); + }; + } + + public async read(pathOrUrl: string): Promise { + const url = this.getUrl(pathOrUrl); + const response = await this.fetch(url); + if (response.ok) { + return Buffer.from(await response.arrayBuffer()); + } else { + throw new Error(`Error fetching file from ${url}: ${response.statusText}`); + } + } + + public async download(pathOrUrl: string, destPath: string): Promise { + const url = this.getUrl(pathOrUrl); + const response = await this.fetch(url); + if (response.ok) { + await mkdir(dirname(destPath), { recursive: true }); + // Typescript complains about Readable.fromWeb, hence the cast + await finished(Readable.fromWeb(response.body! as any).pipe(createWriteStream(destPath))); + } else { + throw new Error(`Error fetching file from ${url}: ${response.statusText}`); + } + } + + public async exists(pathOrUrl: string): Promise { + const url = this.getUrl(pathOrUrl); + const response = await this.fetch(url); + return response.ok; + } + + private getUrl(path: string): string { + return URL.canParse(path) ? path : `${this.baseUrl.replace(/\/$/, '')}/${path}`; + } +} diff --git a/yarn-project/stdlib/src/file-store/index.ts b/yarn-project/stdlib/src/file-store/index.ts new file mode 100644 index 000000000000..c9130f5e5469 --- /dev/null +++ b/yarn-project/stdlib/src/file-store/index.ts @@ -0,0 +1,2 @@ +export * from './interface.js'; +export * from './factory.js'; diff --git a/yarn-project/stdlib/src/file-store/interface.ts b/yarn-project/stdlib/src/file-store/interface.ts new file mode 100644 index 000000000000..3de2c6e9590c --- /dev/null +++ b/yarn-project/stdlib/src/file-store/interface.ts @@ -0,0 +1,19 @@ +/** Simple read-only file store. */ +export interface ReadOnlyFileStore { + /** Reads a file given a path, or an URI as returned by calling `save`. Returns file contents. */ + read(pathOrUrl: string): Promise; + /** Downloads a file given a path, or an URI as returned by calling `save`. Saves file to local path. */ + download(pathOrUrlStr: string, destPath: string): Promise; + /** Returns whether a file at the given path or URI exists. */ + exists(pathOrUrl: string): Promise; +} + +export type FileStoreSaveOptions = { public?: boolean; metadata?: Record; compress?: boolean }; + +/** Simple file store. */ +export interface FileStore extends ReadOnlyFileStore { + /** Saves contents to the given path. Returns an URI that can be used later to `read` the file. */ + save(path: string, data: Buffer, opts?: FileStoreSaveOptions): Promise; + /** Uploads contents from a local file. Returns an URI that can be used later to `read` the file. */ + upload(destPath: string, srcPath: string, opts?: FileStoreSaveOptions): Promise; +} diff --git a/yarn-project/stdlib/src/file-store/local.test.ts b/yarn-project/stdlib/src/file-store/local.test.ts new file mode 100644 index 000000000000..69b6fd4fd219 --- /dev/null +++ b/yarn-project/stdlib/src/file-store/local.test.ts @@ -0,0 +1,48 @@ +import { mkdtemp, rm } from 'fs/promises'; +import { tmpdir } from 'os'; + +import { LocalFileStore } from './local.js'; + +describe('LocalFileStore', () => { + let fileStore: LocalFileStore; + let testDir: string; + + afterAll(async () => { + try { + await rm(testDir, { recursive: true, force: true }); + } catch (err) { + // Silently ignore + } + }); + + describe('save', () => { + let data: string; + let filePath: string; + let fileUrl: string; + + beforeAll(async () => { + testDir = await mkdtemp((tmpdir(), 'local-file-store-1-')); + fileStore = new LocalFileStore(testDir); + + data = 'foobar'; + filePath = 'test.txt'; + fileUrl = await fileStore.save(filePath, Buffer.from(data)); + }); + + it('reads using full url', async () => { + expect(await fileStore.read(fileUrl).then(x => x.toString())).toEqual(data); + }); + + it('reads using path', async () => { + expect(await fileStore.read(filePath).then(x => x.toString())).toEqual(data); + }); + + it('checks file exists with full url', async () => { + expect(await fileStore.exists(fileUrl)).toBe(true); + }); + + it('checks file exists with path', async () => { + expect(await fileStore.exists(filePath)).toBe(true); + }); + }); +}); diff --git a/yarn-project/stdlib/src/file-store/local.ts b/yarn-project/stdlib/src/file-store/local.ts new file mode 100644 index 000000000000..e2c3409666ba --- /dev/null +++ b/yarn-project/stdlib/src/file-store/local.ts @@ -0,0 +1,46 @@ +import { access, mkdir, readFile, writeFile } from 'fs/promises'; +import { dirname, resolve } from 'path'; + +import type { FileStore } from './interface.js'; + +export class LocalFileStore implements FileStore { + constructor(private readonly basePath: string) {} + + public async save(path: string, data: Buffer): Promise { + const fullPath = this.getFullPath(path); + await mkdir(dirname(fullPath), { recursive: true }); + await writeFile(fullPath, data); + return `file://${fullPath}`; + } + + public async upload(destPath: string, srcPath: string, _opts: { compress: boolean }): Promise { + const data = await readFile(srcPath); + return this.save(destPath, data); + } + + public read(pathOrUrlStr: string): Promise { + const fullPath = this.getFullPath(pathOrUrlStr); + return readFile(fullPath); + } + + public async download(pathOrUrlStr: string, destPath: string): Promise { + const data = await this.read(pathOrUrlStr); + const fullPath = this.getFullPath(destPath); + await writeFile(fullPath, data); + } + + public exists(pathOrUrlStr: string): Promise { + const fullPath = this.getFullPath(pathOrUrlStr); + return access(fullPath) + .then(() => true) + .catch(() => false); + } + + private getFullPath(pathOrUrl: string): string { + if (URL.canParse(pathOrUrl)) { + return new URL(pathOrUrl).pathname; + } else { + return resolve(this.basePath, pathOrUrl); + } + } +} diff --git a/yarn-project/stdlib/src/interfaces/aztec-node-admin.test.ts b/yarn-project/stdlib/src/interfaces/aztec-node-admin.test.ts index 80a73181f299..0547180cf48c 100644 --- a/yarn-project/stdlib/src/interfaces/aztec-node-admin.test.ts +++ b/yarn-project/stdlib/src/interfaces/aztec-node-admin.test.ts @@ -33,6 +33,10 @@ describe('AztecNodeAdminApiSchema', () => { it('flushTxs', async () => { await context.client.flushTxs(); }); + + it('startSnapshotUpload', async () => { + await context.client.startSnapshotUpload('foo'); + }); }); class MockAztecNodeAdmin implements AztecNodeAdmin { @@ -44,4 +48,7 @@ class MockAztecNodeAdmin implements AztecNodeAdmin { flushTxs(): Promise { return Promise.resolve(); } + startSnapshotUpload(_location: string): Promise { + return Promise.resolve(); + } } diff --git a/yarn-project/stdlib/src/interfaces/aztec-node-admin.ts b/yarn-project/stdlib/src/interfaces/aztec-node-admin.ts index 0391fb441447..975ab2ed8403 100644 --- a/yarn-project/stdlib/src/interfaces/aztec-node-admin.ts +++ b/yarn-project/stdlib/src/interfaces/aztec-node-admin.ts @@ -17,13 +17,23 @@ export interface AztecNodeAdmin { */ setConfig(config: Partial): Promise; - /** Forces the next block to be built bypassing all time and pending checks. Useful for testing. */ + /** + * Forces the next block to be built bypassing all time and pending checks. + * Useful for testing. + */ flushTxs(): Promise; + + /** + * Pauses syncing, creates a backup of archiver and world-state databases, and uploads them. Returns immediately. + * @param location - The location to upload the snapshot to. + */ + startSnapshotUpload(location: string): Promise; } export const AztecNodeAdminApiSchema: ApiSchemaFor = { setConfig: z.function().args(SequencerConfigSchema.merge(ProverConfigSchema).partial()).returns(z.void()), flushTxs: z.function().returns(z.void()), + startSnapshotUpload: z.function().args(z.string()).returns(z.void()), }; export function createAztecNodeAdminClient( diff --git a/yarn-project/stdlib/src/interfaces/service.ts b/yarn-project/stdlib/src/interfaces/service.ts index aba338ef6371..331726708e84 100644 --- a/yarn-project/stdlib/src/interfaces/service.ts +++ b/yarn-project/stdlib/src/interfaces/service.ts @@ -11,6 +11,9 @@ export interface Service { /** Stops the service. */ stop(): Promise; + + /** Resumes the service after it was stopped */ + resume(): void; } /** Tries to call stop on a given object and awaits it. Logs any errors and does not rethrow. */ @@ -23,3 +26,13 @@ export async function tryStop(service: Maybe, logger?: Logger): Promise logger?.error(`Error stopping service ${(service as object).constructor?.name}: ${err}`); } } + +export function tryRestart(service: Maybe, logger?: Logger) { + try { + return typeof service === 'object' && service && 'restart' in service && typeof service.restart === 'function' + ? service.restart() + : Promise.resolve(); + } catch (err) { + logger?.error(`Error restarting service ${(service as object).constructor?.name}: ${err}`); + } +} diff --git a/yarn-project/stdlib/src/interfaces/world_state.ts b/yarn-project/stdlib/src/interfaces/world_state.ts index 5fd63bedc6c1..546644d07add 100644 --- a/yarn-project/stdlib/src/interfaces/world_state.ts +++ b/yarn-project/stdlib/src/interfaces/world_state.ts @@ -1,7 +1,10 @@ import { z } from 'zod'; +import type { SnapshotDataKeys } from '../snapshots/types.js'; import type { MerkleTreeReadOperations, MerkleTreeWriteOperations } from './merkle_tree_operations.js'; +export type { SnapshotDataKeys }; + /** * Defines the possible states of the world state synchronizer. */ @@ -41,37 +44,36 @@ export interface ForkMerkleTreeOperations { /** Gets a handle that allows reading the state as it was at the given block number. */ getSnapshot(blockNumber: number): MerkleTreeReadOperations; + + /** Backups the db to the target path. */ + backupTo(dstPath: string, compact?: boolean): Promise, string>>; } /** Defines the interface for a world state synchronizer. */ export interface WorldStateSynchronizer extends ForkMerkleTreeOperations { - /** - * Starts the synchronizer. - * @returns A promise that resolves once the initial sync is completed. - */ + /** Starts the synchronizer. */ start(): void; - /** - * Returns the current status of the synchronizer. - * @returns The current status of the synchronizer. - */ + /** Returns the current status of the synchronizer. */ status(): Promise; - /** - * Stops the synchronizer. - */ + /** Stops the synchronizer and its database. */ stop(): Promise; + /** Stops the synchronizer from syncing, but keeps the database online. */ + stopSync(): Promise; + + /** Resumes synching after a stopSync call. */ + resumeSync(): void; + /** * Forces an immediate sync to an optionally provided minimum block number - * @param minBlockNumber - The minimum block number that we must sync to + * @param minBlockNumber - The minimum block number that we must sync to (may sync further) * @returns A promise that resolves with the block number the world state was synced to */ syncImmediate(minBlockNumber?: number): Promise; - /** - * Returns an instance of MerkleTreeAdminOperations that will not include uncommitted data. - */ + /** Returns an instance of MerkleTreeAdminOperations that will not include uncommitted data. */ getCommitted(): MerkleTreeReadOperations; } diff --git a/yarn-project/stdlib/src/snapshots/download.ts b/yarn-project/stdlib/src/snapshots/download.ts new file mode 100644 index 000000000000..e9b08995ba28 --- /dev/null +++ b/yarn-project/stdlib/src/snapshots/download.ts @@ -0,0 +1,60 @@ +import { fromEntries, getEntries, maxBy } from '@aztec/foundation/collection'; +import { jsonParseWithSchemaSync } from '@aztec/foundation/json-rpc'; +import type { ReadOnlyFileStore } from '@aztec/stdlib/file-store'; + +import { join } from 'path'; + +import { + SnapshotDataKeys, + type SnapshotDataUrls, + type SnapshotMetadata, + type SnapshotsIndex, + type SnapshotsIndexMetadata, + SnapshotsIndexSchema, +} from './types.js'; + +export async function getSnapshotIndex( + metadata: SnapshotsIndexMetadata, + store: ReadOnlyFileStore, +): Promise { + const basePath = getBasePath(metadata); + const snapshotIndexPath = `${basePath}/index.json`; + try { + if (await store.exists(snapshotIndexPath)) { + const snapshotIndexData = await store.read(snapshotIndexPath); + return jsonParseWithSchemaSync(snapshotIndexData.toString(), SnapshotsIndexSchema); + } else { + return undefined; + } + } catch (err) { + throw new Error(`Error reading snapshot index from ${snapshotIndexPath}: ${err}`); + } +} + +export async function getLatestSnapshotMetadata( + metadata: SnapshotsIndexMetadata, + store: ReadOnlyFileStore, +): Promise { + const snapshotsIndex = await getSnapshotIndex(metadata, store); + return snapshotsIndex?.snapshots && maxBy(snapshotsIndex?.snapshots, s => s.l1BlockNumber); +} + +export function getBasePath(metadata: SnapshotsIndexMetadata): string { + return `aztec-${metadata.l1ChainId}-${metadata.l2Version}-${metadata.rollupAddress}`; +} + +export function getSnapshotIndexPath(metadata: SnapshotsIndexMetadata): string { + return `${getBasePath(metadata)}/index.json`; +} + +export function makeSnapshotLocalPaths(baseDir: string): SnapshotDataUrls { + return fromEntries(SnapshotDataKeys.map(key => [key, join(baseDir, `${key}.db`)])); +} + +export async function downloadSnapshot( + snapshot: Pick, + localPaths: Record, + store: ReadOnlyFileStore, +): Promise { + await Promise.all(getEntries(localPaths).map(([key, path]) => store.download(snapshot.dataUrls[key], path))); +} diff --git a/yarn-project/stdlib/src/snapshots/index.test.ts b/yarn-project/stdlib/src/snapshots/index.test.ts new file mode 100644 index 000000000000..eb8a33a51e97 --- /dev/null +++ b/yarn-project/stdlib/src/snapshots/index.test.ts @@ -0,0 +1,139 @@ +import { times } from '@aztec/foundation/collection'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { jsonStringify } from '@aztec/foundation/json-rpc'; +import type { FileStore } from '@aztec/stdlib/file-store'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { getLatestSnapshotMetadata } from './download.js'; +import type { SnapshotDataUrls, SnapshotMetadata, SnapshotsIndex, SnapshotsIndexMetadata } from './types.js'; +import { uploadSnapshot } from './upload.js'; + +describe('snapshots', () => { + let store: MockProxy; + let index: SnapshotsIndex; + let metadata: SnapshotsIndexMetadata; + let snapshots: SnapshotMetadata[]; + let rollup: EthAddress; + + const makeDataPaths = (index: number, basePath = ''): SnapshotDataUrls => ({ + 'archive-tree': `${basePath}/archive-tree/${index}`, + 'l1-to-l2-message-tree': `${basePath}/l1-to-l2-message-tree/${index}`, + 'note-hash-tree': `${basePath}/note-hash-tree/${index}`, + 'nullifier-tree': `${basePath}/nullifier-tree/${index}`, + 'public-data-tree': `${basePath}/public-data-tree/${index}`, + archiver: `${basePath}/archiver/${index}`, + }); + + const makeExpectedDataPaths = () => ({ + 'archive-tree': expect.stringContaining('archive'), + 'l1-to-l2-message-tree': expect.stringContaining('l1-to-l2-message'), + 'note-hash-tree': expect.stringContaining('note-hash'), + 'nullifier-tree': expect.stringContaining('nullifier'), + 'public-data-tree': expect.stringContaining('public-data'), + archiver: expect.stringContaining('archiver'), + }); + + const makeSnapshotMetadata = (index: number): SnapshotMetadata => ({ + dataUrls: makeDataPaths(index), + l1BlockNumber: index, + l2BlockNumber: index, + l2BlockHash: `0x${index}`, + timestamp: index, + schemaVersions: { archiver: 1, worldState: 1 }, + }); + + beforeEach(() => { + store = mock(); + store.upload.mockImplementation(dest => Promise.resolve(dest)); + rollup = EthAddress.random(); + metadata = { l1ChainId: 1, l2Version: 2, rollupAddress: rollup }; + snapshots = times(5, makeSnapshotMetadata); + index = { ...metadata, snapshots }; + }); + + describe('download', () => { + it('gets latest snapshot metadata', async () => { + store.exists.mockResolvedValue(true); + store.read.mockResolvedValue(Buffer.from(jsonStringify(index), 'utf-8')); + await expect(getLatestSnapshotMetadata(metadata, store)).resolves.toEqual(snapshots[4]); + expect(store.read).toHaveBeenCalledWith(`aztec-1-2-${rollup.toString()}/index.json`); + }); + + it('returns undefined if there are no snapshots', async () => { + store.exists.mockResolvedValue(false); + await expect(getLatestSnapshotMetadata(metadata, store)).resolves.toBeUndefined(); + }); + }); + + describe('upload', () => { + it('with no existing index', async () => { + store.exists.mockResolvedValue(false); + + let uploadedIndex: string; + store.save.mockImplementation((_path, data) => { + uploadedIndex = data.toString(); + return Promise.resolve('index.json'); + }); + + const uploaded = await uploadSnapshot( + makeDataPaths(1, '/local/'), + { archiver: 1, worldState: 1 }, + { ...metadata, l1BlockNumber: 1, l2BlockHash: '0x1', l2BlockNumber: 1 }, + store, + ); + + const expectedSnapshot: SnapshotMetadata = { + ...makeSnapshotMetadata(1), + dataUrls: makeExpectedDataPaths(), + timestamp: expect.any(Number), + }; + + expect(uploaded).toEqual(expectedSnapshot); + + expect(JSON.parse(uploadedIndex!)).toEqual({ + ...metadata, + rollupAddress: rollup.toString(), + snapshots: [expectedSnapshot], + }); + + expect(store.exists).toHaveBeenCalledWith(`aztec-1-2-${rollup.toString()}/index.json`); + expect(store.upload).toHaveBeenCalledTimes(6); + }); + + it('updates an existing index', async () => { + store.exists.mockResolvedValue(true); + store.read.mockResolvedValue(Buffer.from(jsonStringify(index), 'utf-8')); + + let uploadedIndex: string; + store.save.mockImplementation((_path, data) => { + uploadedIndex = data.toString(); + return Promise.resolve('index.json'); + }); + + const uploaded = await uploadSnapshot( + makeDataPaths(6, '/local/'), + { archiver: 1, worldState: 1 }, + { ...metadata, l1BlockNumber: 6, l2BlockHash: '0x6', l2BlockNumber: 6 }, + store, + ); + + const expectedSnapshot: SnapshotMetadata = { + ...makeSnapshotMetadata(6), + dataUrls: makeExpectedDataPaths(), + timestamp: expect.any(Number), + }; + + expect(uploaded).toEqual(expectedSnapshot); + + expect(JSON.parse(uploadedIndex!)).toEqual({ + ...metadata, + rollupAddress: rollup.toString(), + snapshots: [expectedSnapshot, ...snapshots], + }); + + expect(store.exists).toHaveBeenCalledWith(`aztec-1-2-${rollup.toString()}/index.json`); + expect(store.upload).toHaveBeenCalledTimes(6); + }); + }); +}); diff --git a/yarn-project/stdlib/src/snapshots/index.ts b/yarn-project/stdlib/src/snapshots/index.ts new file mode 100644 index 000000000000..8f17a6b4fae4 --- /dev/null +++ b/yarn-project/stdlib/src/snapshots/index.ts @@ -0,0 +1,3 @@ +export * from './types.js'; +export * from './upload.js'; +export { downloadSnapshot, getLatestSnapshotMetadata, makeSnapshotLocalPaths } from './download.js'; diff --git a/yarn-project/stdlib/src/snapshots/types.ts b/yarn-project/stdlib/src/snapshots/types.ts new file mode 100644 index 000000000000..4a0f4f68f503 --- /dev/null +++ b/yarn-project/stdlib/src/snapshots/types.ts @@ -0,0 +1,58 @@ +import type { EthAddress } from '@aztec/foundation/eth-address'; +import { type ZodFor, schemas } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +export const SnapshotDataKeys = [ + 'archiver', + 'nullifier-tree', + 'public-data-tree', + 'note-hash-tree', + 'archive-tree', + 'l1-to-l2-message-tree', +] as const; + +export type SnapshotDataKeys = (typeof SnapshotDataKeys)[number]; + +export type SnapshotDataUrls = Record; + +export type SnapshotMetadata = { + l2BlockNumber: number; + l2BlockHash: string; + l1BlockNumber: number; + timestamp: number; + dataUrls: SnapshotDataUrls; + schemaVersions: { archiver: number; worldState: number }; +}; + +export type SnapshotsIndexMetadata = { + l1ChainId: number; + l2Version: number; + rollupAddress: EthAddress; +}; + +export type SnapshotsIndex = SnapshotsIndexMetadata & { + snapshots: SnapshotMetadata[]; +}; + +export const SnapshotsIndexSchema = z.object({ + l1ChainId: z.number(), + l2Version: z.number(), + rollupAddress: schemas.EthAddress, + snapshots: z.array( + z.object({ + l2BlockNumber: z.number(), + l2BlockHash: z.string(), + l1BlockNumber: z.number(), + timestamp: z.number(), + schemaVersions: z.object({ + archiver: z.number(), + worldState: z.number(), + }), + dataUrls: z + .record(z.enum(SnapshotDataKeys), z.string()) + // See https://stackoverflow.com/questions/77958464/zod-record-with-required-keys + .refine((obj): obj is Required => SnapshotDataKeys.every(key => !!obj[key])), + }), + ), +}) satisfies ZodFor; diff --git a/yarn-project/stdlib/src/snapshots/upload.ts b/yarn-project/stdlib/src/snapshots/upload.ts new file mode 100644 index 000000000000..57345cb12232 --- /dev/null +++ b/yarn-project/stdlib/src/snapshots/upload.ts @@ -0,0 +1,53 @@ +import { fromEntries, getEntries, pick } from '@aztec/foundation/collection'; +import { jsonStringify } from '@aztec/foundation/json-rpc'; +import type { FileStore } from '@aztec/stdlib/file-store'; + +import { getBasePath, getSnapshotIndex, getSnapshotIndexPath } from './download.js'; +import type { SnapshotDataKeys, SnapshotMetadata, SnapshotsIndex } from './types.js'; + +export type UploadSnapshotMetadata = Pick & + Pick; + +export async function uploadSnapshot( + localPaths: Record, + schemaVersions: SnapshotMetadata['schemaVersions'], + metadata: UploadSnapshotMetadata, + store: FileStore, +): Promise { + const timestamp = Date.now(); + const date = new Date().toISOString().replace(/[-:T]/g, '').replace(/\..+$/, ''); + const basePath = getBasePath(metadata); + const targetPathFor = (key: SnapshotDataKeys) => `${basePath}/${key}-${date}-${metadata.l2BlockHash}.db`; + + const dataUrls = fromEntries( + await Promise.all( + getEntries(localPaths).map( + async ([key, path]) => + [key, await store.upload(targetPathFor(key), path, { compress: true, public: true })] as const, + ), + ), + ); + + const snapshotsIndex = (await getSnapshotIndex(metadata, store)) ?? createEmptyIndex(metadata); + + const newSnapshotMetadata: SnapshotMetadata = { + ...pick(metadata, 'l1BlockNumber', 'l2BlockHash', 'l2BlockNumber'), + schemaVersions, + timestamp, + dataUrls, + }; + snapshotsIndex.snapshots.unshift(newSnapshotMetadata); + + await store.save(getSnapshotIndexPath(metadata), Buffer.from(jsonStringify(snapshotsIndex, true)), { + public: true, // Make the index publicly accessible + metadata: { ['Cache-control']: 'no-store' }, // Do not cache object versions + }); + return newSnapshotMetadata; +} + +function createEmptyIndex(metadata: Pick): SnapshotsIndex { + return { + ...pick(metadata, 'l1ChainId', 'l2Version', 'rollupAddress'), + snapshots: [], + }; +} diff --git a/yarn-project/world-state/src/instrumentation/instrumentation.ts b/yarn-project/world-state/src/instrumentation/instrumentation.ts index 238dc38e357b..8126acc262ce 100644 --- a/yarn-project/world-state/src/instrumentation/instrumentation.ts +++ b/yarn-project/world-state/src/instrumentation/instrumentation.ts @@ -1,4 +1,4 @@ -import { createLogger } from '@aztec/foundation/log'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import { MerkleTreeId } from '@aztec/stdlib/trees'; import { Attributes, @@ -40,7 +40,10 @@ export class WorldStateInstrumentation { private requestHistogram: Histogram; private criticalErrors: UpDownCounter; - constructor(public readonly telemetry: TelemetryClient, private log = createLogger('world-state:instrumentation')) { + constructor( + public readonly telemetry: TelemetryClient, + private log: Logger = createLogger('world-state:instrumentation'), + ) { const meter = telemetry.getMeter('World State'); this.dbMapSize = meter.createGauge(Metrics.WORLD_STATE_DB_MAP_SIZE, { description: `The current configured map size for each merkle tree`, diff --git a/yarn-project/world-state/src/native/message.ts b/yarn-project/world-state/src/native/message.ts index 5638d928b0b5..f38c699d8522 100644 --- a/yarn-project/world-state/src/native/message.ts +++ b/yarn-project/world-state/src/native/message.ts @@ -41,6 +41,8 @@ export enum WorldStateMessageType { COMMIT_CHECKPOINT, REVERT_CHECKPOINT, + COPY_STORES, + CLOSE = 999, } @@ -409,6 +411,11 @@ interface CreateForkResponse { interface DeleteForkRequest extends WithForkId {} +interface CopyStoresRequest extends WithCanonicalForkId { + dstPath: string; + compact: boolean; +} + export type WorldStateRequestCategories = WithForkId | WithWorldStateRevision | WithCanonicalForkId; export function isWithForkId(body: WorldStateRequestCategories): body is WithForkId { @@ -460,6 +467,8 @@ export type WorldStateRequest = { [WorldStateMessageType.COMMIT_CHECKPOINT]: WithForkId; [WorldStateMessageType.REVERT_CHECKPOINT]: WithForkId; + [WorldStateMessageType.COPY_STORES]: CopyStoresRequest; + [WorldStateMessageType.CLOSE]: WithCanonicalForkId; }; @@ -500,6 +509,8 @@ export type WorldStateResponse = { [WorldStateMessageType.COMMIT_CHECKPOINT]: void; [WorldStateMessageType.REVERT_CHECKPOINT]: void; + [WorldStateMessageType.COPY_STORES]: void; + [WorldStateMessageType.CLOSE]: void; }; diff --git a/yarn-project/world-state/src/native/native_world_state.test.ts b/yarn-project/world-state/src/native/native_world_state.test.ts index 3236f854dcdb..208c74457728 100644 --- a/yarn-project/world-state/src/native/native_world_state.test.ts +++ b/yarn-project/world-state/src/native/native_world_state.test.ts @@ -9,6 +9,7 @@ import { NULLIFIER_TREE_HEIGHT, PUBLIC_DATA_TREE_HEIGHT, } from '@aztec/constants'; +import { timesAsync } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { Fr } from '@aztec/foundation/fields'; import type { SiblingPath } from '@aztec/foundation/trees'; @@ -28,12 +29,13 @@ import { join } from 'path'; import { assertSameState, compareChains, mockBlock, mockEmptyBlock } from '../test/utils.js'; import { INITIAL_NULLIFIER_TREE_SIZE, INITIAL_PUBLIC_DATA_TREE_SIZE } from '../world-state-db/merkle_tree_db.js'; import type { WorldStateStatusSummary } from './message.js'; -import { NativeWorldStateService } from './native_world_state.js'; +import { NativeWorldStateService, WORLD_STATE_DB_VERSION, WORLD_STATE_DIR } from './native_world_state.js'; jest.setTimeout(60_000); describe('NativeWorldState', () => { let dataDir: string; + let backupDir: string | undefined; let rollupAddress: EthAddress; const defaultDBMapSize = 25 * 1024 * 1024; @@ -44,11 +46,15 @@ describe('NativeWorldState', () => { afterAll(async () => { await rm(dataDir, { recursive: true, maxRetries: 3 }); + if (backupDir) { + await rm(backupDir, { recursive: true, maxRetries: 3 }); + } }); - describe('persistence', () => { + describe('Persistence', () => { let block: L2Block; let messages: Fr[]; + let noteHash: Fr; const findLeafIndex = async (leaf: Fr, ws: NativeWorldStateService) => { const indices = await ws.getCommitted().findLeafIndices(MerkleTreeId.NOTE_HASH_TREE, [leaf]); @@ -58,10 +64,17 @@ describe('NativeWorldState', () => { return indices[0]; }; + const writeVersion = (baseDir: string) => + DatabaseVersionManager.writeVersion( + new DatabaseVersion(WORLD_STATE_DB_VERSION, rollupAddress), + join(baseDir, WORLD_STATE_DIR), + ); + beforeAll(async () => { const ws = await NativeWorldStateService.new(rollupAddress, dataDir, defaultDBMapSize); const fork = await ws.fork(); ({ block, messages } = await mockBlock(1, 2, fork)); + noteHash = block.body.txEffects[0].noteHashes[0]; await fork.close(); await ws.handleL2BlockAndMessages(block, messages); @@ -76,6 +89,45 @@ describe('NativeWorldState', () => { await ws.close(); }); + it('copies and restores committed state', async () => { + backupDir = await mkdtemp(join(tmpdir(), 'world-state-backup-test')); + const ws = await NativeWorldStateService.new(rollupAddress, dataDir, defaultDBMapSize); + await expect(findLeafIndex(noteHash, ws)).resolves.toBeDefined(); + await ws.backupTo(join(backupDir, WORLD_STATE_DIR), true); + await ws.close(); + + await writeVersion(backupDir); + const ws2 = await NativeWorldStateService.new(rollupAddress, backupDir, defaultDBMapSize); + const status2 = await ws2.getStatusSummary(); + expect(status2.unfinalisedBlockNumber).toBe(1n); + await expect(findLeafIndex(noteHash, ws2)).resolves.toBeDefined(); + expect((await ws2.getStatusSummary()).unfinalisedBlockNumber).toBe(1n); + await ws2.close(); + }); + + it('blocks writes while copying', async () => { + backupDir = await mkdtemp(join(tmpdir(), 'world-state-backup-test')); + const ws = await NativeWorldStateService.new(rollupAddress, dataDir, defaultDBMapSize); + const copyPromise = ws.backupTo(join(backupDir, WORLD_STATE_DIR), true); + + await timesAsync(5, async i => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(i + 1, 2, fork); + await ws.handleL2BlockAndMessages(block, messages); + await fork.close(); + }); + + await copyPromise; + expect((await ws.getStatusSummary()).unfinalisedBlockNumber).toBe(6n); + await ws.close(); + + await writeVersion(backupDir); + const ws2 = await NativeWorldStateService.new(rollupAddress, backupDir, defaultDBMapSize); + await expect(findLeafIndex(block.body.txEffects[0].noteHashes[0], ws2)).resolves.toBeDefined(); + expect((await ws2.getStatusSummary()).unfinalisedBlockNumber).toBe(1n); + await ws2.close(); + }); + it('clears the database if the rollup is different', async () => { // open ws against the same data dir but a different rollup let ws = await NativeWorldStateService.new(EthAddress.random(), dataDir, defaultDBMapSize); @@ -126,7 +178,7 @@ describe('NativeWorldState', () => { await ws.close(); }); - it('Fails to sync further blocks if trees are out of sync', async () => { + it('fails to sync further blocks if trees are out of sync', async () => { // open ws against the same data dir but a different rollup and with a small max db size const rollupAddress = EthAddress.random(); const ws = await NativeWorldStateService.new(rollupAddress, dataDir, 1024); @@ -267,7 +319,7 @@ describe('NativeWorldState', () => { await ws.close(); }); - it('Tracks pending and proven chains', async () => { + it('tracks pending and proven chains', async () => { const fork = await ws.fork(); for (let i = 0; i < 16; i++) { @@ -290,7 +342,7 @@ describe('NativeWorldState', () => { } }); - it('Can finalise multiple blocks', async () => { + it('can finalise multiple blocks', async () => { const fork = await ws.fork(); for (let i = 0; i < 16; i++) { @@ -309,7 +361,7 @@ describe('NativeWorldState', () => { expect(status.finalisedBlockNumber).toBe(8n); }); - it('Can prune historic blocks', async () => { + it('can prune historic blocks', async () => { const fork = await ws.fork(); const forks = []; const provenBlockLag = 4; @@ -365,7 +417,7 @@ describe('NativeWorldState', () => { it.each([ ['1-tx blocks', (blockNumber: number, fork: MerkleTreeWriteOperations) => mockBlock(blockNumber, 1, fork)], ['empty blocks', (blockNumber: number, fork: MerkleTreeWriteOperations) => mockEmptyBlock(blockNumber, fork)], - ])('Can re-org %s', async (_, genBlock) => { + ])('can re-org %s', async (_, genBlock) => { const nonReorgState = await NativeWorldStateService.tmp(); const sequentialReorgState = await NativeWorldStateService.tmp(); let fork = await ws.fork(); @@ -465,7 +517,7 @@ describe('NativeWorldState', () => { await compareChains(ws.getCommitted(), nonReorgState.getCommitted()); }); - it('Forks are deleted during a re-org', async () => { + it('forks are deleted during a re-org', async () => { const fork = await ws.fork(); const blockForks = []; @@ -498,7 +550,7 @@ describe('NativeWorldState', () => { }); }); - describe('finding leaves', () => { + describe('Finding leaves', () => { let block: L2Block; let messages: Fr[]; @@ -535,7 +587,7 @@ describe('NativeWorldState', () => { }); }); - describe('block numbers for indices', () => { + describe('Block numbers for indices', () => { let block: L2Block; let messages: Fr[]; let noteHashes: number; @@ -596,7 +648,7 @@ describe('NativeWorldState', () => { }); }); - describe('status reporting', () => { + describe('Status reporting', () => { let block: L2Block; let messages: Fr[]; @@ -763,7 +815,7 @@ describe('NativeWorldState', () => { await ws.close(); }); - it('Mutating and non-mutating requests are correctly queued', async () => { + it('mutating and non-mutating requests are correctly queued', async () => { const numReads = 64; const setupFork = await ws.fork(); diff --git a/yarn-project/world-state/src/native/native_world_state.ts b/yarn-project/world-state/src/native/native_world_state.ts index 97751df3d607..04d2c5885d50 100644 --- a/yarn-project/world-state/src/native/native_world_state.ts +++ b/yarn-project/world-state/src/native/native_world_state.ts @@ -1,8 +1,8 @@ import { MAX_NOTE_HASHES_PER_TX, MAX_NULLIFIERS_PER_TX, NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP } from '@aztec/constants'; -import { padArrayEnd } from '@aztec/foundation/collection'; +import { fromEntries, padArrayEnd } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { Fr } from '@aztec/foundation/fields'; -import { createLogger } from '@aztec/foundation/log'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import type { L2Block } from '@aztec/stdlib/block'; import { DatabaseVersionManager } from '@aztec/stdlib/database-version'; import type { @@ -10,6 +10,7 @@ import type { MerkleTreeReadOperations, MerkleTreeWriteOperations, } from '@aztec/stdlib/interfaces/server'; +import type { SnapshotDataKeys } from '@aztec/stdlib/snapshots'; import { MerkleTreeId, NullifierLeaf, type NullifierLeafPreimage, PublicDataTreeLeaf } from '@aztec/stdlib/trees'; import { BlockHeader, PartialStateReference, StateReference } from '@aztec/stdlib/tx'; import { getTelemetryClient } from '@aztec/telemetry-client'; @@ -38,6 +39,8 @@ import { NativeWorldState } from './native_world_state_instance.js'; // Increment this when making incompatible changes to the database schema export const WORLD_STATE_DB_VERSION = 1; // The initial version +export const WORLD_STATE_DIR = 'world_state'; + export class NativeWorldStateService implements MerkleTreeDatabase { protected initialHeader: BlockHeader | undefined; // This is read heavily and only changes when data is persisted, so we cache it @@ -46,7 +49,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { protected constructor( protected readonly instance: NativeWorldState, protected readonly worldStateInstrumentation: WorldStateInstrumentation, - protected readonly log = createLogger('world-state:database'), + protected readonly log: Logger = createLogger('world-state:database'), private readonly cleanup = () => Promise.resolve(), ) {} @@ -59,7 +62,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { log = createLogger('world-state:database'), cleanup = () => Promise.resolve(), ): Promise { - const worldStateDirectory = join(dataDir, 'world_state'); + const worldStateDirectory = join(dataDir, WORLD_STATE_DIR); // Create a version manager to handle versioning const versionManager = new DatabaseVersionManager( WORLD_STATE_DB_VERSION, @@ -314,4 +317,25 @@ export class NativeWorldStateService implements MerkleTreeDatabase { ), ); } + + public async backupTo( + dstPath: string, + compact: boolean = true, + ): Promise, string>> { + await this.instance.call(WorldStateMessageType.COPY_STORES, { + dstPath, + compact, + canonical: true, + }); + return fromEntries(NATIVE_WORLD_STATE_DBS.map(([name, dir]) => [name, join(dstPath, dir, 'data.mdb')] as const)); + } } + +// The following paths are defined in cpp-land +export const NATIVE_WORLD_STATE_DBS = [ + ['l1-to-l2-message-tree', 'L1ToL2MessageTree'], + ['archive-tree', 'ArchiveTree'], + ['public-data-tree', 'PublicDataTree'], + ['note-hash-tree', 'NoteHashTree'], + ['nullifier-tree', 'NullifierTree'], +] as const; diff --git a/yarn-project/world-state/src/native/native_world_state_instance.ts b/yarn-project/world-state/src/native/native_world_state_instance.ts index c53eed09fe32..8ce3f9ad1873 100644 --- a/yarn-project/world-state/src/native/native_world_state_instance.ts +++ b/yarn-project/world-state/src/native/native_world_state_instance.ts @@ -8,7 +8,7 @@ import { NULLIFIER_TREE_HEIGHT, PUBLIC_DATA_TREE_HEIGHT, } from '@aztec/constants'; -import { createLogger } from '@aztec/foundation/log'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import { NativeWorldState as BaseNativeWorldState, MsgpackChannel } from '@aztec/native'; import { MerkleTreeId } from '@aztec/stdlib/trees'; import type { PublicDataTreeLeaf } from '@aztec/stdlib/trees'; @@ -54,7 +54,7 @@ export class NativeWorldState implements NativeWorldStateInstance { dbMapSizeKb: number, prefilledPublicData: PublicDataTreeLeaf[] = [], private instrumentation: WorldStateInstrumentation, - private log = createLogger('world-state:database'), + private log: Logger = createLogger('world-state:database'), ) { const threads = Math.min(cpus().length, MAX_WORLD_STATE_THREADS); log.info( diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index 19083e679c60..222c973460d8 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -1,6 +1,6 @@ import { L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/constants'; import type { Fr } from '@aztec/foundation/fields'; -import { createLogger } from '@aztec/foundation/log'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; import { elapsed } from '@aztec/foundation/timer'; import { MerkleTreeCalculator } from '@aztec/foundation/trees'; @@ -22,6 +22,7 @@ import { type WorldStateSynchronizerStatus, } from '@aztec/stdlib/interfaces/server'; import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; +import type { SnapshotDataKeys } from '@aztec/stdlib/snapshots'; import type { L2BlockHandledStats } from '@aztec/stdlib/stats'; import { MerkleTreeId, type MerkleTreeReadOperations, type MerkleTreeWriteOperations } from '@aztec/stdlib/trees'; import { TraceableL2BlockStream, getTelemetryClient } from '@aztec/telemetry-client'; @@ -31,6 +32,8 @@ import type { WorldStateStatusFull } from '../native/message.js'; import type { MerkleTreeAdminDatabase } from '../world-state-db/merkle_tree_db.js'; import type { WorldStateConfig } from './config.js'; +export type { SnapshotDataKeys }; + /** * Synchronizes the world state with the L2 blocks from a L2BlockSource via a block stream. * The synchronizer will download the L2 blocks from the L2BlockSource and update the merkle trees. @@ -54,7 +57,7 @@ export class ServerWorldStateSynchronizer private readonly l2BlockSource: L2BlockSource & L1ToL2MessageSource, private readonly config: WorldStateConfig, private instrumentation = new WorldStateInstrumentation(getTelemetryClient()), - private readonly log = createLogger('world_state'), + private readonly log: Logger = createLogger('world_state'), ) { this.merkleTreeCommitted = this.merkleTreeDb.getCommitted(); this.historyToKeep = config.worldStateBlockHistory < 1 ? undefined : config.worldStateBlockHistory; @@ -77,6 +80,10 @@ export class ServerWorldStateSynchronizer return this.merkleTreeDb.fork(blockNumber); } + public backupTo(dstPath: string, compact?: boolean): Promise, string>> { + return this.merkleTreeDb.backupTo(dstPath, compact); + } + public async start() { if (this.currentState === WorldStateRunningState.STOPPED) { throw new Error('Synchronizer already stopped'); @@ -147,6 +154,21 @@ export class ServerWorldStateSynchronizer return (await this.getL2Tips()).latest.number; } + public async stopSync() { + this.log.debug('Stopping sync...'); + await this.blockStream?.stop(); + this.log.info('Stopped sync'); + } + + public resumeSync() { + if (!this.blockStream) { + throw new Error('Cannot resume sync as block stream is not initialized'); + } + this.log.debug('Resuming sync...'); + this.blockStream.start(); + this.log.info('Resumed sync'); + } + /** * Forces an immediate sync. * @param targetBlockNumber - The target block number that we must sync to. Will download unproven blocks if needed to reach it. Throws if cannot be reached. diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index e79c05a136e6..5208c2d3c9d4 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -1348,6 +1348,7 @@ __metadata: "@aztec/constants": "workspace:^" "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" + "@google-cloud/storage": "npm:^7.15.0" "@jest/globals": "npm:^29.5.0" "@types/jest": "npm:^29.5.0" "@types/lodash.chunk": "npm:^4.2.9"