Skip to content

Iceberg with data migrations #24780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
db34c07
tests/datalake/e2e: table can be queried when topic is deleted
bashtanov Jan 10, 2025
6e1e0f5
tests/migrations: switch to flaky admin before disruption
bashtanov Jan 11, 2025
ef4326a
tests/migrations: correct test_listing_inexistent_migration
bashtanov Jan 11, 2025
7a04790
tests/dl/verifier: improve logging
bashtanov Jan 6, 2025
8613ac6
tests/dl/verifier: add facility to wait until first message via iceberg
bashtanov Jan 11, 2025
4a0d752
tests/dl/verifier: introduce mode for no communication with the topic
bashtanov Jan 11, 2025
b5582de
tests/migrations: separate migration utility functions into a mixin
bashtanov Jan 11, 2025
917de67
tests/services/connect/stop: option to make sure stream NOT finished
bashtanov Jan 9, 2025
e16b1db
tests/dl/simple_connect_test: refactor in prep to add un/mount tests
bashtanov Jan 11, 2025
835e90c
tests/dl/simple_connect_test/rpconnect: configure no of messages
bashtanov Jan 11, 2025
f5a1635
tests/dl/simple_connect_test: test with data migrations
bashtanov Jan 11, 2025
fa82488
c/archival/ntp_archiver_service: hold gate while waiting for flush
bashtanov Jan 2, 2025
a7e4e1c
c/topic_recovery_service: copy all topic properties, override specific
bashtanov Jan 2, 2025
700619c
c/migrations/backend: copy all topic properties, override specific ones
bashtanov Jan 2, 2025
727bfb2
utils/notification_list: collect return values
bashtanov Jan 2, 2025
a6ed359
r/offset_monitor: make templated so it can be used for kafka offsets
bashtanov Jan 2, 2025
a84e7c2
dl/translation/stm: implement waiting for translation of specific offset
bashtanov Jan 6, 2025
491d67e
c/partition: to dispatch async flush actions to components
bashtanov Jan 6, 2025
f166e6d
dl/translation/partition_translator: subscribe as a partition flusher
bashtanov Jan 6, 2025
314619c
c/migrations/worker: get offset when blocking writes and use it on flush
bashtanov Jan 6, 2025
58d8435
model/fundamental: fix typo in comment
bashtanov Jan 6, 2025
00a9c30
c/topic_frontend: fix formatting
bashtanov Jan 6, 2025
ae89710
c/partition: add comment
bashtanov Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ redpanda_cc_library(
"//src/v/ssx:sformat",
"//src/v/ssx:single_sharded",
"//src/v/ssx:sleep_abortable",
"//src/v/ssx:when_all",
"//src/v/storage",
"//src/v/storage:parser_utils",
"//src/v/storage:record_batch_builder",
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2907,6 +2907,8 @@ flush_result ntp_archiver::flush() {
}

ss::future<wait_result> ntp_archiver::wait(model::offset o) {
ss::gate::holder holder(_gate);

if (_parent.is_read_replica_mode_enabled()) {
vlog(
_rtclog.debug,
Expand Down
29 changes: 11 additions & 18 deletions src/v/cluster/data_migration_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,36 +628,29 @@ ss::future<errc> backend::create_topic(
if (!maybe_cfg) {
co_return errc::topic_invalid_config;
}

cluster::topic_configuration topic_to_create_cfg(
local_nt.ns,
local_nt.tp,
maybe_cfg->partition_count,
maybe_cfg->replication_factor);
auto& topic_properties = topic_to_create_cfg.properties;
auto manifest_props = maybe_cfg->properties;

topic_properties.compression = manifest_props.compression;
topic_properties.cleanup_policy_bitflags
= manifest_props.cleanup_policy_bitflags;
topic_properties.compaction_strategy = manifest_props.compaction_strategy;

topic_properties.retention_bytes = manifest_props.retention_bytes;
topic_properties.retention_duration = manifest_props.retention_duration;
topic_properties.retention_local_target_bytes
= manifest_props.retention_local_target_bytes;

topic_properties.remote_topic_namespace_override
= manifest_props.remote_topic_namespace_override
? manifest_props.remote_topic_namespace_override
: original_nt;
// copy all properties
topic_properties = maybe_cfg->properties;

// override specific ones
topic_to_create_cfg.is_migrated = true;
if (!topic_properties.remote_topic_namespace_override) {
topic_properties.remote_topic_namespace_override = original_nt;
}
topic_properties.remote_topic_properties.emplace(
tm.get_revision(), maybe_cfg->partition_count);
topic_properties.shadow_indexing = {model::shadow_indexing_mode::full};
topic_properties.shadow_indexing = model::shadow_indexing_mode::full;
topic_properties.recovery = true;
topic_properties.remote_label = manifest_props.remote_label;
topic_properties.read_replica = {};
topic_properties.read_replica_bucket = {};

topic_to_create_cfg.is_migrated = true;
custom_assignable_topic_configuration_vector cfg_vector;
cfg_vector.push_back(
custom_assignable_topic_configuration(std::move(topic_to_create_cfg)));
Expand Down
47 changes: 16 additions & 31 deletions src/v/cluster/data_migration_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/all.hh>

#include <fmt/ostream.h>

Expand Down Expand Up @@ -248,16 +249,21 @@ ss::future<errc> worker::do_work(

switch (sought_state) {
case state::prepared:
co_return co_await flush(partition);
co_return co_await partition->flush_archiver();
case state::executed: {
auto block_res = co_await block(partition, true);
if (block_res != errc::success) {
co_return block_res;
if (!block_res.has_value()) {
co_return block_res.error();
}
co_return co_await flush(partition);
auto block_offset = block_res.value();

auto deadline = model::timeout_clock::now() + 5s;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this timeout value is very low considering the possible amount of work is there to be done. Should we consider making it larger and configurable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be retried by migration reconciliation, including when controller leadership changes. Since partition is blocked already translation backlog will eventually get processed. Translator flush doesn't do any active work, it just waits for certain offset to be translated. The only problem is cloud storage flush will be invoked every time. @WillemKauf @andrwng if a partition does not receive any further writes is it much overhead to flush its cloud data every few seconds?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this call we are waiting for the translator to actually execute the translation work, and this may take a while depending on the translation gap. If all the calls are idempotent then this is great, it will just be retried and eventually succeed

co_return co_await partition->flush(block_offset, deadline, _as);
}
case state::cancelled: {
auto res = co_await block(partition, false);
co_return res.has_value() ? errc::success : res.error();
}
case state::cancelled:
co_return co_await block(partition, false);
default:
vassert(
false,
Expand All @@ -267,36 +273,15 @@ ss::future<errc> worker::do_work(
}
}

ss::future<errc>
ss::future<result<model::offset, errc>>
worker::block(ss::lw_shared_ptr<partition> partition, bool block) {
auto res = co_await partition->set_writes_disabled(
partition_properties_stm::writes_disabled{block},
model::timeout_clock::now() + 5s);
co_return map_update_interruption_error_code(res);
}

ss::future<errc> worker::flush(ss::lw_shared_ptr<partition> partition) {
// todo: check ntp_config cloud storage writes enabled?
auto maybe_archiver = partition->archiver();
if (!maybe_archiver) {
co_return errc::invalid_partition_operation;
}
auto& archiver = maybe_archiver->get();
auto flush_res = archiver.flush();
if (flush_res.response != archival::flush_response::accepted) {
co_return errc::partition_operation_failed;
}
switch (co_await archiver.wait(*flush_res.offset)) {
case archival::wait_result::not_in_progress:
// is partition concurrently flushed/waited by smth else?
vassert(false, "Freshly accepted flush cannot be waited for");
case archival::wait_result::lost_leadership:
co_return errc::leadership_changed;
case archival::wait_result::failed:
co_return errc::partition_operation_failed;
case archival::wait_result::complete:
co_return errc::success;
if (res.has_value()) {
co_return res.value();
}
co_return map_update_interruption_error_code(res.error());
}

void worker::spawn_work_if_leader(managed_ntp_it it) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/data_migration_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class worker : public ss::peering_sharded_service<worker> {
state sought_state,
const outbound_partition_work_info&);

ss::future<errc> block(ss::lw_shared_ptr<partition> partition, bool block);
ss::future<errc> flush(ss::lw_shared_ptr<partition> partition);
ss::future<result<model::offset, errc>>
block(ss::lw_shared_ptr<partition> partition, bool block);

model::node_id _self;
partition_leaders_table& _leaders_table;
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ namespace cluster {
using notification_id_type = named_type<int32_t, struct notification_id>;
inline constexpr notification_id_type notification_id_type_invalid{-1};

using partition_flush_hook_id
= named_type<int32_t, struct partition_finalize_hook_id_tag>;
inline constexpr partition_flush_hook_id partition_flush_hook_id_invalid{-1};

} // namespace cluster
65 changes: 64 additions & 1 deletion src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "raft/fundamental.h"
#include "raft/fwd.h"
#include "raft/state_machine_manager.h"
#include "ssx/when_all.h"
#include "storage/ntp_config.h"
#include "utils/rwlock.h"

Expand Down Expand Up @@ -544,13 +545,23 @@ ss::future<> partition::start(
_dl_stm_api = ss::make_shared<experimental::cloud_topics::dl_stm_api>(
clusterlog, std::move(dl_stm));
}

_archiver_flush_subscription = register_flush_hook(
[this](
model::offset,
model::timeout_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>>) {
return flush_archiver();
});
}

ss::future<> partition::stop() {
auto partition_ntp = ntp();
vlog(clusterlog.debug, "Stopping partition: {}", partition_ntp);
_as.request_abort();

unregister_flush_hook(_archiver_flush_subscription);

{
// `partition_manager::do_shutdown` (caller of stop) will assert
// out on any thrown exceptions. Hence, acquire the units without
Expand Down Expand Up @@ -1699,7 +1710,7 @@ partition::force_abort_replica_set_update(model::revision_id rev) {
}
consensus_ptr partition::raft() const { return _raft; }

ss::future<std::error_code> partition::set_writes_disabled(
ss::future<result<model::offset>> partition::set_writes_disabled(
partition_properties_stm::writes_disabled disable,
model::timeout_clock::time_point deadline) {
ssx::rwlock::holder holder;
Expand All @@ -1718,17 +1729,69 @@ ss::future<std::error_code> partition::set_writes_disabled(
if (_partition_properties_stm == nullptr) {
co_return errc::invalid_partition_operation;
}

// abort active transactions
if (disable && _rm_stm) {
auto res = co_await _rm_stm->abort_all_txes();
if (res != tx::errc::none) {
co_return res;
}
}

auto method = disable ? &partition_properties_stm::disable_writes
: &partition_properties_stm::enable_writes;
co_return co_await (*_partition_properties_stm.*method)();
}

partition_flush_hook_id partition::register_flush_hook(flush_hook&& cb) {
return _flush_hooks.register_cb(std::move(cb));
}

void partition::unregister_flush_hook(partition_flush_hook_id id) {
_flush_hooks.unregister_cb(id);
}

ss::future<errc> partition::flush(
model::offset offset,
model::timeout_clock::time_point deadline,
ss::abort_source& as) {
// non-leader may lack some flush hooks
if (!is_leader()) {
co_return errc::not_leader;
}

vlog(clusterlog.info, "[{}] flushing offset {}", ntp(), offset);
auto futures = _flush_hooks.notify(offset, deadline, as);
using errs = std::vector<errc>;
errs results = co_await ssx::when_all_succeed<errs>(std::move(futures));
vlog(clusterlog.info, "[{}] flushed offset {}", ntp(), offset);
co_return *std::ranges::max_element(results);
}

ss::future<errc> partition::flush_archiver() {
vlog(clusterlog.debug, "[{}] flushing archiver", ntp());
if (!_archiver) {
co_return errc::invalid_partition_operation;
}
auto flush_res = _archiver->flush();
if (flush_res.response != archival::flush_response::accepted) {
co_return errc::partition_operation_failed;
}
auto res = co_await _archiver->wait(*flush_res.offset);
vlog(clusterlog.debug, "[{}] flushed archiver: {}", ntp(), res);
switch (res) {
case archival::wait_result::not_in_progress:
// is partition concurrently flushed/waited by smth else?
vassert(false, "Freshly accepted flush cannot be waited for");
case archival::wait_result::lost_leadership:
co_return errc::leadership_changed;
case archival::wait_result::failed:
co_return errc::partition_operation_failed;
case archival::wait_result::complete:
co_return errc::success;
}
}

ss::future<result<ssx::rwlock_unit>> partition::hold_writes_enabled() {
auto maybe_units = _produce_lock.attempt_read_lock();
if (!maybe_units) {
Expand Down
24 changes: 22 additions & 2 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/ntp_config.h"
#include "storage/translating_reader.h"
#include "storage/types.h"
#include "utils/notification_list.h"
#include "utils/rwlock.h"

#include <seastar/core/shared_ptr.hh>
Expand Down Expand Up @@ -361,10 +362,25 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
ss::shared_ptr<cloud_storage::async_manifest_view>
get_cloud_storage_manifest_view();

ss::future<std::error_code> set_writes_disabled(
ss::future<result<model::offset>> set_writes_disabled(
partition_properties_stm::writes_disabled disable,
model::timeout_clock::time_point deadline);

using flush_hook = ss::noncopyable_function<ss::future<errc>(
model::offset,
model::timeout_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>>)>;

// Register and execute actions to make sure we leave partition belongings
// in up-to-date state. Used for unmount.
partition_flush_hook_id register_flush_hook(flush_hook&& cb);
void unregister_flush_hook(partition_flush_hook_id id);
ss::future<errc>
flush(model::offset, model::timeout_clock::time_point, ss::abort_source&);

// callers must not invoke it multiple times concurrently
ss::future<errc> flush_archiver();

bool started() const noexcept { return _started; }
void mark_started() noexcept { _started = true; }

Expand All @@ -390,7 +406,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
// dirty so that it gets reuploaded
ss::future<> restart_archiver(bool should_notify_topic_config);

consensus_ptr _raft;
consensus_ptr _raft; // never null
ss::shared_ptr<cluster::log_eviction_stm> _log_eviction_stm;
ss::shared_ptr<cluster::rm_stm> _rm_stm;
ss::shared_ptr<archival_metadata_stm> _archival_meta_stm;
Expand Down Expand Up @@ -431,6 +447,10 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
// exclusive ("write") for enabling/disabling writes
ssx::rwlock _produce_lock;

notification_list<flush_hook, partition_flush_hook_id> _flush_hooks;
partition_flush_hook_id _archiver_flush_subscription
= partition_flush_hook_id_invalid;

bool _started{false};

friend std::ostream& operator<<(std::ostream& o, const partition& x);
Expand Down
12 changes: 6 additions & 6 deletions src/v/cluster/partition_properties_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ partition_properties_stm::apply_raft_snapshot(const iobuf& buffer) {
co_return;
}

ss::future<std::error_code>
ss::future<result<model::offset>>
partition_properties_stm::replicate_properties_update(
model::timeout_clock::duration timeout, update_writes_disabled_cmd cmd) {
if (!co_await sync(timeout)) {
Expand Down Expand Up @@ -224,11 +224,11 @@ partition_properties_stm::replicate_properties_update(
co_return r.error();
}

auto applied = co_await wait_no_throw(r.value().last_offset, deadline);
if (!applied) {
auto message_offset = r.value().last_offset;
if (!co_await wait_no_throw(message_offset, deadline)) {
co_return errc::timeout;
}
co_return errc::success;
co_return message_offset;
}

partition_properties_stm::writes_disabled
Expand All @@ -250,14 +250,14 @@ model::record_batch partition_properties_stm::make_update_partitions_batch(
return std::move(builder).build();
}

ss::future<std::error_code> partition_properties_stm::disable_writes() {
ss::future<result<model::offset>> partition_properties_stm::disable_writes() {
vlog(_log.info, "disabling partition writes");
return replicate_properties_update(
_sync_timeout(),
update_writes_disabled_cmd{.writes_disabled = writes_disabled::yes});
}

ss::future<std::error_code> partition_properties_stm::enable_writes() {
ss::future<result<model::offset>> partition_properties_stm::enable_writes() {
vlog(_log.info, "enabling partition writes");
return replicate_properties_update(
_sync_timeout(),
Expand Down
12 changes: 7 additions & 5 deletions src/v/cluster/partition_properties_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ class partition_properties_stm

ss::future<iobuf> take_snapshot(model::offset) final;

// Updates partition properties to disable writes
ss::future<std::error_code> disable_writes();
// Updates partition properties to enable writes
ss::future<std::error_code> enable_writes();
// Updates partition properties to disable writes;
// returns the offset of the blocking message
ss::future<result<model::offset>> disable_writes();
// Updates partition properties to enable writes;
// returns the offset of the unblocking message
ss::future<result<model::offset>> enable_writes();
// Waits for the state to be up to date and returns an up to date state of
// write disabled property, this method may return an error and is only
// intended to be called on the current leader.
Expand Down Expand Up @@ -126,7 +128,7 @@ class partition_properties_stm
static model::record_batch
make_update_partitions_batch(update_writes_disabled_cmd);

ss::future<std::error_code> replicate_properties_update(
ss::future<result<model::offset>> replicate_properties_update(
model::timeout_clock::duration timeout,
update_writes_disabled_cmd command);

Expand Down
Loading