Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ LMDBTreeStore::LMDBTreeStore(std::string directory, std::string name, uint64_t m
}
}

const std::string& LMDBTreeStore::get_name() const
{
return _name;
}

void LMDBTreeStore::get_stats(TreeDBStats& stats, ReadTransaction& tx)
{
stats.mapSize = _environment->get_map_size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class LMDBTreeStore : public LMDBStoreBase {
LMDBTreeStore& operator=(LMDBTreeStore&& other) = delete;
~LMDBTreeStore() override = default;

const std::string& get_name() const;

void get_stats(TreeDBStats& stats, ReadTransaction& tx);

void write_block_data(const block_number_t& blockNumber, const BlockPayload& blockData, WriteTransaction& tx);
Expand Down
14 changes: 14 additions & 0 deletions barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,18 @@ LMDBStoreBase::WriteTransaction::Ptr LMDBStoreBase::create_write_transaction() c
_environment->wait_for_writer();
return std::make_unique<WriteTransaction>(_environment);
}

void LMDBStoreBase::copy_store(const std::string& dstPath, bool compact)
{
// Create a write tx to acquire a write lock to prevent writes while copying. From LMDB docs:
// "[mdb_copy] can trigger significant file size growth if run in parallel with write transactions,
// because pages which they free during copying cannot be reused until the copy is done."
WriteTransaction::Ptr tx = create_write_transaction();
call_lmdb_func("mdb_env_copy2",
mdb_env_copy2,
_environment->underlying(),
dstPath.c_str(),
static_cast<unsigned int>(compact ? MDB_CP_COMPACT : 0));
}

} // namespace bb::lmdblib
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class LMDBStoreBase {
ReadTransaction::SharedPtr create_shared_read_transaction() const;
WriteTransaction::Ptr create_write_transaction() const;
LMDBDatabaseCreationTransaction::Ptr create_db_transaction() const;
void copy_store(const std::string& dstPath, bool compact);

protected:
std::string _dbDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum LMDBStoreMessageType {
STATS,

CLOSE,
COPY_STORE,
};

struct OpenDatabaseRequest {
Expand Down Expand Up @@ -118,6 +119,12 @@ struct StatsResponse {
MSGPACK_FIELDS(stats, dbMapSizeBytes);
};

struct CopyStoreRequest {
std::string dstPath;
std::optional<bool> compact;
MSGPACK_FIELDS(dstPath, compact);
};

} // namespace bb::nodejs::lmdb_store

MSGPACK_ADD_ENUM(bb::nodejs::lmdb_store::LMDBStoreMessageType)
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)

// The close operation requires exclusive execution, no other operations can be run concurrently with it
_msg_processor.register_handler(LMDBStoreMessageType::CLOSE, this, &LMDBStoreWrapper::close, true);

_msg_processor.register_handler(LMDBStoreMessageType::COPY_STORE, this, &LMDBStoreWrapper::copy_store, true);
}

Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
Expand Down Expand Up @@ -279,6 +281,14 @@ BoolResponse LMDBStoreWrapper::close()
return { true };
}

BoolResponse LMDBStoreWrapper::copy_store(const CopyStoreRequest& req)
{
verify_store();
_store->copy_store(req.dstPath, req.compact.value_or(false));

return { true };
}

std::pair<bool, bb::lmdblib::KeyDupValuesVector> LMDBStoreWrapper::_advance_cursor(const lmdblib::LMDBCursor& cursor,
bool reverse,
uint64_t page_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class LMDBStoreWrapper : public Napi::ObjectWrap<LMDBStoreWrapper> {

BoolResponse close();

BoolResponse copy_store(const CopyStoreRequest& req);

static std::pair<bool, lmdblib::KeyDupValuesVector> _advance_cursor(const lmdblib::LMDBCursor& cursor,
bool reverse,
uint64_t page_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ WorldStateWrapper::WorldStateWrapper(const Napi::CallbackInfo& info)
_dispatcher.register_target(
WorldStateMessageType::REVERT_CHECKPOINT,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return revert_checkpoint(obj, buffer); });

_dispatcher.register_target(
WorldStateMessageType::COPY_STORES,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return copy_stores(obj, buffer); });
}

Napi::Value WorldStateWrapper::call(const Napi::CallbackInfo& info)
Expand Down Expand Up @@ -824,6 +828,20 @@ bool WorldStateWrapper::get_status(msgpack::object& obj, msgpack::sbuffer& buf)
return true;
}

