Skip to content

Support Iceberg row-level delete and update#10075

Closed
jackye1995 wants to merge 1 commit intotrinodb:masterfrom
jackye1995:mor
Closed

Support Iceberg row-level delete and update#10075
jackye1995 wants to merge 1 commit intotrinodb:masterfrom
jackye1995:mor

Conversation

@jackye1995
Copy link
Copy Markdown
Member

@jackye1995 jackye1995 commented Nov 26, 2021

This PR continues the effort of #8534 and #8565 to provide full support for reading Iceberg position and equality deletes, and writing Iceberg position deletes in Parquet.

I have added some tests to ensure the correctness of the implementation, and I will continue to add more tests in the following days. I will leave some comments in the code as discussion points.

This is a big PR and we can separate it to multiple for actual contribution, but anyone interested can also try this patch out, I have made sure all related tests pass.

@jackye1995
Copy link
Copy Markdown
Member Author

@jackye1995
Copy link
Copy Markdown
Member Author

@electrum @rdblue

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added support for users to directly create a v2 table using property format_version

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that I can define the iceberg table spec in the with clause by specifying format_version?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Copy Markdown
Member Author

@jackye1995 jackye1995 Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that all deletes received from the updatable page source will always come from the same split, which means the same task, which means the same partition. So there is no need to use the fanout writer for writing deletes. Please let me know if it is not the case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, Iceberg is adding full Jackson support for its object models. This wrapper will be updated to use Jackson serialization once that is completed. Meanwhile I delegate everything to Java serialization that Iceberg provides. Because of that, some related classes are added Serializable or Externalizable implementations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the timeline for that. Will it be feasible to wait for Jackson serialization before merging this PR?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's up to the community, I am just posting this out for review first while working on that. The 0.13 train is going to be out soon so we will target 0.14 for this support, so likely around Jan 2022.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java serialization is a hard no for Trino. It is huge ugly mess with lots of security problems. Instead, for temporary storage you could do something like annotate the iceberg objects with Jackson annotations or use Jackson features to serialize third party objects. For long term storage, you should write your own serialization objects and copy in and out of them.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding row-level deletes, now some behaviors of DELETE becomes unclear to users. Some queries run as metadata delete, some as row-level delete. We might want to introduce the following configs:

  1. if metadata delete should be used when possible
  2. for metadata delete, should it use DeleteFiles API or global equality delete
  3. for row-level delete, should it use position delete or equality delete

We know Trino architecture fits position delete better than equality delete, but please read document apache/iceberg#3432 for the tradeoff between the 2. There might be use cases that people desire to use equality delete instead.

We will probably add these configs when the feature request comes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to avoid new setting if possible. So questions:

  1. If metadata delete is possible you'd always want that, right?
  2. If so, why would anybody want the equality delete when DeleteFiles works?
  3. Not clear here. That's seems to be the only one we want to configure. And probably on a query-by-query basis.

Copy link
Copy Markdown
Member Author

@jackye1995 jackye1995 Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, we should extend the feature of metadata delete to also consider Iceberg hidden partitions. As of today it only works for direct partition columns, and deleting based on partition transform predicate would result in row-level delete. That is the "unclear behavior to user" I meant.
  2. because it does not touch data file, so will not cause commit conflict. However, I also do not want to add use cases for global equality delete for performance reason, I am just listing the theoretical use cases here.
  3. agree, I think we will need session config and ideally per-query config for this (if there is a need from community). But for now position delete definitely works the best with Trino row-level delete design.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a config

  • when "metadata delete" is possible, we should just do it. This is at most same work as split generation. We determine whole files should 'disappear' from the table and we should just do that, without any delete files.
  • we should not use "equality deletes" ever. We discussed this somewhere in iceberg community already. Trino can do the work to produce position-based delete files, so subsequent read queries are better handled, and also Trino wants to return # deleted rows to the user, so processing data files is desireable anyway.
  • so "metadata delete" is possible, position-based otherwise.

@jackye1995 jackye1995 force-pushed the mor branch 4 times, most recently from 02907c3 to cc52d49 Compare November 26, 2021 22:25
BIGINT,
Optional.empty());

// use Integer.MIN_VALUE as $row_id field ID, which is currently not reserved by Iceberg
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we know if iceberg starts using this id internally? Would sth break?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Iceberg column name and type could change, comparing column ID is needed for equality check. Using a duplicated column ID would cause issue in that area.

All the Iceberg metadata columns have IDs counting down from Integer.MAX_VALUE. That's why I am choosing Integer.MIN_VALUE here. If there is any risk of conflicting IDs, Iceberg will inform the Trino community, and switching to another ID is fully backwards compatible.

Comment on lines +33 to +34
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
Copy link
Copy Markdown
Member

@losipiuk losipiuk Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: MIN/MAX_SUPPORTED_VERSION

}
}

Optional<ReaderColumns> readerColumns = Optional.of(new ReaderColumns(projectedColumns.build(), outputColumnMapping.build()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why optional if never empty?

boolean isDeleteOrUpdateQuery = false;
for (int idx = 0; idx < queriedColumns.size(); idx++) {
IcebergColumnHandle column = queriedColumns.get(idx);
if (column.isTrinoRowIdColumn()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more explicit and encode fact that we are doing DELETE/UPDATE in IcebergTableHandle instead of inferring it from list of columns?

.collect(toImmutableList());

Map<Integer, Optional<String>> partitionKeys = split.getPartitionKeys();
Optional<StructLike> partition = task.spec().isUnpartitioned() ? Optional.empty() : Optional.of(task.file().partition());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole createPageSource is hard to follow. Block of code with lots of raw index operations. Would be great to add some higher level structure to it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, I can explore that a bit more to see what's the best way to refactor this to be more readable.

IcebergPageSourceProvider::applyProjection));
return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource.get(), projectionsAdapter);

return new IcebergPageSource(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap after each arg

IcebergColumnHandle column,
List<Integer> fileReadColumnIds,
List<IcebergColumnHandle> fileReadColumns,
Object[] prefillValues,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Block[] ?

Copy link
Copy Markdown
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(skimming)

if (column.isTrinoRowIdColumn()) {
// TODO: it's a bit late to fail here, but failing earlier would cause metadata delete to also fail
if (ORC == getFileFormat(table.getTable())) {
throw new TrinoException(GENERIC_USER_ERROR, "Row level delete and update are not supported for ORC type");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new TrinoException(GENERIC_USER_ERROR, "Row level delete and update are not supported for ORC type");
throw new TrinoException(GENERIC_USER_ERROR, "Row-level delete and update are not supported for ORC");

Block[] queriedColumnPrefillValues = new Block[queriedColumns.size()];
int[] queriedColumnFileReadChannels = new int[queriedColumns.size()];
boolean isDeleteOrUpdateQuery = false;
for (int idx = 0; idx < queriedColumns.size(); idx++) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idx -> i

Comment on lines +119 to +128
public void writeExternal(ObjectOutput out)
throws IOException
{
}

@Override
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unlikely correct serialization of HdfsEnvironment state, since it doesn't seem to store anything.

Anyway, we should not make this class serializable (nor Externalizable) at all

Comment on lines +166 to +175
public void writeExternal(ObjectOutput out)
throws IOException
{
}

@Override
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above. This is neither correct, nor desired.

FORMAT_VERSION_PROPERTY,
"Iceberg table format version",
icebergConfig.getFormatVersion(),
false))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation should happen here, not in getFormatVersion below.

see availablke integerProperty overload

Comment on lines +244 to +246
Block dictionary = ((DictionaryBlock) rowIds).getDictionary();
if (dictionary instanceof RowBlock) {
rows = (RowBlock) dictionary;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of resolveRowIdBlock are ill-defined. You get "some block, with some values".
the underlying DictionaryBlock.getDictionary may contain bogus data, or pretty much anything

remove this method.

you probably need it because you didn't the proper class, see the other comment about Block.getChildren

{
try {
Collection<Slice> slices = new ArrayList<>();
if (posDeleteSink != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the field is nullable, annotate the field and constructor param with @Nullable

if (posDeleteSink != null) {
slices.addAll(posDeleteSink.finish().get());
}
if (updateRowSink != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the field is nullable, annotate the field and constructor param with @Nullable

Comment on lines +279 to +280
posDeleteSink.abort();
updateRowSink.abort();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish() assumes the fields are nullable.

}

@Override
public void close()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close posDeleteSink, updateRowSink too

@littleWT
Copy link
Copy Markdown

littleWT commented Dec 3, 2021

This PR continues the effort of #8534 and #8565 to provide full support for reading Iceberg position and equality deletes, and writing Iceberg position deletes in Parquet.

I have added some tests to ensure the correctness of the implementation, and I will continue to add more tests in the following days. I will leave some comments in the code as discussion points.

This is a big PR and we can separate it to multiple for actual contribution, but anyone interested can also try this patch out, I have made sure all related tests pass.

HI,Recently,I tested this PR on flink CDC data in iceberg,and found that when no update/delete data it would be ok and query very fast。But it can not work when there are a lot of update/delete data.

@jackye1995
Copy link
Copy Markdown
Member Author

thanks for the information @littleWT !

This PR focuses on getting the feature in, I would expect the performance to be not super good because vectorization of reading deletes is still work in progress in Iceberg. Position delete vectorized read will be out in 0.13, equality delete vectorized read will be out in 0.14.

It would be great if you can provide some information related to your test scale for other people to refer.

@littleWT
Copy link
Copy Markdown

littleWT commented Dec 3, 2021

@jackye1995 Haha,we had talk aboult the problem in Dingding!
We tested about 13 millions rows data and when there are only thounds delete/update rows the query would not work.
Now we have combined the CDC data with SPARK(MOR), that would be better than before. But not so good.

Comment on lines +562 to +563
List<Types.NestedField> icebergFields = new ArrayList<>();
List<RowType.Field> trinoFields = new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually ImmutableList.Builder is preferred

.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

Suggested change
if (!icebergTable.spec().fields().isEmpty()) {
if (partitionColumnTypes.length > 0) {

delegateIndex++;
}
outputIndex++;
this.queriedColumnPrefillValues = requireNonNull(queriedColumnPrefillValues, "queriedColumnPrefillValues is null");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any verify's we can put in for these array sizes?

if (fileReadChannel == -1) {
blocks[i] = new RunLengthEncodedBlock(prefillValues[i], batchSize);
}
else if (fileReadChannel == -2) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe an emum rather than integer sentinel values

@Override
public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw an unsupported exception if the table is v1 right? Same for beginDelete

@findepi
Copy link
Copy Markdown
Member

findepi commented Jan 13, 2022

Please add a test with OPTIMIZE after some rows modified with DELETE and UPDATE.

@alexjo2144
Copy link
Copy Markdown
Member

@jackye1995 @findepi I'm going to take a look at the conflicts and see if I can split some of this out into smaller chunks. It looks like some of the serialization work for the REST catalog should be useful here too.

@findepi
Copy link
Copy Markdown
Member

findepi commented Feb 25, 2022

Thanks @alexjo2144 !
also cc @kbendick re serialization

@electrum
Copy link
Copy Markdown
Member

electrum commented Apr 7, 2022

@jackye1995 thanks for your work here. @alexjo2144 can this be closed now, or is there additional code to be merged?

@alexjo2144
Copy link
Copy Markdown
Member

So far I've only pulled out the read support pieces from here, I'm working on the merge conflicts for write support now.

I have a copy of Jack's code though, so I think we can close this PR.

@findepi
Copy link
Copy Markdown
Member

findepi commented Apr 13, 2022

Superseded by #11886

@findepi findepi closed this Apr 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

9 participants