-
Notifications
You must be signed in to change notification settings - Fork 102
feat!: Add row tracking writer feature #1239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: Add row tracking writer feature #1239
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1239 +/- ##
==========================================
+ Coverage 83.60% 83.67% +0.06%
==========================================
Files 107 108 +1
Lines 25651 25926 +275
Branches 25651 25926 +275
==========================================
+ Hits 21446 21694 +248
- Misses 3135 3144 +9
- Partials 1070 1088 +18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are enough potential performance pitfalls in this new code (multiple collect
passes over add actions on the write path, double expression eval, etc) that I worry we'll need to fully separate the non-row tracking code path from the row tracking path. Otherwise we risk regressing existing use cases by some unknown and potentially large amount.
We should also seriously consider adding a basic benchmark of some kind, to measure the impact of this new code. If the impact is negligible for non-rowtracking tables, we don't need to worry about getting fancy with performance isolation; if it's massive, at least we know what we're up against.
Thank you for your thorough review, @scovich! Based on your feedback, I was able to get rid of the
|
e48bff7
to
eaac1df
Compare
741ad22
to
439bed6
Compare
// We can add shredding features as well as we are allowed to write unshredded variants | ||
// into shredded tables and shredded reads are explicitly blocked in the default | ||
// engine's parquet reader. | ||
// TODO: (#1124) we don't actually support column mapping writes yet, but have some | ||
// tests that do column mapping on writes. For now omit the writer feature to let tests | ||
// run, but after actual support this should be enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved these comments since they used to be in create_table()
.
} | ||
|
||
/// The static instance referenced by [`add_files_schema`]. | ||
pub(crate) static ADD_FILES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I kept the name ADD_FILES_SCHEMA
here since we discussed this in another PR.
I still think the naming is not super clear, but I would rather address this in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The review was a bit frazzled/interrupted tho, so hopefully somebody else can double check that I didn't miss anything important.
d31e51b
to
905f405
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing comments, reviewed everything except new integration test + want to take a look again at the generate_adds
changes
// Generate add actions including row tracking metadata | ||
let add_actions = self.generate_adds( | ||
engine, | ||
extended_add_files_metadata, | ||
with_row_tracking_cols(add_files_schema()), | ||
as_log_add_schema(with_row_tracking_cols(&with_stats_col( | ||
mandatory_add_file_schema(), | ||
))), | ||
); | ||
adds_evaluator.evaluate(add_files_batch.as_ref()) | ||
}) | ||
|
||
// Return a chained iterator with add and domain metadata actions | ||
Ok(Box::new( | ||
add_actions.chain(iter::once(domain_metadata_action)), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of embedding generate_adds
into this method would it be reasonable to have the flow above be something like:
let (domain_metadata, input_schema, output_schema) = if row_tracking {
// ...
} else {
// ...
};
// then just one
let adds = generate_adds(...);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that. I personally prefer the current approach because it reduces complexity in commit()
. Looking forward, commit()
will only become more complex as we add features, so I'd favor separating more logic in helper methods rather than inlining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though in the future we will have more domain metadatas, etc. and seems more scalable to not embed them into the 'generate adds'
this may not be worth blocking this PR on but would prefer to track as a follow-up if we decide not to pursue here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can take on in #1274 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's better to address as part of #1274. Since row tracking is an internal domain metadata, we might have to handle it different than user-provided one anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, i'll take care of it in #1274.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM to merge - I'll get this in and let you follow up on comments (and possibly in little follow-up PR to fix stuff like the as
but don't want to block merge)
thanks @lbhm, awesome work! 🚢
let actions = iter::once(commit_info_action) | ||
.chain(set_transaction_actions) | ||
.chain(add_actions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no i meant of set_transactions and adds - we always write commit info first. but previously we wrote adds then set_transaction.
fn with_stats_col(schema: &SchemaRef) -> SchemaRef { | ||
let fields = schema | ||
.fields() | ||
.cloned() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given this PR has run up into lots of the existing Schema
shortcomings it would be great to add as much detail/examples/pointers/issues to #1284 as possible :)
{ | ||
let evaluation_handler = engine.evaluation_handler(); | ||
|
||
Box::new(add_files_metadata.map(move |add_files_batch| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think previously we returned an impl Iterator
and avoided the Box
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can possibly just take on in #1274
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @MannDP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here was that generate_adds()
and generate_adds_with_row_tracking()
return different kinds of iterators, so I had to use a trait object and box it. Maybe my Rust typing foo wasn't strong enough though. Do you see a better solution @zachschuermann?
|
||
// Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata | ||
let row_id_high_water_mark = | ||
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: does spark also do a separate log replay for this? wonder if we can eagerly track domain metadatas we care about during snapshot construction to avoid this? open a follow up to consider?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging @johanl-db and @scovich here since they are more familiar with Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same discussion came up during the row tracking write path implementation in Java (delta-io/delta#3835 (comment), IIRC the discussion took place in the kernel slack channel)
I believe kernel java now collects domain metadata during the main log replay if possible, and also attemps to get them from the CRC file (delta-io/delta@a59495b)
I can double check what spark is doing but I believe is fairly similar: get from CRC file if available, or from snapshot assuming log replay already happened
That would anyway be better suited as a follow up
|
||
// Create a row tracking visitor and visit all files to collect row tracking information | ||
let mut row_tracking_visitor = RowTrackingVisitor::new(row_id_high_water_mark); | ||
let mut base_row_id_batches = Vec::with_capacity(self.add_files_metadata.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't the visitor track the base_row_id_batches
internally? (could even pass in length hint if we want)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the RowTrackingVisitor accordingly. I agree that it clarifies the separation of concerns, but if we are playing code golf, it does not really help.
Thank you @zachschuermann! I addressed the open comments in #1291 and the schema API-related changes in #1266. |
What changes are proposed in this pull request?
This PR adds row tracking as a writer feature. As part of this overall goal, it introduces the following changes:
ADD_FILES_SCHEMA
and nestnumRecords
in astats
struct. This simplifies the two-stage expression evaluation inTransaction::generate_adds
for the moment.WriterFeature::DomainMetadata
andWriterFeature::RowTracking
to the supported writer features and extend the table properties, config, and deserialization logic accordingly.RowVisitor
that extracts thenumRecords
field out of add actions to compute a base row ID for each add action.Transaction::commit
to enrich Add actions with row tracking metadata on demand.create_table
function in test utils to accept a Vec of table features so that we don't have to touch it every time we introduce new table features.This PR affects the following public APIs
This PR updates the
ADD_FILES_SCHEMA
as described above.How was this change tested?
Added a new
row_tracking.rs
test suite with write integration tests. Golden table tests will be added in a follow-up PR.