Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-source: detect topic deletion in metadata fetcher #30880

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 109 additions & 120 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use anyhow::bail;
use anyhow::anyhow;
use chrono::{DateTime, NaiveDateTime};
use differential_dataflow::{AsCollection, Hashable};
use futures::StreamExt;
use maplit::btreemap;
use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
use mz_kafka_util::client::{
get_partitions, GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext,
};
use mz_ore::assert_none;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand Down Expand Up @@ -215,7 +217,7 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
connection: KafkaSourceConnection,
config: RawSourceCreationConfig,
resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
metadata_stream: Stream<G, MetadataUpdate>,
metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
Expand Down Expand Up @@ -517,7 +519,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(

let mut prev_offset_known = None;
let mut prev_offset_committed = None;
let mut prev_pid_info: Option<BTreeMap<PartitionId, HighWatermark>> = None;
let mut metadata_update: Option<MetadataUpdate> = None;
let mut snapshot_total = None;

Expand All @@ -539,7 +540,10 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
updates.append(&mut data);
}
}
metadata_update = updates.into_iter().max_by_key(|u| u.timestamp);
metadata_update = updates
.into_iter()
.max_by_key(|(ts, _)| *ts)
.map(|(_, update)| update);
}

// This future is not cancel safe but we are only passing a reference to it in
Expand All @@ -548,8 +552,8 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
_ = resume_uppers_process_loop.as_mut() => {},
}

match metadata_update.take().map(|m| m.info) {
Some(Ok(partitions)) => {
match metadata_update.take() {
Some(MetadataUpdate::Partitions(partitions)) => {
let max_pid = partitions.keys().last().cloned();
let lower = max_pid
.map(RangeBound::after)
Expand All @@ -560,66 +564,6 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
MzOffset::from(0),
);

// Topics are identified by name but it's possible that a user recreates a
// topic with the same name but different configuration. Ideally we'd want to
// catch all of these cases and immediately error out the source, since the
// data is effectively gone. Unfortunately this is not possible without
// something like KIP-516 so we're left with heuristics.
//
// The first heuristic is whether the reported number of partitions went down
if !PartialOrder::less_equal(data_cap.time(), &future_ts) {
let prev_pid_count = prev_pid_info.map(|info| info.len()).unwrap_or(0);
let pid_count = partitions.len();
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(
format!(
"topic was recreated: partition count regressed from \
{prev_pid_count} to {pid_count}"
)
.into(),
),
}));
let time = data_cap.time().clone();
let err = Err(err);
for (output, err) in
outputs.iter().map(|o| o.output_index).repeat_clone(err)
{
data_output
.give_fueled(&data_cap, ((output, err), time, 1))
.await;
}
return;
}

// The second heuristic is whether the high watermark regressed
if let Some(prev_pid_info) = prev_pid_info {
for (pid, prev_high_watermark) in prev_pid_info {
let high_watermark = partitions[&pid];
if !(prev_high_watermark <= high_watermark) {
let err = DataflowError::SourceError(Box::new(SourceError {
error: SourceErrorDetails::Other(
format!(
"topic was recreated: high watermark of \
partition {pid} regressed from {} to {}",
prev_high_watermark, high_watermark
)
.into(),
),
}));
let time = data_cap.time().clone();
let err = Err(err);
for (output, err) in
outputs.iter().map(|o| o.output_index).repeat_clone(err)
{
data_output
.give_fueled(&data_cap, ((output, err), time, 1))
.await;
}
return;
}
}
}

let mut upstream_stat = 0;
for (&pid, &high_watermark) in &partitions {
if responsible_for_pid(&config, pid) {
Expand Down Expand Up @@ -688,9 +632,8 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
progress_statistics.offset_known = Some(upstream_stat);
data_cap.downgrade(&future_ts);
progress_cap.downgrade(&future_ts);
prev_pid_info = Some(partitions);
}
Some(Err(status)) => {
Some(MetadataUpdate::TransientError(status)) => {
if let Some(update) = status.kafka {
for (output, update) in outputs.iter().repeat_clone(update) {
health_output.give(
Expand All @@ -716,6 +659,19 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
}
}
}
Some(MetadataUpdate::DefiniteError(error)) => {
let error = Err(error.into());
let time = data_cap.time().clone();
for (output, error) in
outputs.iter().map(|o| o.output_index).repeat_clone(error)
{
data_output
.give_fueled(&data_cap, ((output, error), time, 1))
.await;
}

return;
}
None => {}
}

Expand Down Expand Up @@ -1460,7 +1416,7 @@ fn fetch_partition_info<C: ConsumerContext>(
consumer: &BaseConsumer<C>,
topic: &str,
fetch_timeout: Duration,
) -> Result<BTreeMap<PartitionId, HighWatermark>, anyhow::Error> {
) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;

let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
Expand All @@ -1474,7 +1430,7 @@ fn fetch_partition_info<C: ConsumerContext>(
for entry in offset_responses.elements() {
let offset = match entry.offset() {
Offset::Offset(offset) => offset,
offset => bail!("unexpected high watermark offset: {offset:?}"),
offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
};

let pid = entry.partition();
Expand All @@ -1486,39 +1442,44 @@ fn fetch_partition_info<C: ConsumerContext>(
}

/// An update produced by the metadata fetcher.
///
/// Either the IDs and high watermarks of the topic partitions as of `timestamp`, or a health
/// status describing a fetch error.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct MetadataUpdate {
timestamp: mz_repr::Timestamp,
info: Result<BTreeMap<PartitionId, HighWatermark>, HealthStatus>,
enum MetadataUpdate {
/// The current IDs and high watermarks of all topic partitions.
Partitions(BTreeMap<PartitionId, HighWatermark>),
/// A transient error.
///
/// Transient errors stall the source until their cause has been resolved.
TransientError(HealthStatus),
/// A definite error.
///
/// Definite errors cannot be recovered from. They poison the source until the end of time.
DefiniteError(SourceError),
}

impl MetadataUpdate {
fn to_probe(&self) -> Option<Probe<KafkaTimestamp>> {
let Ok(partitions) = &self.info else {
return None;
};
fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
match self {
Self::Partitions(partitions) => {
let max_pid = partitions.keys().last().copied();
let lower = max_pid
.map(RangeBound::after)
.unwrap_or(RangeBound::NegInfinity);
let future_ts =
Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));

let mut frontier = Antichain::from_elem(future_ts);
for (pid, high_watermark) in partitions {
frontier.insert(Partitioned::new_singleton(
RangeBound::exact(*pid),
MzOffset::from(*high_watermark),
));
}

let max_pid = partitions.keys().last().copied();
let lower = max_pid
.map(RangeBound::after)
.unwrap_or(RangeBound::NegInfinity);
let future_ts = Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));

let mut upstream_frontier = Antichain::from_elem(future_ts);
for (pid, high_watermark) in partitions {
upstream_frontier.insert(Partitioned::new_singleton(
RangeBound::exact(*pid),
MzOffset::from(*high_watermark),
));
Some(frontier)
}
Self::DefiniteError(_) => Some(Antichain::new()),
Self::TransientError(_) => None,
}

Some(Probe {
probe_ts: self.timestamp,
upstream_frontier,
})
}
}

Expand All @@ -1540,13 +1501,21 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
connection: KafkaSourceConnection,
config: RawSourceCreationConfig,
) -> (
Stream<G, MetadataUpdate>,
Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
Stream<G, Probe<KafkaTimestamp>>,
PressOnDropButton,
) {
let active_worker_id = usize::cast_from(config.id.hashed());
let is_active_worker = active_worker_id % scope.peers() == scope.index();

let resume_upper = Antichain::from_iter(
config
.source_resume_uppers
.values()
.map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
.flatten(),
);

let name = format!("KafkaMetadataFetcher({})", config.id);
let mut builder = AsyncOperatorBuilder::new(name, scope.clone());

Expand Down Expand Up @@ -1603,11 +1572,9 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
_ => HealthStatus::kafka(status_update),
};
let update = MetadataUpdate {
timestamp: 0.into(),
info: Err(status),
};
metadata_output.give(&metadata_cap, update);
let error = MetadataUpdate::TransientError(status);
let timestamp = (config.now_fn)().into();
metadata_output.give(&metadata_cap, (timestamp, error));
return;
}
};
Expand All @@ -1625,11 +1592,33 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
let (tx, mut rx) = mpsc::unbounded_channel();
spawn_metadata_thread(config, consumer, topic, poll_interval, tx);

while let Some(update) = rx.recv().await {
if let Some(probe) = update.to_probe() {
let mut prev_upstream_frontier = resume_upper;

while let Some((timestamp, mut update)) = rx.recv().await {
if prev_upstream_frontier.is_empty() {
return;
}

if let Some(upstream_frontier) = update.upstream_frontier() {
if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
let error = SourceError {
error: SourceErrorDetails::Other("topic was recreated".into()),
};
update = MetadataUpdate::DefiniteError(error);
}
}

if let Some(upstream_frontier) = update.upstream_frontier() {
prev_upstream_frontier = upstream_frontier.clone();

let probe = Probe {
probe_ts: timestamp,
upstream_frontier,
};
probe_output.give(&probe_cap, probe);
}
metadata_output.give(&metadata_cap, update);

metadata_output.give(&metadata_cap, (timestamp, update));
}
});

Expand All @@ -1641,7 +1630,7 @@ fn spawn_metadata_thread<C: ConsumerContext>(
consumer: BaseConsumer<TunnelingClientContext<C>>,
topic: String,
poll_interval: Duration,
tx: mpsc::UnboundedSender<MetadataUpdate>,
tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
) {
thread::Builder::new()
.name(format!("kafka-metadata-{}", config.id))
Expand Down Expand Up @@ -1680,10 +1669,13 @@ fn spawn_metadata_thread<C: ConsumerContext>(
"kafka metadata thread: fetched partition metadata info",
);

MetadataUpdate {
timestamp: probe_ts,
info: Ok(partitions),
}
MetadataUpdate::Partitions(partitions)
}
Err(GetPartitionsError::TopicDoesNotExist) => {
let error = SourceError {
error: SourceErrorDetails::Other("topic was deleted".into()),
};
MetadataUpdate::DefiniteError(error)
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
Expand All @@ -1699,17 +1691,14 @@ fn spawn_metadata_thread<C: ConsumerContext>(
}
};

MetadataUpdate {
timestamp: probe_ts,
info: Err(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
}),
}
MetadataUpdate::TransientError(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
})
}
};

if tx.send(update).is_err() {
if tx.send((probe_ts, update)).is_err() {
break;
}

Expand Down
Loading
Loading