Skip to content

Commit

Permalink
storage: move copy to preflight to adapter
Browse files Browse the repository at this point in the history
A copy to operation involves some preflight checks: validating that the
requested prefix of S3 is empty, and writing an INCOMPLETE sentinel
file.

Previously these checks were run as the first operation on every replica
participating in the copy to operation. This was racy though: a lagging
replica could write the INCOMPLETE sentinel file *after* the leading
replica finished the copy and had thus *deleted* the INCOMPLETE sentinel
file. Since the lagging replica's work is canceled once the leading
replica's work is finished, this could result in a INCOMPLETE sentinel
file that never got removed.

This commit moves these preflight checks to the adapter, where they are
performed exactly once before any replica is notified of the copy to
operation. This ensures that the INCOMPLETE file is written at most once
in an copy to operation, and in particular that lagging replicas can't
cause the file to reappear afer the leading replica finishes the
operation.
  • Loading branch information
benesch committed Dec 24, 2024
1 parent 1dcd629 commit 6da67dd
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 119 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ rust_library(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down Expand Up @@ -135,6 +136,7 @@ rust_test(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down Expand Up @@ -185,6 +187,7 @@ rust_doc_test(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down Expand Up @@ -255,6 +258,7 @@ rust_test(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down Expand Up @@ -325,6 +329,7 @@ rust_test(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down Expand Up @@ -395,6 +400,7 @@ rust_test(
"//src/sql-parser:mz_sql_parser",
"//src/ssh-util:mz_ssh_util",
"//src/storage-client:mz_storage_client",
"//src/storage-operators:mz_storage_operators",
"//src/storage-types:mz_storage_types",
"//src/timestamp-oracle:mz_timestamp_oracle",
"//src/tls-util:mz_tls_util",
Expand Down
1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mz-sql = { path = "../sql" }
mz-sql-parser = { path = "../sql-parser" }
mz-ssh-util = { path = "../ssh-util" }
mz-storage-client = { path = "../storage-client" }
mz-storage-operators = { path = "../storage-operators" }
mz-storage-types = { path = "../storage-types" }
mz-tls-util = { path = "../tls-util" }
mz-tracing = { path = "../tracing" }
Expand Down
6 changes: 4 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ pub enum PeekStage {
/// Final stage for an explain.
ExplainPlan(PeekStageExplainPlan),
ExplainPushdown(PeekStageExplainPushdown),
/// Final stage for a copy to.
CopyTo(PeekStageCopyTo),
/// Preflight checks for a copy to operation.
CopyToPreflight(PeekStageCopyTo),
/// Final stage for a copy to which involves shipping the dataflow.
CopyToDataflow(PeekStageCopyTo),
}

#[derive(Debug)]
Expand Down
45 changes: 42 additions & 3 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use http::Uri;
use itertools::Either;
use maplit::btreemap;
use mz_compute_types::sinks::ComputeSinkConnection;
use mz_controller_types::ClusterId;
use mz_expr::{CollectionPlan, ResultSpec};
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -69,7 +70,8 @@ impl Staged for PeekStage {
PeekStage::Finish(stage) => &mut stage.validity,
PeekStage::ExplainPlan(stage) => &mut stage.validity,
PeekStage::ExplainPushdown(stage) => &mut stage.validity,
PeekStage::CopyTo(stage) => &mut stage.validity,
PeekStage::CopyToPreflight(stage) => &mut stage.validity,
PeekStage::CopyToDataflow(stage) => &mut stage.validity,
}
}

Expand All @@ -94,7 +96,8 @@ impl Staged for PeekStage {
PeekStage::ExplainPushdown(stage) => {
coord.peek_explain_pushdown(ctx.session(), stage).await
}
PeekStage::CopyTo(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
}
}

Expand Down Expand Up @@ -687,7 +690,7 @@ impl Coordinator {
}
Ok(Either::Right(global_lir_plan)) => {
let optimizer = optimizer.unwrap_right();
PeekStage::CopyTo(PeekStageCopyTo {
PeekStage::CopyToPreflight(PeekStageCopyTo {
validity,
optimizer,
global_lir_plan,
Expand Down Expand Up @@ -947,6 +950,42 @@ impl Coordinator {
Ok(StageResult::Response(resp))
}

#[instrument]
async fn peek_copy_to_preflight(
&mut self,
copy_to: PeekStageCopyTo,
) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
let connection_context = self.connection_context().clone();
Ok(StageResult::Handle(mz_ore::task::spawn(
|| "peek copy to preflight",
async {
let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
if sinks.len() != 1 {
return Err(AdapterError::Internal(
"expected exactly one copy to s3 sink".into(),
));
}
let (sink_id, sink_desc) = sinks.first_key_value().unwrap();
match &sink_desc.connection {
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
mz_storage_operators::s3_oneshot_sink::preflight(
connection_context,
&conn.aws_connection,
&conn.upload_info,
conn.connection_id,
*sink_id,
)
.await?;
Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
}
_ => Err(AdapterError::Internal(
"expected copy to s3 oneshot sink".into(),
)),
}
},
)))
}

#[instrument]
async fn peek_copy_to_dataflow(
&mut self,
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ pub struct GlobalLirPlan {
}

impl GlobalLirPlan {
pub fn df_desc(&self) -> &LirDataflowDescription {
&self.df_desc
}

pub fn sink_id(&self) -> GlobalId {
let sink_exports = &self.df_desc.sink_exports;
let sink_id = sink_exports.keys().next().expect("valid sink");
Expand Down
Loading

0 comments on commit 6da67dd

Please sign in to comment.