bool WorldStateWrapper::copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer)
{
TypedMessage<CopyStoresRequest> request;
obj.convert(request);

_ws->copy_stores(request.value.dstPath, request.value.compact.value_or(false));

MsgHeader header(request.header.messageId);
messaging::TypedMessage<WorldStateStatusFull> resp_msg(WorldStateMessageType::COPY_STORES, header, {});
msgpack::pack(buffer, resp_msg);

return true;
}

Napi::Function WorldStateWrapper::get_class(Napi::Env env)
{
return DefineClass(env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class WorldStateWrapper : public Napi::ObjectWrap<WorldStateWrapper> {
bool checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);
bool commit_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);
bool revert_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);

bool copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer);
};

} // namespace bb::nodejs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ enum WorldStateMessageType {
COMMIT_CHECKPOINT,
REVERT_CHECKPOINT,

COPY_STORES,

CLOSE = 999,
};

Expand Down Expand Up @@ -231,6 +233,12 @@ struct SyncBlockRequest {
publicDataWrites);
};

struct CopyStoresRequest {
std::string dstPath;
std::optional<bool> compact;
MSGPACK_FIELDS(dstPath, compact);
};

} // namespace bb::nodejs

MSGPACK_ADD_ENUM(bb::nodejs::WorldStateMessageType)
12 changes: 12 additions & 0 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ void WorldState::create_canonical_fork(const std::string& dataDir,
_forks[fork->_forkId] = fork;
}

void WorldState::copy_stores(const std::string& dstPath, bool compact) const
{
auto copyStore = [&](const LMDBTreeStore::SharedPtr& store) {
std::filesystem::path directory = dstPath;
directory /= store->get_name();
std::filesystem::create_directories(directory);
store->copy_store(directory, compact);
};

std::for_each(_persistentStores->begin(), _persistentStores->end(), copyStore);
}

Fork::SharedPtr WorldState::retrieve_fork(const uint64_t& forkId) const
{
std::unique_lock lock(mtx);
Expand Down
8 changes: 8 additions & 0 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ class WorldState {
const std::vector<PublicDataLeafValue>& prefilled_public_data,
uint32_t initial_header_generator_point);

/**
* @brief Copies all underlying LMDB stores to the target directory while acquiring a write lock
*
* @param dstPath Parent folder where trees will be copied
* @param compact Whether to compact stores when copying
*/
void copy_stores(const std::string& dstPath, bool compact) const;

/**
* @brief Get tree metadata for a particular tree
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ struct WorldStateStores {
, messageStore(std::move(other.messageStore))
{}

auto begin() const { return stores.begin(); }
auto end() const { return stores.end(); }

WorldStateStores(const WorldStateStores& other) = delete;
~WorldStateStores() = default;

WorldStateStores& operator=(WorldStateStores&& other) = delete;
WorldStateStores& operator=(WorldStateStores& other) = delete;

private:
std::array<LMDBTreeStore::SharedPtr, 5> stores{
nullifierStore, publicDataStore, archiveStore, noteHashStore, messageStore
};
};
} // namespace bb::world_state
35 changes: 34 additions & 1 deletion yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {

private l1BlockNumber: bigint | undefined;
private l1Timestamp: bigint | undefined;
private initialSyncComplete: boolean = false;

public readonly tracer: Tracer;

Expand Down Expand Up @@ -280,6 +281,7 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
// but the corresponding blocks have not been processed (see #12631).
this.l1Timestamp = currentL1Timestamp;
this.l1BlockNumber = currentL1BlockNumber;
this.initialSyncComplete = true;

if (initialRun) {
this.log.info(`Initial archiver sync to L1 block ${currentL1BlockNumber} complete.`, {
Expand All @@ -293,7 +295,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
/** Queries the rollup contract on whether a prune can be executed on the immediatenext L1 block. */
private async canPrune(currentL1BlockNumber: bigint, currentL1Timestamp: bigint) {
const time = (currentL1Timestamp ?? 0n) + BigInt(this.l1constants.ethereumSlotDuration);
return await this.rollup.canPruneAtTime(time, { blockNumber: currentL1BlockNumber });
const result = await this.rollup.canPruneAtTime(time, { blockNumber: currentL1BlockNumber });
if (result) {
this.log.debug(`Rollup contract allows pruning at L1 block ${currentL1BlockNumber} time ${time}`, {
currentL1Timestamp,
pruneTime: time,
currentL1BlockNumber,
});
}
return result;
}

/** Checks if there'd be a reorg for the next block submission and start pruning now. */
Expand Down Expand Up @@ -542,6 +552,18 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
return { provenBlockNumber };
}

/** Resumes the archiver after a stop. */
public resume() {
if (!this.runningPromise) {
throw new Error(`Archiver was never started`);
}
if (this.runningPromise.isRunning()) {
this.log.warn(`Archiver already running`);
}
this.log.info(`Restarting archiver`);
this.runningPromise.start();
}

/**
* Stops the archiver.
* @returns A promise signalling completion of the stop process.
Expand All @@ -554,6 +576,10 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
return Promise.resolve();
}

public backupTo(destPath: string): Promise<string> {
return this.dataStore.backupTo(destPath);
}

public getL1Constants(): Promise<L1RollupConstants> {
return Promise.resolve(this.l1constants);
}
Expand Down Expand Up @@ -654,6 +680,11 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
return l1Timestamp + leeway >= endTimestamp;
}

/** Returns whether the archiver has completed an initial sync run successfully. */
public isInitialSyncComplete(): boolean {
return this.initialSyncComplete;
}

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
Expand Down Expand Up @@ -874,6 +905,8 @@ class ArchiverStoreHelper
| 'addContractInstanceUpdates'
| 'deleteContractInstanceUpdates'
| 'addFunctions'
| 'backupTo'
| 'close'
>
{
#log = createLogger('archiver:block-helper');
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,10 @@ export interface ArchiverDataStore {
* Estimates the size of the store in bytes.
*/
estimateSize(): Promise<{ mappingSize: number; actualSize: number; numItems: number }>;

/** Backups the archiver db to the target folder. Returns the path to the db file. */
backupTo(path: string): Promise<string>;

/** Closes the underlying data store. */
close(): Promise<void>;
}
2 changes: 1 addition & 1 deletion yarn-project/archiver/src/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ export * from './archiver.js';
export * from './config.js';
export { type PublishedL2Block, type L1PublishedData } from './structs/published.js';
export type { ArchiverDataStore } from './archiver_store.js';
export { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js';
export { KVArchiverDataStore, ARCHIVER_DB_VERSION } from './kv_archiver_store/kv_archiver_store.js';
export { ContractInstanceStore } from './kv_archiver_store/contract_instance_store.js';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { type LogFilter, PrivateLog, type TxScopedL2Log } from '@aztec/stdlib/lo
import type { InboxLeaf } from '@aztec/stdlib/messaging';
import type { BlockHeader, TxHash, TxReceipt } from '@aztec/stdlib/tx';

import { join } from 'path';

import type { ArchiverDataStore, ArchiverL1SynchPoint } from '../archiver_store.js';
import type { DataRetrieval } from '../structs/data_retrieval.js';
import type { PublishedL2Block } from '../structs/published.js';
Expand All @@ -26,11 +28,13 @@ import { ContractInstanceStore } from './contract_instance_store.js';
import { LogStore } from './log_store.js';
import { MessageStore } from './message_store.js';

export const ARCHIVER_DB_VERSION = 1;

/**
* LMDB implementation of the ArchiverDataStore interface.
*/
export class KVArchiverDataStore implements ArchiverDataStore {
public static readonly SCHEMA_VERSION = 1;
public static readonly SCHEMA_VERSION = ARCHIVER_DB_VERSION;

#blockStore: BlockStore;
#logStore: LogStore;
Expand All @@ -49,6 +53,15 @@ export class KVArchiverDataStore implements ArchiverDataStore {
this.#contractInstanceStore = new ContractInstanceStore(db);
}

public async backupTo(path: string, compress = true): Promise<string> {
await this.db.backupTo(path, compress);
return join(path, 'data.mdb');
}

Comment on lines +56 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something for this PR but we should consider bundling the rollup address/schema version into the backup to avoid restoring incompatible versions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to go back and update this comment: this is handled already. This info is part of the snapshot index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding it to the snapshot metadata. We could write those as entries of the db itself though, and completely do away with the db_version file we use today. WDYT?

public close() {
return this.db.close();
}

// TODO: These function names are in memory only as they are for development/debugging. They require the full contract
// artifact supplied to the node out of band. This should be reviewed and potentially removed as part of
// the node api cleanup process.
Expand Down
Loading