Skip to content

Commit

Permalink
storage: add compaction_scheduling fixture test
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Feb 11, 2025
1 parent 985fe43 commit 70c37b7
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ redpanda_cc_btest(
"//src/v/test_utils:seastar_boost",
"//src/v/utils:directory_walker",
"//src/v/utils:to_string",
"//src/v/utils:tristate",
"@boost//:test",
"@fmt",
"@seastar",
Expand Down
115 changes: 115 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "reflection/adl.h"
#include "storage/batch_cache.h"
#include "storage/disk_log_impl.h"
#include "storage/log_housekeeping_meta.h"
#include "storage/log_manager.h"
#include "storage/log_reader.h"
#include "storage/ntp_config.h"
Expand All @@ -40,6 +41,7 @@
#include "test_utils/randoms.h"
#include "test_utils/tmp_dir.h"
#include "utils/directory_walker.h"
#include "utils/tristate.h"

#include <seastar/core/io_priority_class.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -5378,3 +5380,116 @@ FIXTURE_TEST(dirty_ratio, storage_test_fixture) {
BOOST_REQUIRE_EQUAL(disk_log->closed_segment_bytes(), 0);
BOOST_REQUIRE_CLOSE(disk_log->dirty_ratio(), 0.0, tolerance);
}

FIXTURE_TEST(compaction_scheduling, storage_test_fixture) {
using log_manager_accessor = storage::testing_details::log_manager_accessor;
storage::log_manager mgr = make_log_manager();
info("Configuration: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
std::vector<ss::shared_ptr<storage::log>> logs;

using overrides_t = storage::ntp_config::default_overrides;
overrides_t ov;
ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction;
ov.min_cleanable_dirty_ratio = tristate<double>{0.2};

for (const auto& topic : {"tapioca", "cassava", "kudzu"}) {
auto ntp = model::ntp("kafka", topic, 0);
auto log
= mgr
.manage(storage::ntp_config(
ntp, mgr.config().base_dir, std::make_unique<overrides_t>(ov)))
.get();
logs.push_back(log);
}

auto& meta_list = log_manager_accessor::logs_list(mgr);

using bflags = storage::log_housekeeping_meta::bitflags;

static constexpr auto is_set = [](bflags var, auto flag) {
return (var & flag) == flag;
};

// Floating point comparison tolerance
static constexpr auto tol = 1.0e-6;

auto append_and_force_roll = [this](auto& log, int num_batches = 10) {
auto headers = append_random_batches<linear_int_kv_batch_generator>(
log, num_batches);
log->force_roll(ss::default_priority_class()).get();
};

// Attempt a housekeeping scan with no partitions to compact
log_manager_accessor::housekeeping_scan(mgr).get();

for (const auto& meta : meta_list) {
BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked));
BOOST_REQUIRE(!is_set(meta.flags, bflags::compacted));
}

// Append batches and force roll with first log- it should be the only one
// compacted
append_and_force_roll(logs[0], 30);
BOOST_REQUIRE_CLOSE(logs[0]->dirty_ratio(), 1.0, tol);

log_manager_accessor::housekeeping_scan(mgr).get();

for (const auto& meta : meta_list) {
bool expect_compacted = meta.handle->config().ntp()
== logs[0]->config().ntp();
BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked));
BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked));
BOOST_REQUIRE_EQUAL(
expect_compacted, is_set(meta.flags, bflags::compacted));
auto batches = read_and_validate_all_batches(logs[0]);
linear_int_kv_batch_generator::validate_post_compaction(
std::move(batches));
}

// Append fewer batches and force roll with second log- it should be the
// only one compacted
append_and_force_roll(logs[1], 20);
BOOST_REQUIRE_CLOSE(logs[1]->dirty_ratio(), 1.0, tol);

log_manager_accessor::housekeeping_scan(mgr).get();

for (const auto& meta : meta_list) {
bool expect_compacted = meta.handle->config().ntp()
== logs[1]->config().ntp();
BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked));
BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked));
BOOST_REQUIRE_EQUAL(
expect_compacted, is_set(meta.flags, bflags::compacted));
auto batches = read_and_validate_all_batches(logs[1]);
linear_int_kv_batch_generator::validate_post_compaction(
std::move(batches));
}

// Append batches and force roll all logs- all of them will be compacted
for (auto& log : logs) {
append_and_force_roll(log, 10);
}

BOOST_REQUIRE_GE(logs[0]->dirty_ratio(), 1.0 / 3.0);
BOOST_REQUIRE_GE(logs[1]->dirty_ratio(), 1.0 / 2.0);
BOOST_REQUIRE_CLOSE(logs[2]->dirty_ratio(), 1.0, tol);

log_manager_accessor::housekeeping_scan(mgr).get();

// Logs in the meta list will be ordered w/r/t their dirty ratio
// (descending) post compaction
auto log_it = logs.rbegin();
for (const auto& meta : meta_list) {
BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked));
BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked));
BOOST_REQUIRE(is_set(meta.flags, bflags::compacted));
BOOST_REQUIRE_EQUAL(
meta.handle->config().ntp(), (*log_it)->config().ntp());
++log_it;
}

for (auto& log : logs) {
auto batches = read_and_validate_all_batches(log);
}
};

0 comments on commit 70c37b7

Please sign in to comment.