Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: infrastructure for independent source output streams #30858

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 0 additions & 45 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,43 +141,6 @@ impl<S> IngestionDescription<S> {
}
}

impl<S: Clone> IngestionDescription<S> {
pub fn indexed_source_exports(
&self,
primary_source_id: &GlobalId,
) -> BTreeMap<GlobalId, IndexedSourceExport<S>> {
let mut source_exports = BTreeMap::new();
// `self.source_exports` contains all source-exports (e.g. subsources & tables) as well as
// the primary source relation. It's not guaranteed that the primary source relation is
// the first element in the map, however it much be set to output 0 to align with
// assumptions in source implementations. This is the case even if the primary
// export will not have any data output for it, since output 0 is the convention
// used for errors that should halt the entire source dataflow.
// TODO: See if we can simplify this to avoid needing to include the primary output
// if no data will be exported to it. This requires refactoring all error output handling.
let mut next_output = 1;
for (id, export) in self.source_exports.iter() {
let ingestion_output = if id == primary_source_id {
0
} else {
let idx = next_output;
next_output += 1;
idx
};

source_exports.insert(
*id,
IndexedSourceExport {
ingestion_output,
export: export.clone(),
},
);
}

source_exports
}
}

impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
fn alter_compatible(
&self,
Expand Down Expand Up @@ -278,14 +241,6 @@ impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
pub struct IndexedSourceExport<S = ()> {
/// Which output index from the ingestion this export refers to.
pub ingestion_output: usize,
/// The SourceExport
pub export: SourceExport<S>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
/// The collection metadata needed to write the exported data
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ pub fn render_decode_delimited<G: Scope, FromTime: Timestamp>(
let health = transient_errors.map(|err: Rc<CsrConnectError>| {
let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
HealthStatusMessage {
index: 0,
id: None,
namespace: if matches!(&*err, CsrConnectError::Ssh(_)) {
StatusNamespace::Ssh
} else {
Expand Down
Loading
Loading