Skip to content

Iceberg: support Parquet read with delete filter#8534

Closed
jackye1995 wants to merge 1 commit intotrinodb:masterfrom
jackye1995:iceberg-delete-reader
Closed

Iceberg: support Parquet read with delete filter#8534
jackye1995 wants to merge 1 commit intotrinodb:masterfrom
jackye1995:iceberg-delete-reader

Conversation

@jackye1995
Copy link
Copy Markdown
Member

support using Iceberg's DeleteFilter to filter delete files in the read path, this implementation only supports Parquet first because it already has the ability to generate row id channel. Will add ORC later if this impl is accepted. The general idea is that:

  1. express Trino Page as an iterable of TrinoRow s, where each row is defined by the underlying the block array and the position in the page.
  2. TrinoRows are implemented as Iceberg StructLike so that it can be used to directly leverage Iceberg's DeleteFilter
  3. DeleteFilter is used to filter pages produced by the Parquet page source.
  4. the result filtered rows can be used to derive the positions to keep in a page
  5. Page.getPositions is used to only retain rows in the particular positions and complete the merge-on-read process

I have not added unit tests yet, only tested with internal Trino installation that supports multi-catalog against tables in Glue catalog. There might be some backport error I missed. Once we agree upon the general implementation idea, I will add back tests and fix performance issues if any.

@phd3 @electrum @findepi @losipiuk @caneGuy @rdblue

@cla-bot cla-bot bot added the cla-signed label Jul 13, 2021
@losipiuk
Copy link
Copy Markdown
Member

@hashhar You've been working on this one, not sure what is the current shape. PTAL.

Copy link
Copy Markdown
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good at first glance. Both position and equality deletes are supported with this change.

One question - the FileIO ends up using the Iceberg Parquet readers to read the delete instead of the Trino native parquet reader. This is different from the normal read path. How difficult would it be to use the Trino parquet reader for reading and applying the deletes?

I've yet to look at calls into the Iceberg library code to see if something more.

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.11.0</dep.iceberg.version>
<dep.iceberg.version>0.11.1</dep.iceberg.version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to make the MetadataColumns available?

.collect(toImmutableList());
Schema deleteReadSchema = new Schema(deleteReadFields);
TrinoDeleteFilter deleteFilter = new TrinoDeleteFilter(fileIo, split.getTask(), deleteReadSchema, deleteReadSchema);
getColumns(deleteFilter.requiredSchema(), typeManager).stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly the deleteFilter.requiredSchema will always be a superset of the columns we request (the columns arg to this method). So do we need to create the initial regularColumns at all? Can we just assign the result of this stream to regularColumns?

@findepi
Copy link
Copy Markdown
Member

findepi commented Jul 14, 2021

One question - the FileIO ends up using the Iceberg Parquet readers to read the delete instead of the Trino native parquet reader.

@hashhar did you have implementation for positional deletes with Trino Parquet reader?
did you already create a PR for this?

@caneGuy
Copy link
Copy Markdown
Contributor

caneGuy commented Jul 15, 2021

Use TrinoRow to reconstruct Page and Blocks looks good!

@EmbeddedSoftwareChenXiangLing
Copy link
Copy Markdown

Hello @jackye1995, I am very interested in using trino to read iceberg table with delete filter, and have compiled your branch to test. but there is an exception was thrown io.trino.spi.TrinoException: row index unavailable.

It seems like row index column is empty when the parquetPageSource.getNextPage execute. But I'm not certain how to fix it to meet your ideas. Is it the completely code for read iceberg v2 table? Could you guide me to solve this problem?

Thank you for your reply !

@jackye1995
Copy link
Copy Markdown
Member Author

For anyone subscribing to this PR, I was mostly focusing on the multi catalog support in the past few weeks, will start the work on this one.

@dijiekstra
Copy link
Copy Markdown

Looking forward to this feature, we are currently using iceberg v2 table and writing binlog by flink, and want to read and complete ETL through Trino

private final String tableName;
private final TableType tableType;
private final Optional<Long> snapshotId;
private final byte[] serializedSchema;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there already was an idea to add schema to IcebergTableHandle and it was rejected (?) for some reason.

@phd3 do you remember?

public Schema getSchema()
{
if (schema == null) {
schema = deserializeFromBytes(serializedSchema);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe publication of the Schema object, since this.schema is not volatile.

}

@Override
protected InputFile getInputFile(String s)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s -> path

}

@Override
public <T> T get(int i, Class<T> aClass)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to impl equality-based deletes?

value = aClass.cast(type.getDouble(block, position));
}
else if (type.equals(TIME_MICROS)) {
value = aClass.cast(type.getLong(block, position) / PICOSECONDS_PER_MICROSECOND);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic could ideally be in a shared function, doing mapping reverse to io.trino.plugin.iceberg.IcebergTypes#convertIcebergValueToTrino

@Override
public <T> void set(int i, T t)
{
throw new TrinoException(NOT_SUPPORTED, "writing to TrinoRow is not supported");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrinoException should be used only when we know what is the reason of the failure.

throw new UnsupportedOperationException();

@Override
protected InputFile getInputFile(String s)
{
return fileIO.newInputFile(s);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method used?

@findepi
Copy link
Copy Markdown
Member

findepi commented Nov 22, 2021

How can we test this?

@jackye1995
Copy link
Copy Markdown
Member Author

close in favor of #10075

@jackye1995 jackye1995 closed this Nov 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

7 participants