-
Notifications
You must be signed in to change notification settings - Fork 101
feat!: Add support for sparse transform expressions #1199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1199 +/- ##
==========================================
+ Coverage 83.47% 83.61% +0.13%
==========================================
Files 105 105
Lines 24024 24444 +420
Branches 24024 24444 +420
==========================================
+ Hits 20054 20438 +384
- Misses 2939 2966 +27
- Partials 1031 1040 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self-review
if transformed_cols.len() != input_col_count { | ||
return Err(Error::InternalError(format!( | ||
"Passed struct had {input_col_count} columns, but transformed column has {}", | ||
transformed_cols.len() | ||
))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was removed by mistake... but it's worrisome that no unit tests failed as a result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(reverted the mistake, but unit test question remains open)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was going to open an issue but figured I might as well just fix it.. #1210
kernel/src/scan/mod.rs
Outdated
#[allow(unused)] | ||
StaticReplace(String, ExpressionRef), // Replace physical_field_name with expression | ||
#[allow(unused)] | ||
StaticInsert(Option<String>, ExpressionRef), // Insert expression after physical_field_name (None = prepend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't actually know of an immediate use for static insertion...
Unless e.g. the row tracking COALESCE above needs to land in a different position than the physical row id column it replaces; then we'd need to drop the physical row id column and insert its replacement in the correct location.
But it's really easy to support this case, and it seems nice to have for completeness.
} // else no replacement => dropped | ||
} else { | ||
// Field passes through unchanged | ||
expressions.push(Arc::new(Expression::column([field_name]))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes! This is incorrect: Column references are paths from top-level, and this transform could be nested arbitrarily deeply in an expression tree. We need to just take the field name, not a column path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and following up from before, the solution here is that this expression needs to know its name? and the we can do a full path? or do you mean that Expression::column
should somehow change to just take the field name in a 'relative' mode or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll post the fix as soon as I've addressed your comments, but tl;dr is:
- Expr::Transform now tracks an
Option<ColumnName>
, which can be used to request Some pathing. - When pathing is requested, the input struct is found by normal path resolution using that path, and then the transform iterates over its fields instead of top-level fields.
- A slightly enhanced
ProvidesColumnsByName
trait helps a lot here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @scovich - I think changes look great, transform expression seems to make sense and address that sparse-ness gap which structs fundamentally couldn't handle.
i left a few comments/nits/questions and will follow up after that bug is fixed!
@@ -482,6 +482,11 @@ fn visit_expression_impl( | |||
Expression::Opaque(OpaqueExpression { op, exprs }) => { | |||
visit_expression_opaque(visitor, op, exprs, sibling_list_id) | |||
} | |||
Expression::Transform(_) => { | |||
// Minimal FFI support: Transform expressions are treated as unknown | |||
// TODO: Implement full Transform FFI support in future version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
started #1205
use super::*; | ||
use crate::arrow::array::{Int32Array, StructArray}; | ||
use crate::arrow::datatypes::{ | ||
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, | ||
}; | ||
use crate::schema::{DataType, StructField, StructType}; | ||
use std::sync::Arc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import ordering/blocks (aside: we should probably provide guidance since this is one of the places clippy doesn't help)
use super::*; | |
use crate::arrow::array::{Int32Array, StructArray}; | |
use crate::arrow::datatypes::{ | |
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, | |
}; | |
use crate::schema::{DataType, StructField, StructType}; | |
use std::sync::Arc; | |
use super::*; | |
use std::sync::Arc; | |
use crate::arrow::array::{Int32Array, StructArray}; | |
use crate::arrow::datatypes::{ | |
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, | |
}; | |
use crate::schema::{DataType, StructField, StructType}; | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's odd... I thought fmt did enforce import ordering? Or at least that it could enforce it?
I also thought stdlib was supposed to be the last thing imported, but 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea i should be more specific: it enforces alphabetical ordering within contiguous blocks but it doesn't do the nice std/3p/crate split like we usually practice. also I've forgotten which way we typically prefer haha but we should probably just decide on one and stick to it.. how about std -> 3p -> internal crates -> this crate? (can document this in CONTRIBUTING.md)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Evaluates a transform expression by building expressions in input schema order | ||
fn evaluate_transform_expression( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it does seem like this general architecture is the Right Thing - I do feel we are marginally increasing the evaluation complexity. This comment is mostly an aside that maybe this underscores the utility of having another data-path microbenchmark to confirm the lack of data-size impact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a side effect of the pathing bug fix, the output struct is now built up directly as part of the evaluation -- no intermediate vec of expressions (it's actually incorrect to use Expr::Column
for nested relative pathing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah that's nice :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also as part of fixing the pathing bug, I extended the early-out optimization for identity transforms to work for paths as well. So basically, we have several levels of complexity now (simplest to most complex):
Expr::Transform
(identity, no path) -- just applies the output schema to the input dataExpr::Column
-- just paths to the requested column and applies the output schemaExpr::Transform
(identity, with path) -- equivalent toExpr::Column
, but has an extraArc::new
Expr::Transform
-- equivalent toExpr::Struct
, but the input struct providing pass-thru fields is pathed to only once and its fields extracted directly.Expr::Struct
-- has to create a newStructArray
from the result of evaluating all field expressions, including manyExpr::Column
which all have to path independently even if the paths are similar.
tl;dr: I would expect Expr::Transform
to be more efficient than the equivalent Expr::Struct
under virtually all circumstances where the transformation to be expressed is actually sparse.
} // else no replacement => dropped | ||
} else { | ||
// Field passes through unchanged | ||
expressions.push(Arc::new(Expression::column([field_name]))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and following up from before, the solution here is that this expression needs to know its name? and the we can do a full path? or do you mean that Expression::column
should somehow change to just take the field name in a 'relative' mode or something?
/// fields inserted). | ||
pub fn is_identity(&self) -> bool { | ||
self.field_replacements.is_empty() && self.field_insertions.is_empty() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after reading through these methods I'm now wondering if we could simplify the interface by hiding hashmaps internally and instead just exposing the ability to probe through methods here? haven't thought about this too much but maybe only worth it if we use a very constrained subset of the hashmaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally I would have said "not worth it" -- but this will become part of the public API so encapsulation by default makes a lot of sense. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I just noticed that (a) all the other expression types have pub fields; and (b) users cannot create expression transforms unless the expression types have pub fields. I might need to revert this change :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah okay, yup seems reasonable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a follow-up for us to not have everything be so public
kernel/src/expressions/transforms.rs
Outdated
/// Recursively transforms the children of a [`Transform`]. Returns `None` if all | ||
/// children were removed, `Some(Cow::Owned)` if at least one child was changed or removed, and | ||
/// `Some(Cow::Borrowed)` otherwise. | ||
fn recurse_into_expr_transform(&mut self, t: &'a Transform) -> Option<Cow<'a, Transform>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
untested? and seems there are some TODOs - should we perhaps make this unimplemented/no-op/something(?) for now and try to get it right in a follow up? considering it's a new expression anyways we don't have anyone trying to transform it already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem is, existing expression transforms that encounter a transform expression have to know what to do with it...
self.fields.values() | ||
} | ||
|
||
#[allow(unused)] // Most uses can leverage ExactSizeIterator::len instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should just remove then (separately)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! this looks great, thanks. Had a few small things but overall lgtm.
/// fields inserted). | ||
pub fn is_identity(&self) -> bool { | ||
self.field_replacements.is_empty() && self.field_insertions.is_empty() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a follow-up for us to not have everything be so public
kernel/src/scan/mod.rs
Outdated
/// Insert the given expression after the given physical field name (None = prepend instead) | ||
#[allow(unused)] | ||
StaticInsert(Option<String>, ExpressionRef), | ||
/// Insert the ith partition value after the given physical field name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "ith" in what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, good question.. this is inherited from pre-existing code. It's ith in whatever numering the creator of the transform spec is using, which hopefully matches the numbering the log replay scan logic is using?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it comes from metaData.partitionColumns
, which is an array of partition column names.
No idea how its order is decided, but it should be stable for any one snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, sorry... it's the partition column's position in the input schema:
https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/scan/mod.rs#L835-L844
It's basically a reference to the schema field, but by index to avoid lifetime issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth inlining this finding in comments? I also notice we have a very hard dependency on schema_fields
ordering - i don't see anywhere that it would ever change but wondering if we enforce/describe that sufficiently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually a dependency on the table's logical schema, which should be quite stable during the life of any given Snapshot. I updated the doc comment.
let partition_value = Arc::new(partition_value.into()); | ||
transform.with_inserted_field(insert_after.clone(), partition_value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
let partition_value = Arc::new(partition_value.into()); | |
transform.with_inserted_field(insert_after.clone(), partition_value) | |
let partition_expr = Arc::new(partition_value.into()); | |
transform.with_inserted_field(insert_after.clone(), partition_expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love having two names for basically the same thing, especially when one is moved-from and useless... and partition_value_expr
would be more accurate than just partition_expr
? Also: a partition value [literal] is a kind of expression?
let field_name = Some(Cow::Borrowed(field_name)); | ||
if let Some(insertion_exprs) = transform.field_insertions.get(&field_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just define a Transform::get_field_insertions_for(field: &str)
and call that here? likewise above although it would save less code, it would make it easier if we want to make the fields not pub
in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh. I had that code before I realized the fields needed to stay pub
to not break expression transforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't take a &str
tho... it's Option<&str>
(None = no predecessor = prepend)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far, this is the only call site and it's in private code. I'd prefer to not design this until we have more usage patters to learn from. Taking the fields private will anyway be a breaking change, regardless of whether we define accessors now.
} | ||
|
||
/// Specifies an expression to replace a field with. | ||
pub fn with_replaced_field(mut self, name: impl Into<String>, expr: ExpressionRef) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
pub fn with_replaced_field(mut self, name: impl Into<String>, expr: ExpressionRef) -> Self { | |
pub fn with_replaced_field(mut self, name: impl Into<String>, replacement_expr: ExpressionRef) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will spill the method signature from one line to five lines, and I'm not sure it really adds any information?
The method clearly states that it's replacing a field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - thanks @scovich excited to have this land!!
kernel/src/scan/mod.rs
Outdated
/// Insert the given expression after the given physical field name (None = prepend instead) | ||
#[allow(unused)] | ||
StaticInsert(Option<String>, ExpressionRef), | ||
/// Insert the ith partition value after the given physical field name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth inlining this finding in comments? I also notice we have a very hard dependency on schema_fields
ordering - i don't see anywhere that it would ever change but wondering if we enforce/describe that sufficiently?
kernel/src/scan/mod.rs
Outdated
#[allow(unused)] | ||
StaticInsert(Option<String>, ExpressionRef), | ||
/// Insert the ith partition value after the given physical field name | ||
Partition(Option<String>, usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also - wonder if it might be useful to just have a quick alias like type PartitionIndex = usize;
with docs saying it's a ref to the schema field as an index into input schema etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and turned all these tuple enum variants into struct variants instead.
That should improve readability a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
fn transform_expr_transform(&mut self, transform: &'a Transform) -> Option<Cow<'a, Transform>> { | ||
self.recurse_into_expr_transform(transform) | ||
Some(Cow::Borrowed(transform)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we track a follow-up here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyone who implements the trait is welcome to recurse however they want. The recurse_into_xxx
helpers are really just that -- helpers -- and implementations are not required to use them. If/when we have an implementation or two that actually do similar things, maybe then we should track a TODO to factor out a helper?
## What changes are proposed in this pull request? Discovered test gap in #1199 (see #1199 (comment)) - this fixes it. ## How was this change tested? test-only
What changes are proposed in this pull request?
Log replay needs to define per-file transforms for row of metadata that survives data skipping. Unfortunately,
Expression::Struct
is "dense" (mentions every output field) and this produces excessive overhead when injecting the (usually very few) partition columns for tables with wide schemas (hundreds or thousands of columns). Column mapping tables are even worse, because they don't change the columns at all -- they just need to apply the output schema to the input data.Solution: Define a new
Expression::Transform
that is a "sparse" representation of the changes to be made to a given top-level schema or nested struct. Input columns can be dropped or replaced, and new output columns can be injected after an input column of choice (or prepended to the output schema). The engine's expression evaluator does the actual work to transfer unchanged input columns across while building the outputEngineData
.Update log replay to use the new transform capability, so that the cost is
O(partition_columns)
instead ofO(schema_width)
. For non-partitioned tables with column mapping mode enabled, this translates to an empty (identity) transform which the default engine expression evaluator has been updated to optimize as a special case (justapply_schema
directly to the input and return).Result: Scan times are cut by nearly half in the
metadata_bench
benchmark:This PR affects the following public APIs
Added a new
Expression::Transform
enum variant.How was this change tested?
New and existing unit tests, existing benchmarks.