Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete snapshot so that it doesn't orphan data keys or delete the wrong key #1973

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
GroupingMap grouping_map;
// Iterating backwards as we are going to erase from this vector as we go along
// This is to spread out deallocation of the input segments
for (auto it = row_slices.rbegin(); it != row_slices.rend(); ++it) {
auto it = row_slices.rbegin();
while(it != row_slices.rend()) {
auto& row_slice = *it;
auto partitioning_column = row_slice.get(ColumnName(grouping_column_));
if (std::holds_alternative<ColumnWithStrings>(partitioning_column)) {
Expand Down Expand Up @@ -383,7 +384,7 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
} else {
util::raise_rte("Expected single column from expression");
}
row_slices.erase(std::next(it).base());
it = static_cast<decltype(row_slices)::reverse_iterator>((row_slices.erase(std::next(it).base())));
}
SegmentInMemory seg;
auto index_col = std::make_shared<Column>(make_scalar_type(grouping_data_type), grouping_map.size(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
Expand Down
42 changes: 26 additions & 16 deletions cpp/arcticdb/util/key_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <memory>
#include <arcticdb/column_store/key_segment.hpp>
#include <arcticdb/storage/store.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/stream/stream_reader.hpp>
#include <arcticdb/stream/stream_utils.hpp>

Expand Down Expand Up @@ -155,14 +154,25 @@ inline ankerl::unordered_dense::set<AtomKey> recurse_segment(
/* Given a container of [multi-]index keys, returns a set containing all the multi-index, index, and data keys
* referenced by these [multi-]index keys.
* Note that this differs from recurse_index_key, which includes the passed in key in the returned set. */
template<typename KeyContainer, typename = std::enable_if<std::is_base_of_v<AtomKey, typename KeyContainer::value_type>>>
template<typename KeyContainer>
requires std::is_base_of_v<AtomKey, typename KeyContainer::value_type>
inline ankerl::unordered_dense::set<AtomKey> recurse_index_keys(
const std::shared_ptr<stream::StreamSource>& store,
const KeyContainer& keys,
storage::ReadKeyOpts opts) {
if (keys.empty()) {
return {};
}
// Having one set for AtomKeys and one for AtomKeyPacked is intentional. This handles the case of pruning data for symbol.
// In that case all keys will be for the same symbol and we can use the less expensive to hash AtomKeyPacked struct as
// rehashing when the set grows is expensive for AtomKeys. In case the keys are for different symbols (e.g. when
// deleting a snapshot) AtomKey must be used as we need the symbol_id per key.
ankerl::unordered_dense::set<AtomKey> res;
ankerl::unordered_dense::set<AtomKeyPacked> res_packed;
const StreamId& first_stream_id = keys.begin()->id();
bool same_stream_id = true;
for (const auto& index_key: keys) {
same_stream_id = first_stream_id == index_key.id();
try {
if (index_key.type() == KeyType::MULTI_KEY) {
// recurse_index_key includes the input key in the returned set, remove this here
Expand All @@ -176,16 +186,17 @@ inline ankerl::unordered_dense::set<AtomKey> recurse_index_keys(
auto data_keys = key_segment.materialise();
util::variant_match(
data_keys,
[&res, &keys](std::vector<AtomKey> &atom_keys) {
res.reserve(keys.size());
for (auto &&key : atom_keys) {
res.emplace(std::move(key));
}
},
[&res_packed, &keys](std::vector<AtomKeyPacked> &atom_keys_packed) {
res_packed.reserve(keys.size());
for (auto &&key_packed : atom_keys_packed) {
res_packed.emplace(std::move(key_packed));
[&]<typename KeyType>(std::vector<KeyType>&atom_keys) {
for (KeyType& key : atom_keys) {
if constexpr (std::is_same_v<KeyType, AtomKey>) {
res.emplace(std::move(key));
} else if constexpr (std::is_same_v<KeyType, AtomKeyPacked>) {
if (same_stream_id) {
res_packed.emplace(std::move(key));
} else {
res.emplace(key.to_atom_key(index_key.id()));
}
}
}
}
);
Expand All @@ -199,15 +210,14 @@ inline ankerl::unordered_dense::set<AtomKey> recurse_index_keys(
if (opts.ignores_missing_key_) {
log::version().info("Missing key while recursing index key {}", e.keys());
} else {
throw storage::KeyNotFoundException(std::move(e.keys()));
throw;
}
}
}
if (!res_packed.empty()) {
res.reserve(res_packed.size() + res.size());
auto id = keys.begin()->id();
for (const auto& key: res_packed) {
res.emplace(key.to_atom_key(id));
for (const auto& key : res_packed) {
res.emplace(key.to_atom_key(first_stream_id));
}
}
return res;
Expand Down
31 changes: 19 additions & 12 deletions cpp/arcticdb/util/test/test_key_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

using namespace arcticdb;

auto write_version_frame_with_three_segments(
static auto write_version_frame_with_three_segments(
const arcticdb::StreamId& stream_id,
arcticdb::VersionId v_id,
arcticdb::version_store::PythonVersionStore& pvs
Expand Down Expand Up @@ -99,10 +99,12 @@ TEST(KeyUtils, RecurseIndexKeyIgnoreMissing) {
// Given
auto [version_store, mock_store] = python_version_store_in_memory();

StreamId first_id{"first"};
const StreamId first_id{"first"};
write_version_frame_with_three_segments(first_id, 0, version_store);
StreamId second_id{"second"};
const StreamId second_id{"second"};
write_version_frame_with_three_segments(second_id, 0, version_store);
const StreamId third_id{"third"};
write_version_frame_with_three_segments(third_id, 0, version_store);

std::vector<AtomKeyImpl> index_keys;
AtomKeyImpl index_for_second;
Expand All @@ -113,22 +115,27 @@ TEST(KeyUtils, RecurseIndexKeyIgnoreMissing) {
index_for_second = ak;
}
});
ASSERT_EQ(index_keys.size(), 2);
ASSERT_EQ(index_keys.size(), 3);
ASSERT_EQ(index_for_second.id(), second_id);

storage::RemoveOpts remove_opts;
mock_store->remove_key(index_for_second, remove_opts);

std::vector<AtomKeyImpl> data_keys;
mock_store->iterate_type(KeyType::TABLE_DATA, [&](auto&& vk) {
data_keys.emplace_back(std::get<AtomKeyImpl>(vk));
});
mock_store->remove_key(index_for_second, storage::RemoveOpts{});

// When
storage::ReadKeyOpts opts;
opts.ignores_missing_key_ = true;
auto res = recurse_index_keys(mock_store, index_keys, opts);

// Then
ASSERT_EQ(res.size(), 3);
ASSERT_EQ(res.size(), 6);
int count_first = 0;
int count_third = 0;
for (const AtomKey& atom_key : res) {
if (atom_key.id() == first_id) {
count_first++;
} else if (atom_key.id() == third_id) {
count_third++;
}
}
ASSERT_EQ(3, count_first);
ASSERT_EQ(3, count_third);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
"""
import pytest
import numpy as np
import pandas as pd
import re

from arcticdb_ext.exceptions import InternalException
from arcticdb_ext.version_store import NoSuchVersionException
from arcticdb_ext.storage import NoDataFoundException
from arcticdb.util.test import distinct_timestamps
from tests.util.storage_test import get_s3_storage_config
from arcticdb_ext.storage import KeyType


def test_basic_snapshot_flow(basic_store):
Expand Down Expand Up @@ -491,4 +493,27 @@ def assert_0_delete_marker(lib, storage):
assert_0_delete_marker(lib, storage)

lib.remove_from_snapshot("snap", ["s2"], [s2_ver])
assert_0_delete_marker(lib, storage)
assert_0_delete_marker(lib, storage)


def test_snapshot_deletion_multiple_symbols(lmdb_version_store_v1):
lib = lmdb_version_store_v1
for symbol_idx in range(2):
lib.write(f"sym_{symbol_idx}", pd.DataFrame({"col": [1, 2]}))
lib.append(f"sym_{symbol_idx}", pd.DataFrame({"col": [3, 4]}))

lib.snapshot("snap")
lib.delete_version("sym_0", 1)
lib.delete_version("sym_1", 1)

lib_tool = lib.library_tool()

for symbol_idx in range(2):
assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, f"sym_{symbol_idx}")) == 2
assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_INDEX, f"sym_{symbol_idx}")) == 2


lib.delete_snapshot("snap")
for symbol_idx in range(2):
assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, f"sym_{symbol_idx}")) == 1
assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_INDEX, f"sym_{symbol_idx}")) == 1
Loading