Skip to content

Conversation

@chenjunjiedada
Copy link
Collaborator

This adds spark reader to read position delete rows. It also changes the predicate logic of the delete filter.

FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
FileIO io, EncryptionManager encryptionManager, boolean caseSensitive,
FileContent deleteContent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that the CombinedScanTask doesn't have both position and equality deletes to apply, so it doesn't make sense to add this argument and handle just one. I think that this reader should return all deleted rows from a file, no matter which kind of delete was encoded. Right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is no guarantee about what kind of deletes a CombinedScanTask contains. Here I'd like to expose the option to the user to select one kind of delete to rewrite. How about returns all deleted rows when deleteContent is not passed? That should be a valid option I think.

InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator();
if (deleteContent.equals(FileContent.EQUALITY_DELETES)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should use a combined keepDeletedRows method instead of methods specific to equality or position deletes.

return keepDeleteRows != filter(row);
}

private boolean filter(T row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a subclass of this one that overrides shouldKeep instead:

class PositionStreamDeletedRowSelector extends PositionStreamDeleteFilter {
  ...
  @Override
  protected boolean shouldKeep(T row) {
    return !super.shouldKeep(row);
  }
}

CloseableIterable<Long> posDeletes) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
CloseableIterable<Long> posDeletes,
boolean keepDeleteRows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not change this method. Instead, let's introduce a new method to select deleted rows instead.

row -> row.get(0, Long.class),
Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)));
Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)),
false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this file needs to change.


public static <T> CloseableIterable<T> streamingSelector(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation is off.

}

@Override
protected FilterIterator<T> getPositionIterator(CloseableIterator<T> items,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg's style is to omit get from method names.

}

private static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
protected static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can still be private because the subclass is also defined in this file.

return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
}

public static <T> CloseableIterable<T> streamingSelector(CloseableIterable<T> rows,
Copy link
Contributor

@rdblue rdblue Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name doesn't look quite clear enough. How about streamingDeletedRowSelector? I think that's clear.

return isInDeleteSets;
return null;
}
Predicate<T> isDeleted = t -> false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be initialized to null instead of a predicate. There is no need to run an extra predicate (with an extra method dispatch for each row in a data file. That's a tight loop so we should do more work here to avoid it. Instead of using isDeleted.or, this should test whether isDeleted is null and either initialize isDeleted or call isDeleted.or.


public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
return CloseableIterable.concat(Lists.newArrayList(keepRowsFromPosDeletes(records),
keepRowsFromEqualityDeletes(records)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct because a row may be deleted by both position and equality deletes. If that happened, then this would return the same row twice. I think this needs to be implemented so that the deleted rows are returned just once.

return deletedRowsFilter.filter(records);
}

public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to make these methods public? Or will rows only be read using keepRowsFromDeletes? What is the use case for these changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are used in DeleteRowReader which is in spark module.

.map(Predicate::negate)
.reduce(Predicate::and)
.orElse(t -> true);
Predicate<T> predicate = buildEqDeletePredicate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more clear if this were named isDeleted.

@rdblue
Copy link
Contributor

rdblue commented Mar 30, 2021

@chenjunjiedada, I started reviewing this again, but I think we should reconsider the direction that this is taking.

My initial review comments were based on this change in isolation, which left out position deletes. Adding position deletes is harder because you can't union the rows that are deleted by position with the rows deleted by equality because a row may have been deleted by both if a position delete is encoded, followed by an equality delete that applies to the same data file. You could update this to avoid the duplicates, but I think that would result in substantial changes and doesn't actually get us closer to what you're trying to do.

If I understand correctly, what you're trying to do is to create a Spark DataFrame of deleted rows. That way, you could use Spark to project _file and _pos, sort it by those fields, and then write the position delete files from the resulting DataFrame. That's probably why you didn't consider position-based deletes in the initial PR. Is this correct?

If so, I think that the approach should be slightly different. Updating the filter supports the original goal of rewriting equality deletes, but is strangely specific and doesn't easily support other uses. Instead, I think that the way to do this is to select all rows and set a metadata column to indicate whether or not the row is deleted. That's an easy way to guarantee that the deleted rows are returned just once because every row is returned once. The filtering may set the same "_is_deleted" field on the record but that's okay. Then we can use the resulting DataFrame for more operations, like inspecting row-level deletes or producing records for streaming (both inserted and deleted).

What do you think?

@chenjunjiedada
Copy link
Collaborator Author

Thanks for the review and comments!

The original thought is to handle equality delete and position delete respectively, which I called a different level of minor compactions. The separate compactions allow users to control the file scan more fine-grained, so as to mitigate overhead to name node. For example, users could monitor the number of equality deletes and position deletes from the snapshot summary and performs a spark or flink action to do the specific compaction.

I didn't consider reading all deleted row because I thought it is major compaction and it may similar to the action remove all deletes. If we want to support one more level compaction which read all deletes and rewrite them to position deletes I think your suggestion definitely works.

So I think it would be better to remove the logic of reading all deleted rows in this PR, and use the suggested way to implement it and also add an action for it. While I'd like to keep the current separate compaction actions for the fine-grained usage. Does that make sense to you?

@openinx
Copy link
Member

openinx commented Mar 30, 2021

In my original mind, there are two kinds of compaction:

a. convert all equality deletes into position deletes. As whether should we eliminate the duplicate position deletes at the same time, the difference for me is: if the duplicate pos-deletes is removed during rewrite, the user's reading efficiency will be higher; if not, the reading efficiency will be worse. Generally speaking, I think it is a trade-off problem in performance optimization. Both of them seems to be acceptable to me.

b. Eliminate all deletes (include pos-deletes and equality-deletes). It is very suitable for the situation where delete has a high proportion in the whole table. On the one hand, we can save a lot of unnecessary storage, and on the other hand, we can avoid a lot of inefficient joins when reading data. This is more simpler to implement compared to the case.a.

After reading @rdblue 's comment , what makes me feel the most valuable is: we can use the abstraction of meta-column to achieve code unification of case.a, case.b, and the normal read path. Saying if we have an iterable=Iterable<Row> with _is_deleted flag inside each row:

For case.a, we could just use Iterables.transform(Iterables.filter(iterable, row -> row.isDeleted()), row -> (row.file(), row.pos())) to generate all the pos-deletes.

For case.b, we could just use Iterables.filter(iterable, row -> !row.isDeleted()) to get all remaining rows.

For the normal read path, it's same to the case.b.

This implementation greatly reduces the complexity of various paths, I think we can try this kind of code implementation.

@openinx
Copy link
Member

openinx commented Mar 30, 2021

As producing records for streaming (both inserted and deleted), I'm not quite sure whether will it work because people usually consume delta files between two snapshots incrementally. The equality deletes from delta files will need to be applied to the downstream consumer firstly because they are deleting the records that has been committed in the previous txn, while pos-deletes are deleting the records committed in the current txn. Applying the row marked _is_deleted directly to the downstream table may cause the upstream's pos-delete to delete data that should not be deleted in downstream.

@openinx
Copy link
Member

openinx commented Mar 30, 2021

Instead, I think that the way to do this is to select all rows and set a metadata column to indicate whether or not the row is deleted.

I've tried to think about how to add the _is_deleted metadata column for each record read from Parquet/Orc Readers. The workflow should be:

  1. Add a boolean reader at the tail when constructing the Parquet/Orc Readers for the given iceberg schema, the boolean reader will just fill a default value false for each record. The real value will be filled after checked the equality delete files & pos delete files iteratively;
  2. The struct parquet/orc reader read the whole row, now the _is_deleted value is false by default;
  3. Check the equality delete files and position delete files. set the _is_deleted to be true if the row has been deleted successfully. it will require the flink RowData & spark InternalRow provide setValue(pos, value) interface to update the real value of _is_deleted.
  4. Return Iterable<Row>.

The most complicated work occurs in the third step, because we will need to refactor all the Deletes#filter path to return a boolean flag for a row , rather than just returning the filtered iterable<T>. This will mean that we have almost refactored the logic related to delete filter. Now I am a little hesitant whether it is necessary to do this.

@rdblue
Copy link
Contributor

rdblue commented Mar 30, 2021

The most complicated work occurs in the third step, because we will need to refactor all the Deletes#filter path to return a boolean flag for a row, rather than just returning the filtered Iterable<T>. This will mean that we have almost refactored the logic related to delete filter. Now I am a little hesitant whether it is necessary to do this.

I think we can leave most of the existing read path as-is because we don't need to do this unless we are projecting deleted rows, but I agree with you that this would be a bit more work. That's why we need to consider the purpose of this change. It sounds like the goal is to rewrite equality deletes as position deletes, but it isn't quite clear. @chenjunjiedada or @openinx, can you confirm why you wanted to be able to read equality-deleted rows like this?

@openinx
Copy link
Member

openinx commented Mar 31, 2021

because we don't need to do this unless we are projecting deleted rows

I think we need to figure out what's the specific implementation approach, will try to publish a PR for this if possible.

can you confirm why you wanted to be able to read equality-deleted rows like this?

I think the core reason is: it's simple to implement. The current approach only need to translate all the eq-deletes into pos-deletes, without considering the duplicated pos-deletes. We cloud also share most of the code path when planing tasks.

@chenjunjiedada
Copy link
Collaborator Author

@rdblue @openinx , I think the goal here is to provide more fine-grained compaction actions. Let me show more background.

We have many internal flink jobs that consume dozens of billions of messages from the MQ system and sink to the iceberg every day. Since the user wants to see data ASAP so they usually set checkpoint in a minute or less. As a result, it produces a huge amount of small files on HDFS. To optimize the read performance, we have to compact or cluster the small files while compaction or clustering itself needs resources and brings overhead for the cluster. To mitigate overhead for the name node and save the resource for the user, we optimized the compaction action to fine-grained actions with predicate and group by partition.

As we are going to support consuming CDC streaming data, I suppose there will be a lot of equality deletes and position deletes files. So we need more fine-grained actions to optimize the read path like what we did for data file compaction. Actually, we have four kinds of compaction for deletes.

  1. Convert all equality deletes to position deletes.
  2. Cluster all position deletes to one.
  3. Convert all equality deletes and position deletes to one position deletes.
  4. Remove all deletes.

From my understanding, the first three compactions are minor compaction, and the last is a major one. The first and second compaction only need a few compute and IO resources, and they can also achieve the almost same optimization effect if we run the first and then the second. Of course we could implement the third finally as well. The point is we want to provide fine-grained options to users and they could apply strategies according to the cluster situations.

@chenjunjiedada
Copy link
Collaborator Author

@rBlue, What do you think about these use cases? Should we continue on these minor compactions? I want to refactor them and implement flink action as well.

@rdblue
Copy link
Contributor

rdblue commented Apr 2, 2021

@chenjunjiedada, the different types of actions make sense to me. What I'm asking is which one you are currently trying to build. I think it is 1, which makes sense and is what I assumed from looking at what you're doing. But it would be great to get an idea of how you plan to build that compaction.

@chenjunjiedada
Copy link
Collaborator Author

Just build a parquet implementation on top of the metadata column method. @rdblue @openinx , you might want to take a look. I will try to build metadata column in ORC and Avro.

@chenjunjiedada chenjunjiedada force-pushed the position-delete-row-reader branch from 58147f7 to 938e356 Compare November 12, 2021 15:35
@chenjunjiedada
Copy link
Collaborator Author

@rdblue @jackye1995 @aokolnychyi @RussellSpitzer, The API changes are ready, now this is needed for deletes conversion. Could you please take another look if have time?

@chenjunjiedada chenjunjiedada changed the title Spark: add position delete row reader Spark: update delete row reader to read position deletes Nov 13, 2021
@flyrain
Copy link
Contributor

flyrain commented Feb 19, 2022

Hi @chenjunjiedada, are we still pursuing this? Is there an ETA to share?

@chenjunjiedada
Copy link
Collaborator Author

@flyrain @jackye1995 In our scenario, the speed of major compaction for equality delete cannot catch up with the production speed. So I think this is necessary for some CDC cases. I can rebase this and update if you guys are interested in this.

@chenjunjiedada
Copy link
Collaborator Author

chenjunjiedada commented Apr 7, 2022

Hmm, Looks like the change data feed may need to read deleted rows as well.

@flyrain
Copy link
Contributor

flyrain commented Apr 8, 2022

Yes, CDC needs to read the deleted row. As I mentioned in the mail list, one of options is to leverage _isDeleted column. I got a local implementation for parquet vectorized read. Will post it soon. Would appreciate your feedback, @chenjunjiedada

@chenjunjiedada
Copy link
Collaborator Author

@flyrain Sounds great! Let me close this one as it is left behind too long.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants