9
9
10
10
#include " base/vlog.h"
11
11
#include " container/fragmented_vector.h"
12
+ #include " model/namespace.h"
12
13
#include " model/record_batch_types.h"
13
14
#include " model/timeout_clock.h"
14
15
#include " random/generators.h"
@@ -104,7 +105,14 @@ struct ot_state {
104
105
// / batches!
105
106
struct ot_state_consumer {
106
107
ss::future<ss::stop_iteration> operator ()(model::record_batch rb) {
107
- if (rb.header ().type != model::record_batch_type::raft_data) {
108
+ static const auto translation_batches
109
+ = model::offset_translator_batch_types ();
110
+ if (
111
+ std::find (
112
+ translation_batches.begin (),
113
+ translation_batches.end (),
114
+ rb.header ().type )
115
+ != translation_batches.end ()) {
108
116
// save information about the non-data batch
109
117
st->gap_offset .push_back (rb.base_offset ());
110
118
st->gap_length .push_back (rb.record_count ());
@@ -122,11 +130,28 @@ struct ot_state_consumer {
122
130
// / by the set of segment base offset values.
123
131
ss::future<ot_state> arrange_and_compact (
124
132
const fragmented_vector<model::record_batch>& batches,
125
- std::deque<model::offset> arrangement) {
133
+ std::deque<model::offset> arrangement,
134
+ bool simulate_internal_topic_compaction = false ) {
126
135
std::sort (arrangement.begin (), arrangement.end ());
127
- storage::disk_log_builder b1;
136
+ storage::log_config cfg = storage::log_builder_config ();
137
+ auto offset_translator_types = model::offset_translator_batch_types ();
138
+ auto raft_group_id = raft::group_id{0 };
139
+ storage::disk_log_builder b1 (cfg, offset_translator_types, raft_group_id);
140
+
141
+ auto ns = simulate_internal_topic_compaction
142
+ ? model::kafka_internal_namespace
143
+ : model::kafka_namespace;
144
+ model::ntp log_ntp (
145
+ ns,
146
+ model::topic_partition (
147
+ model::topic (random_generators::gen_alphanum_string (8 )),
148
+ model::partition_id{0 }));
128
149
std::exception_ptr error = nullptr ;
129
- co_await b1.start ();
150
+ co_await b1.start (log_ntp);
151
+
152
+ // Must initialize translator state.
153
+ co_await b1.get_disk_log_impl ().start (std::nullopt);
154
+
130
155
try {
131
156
for (const auto & b : batches) {
132
157
co_await b1.add_batch (b.copy ());
@@ -138,11 +163,13 @@ ss::future<ot_state> arrange_and_compact(
138
163
}
139
164
}
140
165
ss::abort_source as;
141
- co_await b1. apply_compaction ( storage::compaction_config (
166
+ auto compact_cfg = storage::compaction_config (
142
167
batches.back ().last_offset (),
143
168
std::nullopt,
144
169
ss::default_priority_class (),
145
- as));
170
+ as);
171
+ std::ignore = co_await b1.apply_sliding_window_compaction (compact_cfg);
172
+ co_await b1.apply_adjacent_merge_compaction (compact_cfg);
146
173
} catch (...) {
147
174
error = std::current_exception ();
148
175
}
@@ -183,13 +210,39 @@ std::deque<model::offset> generate_random_arrangement(
183
210
}
184
211
185
212
SEASTAR_THREAD_TEST_CASE (test_compaction_with_different_segment_arrangements) {
186
- auto batches = generate_random_record_batches (1000 , 10 );
213
+ #ifdef NDEBUG
214
+ static constexpr auto num_batches = 1000 ;
215
+ std::vector<size_t > num_segments = {10 , 100 , 1000 };
216
+ #else
217
+ static constexpr auto num_batches = 10 ;
218
+ std::vector<size_t > num_segments = {10 };
219
+ #endif
220
+ auto batches = generate_random_record_batches (num_batches, 10 );
187
221
auto expected_ot
188
- = arrange_and_compact (batches, std::deque<model::offset>{}).get ();
222
+ = arrange_and_compact (batches, std::deque<model::offset>{}, false ).get ();
223
+ for (auto num : num_segments) {
224
+ auto arrangement = generate_random_arrangement (batches, num);
225
+ auto actual_ot = arrange_and_compact (batches, arrangement, false ).get ();
226
+ BOOST_REQUIRE (expected_ot.gap_offset == actual_ot.gap_offset );
227
+ BOOST_REQUIRE (expected_ot.gap_length == actual_ot.gap_length );
228
+ }
229
+ }
230
+
231
+ SEASTAR_THREAD_TEST_CASE (
232
+ test_compaction_with_different_segment_arrangements_simulate_internal_topic) {
233
+ #ifdef NDEBUG
234
+ static constexpr auto num_batches = 1000 ;
189
235
std::vector<size_t > num_segments = {10 , 100 , 1000 };
236
+ #else
237
+ static constexpr auto num_batches = 10 ;
238
+ std::vector<size_t > num_segments = {10 };
239
+ #endif
240
+ auto batches = generate_random_record_batches (num_batches, 10 );
241
+ auto expected_ot
242
+ = arrange_and_compact (batches, std::deque<model::offset>{}, true ).get ();
190
243
for (auto num : num_segments) {
191
244
auto arrangement = generate_random_arrangement (batches, num);
192
- auto actual_ot = arrange_and_compact (batches, arrangement).get ();
245
+ auto actual_ot = arrange_and_compact (batches, arrangement, true ).get ();
193
246
BOOST_REQUIRE (expected_ot.gap_offset == actual_ot.gap_offset );
194
247
BOOST_REQUIRE (expected_ot.gap_length == actual_ot.gap_length );
195
248
}
0 commit comments