Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: infrastructure for independent source output streams #30858

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Dec 17, 2024

Motivation

This PR puts in the basis for allowing a source implementation to produce an independent DD collection per output.

The situation before this PR was that the SourceRender trait required a single, multiplexed DD collection to be produced, of type (usize, D), where the usize designated the output. Since all outputs were multiplexed, a single frontier had to describe their overall progress, which described the "slowest" one. This is generally fine when all outputs more or less march forwards together but it's not ok when a new subsource is added to a source that has otherwise been running for a while. In this situation the upper frontier of the multiplexed collection would necessarily have to stay stuck until the new subsource finished its snapshot and caught up with the other ones, making the previously healthy sources unavailable for all this time.

This PR fixes this by requiring a BTreeMap<GlobalId, Collection> output type from source implementations. This way each subsource can be driven independently and a new subsource can be added without imposing a frontier stall to the previously ingested subsources.

This PR does not change any of the source implementations to take advantage of this new interface, since that would create a giant PR. Instead, this only changes the interface and the fallout of all the changes in the various generic parts of the pipeline. Follow up PRs will be done that target individual source implementation and change them to be directly produce multiple collections.

Tips for reviewer

I went through the diff and left comments explaining parts of the changes.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

This PR puts in the basis for allowing a source implementation to
produce an independent DD collection per output.

The situation before this PR was that the `SourceRender` trait required
a single, multiplexed DD collection to be produced, of type `(usize,
D)`, where the `usize` designated the output. Since all outputs were
multiplexed, a single frontier had to describe their overall progress,
which described the "slowest" one. This is generally fine when all
outputs more or less march forwards together but it's not ok when a new
subsource is added to a source that has otherwise been running for a
while. In this situation the upper frontier of the multiplexed
collection would necessarily have to stay stuck until the new subsource
finished its snapshot and caught up with the other ones, making the
previously healthy sources unavailable for all this time.

This PR fixes this by requiring a `BTreeMap<GlobalId, Collection>`
output type from source implementations. This way each subsource can be
driven independently and a new subsource can be added without imposing a
frontier stall to the previously ingested subsources.

This PR does not change any of the source implementations to take
advantage of this new interface, since that would create a giant PR.
Instead, this only changes the interface and the fallout of all the
changes in the various generic parts of the pipeline. Follow up PRs will
be done that target individual source implementation and change them to
be directly produce multiple collections.

Signed-off-by: Petros Angelatos <[email protected]>
pub index: usize,
/// The object that this status message is about. When None, it refers to the entire ingestion
/// as a whole. When Some, it refers to a specific subsource.
pub id: Option<GlobalId>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first set of changes relate to changing the healthcheck operator to identify statuses based on the GlobalId of the subsource instead of an index. The healthcheck operator has a special GlobalId, called the halting_id, which is the global id whose statuses are allowed to halt the source. Instead of having a magic id I chose to represent those special statuses as those having id: None, which can be thought of as a status update for the entire ingestion.

@@ -57,11 +56,6 @@ impl GeneralSourceMetricDefs {
Self {
// TODO(guswynn): some of these metrics are not clear when subsources are involved, and
// should be fixed
capability: registry.register(metric!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this metric as it doesn't seem to be all that useful. We don't even have a panel for it in grafana and we have a separate one that tracks the frontier of each subsource. Since this metric was populated from an operator I deleted I chose to remove it as well instead of finding a new home for it

@@ -86,42 +80,40 @@ impl GeneralSourceMetricDefs {
progress: registry.register(metric!(
name: "mz_source_progress",
help: "A timestamp gauge representing forward progess in the data shard",
var_labels: ["source_id", "output", "shard", "worker_id"],
var_labels: ["source_id", "shard", "worker_id"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed all notions of "output index" from metrics. The index is not exposed anywhere externally and even in our logs we identify everything by its id.

@@ -272,7 +272,7 @@ pub fn build_ingestion_dataflow<A: Allocate>(
let base_source_config = RawSourceCreationConfig {
name: format!("{}-{}", connection.name(), primary_source_id),
id: primary_source_id,
source_exports: description.indexed_source_exports(&primary_source_id),
source_exports: description.source_exports.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire notion and struct for "indexed source exports" is gone. Same theme here, we can use a GlobalId anywhere we need to identify a particular output instead of a usize

use timely::Container;

/// Partition a stream of records into multiple streams.
pub trait PartitionCore<G: Scope, C: Container> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a helper trait until TimelyDataflow/timely-dataflow#610 is merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding that as a comment too?

};
output_map
.entry(output_type)
.or_insert_with(Vec::new)
.push(export.ingestion_output);
.push(idx);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea in all of these is that indexing things by a usize is an implementation detail that sources can choose to do. So anything that was previously using the ingestion_output thing has been switched to be using the index of the source as found in the source_exports map

let mut data_collections = BTreeMap::new();
for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
data_collections.insert(*id, data_stream.as_collection());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another common piece of code that this PR adds to all implementations. This piece of code converts the legacy multiplexed output of sources into the non-multiplexed one that the new interface requires. These will be removed as we start moving individual sources into the new interface

})
.capture_into(PusherCapture(reclock_pusher));
for (id, export) in exports {
let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the first substantial change of this PR. Instead of rendering a single reclock operator we iterate over all the exports and instantiate one reclock operator per output.

export_handles.push((id, export_input, export_output));
let new_export: StackedCollection<G, Result<SourceMessage, DataflowError>> =
new_export.as_collection();
export_collections.insert(id, new_export);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is one that needs quite a bit of attention from the reviewer. This is an operator that simply passes through the data produced by the source and records some statistics and health related information as it sees the data passing by. Now that each output of a source is a separate stream this is an operator with N inputs and N outputs. The operator connections must be such that the i-th input is only connected to the i-th output, plus the progress collection.

It is important that we get this right since the failure mode will be hard to spot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: The connection vec contains one entry for each output, specifying how the input is connected to that output. Here we construct connection for input i to be:

[ [0],      [],     [],       ...  [0] ]
  progress  health  output 1  ...  output i ... output n

I would have expected connection to also have empty frontier entries for the outputs between i and n. I assume we can skip those because missing entries default to the empty frontier?

@petrosagg petrosagg marked this pull request as ready for review December 18, 2024 09:12
@petrosagg petrosagg requested a review from a team as a code owner December 18, 2024 09:12
This test provides questionable value and requires modifying code
behavior deep in the dataflow to trigger the condition it wants to test.
Hence it is removed. I put this as a separate commit in case we change
our minds during review and want to keep it.

Signed-off-by: Petros Angelatos <[email protected]>
Signed-off-by: Petros Angelatos <[email protected]>
Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great simplification to how source exports are managed. Thanks for the helpful comments!

Comment on lines -1719 to -1749
def workflow_pg_snapshot_partial_failure(c: Composition) -> None:
"""Test PostgreSQL snapshot partial failure"""

c.down(destroy_volumes=True)

with c.override(
# Start postgres for the pg source
Testdrive(no_reset=True),
Clusterd(
name="clusterd1",
environment_extra=["FAILPOINTS=pg_snapshot_pause=return(2)"],
),
):
c.up("materialized", "postgres", "clusterd1")

c.run_testdrive_files("pg-snapshot-partial-failure/01-configure-postgres.td")
c.run_testdrive_files("pg-snapshot-partial-failure/02-create-sources.td")

c.run_testdrive_files(
"pg-snapshot-partial-failure/03-verify-good-sub-source.td"
)

c.kill("clusterd1")
# Restart the storage instance with the failpoint off...
with c.override(
# turn off the failpoint
Clusterd(name="clusterd1")
):
c.run_testdrive_files("pg-snapshot-partial-failure/04-add-more-data.td")
c.up("clusterd1")
c.run_testdrive_files("pg-snapshot-partial-failure/05-verify-data.td")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't infer what this test is about, but is it not relevant anymore?

Edit: Ah, saw the commit message. I don't have an opinion on the value of this test, but it's true that the failpoint in persist_sink is quite randomizing and the code is better without it.

use timely::Container;

/// Partition a stream of records into multiple streams.
pub trait PartitionCore<G: Scope, C: Container> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding that as a comment too?

)>,
Collection<
Child<'g, G, mz_repr::Timestamp>,
Result<SourceOutput<C::Time>, DataflowError>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be worried about increasing the size of each item going through this collection? We often demultiplex oks/errs into separate collections because of that, but I'm not sure if a Result<SourceOutput, DataflowError> is actually larger than a SourceOutput.

export_handles.push((id, export_input, export_output));
let new_export: StackedCollection<G, Result<SourceMessage, DataflowError>> =
new_export.as_collection();
export_collections.insert(id, new_export);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: The connection vec contains one entry for each output, specifying how the input is connected to that output. Here we construct connection for input i to be:

[ [0],      [],     [],       ...  [0] ]
  progress  health  output 1  ...  output i ... output n

I would have expected connection to also have empty frontier entries for the outputs between i and n. I assume we can skip those because missing entries default to the empty frontier?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants