Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp): Defer each archive's global metadata updates until it has been completed (resolves #685). #705

Merged
merged 23 commits into from
Feb 14, 2025
Merged
54 changes: 32 additions & 22 deletions components/core/src/clp/streaming_archive/writer/Archive.cpp
Original file line number Diff line number Diff line change
@@ -157,10 +157,6 @@ void Archive::open(UserConfig const& user_config) {

m_global_metadata_db = user_config.global_metadata_db;

m_global_metadata_db->open();
m_global_metadata_db->add_archive(m_id_as_string, *m_local_metadata);
m_global_metadata_db->close();

m_file = nullptr;

// Open log-type dictionary
@@ -238,8 +234,18 @@ void Archive::close() {

m_metadata_file_writer.close();

update_global_metadata();
m_global_metadata_db = nullptr;

for (auto* file : m_file_metadata_for_global_update) {
delete file;
}
m_file_metadata_for_global_update.clear();

if (m_print_archive_stats_progress) {
print_archive_stats_progress();
}

m_metadata_db.close();

m_creator_id_as_string.clear();
@@ -557,8 +563,6 @@ void Archive::persist_file_metadata(vector<File*> const& files) {

m_metadata_db.update_files(files);

m_global_metadata_db->update_metadata_for_files(m_id_as_string, files);

// Mark files' metadata as clean
for (auto file : files) {
file->mark_metadata_as_clean();
@@ -593,16 +597,12 @@ void Archive::close_segment_and_persist_file_metadata(

for (auto file : files) {
file->mark_as_in_committed_segment();
m_file_metadata_for_global_update.emplace_back(file);
}

m_global_metadata_db->open();
persist_file_metadata(files);
update_metadata();
m_global_metadata_db->close();
update_local_metadata();

for (auto file : files) {
delete file;
}
files.clear();
}

@@ -628,23 +628,33 @@ uint64_t Archive::get_dynamic_compressed_size() {
return on_disk_size;
}

void Archive::update_metadata() {
auto Archive::print_archive_stats_progress() -> void {
nlohmann::json json_msg;
json_msg["id"] = m_id_as_string;
json_msg["uncompressed_size"] = m_local_metadata->get_uncompressed_size_bytes();
json_msg["size"] = m_local_metadata->get_compressed_size_bytes();
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
}

void Archive::update_local_metadata() {
m_local_metadata->set_dynamic_uncompressed_size(0);
m_local_metadata->set_dynamic_compressed_size(get_dynamic_compressed_size());
// Rewrite (overwrite) the metadata file
m_metadata_file_writer.seek_from_begin(0);
m_local_metadata->write_to_file(m_metadata_file_writer);
}

m_global_metadata_db->update_archive_metadata(m_id_as_string, *m_local_metadata);

if (m_print_archive_stats_progress) {
nlohmann::json json_msg;
json_msg["id"] = m_id_as_string;
json_msg["uncompressed_size"] = m_local_metadata->get_uncompressed_size_bytes();
json_msg["size"] = m_local_metadata->get_compressed_size_bytes();
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
<< std::endl;
auto Archive::update_global_metadata() -> void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming update_metadata to update_local_metadata so that there's congruency?

m_global_metadata_db->open();
if (false == m_local_metadata.has_value()) {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
m_global_metadata_db->add_archive(m_id_as_string, m_local_metadata.value());
m_global_metadata_db->update_metadata_for_files(
m_id_as_string,
m_file_metadata_for_global_update
);
m_global_metadata_db->close();
}

// Explicitly declare template specializations so that we can define the template methods in this
25 changes: 20 additions & 5 deletions components/core/src/clp/streaming_archive/writer/Archive.hpp
Original file line number Diff line number Diff line change
@@ -245,16 +245,16 @@ class Archive {
std::vector<File*>& files_in_segment
);
/**
* Writes the given files' metadata to the database using bulk writes
* Writes the given files' metadata to the local database using bulk writes.
* @param files
* @throw streaming_archive::writer::Archive::OperationFailed if failed to replace old
* metadata for any file
* @throw mongocxx::logic_error if invalid database operation is created
*/
void persist_file_metadata(std::vector<File*> const& files);
/**
* Closes a given segment, persists the metadata of the files in the segment, and cleans up
* any data remaining outside the segment
* Closes a given segment, persists metadata to local database for files in the segment, and
* cleans up any data remaining outside the segment.
* @param segment
* @param files
* @param segment_logtype_ids
@@ -274,10 +274,21 @@ class Archive {
* is closed
*/
uint64_t get_dynamic_compressed_size();

/**
* Prints archive progress statistics as a JSON object.
*/
auto print_archive_stats_progress() -> void;

/**
* Updates the archive's metadata
* Updates the archive's metadata in the local metadata database.
*/
void update_metadata();
void update_local_metadata();

/**
* Updates the archive's metadata in the global metadata database.
*/
auto update_global_metadata() -> void;

// Variables
boost::uuids::uuid m_id;
@@ -316,6 +327,10 @@ class Archive {
std::vector<File*> m_files_with_timestamps_in_segment;
std::vector<File*> m_files_without_timestamps_in_segment;

// Collection of `File`s which have been written to archive and deallocated, but whose
// metadata has not yet been persisted globally.
std::vector<File*> m_file_metadata_for_global_update;

size_t m_target_segment_uncompressed_size;
Segment m_segment_for_files_with_timestamps;
ArrayBackedPosIntSet<logtype_dictionary_id_t>