Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow suppling a table schema to ParquetExec #12010

Open
nrc opened this issue Aug 15, 2024 · 7 comments
Open

Allow suppling a table schema to ParquetExec #12010

nrc opened this issue Aug 15, 2024 · 7 comments
Labels
enhancement New feature or request

Comments

@nrc
Copy link
Contributor

nrc commented Aug 15, 2024

Is your feature request related to a problem or challenge?

We have a couple of situations where our schema evolves and we wish to read both old and new data and operate on it seamlessly. We are reading from Parquet and in theory this should just work because of the internal SchemaAdapter, however, in practice we can't make this work without doing work which feels like abstraction-breaking. This has happened when we've changed regular columns into partition columns, and more generally when we've reordered or otherwise changed schemas in minor ways.

In more detail, we're implementing TableProvider and we have a logical schema which we return from TableProvider::schema pushed down projections are passed to us based on this schema. In our scan method, we create different ParquetExecs for each version of the file schema using FileScanConfig. We then union these execs together to make our data source. To do this, their schemas must match and we ensure they match our logical schema. However, doing so is painful: we have to create a new projection to reorder the fields, and compose this with the pushed-down projection. We have to do some manipulation of the file schema and partition columns (presumably some of this is unavoidable, but it seems unnecessary that we the logical schema, a file schema that we pass in, and a file schema found from the file). This is made more difficult by the fact that you can't control where the partition columns appear in the logical schema, they always end up at the end.

Although ParqetExec has an internal schema adapter, this is not very useful (in this case) because it's 'target' schema is always the file schema from the FileScanConfig (in <ParquetExec as ExecutionPlan>::execute).

Describe the solution you'd like

I'm not sure exactly what this should look like. I think I would like to supply a table schema which describes the output of the ParquetExec, is used as the 'target' schema for the SchemaAdapter, specifies the location of partition columns, and is automagically applied to passed-down projections. In other words, the 'reader' is able to fully encapsulate schema changes.

Describe alternatives you've considered

A few alternatives which would make this situation easier to handle without this 'complete' change:

  • Be able to specify the location of parititon columns within a schema, not just at the end.
  • Provide functionality to compose, invert, and otherwise manipulate projections (this would probably require using a projection newtype around Vec<usize>, which I think would be a good thing anyway).
  • Move the schema adapter factory from ParquetAdapter to FileScanConfig (I'm not sure this would actually help at all and I appreciate that other readers might not be able to apply the adaptation, but it feels like this could help somehow in making projection/partition handling/schema adaption be better integrated).

Additional context

No response

@nrc nrc added the enhancement New feature or request label Aug 15, 2024
@nrc
Copy link
Contributor Author

nrc commented Aug 15, 2024

Related: #5950, #10515

@adriangb
Copy link
Contributor

To give a concrete examples, we might have a schema evolution that looks like:

day=2020-01-01/1.parquet  - (name Utf8, department Utf8)
-- department column made a partition
day=2020-01-02/department=abc/1.parquet  - (name Utf8,)
-- added salary column
day=2020-01-03/department=abc/1.parquet  - (name Utf8, salary Float32)
-- department moved back to a regular column
day=2020-01-04/1.parquet  - (name Utf8, salary Float32, department Utf8)
-- name made LargeUtf8 (or Utf8View, etc.)
day=2020-01-05/1.parquet  - (name LargeUtf8, salary Float32, department Utf8)

Where our "final"/"current" table schema is (day Date32, name LargeUtf8, salary Float32, department Utf8).

(fwiw in reality some of these columns, e.g. department, would be dict-encoded)

@samuelcolvin
Copy link
Contributor

It's worth noting that allow the position of partition columns to be controlled would be useful beyond this problem:

If I have industry, company, department, employee, ... as columns, it makes queries results easier to read if columns are in that order rather than department, employee, ..., industry, company as you might see if industry and company were partition columns.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 16, 2024

We then union these execs together to make our data source. To do this, their schemas must match and we ensure they match our logical schema

create a new projection to reorder the fields, and compose this with the pushed-down projection. We have to do some manipulation of the file schema and partition columns (presumably some of this is unavoidable, but it seems unnecessary that we the logical schema, a file schema that we pass in, and a file schema found from the file)

I'm not familiar with how people interact with Parquet file, thus not fully understand the difficulty about schema matching and why SchemaAdapter is not helping. It would be great if there is a minimum example that shows the difficulty of reordering schema to make it works

@nrc
Copy link
Contributor Author

nrc commented Aug 18, 2024

I'm not sure this will be too useful, but our code for creating a ParquetExec is something like

let file_scan_config = FileScanConfig::new(self.object_store_url.clone(), self.file_schema.clone())
    .with_projection(projection.cloned())
    .with_limit(limit)
    .with_table_partition_cols(partition_fields)
    .with_file_groups(file_groups);

let reader_factory = DefaultParquetFileReaderFactory::new(self.object_store.clone());

let exec = ParquetExecBuilder::new(file_scan_config)
    .with_predicate(predicate.clone())
    .with_parquet_file_reader_factory(Arc::new(reader_factory))
    .build_arc();

You can call ParquetExecBuilder::with_schema_adapter_factory to supply a schema adpater indirectly to the ParqetExec. However, you can't pass in the 'target' schema to the schema adapter, that happens in <ParquetExec as ExecutionPlan>::execute and is always the file schema from the FileScanConfig. We could pass an adjusted schema to FileScanConfig::new, but I believe the schema is used for other things and so would cause errors elsewhere. We could write schema adapter factory which makes a custom schema adapter and in that adapter ignore the passed in schema and target our own, but that seems like a Bad Idea.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 18, 2024

I take a look at the code around FileScanConfig and ParquetExec. My understanding is that the target schema you mentioned -- I think it is also known as table_schema could get from FileScanConfig::project. If you want to specify the location of partition columns, FileScanConfig::with_projection let you choose the order of fields + partition columns. (This is what you had done)

let proj_iter: Box<dyn Iterator<Item = usize>> = match &self.projection {
Some(proj) => Box::new(proj.iter().copied()),
None => Box::new(
0..(self.file_schema.fields().len() + self.table_partition_cols.len()),
),
};
let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_iter {
if idx < self.file_schema.fields().len() {
let field = self.file_schema.field(idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
}

However, the table schema is stored as part of the properties in ParquetExec's cache. And not passed to ParquetOpener

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
&base_config,
);

I think we could store the table_schema in ParquetExec explicitly as table_schema, and we can get it for ParquetExec::execute. Then we could replace the table_schema in ParquetOpener with the one from ParquetExec.

We can see that the ParquetOpener's table_schema is assigned with file_schema but not table_schema (from FileScanConfig::project).

table_schema: self.base_config.file_schema.clone(),

Not sure how do you union with the created ParquetExec, If you have the table_schema from FileScanConfig::projection in ParquetOpener, does this help?

@nrc
Copy link
Contributor Author

nrc commented Aug 19, 2024

I'm not familiar enough with DataFusion internals to comment on your suggestion, sorry.

Not sure how do you union with the created ParquetExec, If you have the table_schema from FileScanConfig::projection in ParquetOpener, does this help?

We make the union by colllecting the ParquetExecs into a Vec and then calling UnionExec::new. The trick is that union requires each component exec to have matching schemas, so we have to ensure that is true. We can't let the exec choose its own schema because as far as I know there is no easy way to have an adjusting step before putting it into the union.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants