Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push parquet select to leaves, add correct reordering #271

Merged
merged 54 commits into from
Jul 16, 2024

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Jul 1, 2024

This PR makes us finally have correct semantics when we are asked to read a schema that's not the same as the schema in the parquet file. In particular this adds:

  • Correct sorting of columns to match the specified schema (previous code was actually not correctly doing this)
  • Identification of leaf fields that are being selected, so we only read exactly what's asked for
  • Sorting a struct inside a list.
  • Detection of requested columns that don't exist in the parquet file.
    • If nullable, note and then fill them in with a column of null
    • Otherwise, error since we're requesting a missing column that can't be null
  • Detection of timestamp columns that need to be cast to match the delta specification, and the code to do the actual casting

This turns out to be way harder than anticipated, so its a lot of code.

Currently does not support reordering things inside maps. This is a complex PR as it is, and we don't need support for that just yet. Map content will just be passed through as read from the file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the problem we face is:

  1. The arrow parquet reader requires that the read schema exactly matches the parquet schema. In particular, the read schema cannot have a different field order, nor can it include any extra fields, and any physical fields we want to ignore must be present-but-masked-out.
  2. A given parquet file can provide fields in any order, because parquet is not ordinal -- it uses either field names or field ids to identify columns.
  3. Thus, we have to convert the read schema from the user into a physical read schema that is compatible with each specific parquet file's physical schema, and then we have to project the resulting data onto the user's read schema (which includes reordering fields and injecting null columns as needed; hopefully we get the masking right and don't have to excise any unwanted columns after the read completes).

If we squint, the above problem is very similar to the one we face with EngineData projections -- used for metadata reads -- where the user can ask for a subset of columns, in arbitrary order, and our data visitor has to comply. Our general expression machinery also supports projection -- used for data reads -- so that we can propagate some fields while manipulating or injecting others.

Thus, I wonder if we should look for ways to beef up and reuse our general projection machinery, rather than adding a third way of doing things? For example, nested column projection support in our expression machinery has been a TODO for a while -- maybe it's time to implement it?

The one annoyance is that we'd have to repeat this process for every parquet file -- or at least every unique physical schema we encounter -- but this is an arrow-specific situation and arrow doesn't actually have any problem creating expressions at runtime.

arrow projection masks

That said -- The projection support would really only help for the post-read step, for converting the physical data to match the user's read schema. I suspect arrow's ProjectionMask will still require some specific and complex code to correctly wire up the pre-read step.

If I understand correctly, the projection mask is based on the ordinals of an ordered traversal (**) of all fields in the physical schema -- both leaf and internal -- and the mask specifies which of those fields should actually be read. The documentation is underwhelming, but given that it's possible to select roots (even if they are nested columns), I have to assume that any mask that includes the index of an inode will automatically select all leaf fields it contains -- so nested column pruning should not include the parent's index.

(**) The docs don't say whether the indexing is in-order, pre-order, or post-order! That detail is kind of important! For now I'll assume pre-order, because in-order only makes sense for binary trees (!= schema), and post-order seems a bit unnatural for this case.

Given the above, I would lean toward a two pass process:

  1. Traverse the physical schema to build a namepath -> index map.
  2. Traverse the read schema, to convert each read schema namepath to a physical schema index.

For example, given the physical schema:

a
  d
  x
b
  y
    z
  e
c

Step 1/ above would produce:

[a] -> 0,
[a, d] -> 1,
[a, x] -> 2,
[b] -> 3,
[b, y] -> 4,
[b, y, z] -> 5,
[b, e] -> 6,
[c] -> 7

And step 2/ would produce the projection mask [6, 2, 7] for the read schema below:

b
  e
a
  x
c

(I'm guessing we actually only need to store leaf indexes, because it's not worth the extra trouble to "optimize" for internal nodes that we fully select?)

engine data projection

For projections, I could imagine a co-recursion on the read schema and physical data:

  • At each internal level, iterate in order over the fields of the read schema
  • Look up each field by name in the physical data.
  • If the field is present, recurse on it; otherwise, inject a null column in its place

(note: I only skimmed the code very lightly so far, wanting to sort out this question and these ideas before diving into actual code)

(this is really a PR level comment, but dropping it here because github loses top-level comments too easily)

Copy link
Collaborator Author

@nicklan nicklan Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, but I think unfortunately the use cases are different between extract/evaluate_expression and this reordering. I do think we should unify extract and evaluate_expression though.

The differences come down to:

  1. Reading parquet needing to handle potentially missing columns, but exract/eval can assume all columns are there (modulo null columns which are a bit tricky to handle)
  2. Parquet being actually "flat", so a struct column is purely a schema level thing and the actual columns are just flat in the file

So in

a
  d
  x
b
  y
    z
  e
c

your above example of selecting:

b
  e
a
  x
c

the mask of [2, 6, 7] is actually incorrect (assuming a b and y are struct fields). The correct mask
would be [1, 3, 4] because a, b, and y don't contribute to the column indices.

The "trie" idea is actually interesting though. I wrote up a whole big comment where I tried to apply it to all the use cases, in which I realized that there are some fundamental incompatibilities between read/reorder and extract/eval.

I think I could re-write this in terms of building a trie with just the leaf indicies, and then do something like:

fn get_mask(kernel_schema, trie) -> ProjectionMask {
  let leaves = kernel_schema.leaves(); // get a Vec<Vec<&str>> of all the paths to leaves
  let indexes = leaves.flat_map(|leaf_path| trie.exact_match(leaf_path)); // exact_match returns a Some<usize> if the path was there
  ProjectionMask::leaves(indexes)
}

You'd then like to do something like this for column ordering:

// has_cols being a stand-in for a RecordBatch/StructArray
order_cols(requested_schema, has_cols, trie) -> cols {
  let mut final_cols = vec!()
  for field in requested_schema {
    if let Some(trie_node) = trie.get_child(field.name()) {
      // this was in the parquet, maybe reorder children too
      let parquet_col = has_cols.column(trie_node.value) // THIS IS WRONG
      let col = if trie_node.has_children() {
        StructCol::new(order_cols(field as Schema, parquet_col, trie_node))
      } else {
        parquet_col
      };
      final_cols.push(col);
    } else {
      // not in parquet, push null
      final_cols.push(new_null_col(field.data_type()))
    }
  }
  final_cols
}

But as noted in the comment, now that this is dealing with actual hierarchical data, the indicies are different. We could potentially compute a logical and physical index for each trie node, but it gets a bit messy because you have to keep all the actual and missing columns in the trie, and then you need to filter down to include or exclude them depending on if you're generating a mask or navigating a RecordBatch/StructArray. Doable, but much less clean.

Open to thoughts about how to improve this, and if we want to go with a Trie approach for just the physical mapping part.

One other note: I'd love to upstream this to the parquet reader once we have it working here. I wrote this PR somewhat with that in mind, and think that it could be similar but more efficient if we just had two parquet schemas to reconcile with each other, and with access to some of the schema internals that we would have inside the parquet crate. But having things be generalized over kernel usage would hinder that.

Copy link
Collaborator Author

@nicklan nicklan Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll note that what the current code is doing is actually very similar to computing a "physical+logical trie", and just skipping the step of flatting and directly returning the mask indicies as well as a "trie like" struct for re-ordering. So it's likely more efficient than building a "physical+logical" trie would be.

Copy link
Collaborator

@scovich scovich Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will digest further when I have more time, but one question/reaction:

the mask of [2, 6, 7]`` is actually incorrect (assuming a `b` and `y` are struct fields). The correct mask
would be `[1, 3, 4]` because `a`, `b`, and `y` don't contribute to the column indices.

This falls squarely in the "underwhelming documentation" category -- the docs don't say how the indexing actually works. I would also have intuited that only leaf fields should have indexes, because that's how parquet works... but the ProjectionMask::roots method only makes sense if inodes also have indexes. Otherwise, it would be impossible to express roots for nested schemas. The example in the ProjectionMask docs even specifically calls this out (sort of):

For example, the schema

message schema {
  REQUIRED boolean         leaf_1;
  REQUIRED GROUP group {
    OPTIONAL int32 leaf_2;
    OPTIONAL int64 leaf_3;
  }
}

Has roots ["leaf_1", "group"] and leaves ["leaf_1", "leaf_2", "leaf_3"]

For non-nested schemas, i.e. those containing only primitive columns, the root and leaves are the same

From that example, I was forced to conclude that if the root set exists (by name in the example, presumably for readability), then there must be corresponding root node indexes as well.

So confusing, do you happen to know a better explanation of how roots and leaves work?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "trie" idea is actually interesting though. I wrote up a whole big comment where I tried to apply it to all the use cases, in which I realized that there are some fundamental incompatibilities between read/reorder and extract/eval.

But as noted in the comment, now that this is dealing with actual hierarchical data, the indicies are different. We could potentially compute a logical and physical index for each trie node, but it gets a bit messy because you have to keep all the actual and missing columns in the trie, and then you need to filter down to include or exclude them depending on if you're generating a mask or navigating a RecordBatch/StructArray. Doable, but much less clean.

Yes, I also realized that the indexes don't stay consistent between pre-read (schema) and post-read (data) steps. That's why I had suggested only using the indexes for the mask generation (pre-read step). On the data reorg (post-read) step, I was suggesting to simply index the data by name, guided by the read schema. This relies on the fact that (a) the name path to any one leaf column is consistent between the read schema and the physical data, regardless of how things might need to be reordered; and (b) we already identified the set of subset of physically present columns from the read schema, so any column that is missing from the resulting data is physically missing from the file, and can be safely nulled out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically correct yes. The main thing I'd change is that there isn't such a thing as a "read schema" in the parquet crate. There is only a ProjectionMask. So logically yes, we want the schema to read from parquet, but in practice we have to convert to a Vec of indices to read.

Copy link
Collaborator

@scovich scovich Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not ever materialize the read schema, but the arrow data that comes back is structured according to the read schema that can be inferred from the combination of physical schema and projection mask, no? Projection mask is just an annoyingindirect way to represent the read schema. And, sadly, it's not stable from file to file because projection masks don't allow column reordering and parquet (being name-based) doesn't guarantee consistent physical column ordering from file to file. So that's different from say spark's concept of "read schema."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that arrow schema doesn't have O(1) field name lookup like our schema and spark schema do. That definitely makes life harder

Possibly crazy question: Today's schema transform is "push-based" -- it has to iterate through the file schema, probing for each field in the logical schema, because the logical (kernel) schema has O(1) name lookup. But that means we are working in terms of the physical schema instead of the logical schema. To apply the logical schema to the physical data, we have to pre-fill a vec with None, and then assert afterward that all the entries were successfully inserted. If we converted the arrow schema to kernel schema -- in order to get O(1) field name lookups on the file schema -- it would let us switch to a "pull-based" schema transform instead. Would that simplify the code enough to be worth the hassle? Alternatively, is there a way to invert the indexing during the initial reorder index creation, so that it's expressed in terms of source schema?

Motivation for the question is two-fold:

  1. Eventually, if/when arrow schema gets better name lookups, we could eliminate the conversion step without changing the algorithm. With today's algorithm, it would be a substantial rewrite later, if we wanted to take advantage of hypothetical future cheap name lookups in arrow schema.
  2. The current code is huge and subtle -- a bad combination -- and anything that simplifies it even a little would help both current bug surface and future maintainability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could potentially simplify things. Would you be okay with pushing that off as an issue to look at, so we can get this merged, since we're pretty blocked on a lot of other development by getting this merged?

Once that's done I (or someone) can prototype what a "pull-based" one would look like and see how much simplification we could get.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, thanks

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
ReorderIndex::Child { index, children } => {
let mut placeholder: Arc<dyn ArrowArray> =
Arc::new(StructArray::new_empty_fields(0, None));
std::mem::swap(&mut input_cols[parquet_position], &mut placeholder);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially reorder every field in a given row yes? Would performance be a concern here? it's not unusual to see thousands of columns in a table. Maybe something like the arc swap crate (which says it's more performant, not sure how true that is) to consider here, but I suppose there is no easy in-place style op to do here that would eliminate the placeholder style?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it could, but I don't think the performance of swapping two pointers is the issue. What this does is:

  • swap two pointers (should be fast)
  • call into_data().into() on the swapped out pointer (this has some cost, we convert to ArrayData and then right back to StructArray. I'd like to find a cheaper way to do this, see my comment about wanting a cheaper way to do this as it's actually just a cast)
  • re-order the children (this is required, just a recursive call)
  • build a new metadata field to match the new order (fairly fast, and has to happen if we've reordered)

Reordering children is just moving around Arcs, so it's pretty cheap. So really the only bit that I think could be faster is into_data().into() bit.

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
@nicklan nicklan requested review from scovich and hntd187 July 2, 2024 23:20
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More thinking, still not much actual code review.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "trie" idea is actually interesting though. I wrote up a whole big comment where I tried to apply it to all the use cases, in which I realized that there are some fundamental incompatibilities between read/reorder and extract/eval.

But as noted in the comment, now that this is dealing with actual hierarchical data, the indicies are different. We could potentially compute a logical and physical index for each trie node, but it gets a bit messy because you have to keep all the actual and missing columns in the trie, and then you need to filter down to include or exclude them depending on if you're generating a mask or navigating a RecordBatch/StructArray. Doable, but much less clean.

Yes, I also realized that the indexes don't stay consistent between pre-read (schema) and post-read (data) steps. That's why I had suggested only using the indexes for the mask generation (pre-read step). On the data reorg (post-read) step, I was suggesting to simply index the data by name, guided by the read schema. This relies on the fact that (a) the name path to any one leaf column is consistent between the read schema and the physical data, regardless of how things might need to be reordered; and (b) we already identified the set of subset of physically present columns from the read schema, so any column that is missing from the resulting data is physically missing from the file, and can be safely nulled out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current code is... actually very similar to computing a "physical+logical trie", and just skipping the step of flatting and directly returning the mask indicies as well as a "trie like" struct for re-ordering. So it's likely more efficient than building a "physical+logical" trie would be.

I don't have a clear intuition of which would be more efficient, but I'm pretty sure the difference is small enough not to be a concern in practice. My suggestions was motivated by the intuition that generating and probing the flat structure is likely a lot smaller/simpler code than what I saw in my first skim of this PR. The field counting routine wouldn't be needed at all, for example.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to upstream this to the parquet reader once we have it working here

+100 -- lack of automatic schema reconciliation is IMO a pretty serious gap in arrow-parquet. It adds a layer of complexity that parquet doesn't require, and which no reader wants. And it's super fiddly and error prone.

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments that have accumulated for a couple days now, in hopes that they're useful.

High level -- I think I understand the algo now, but there's no doc comment nor example anywhere that would make it easier to understand and verify the code. Strongly suggest to add something that shows what is being accomplished.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we actually have three schemas to worry about:

  1. The file's physical schema (= the columns that are actually available)
  2. The read schema that goes to the parquet reader (= a subset of the file schema)
  3. The logical schema from the engine (= the columns we actually want, a superset of the read schema).

In other words, read schema is the intersection of file schema and logical schema. Columns unique to the file schema need to be masked out (= ignored), while columns unique to the logical schema need to be backfilled with nulls. And we have to worry about field ordering differences between read schema and logical schema.

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/default/parquet.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@sppalkia sppalkia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing out some minor comments and a question. I read through the larger thread between yourself and Ryan, my main confusion was around the mask indices and what they're used for -- I think it's confusing me because it looks like the Parquet schema isn't using the Thrift-style representation but rather some intermediate thing instead? This part especially confused me:

the mask of [2, 6, 7]`` is actually incorrect (assuming a b and `y` are struct fields). The correct mask
would be `[1, 3, 4]` because `a`, `b`, and `y` don't contribute to the column indices.

Because in Parquet the nested schema elements do occupy an "index" in the schema list.

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
Error::generic("Didn't find all requested columns in parquet schema")
);
Ok((mask_indicies, reorder_indicies))
) -> DeltaResult<(Vec<usize>, Vec<ReorderIndex>)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we return a struct instead of a tuple here? Especially since "Vec" is pretty ambiguous/not self-describing.

Copy link
Collaborator

@hntd187 hntd187 Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I don't know if rolling this into a struct for internal use would make sense. Especially since you are likely going to just assign it in something like let (mask, reordered_indexes) = get_requested_indices(...);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, though you can always de-structure structs too:

let SchemaIndex { mask, reordered_indices } = get_requested_indices(...);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given this purely internal for now, I think leaving it as a tuple with a clear doc comment is okay, but lmk if you feel strongly about it :)

kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Outdated Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Show resolved Hide resolved
kernel/src/engine/arrow_utils.rs Show resolved Hide resolved
@nicklan
Copy link
Collaborator Author

nicklan commented Jul 10, 2024

my main confusion was around the mask indices and what they're used for -- I think it's confusing me because it looks like the Parquet schema isn't using the Thrift-style representation but rather some intermediate thing instead? This part especially confused me:

the mask of [2, 6, 7]`` is actually incorrect (assuming a b and `y` are struct fields). The correct mask
would be `[1, 3, 4]` because `a`, `b`, and `y` don't contribute to the column indices.

Because in Parquet the nested schema elements do occupy an "index" in the schema list.

Yeah, so the parquet crate is just using a "flat" representation. The method we call is ProjectionMask::leaves. The docs are woefully incomplete for that method, but basically anything that's logical (i.e. nested) "doesn't count". So something like:

struct { colA, colB }, struct { colC, colD }

Is actually only 4 columns in the parquet (assuming those cols aren't nested themselves). But the schema for this would be something with 2 columns at the root and then each child having 2 columns. So the structs themselves "don't count" when trying to build a list of indices that we want to select out of the file.

If we wanted only colA and colC we'd need to pass [0, 2] to the ProjectionMask::leaves call, meaning we "skip" over the entries in the schema describing the Struct, since it's purely logical. Hopefully that makes some sense?

match (kernel_type, arrow_type) {
(DataType::Primitive(PrimitiveType::Timestamp), ArrowDataType::Timestamp(_, _))
| (DataType::Primitive(PrimitiveType::TimestampNtz), ArrowDataType::Timestamp(_, _)) => {
// We assume that any timestamp data read from a delta table is correctly written in
Copy link
Collaborator Author

@nicklan nicklan Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is... questionable. we should not do this long term, but i need to figure out what's going on on mac such that things read incorrectly, and then if it really is reading in nanoseconds for some reason, we'd need to do some kind of conversion on the fly (maybe).

Regardless, not something I want to tackle in this PR.

Note that the file in question is acceptance/tests/dat/out/reader_tests/generated/all_primitive_types/delta/part-00000-a31beaf2-ba0b-4ad6-a402-e5867ba52c91-c000.snappy.parquet and it really does seem to have things stored in microseconds with UTC

Running `parquet-tools inspect` on the file
$ parquet-tools inspect part-00000-a31beaf2-ba0b-4ad6-a402-e5867ba52c91-c000.snappy.parquet
############ file meta data ############
created_by: parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)
num_columns: 12
num_rows: 5
num_row_groups: 1
format_version: 1.0
serialized_size: 2189


############ Columns ############
utf8
int64
int32
int16
int8
float32
float64
bool
binary
decimal
date32
timestamp

############ Column(utf8) ############
name: utf8
path: utf8
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY (space_saved: -2%)

...[snip]...

############ Column(timestamp) ############
name: timestamp
path: timestamp
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
compression: SNAPPY (space_saved: 0%)

But the mac tests fail if I don't use this hack: https://github.com/delta-incubator/delta-kernel-rs/actions/runs/9882874132/job/27296554771 complaining "Incorrect datatype. Expected Timestamp(Microsecond, Some(\"UTC\")), got Timestamp(Nanosecond, None)"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, nm, this is an issue with DAT. My local version works because the file there has the correct format, but the one the test runners pull is incorrect. I will figure out how to fix that and revert this hack.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this turned out to be something that's fixed in the newest version of dat, so I've reverted the hack, and bumped the version for our acceptance tests.

Created #279 to follow up on this

@nicklan
Copy link
Collaborator Author

nicklan commented Jul 11, 2024

High level -- I think I understand the algo now, but there's no doc comment nor example anywhere that would make it easier to understand and verify the code. Strongly suggest to add something that shows what is being accomplished.

I've added a big doc comment that tries to cover how things work. LMK if that helps.

@nicklan
Copy link
Collaborator Author

nicklan commented Jul 11, 2024

@scovich @sppalkia thanks for the reviews! I think I've addressed all the comments, so re-requesting. Apologies if I missed anything, github makes it hard to be sure.

Please do have a look at this new thing. Moving the type verification in uncovered something strange in parquet reading on MacOS. I've papered over it for now, but definitely need to look into it further. Will add an issue here if we're all okay with moving forward with this somewhat hacky solution for now.

@nicklan nicklan requested review from scovich and sppalkia July 11, 2024 00:58
@nicklan
Copy link
Collaborator Author

nicklan commented Jul 12, 2024

Moving the type verification in uncovered something strange in parquet reading

It's that spark sometimes writes INT96 timestamps (which are deprecated). The parquet crate defaults these to Nanosecond resolution, so we get a schema mismatch.

I've partially fixed that via updating the DAT version, but what's funny is that now a test I added just today fails, for the same reason. This is with data pulled directly from a delta-sharing table, so I'm worried we are going to encounter this in the wild often enough to need to do something about it.

I'm considering trying to figure out if the data type is INT96 on a column, and then truncate the timestamp to micros somehow, but I'm not totally clear how easy that will be. That is what the Java folks do, so we may need to do the same.

@nicklan
Copy link
Collaborator Author

nicklan commented Jul 12, 2024

Quick follow-up for anyone about to review:

After discussing with Shoumik, the plan is to detect if two data-types are compatible (i.e. the INT96 <-> Microsecond) case, and if so, to allow the read to proceed, and then to cast to the expected type before finally returning from the read.

Initially planning to support only the timestamp one, and decimals, but can add more as needed.

@nicklan nicklan force-pushed the push-parquet-select-to-leaves branch from 040db84 to 765b65d Compare July 12, 2024 20:20
// INT96 (which is nanosecond precision), while the spec says we should read partition columns
// as microseconds. This means the read and golden data don't line up. When this is released in
// `dat` upstream, we can stop skipping these tests
"all_primitive_types",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we handle timestamps correctly, the golden data is "wrong" for this table. This will be fixed by updating to data 0.0.3, and I'll do that shortly after this merges. I've verified manually that we correctly read these two tables.

@@ -511,8 +513,7 @@ fn read_with_scan_data(
if expected.is_empty() {
assert_eq!(batches.len(), 0);
} else {
let schema = batches[0].schema();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small bug fix here. arrow will assume a column isn't nullable if it doesn't find any nulls in it. so using the schema of the first batch as the schema for all is incorrect if the first batch happens to have a column with no nulls, but subsequent batches DO have nulls. Instead we do the correct thing now and transform the schema the scan promises to read into an arrow schema, and use that as the schema for the batches.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

];
read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?;
Ok(())
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are taken from the delta-java tests. i validated that the expected arrays here are correct.

@nicklan
Copy link
Collaborator Author

nicklan commented Jul 12, 2024

@scovich @sppalkia , this is ready for another look

We now handle converting timestamp types from whatever is in the parquet to what delta wants. currently this is just used for timestamp types.

I looked into needing conversion for decimal types, but it turns out parquet actually does store the correct logical type there in the footer, so the parquet reader does the right thing in reading it into an arrow decimal array. eg:

############ Column(decimal) ############
name: decimal
path: decimal
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: Decimal(precision=5, scale=3)
converted_type (legacy): DECIMAL
compression: SNAPPY (space_saved: -4%)

I added the basic_decimal test to ensure this works.

In the future when we find cases where the logical_type isn't correct, or other types that need conversions, we can add code to handle that, but this PR is crazy enough as it is.

@nicklan nicklan force-pushed the push-parquet-select-to-leaves branch from 5c4ef5a to 2f42578 Compare July 16, 2024 00:45
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, thanks

@nicklan nicklan merged commit 334af14 into delta-io:main Jul 16, 2024
9 checks passed
nicklan added a commit that referenced this pull request Jul 16, 2024
…265)

Previously we had two independent code paths for interacting with tables
via a scan, `execute()` or `scan_data()`.

`scan_data` is what most engines will use, and is a bit more complex in
that it evaluates expressions over the returned add files. This meant
that bugs like #261 could happen because our tests used `execute` which
didn't catch the issue.

This PR makes `execute` use `scan_data` under the hood. It's a bit more
complex, but now we won't need to maintain two code paths.

Until #271 merges, this PR will fail tests, because nullable columns are
not filled in as expected.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants