Skip to content

Conversation

@huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Mar 3, 2022

Co-Authored-By: Huaxin Gao [email protected]
Co-Authored-By: Xinli Shang [email protected]

Jira

Changes in this PR:

Parquet relies on the column name. In a lot of usages e.g. schema resolution, this would be a problem. For example, Iceberg has schema evolution and column name could be changed.

This PR will add the support of column resolution by Id. The Id we are using is the field_id in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398). After this PR, if the user explicitly requests for column Id resolution, Parquet readers will use the field_id to determine which Parquet columns to read.

The changes are as follows:

In write path, ParquetOutputFormat.COLUMN_ID_RESOLUTION is introduced, the default is false. If sets to true, the field_id has to be unique in the entire schema, otherwise, an Exception will be thrown. In read path, ParquetInputFormat.COLUMN_ID_RESOLUTION is introduced and the default is false. If sets to true, Parquet readers will resolve the column using field_id. If there are duplicate id in the schema, an Exception will be thrown.

For Filters, this PR adds the choice of constructing a filter using column ID instead of column path
for example, originally we only allow

IntColumn intColumn = intColumn("a.b.c"); 
FilterPredicate predicate = eq(intColumn, 7);

Now we can have

IntColumn intColumn = intColumn(new Type.ID(1)); 
FilterPredicate predicate = eq(intColumn, 7);

For Filters, the column resolution is done at SchemaCompatibilityValidator.validateColumn. In this method, the file's schema is used as the source of truth to resolve and validate the filter's columns.

For read schema, the column resolution is done at ParquetFileReader.setRequestedSchema. Again, the file's schema is used as the source of truth to resolve and validate the requested schema. If ParquetInputFormat.COLUMN_ID_RESOLUTION sets to true, the id in the requested schema will be used to figured out the original ColumnPath.

For example, if the file schema has column name random, type int, id 1 , column name name, type String, id 2 as follows (type is Spark type)

      val writeSchema =
        new StructType()
          .add("random", IntegerType, true, withId(1))
          .add("name", StringType, true, withId(0))

if after schema evolution, the table schema is changed to column name a, type int, id 1 , column name b, type String, id 2 as follows (type is Spark type)

      val readSchema =
        new StructType()
          .add("a", StringType, true, withId(0))
          .add("b", IntegerType, true, withId(1))

the id in the requested schema (the above readSchema) will be used to compared to the file schema to figure out the correct ColumnPath to read.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

@shangxinli
Copy link
Contributor

Can you rebase and combine all the commits?

@huaxingao huaxingao force-pushed the column-id branch 2 times, most recently from 41087af to 9eb6266 Compare March 5, 2022 06:45
@huaxingao
Copy link
Contributor Author

Rebased. @shangxinli

@huaxingao
Copy link
Contributor Author

cc @ggershinsky @sunchao
Could you please take a look? Thanks!

ids = new HashSet<>();
boolean projectionSchemaIdUnique = uniqueId(projection, ids);
MessageType schema = null;
// if ids are unique, use id resolution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of figuring out by parquet-mr internals, did you consider letting the caller enable it explicitly? By doing that, we can still check the uniqueness, if not unique, we can throw exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to an explicit control / visibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Added ParquetInputFormat.COLUMN_ID_RESOLUTION to control this.

@shangxinli
Copy link
Contributor

Hi. @huaxingao Thanks for working on it. I just had a first-round review and left some comments. After we address them, I will have another look.

@ggershinsky
Copy link
Contributor

ggershinsky commented Mar 21, 2022

hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)?
I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).


private <T extends Comparable<T>> void validateColumn(Column<T> column) {
ColumnPath path = column.getColumnPath();
HashSet<Type.ID> ids = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be introduced after the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

// Setting the projection schema before running any filtering (e.g. getting filtered record count)
// because projection impacts filtering
reader.setRequestedSchema(requestedSchema);
this.requestedSchema = reader.setRequestedSchema(requestedSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.requestedSchema is set in the line 180; so this line overrides the class field setting. Maybe you can use a temp local variable (with appropriate name) in the line 180

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!



MessageType schema = fileWriteContext.getSchema();
HashSet<Type.ID> ids = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable is not used outside checkDuplicateId; is there a way to hide it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

public MessageType setRequestedSchema(MessageType projection) {
paths.clear();
for (ColumnDescriptor col : projection.getColumns()) {
HashSet<Type.ID> ids = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable is not used outside uniqueId; is there a way to hide it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@huaxingao
Copy link
Contributor Author

@ggershinsky I updated the description. Please check again to see if it is clear to you. Thanks!

@ggershinsky
Copy link
Contributor

Thanks @huaxingao , one more question / clarification. In the writer,

field_id has to be unique in the entire schema, otherwise, an Exception will be thrown.

what happens if the column ids are not set by the caller?

In the reader,

the id in the requested schema will be used to figured out the original ColumnPath.

what happens if the file footer doesn't have the column ids (field_id were not set by the writer)?

@shangxinli
Copy link
Contributor

shangxinli commented Mar 22, 2022

@huaxingao @ggershinsky Will you be able to join tomorrow's meeting to have a discussion on the open issues? We can try to close tham in the meeting and move this PR forward.

@huaxingao
Copy link
Contributor Author

@ggershinsky
I think in write/read, if COLUMN_ID_RESOLUTION sets to true but field_id were not set by the caller/writer, we need to throw Exception.

@huaxingao
Copy link
Contributor Author

@shangxinli Yes, I will join the meeting tomorrow.

@ggershinsky
Copy link
Contributor

I'll join too.

@shangxinli
Copy link
Contributor

hi @huaxingao , can you describe the lifecycle of the column IDs at a high level, either in the PR description, or in a comment? Where these IDs are stored (if in footer - which struct/field)? How are they set and written? Is the writer app expected to verify the uniqueness, or it can use this PR code for that? How the column IDs are read and used (is the reader app expected to do anything beyond using this PR code)? I think the answer to the last question is mostly provided, but it doesn't explicitly say what IDs are used (where they are stored / read from).

+1, I think we can add it to the design doc

* @param type the type of the field
* @param maxRep the maximum repetition level for that path
* @param maxDef the maximum definition level for that path
* @deprecated will be removed in 2.0.0; Use {@link #ColumnDescriptor(String[], PrimitiveType, int, int)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? I would expect to deprecate the old one.

return maxBloomFilterBytes;
}

public boolean getColumnIdResolution() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use the verb from the setting, rather than adding get as a second verb. Code will read more easily if you use boolean resolveColumnsByID(). You can also add "should" or "is" to denote that the return is a boolean if you like that convention.

return this;
}

public Builder withColumnIdResolution(boolean columnIdResolution) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveColumnsByID(boolean shouldResolveById)?

return new IntColumn(ColumnPath.fromDotString(columnPath));
}

public static IntColumn intColumn(Type.ID id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao and @shangxinli, has the Parquet community considered using Iceberg expressions and filters? I know that's a separate change, but it would be a great way to pick up a cleaner filter API that handles expression binding.

return columnPath;
}

public void setColumnPath(ColumnPath columnPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this add a setter for column path?

Type.ID columnId = columnDescriptor.getId();
if (columnId != null) {
if (ids.contains(columnId)) {
throw new RuntimeException("duplicate id");
Copy link
Contributor

@rdblue rdblue Apr 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that this is the right place to catch duplicate column IDs. Also, I think it should probably throw an exception more specific than RuntimeException.

}
ids.add(columnId);
if (columnId.intValue() == id.intValue()) {
column.setColumnPath(ColumnPath.get(columnDescriptor.getPath()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate not wanting to change the validation logic, but making the filter Column mutable just to be able to add the path seems like a bad idea in general. What happens if the filter is reused and the path changes? It looks like this would use the first path for a column that was encountered.

I think this should validate directly using IDs instead.

HashSet<Type.ID> ids = new HashSet<>();
boolean fileSchemaIdUnique = uniqueId(fileMetaData.getSchema(), ids);
if (!fileSchemaIdUnique) {
throw new RuntimeException("can't use column id resolution because there are duplicate column ids.");
Copy link
Contributor

@rdblue rdblue Apr 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: error messages should use sentence case and begin with a capital letter. I usually like to also show context, like the IDs that are not unique and the column names within the file.

}

public void setRequestedSchema(MessageType projection) {
private boolean uniqueId(GroupType schema, HashSet<Type.ID> ids) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good practice to modify a set that's passed in. I would expect this to produce a set. If you want to throw an exception because this finds a duplicate ID, then I think it should just throw an exception in this method.

return schema;
}

private MessageType resetColumnNameBasedOnId(MessageType schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like the right class to contain these utility methods. I'd recommend a utility class to handle this.

List<Type> childFields = ((GroupType) field).getFields();
List<Type> resetFields = resetColumnNameInFields(childFields);
if (resetFields.size() > 0) {
resetField = ((GroupType) field).withNewFields(resetFields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this renames the field itself. So if you have a nested field with a top-level name change this wouldn't work.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach here seems to be to allow IDs, but to translate back to field names for each file. I think that's going to be a problem for maintainability. It would be better to map column names to IDs and use IDs so you have consistency across all files.

@shangxinli shangxinli requested a review from rdblue April 20, 2022 15:38
@shangxinli
Copy link
Contributor

@rdblue regarding to 'using Iceberg expressions and filters', we agreed to use. We are finding resources to work on it. Huaxin may be able to work on it after this column resolution by ID is done (didn't confirm with her yet).

@iflytek-hmwang5
Copy link

iflytek-hmwang5 commented Mar 24, 2024

@huaxingao @shangxinli Hi,really appreciate the hard work has been done so far. how is this PR going now? Any updates?

@huaxingao
Copy link
Contributor Author

@iflytek-hmwang5
I think we don't need this PR any more. The purpose of this PR is to introduce page indexing in Iceberg. However, there's an alternative method for supporting page indexes in Iceberg, as outlined in this PR

@emkornfield
Copy link
Contributor

Per #950 (comment) going to close this as stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants