Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Nov 21, 2020

This is a draft of how we can extend the current writer classes to handle deltas and file rolling.

  • Adds DataWriter like EqualityDeleteWriter and PositionDeleteWriter
  • Adds methods to FileAppenderFactory to create writers; it may be cleaner to move these to a separate WriterFactory
  • Adds a rolling writer implementation for equality deletes to BaseTaskWriter
  • Abstracts rolling writer logic into a base class
  • Adds an example UnpartitionedDeltaWriter

These changes are incomplete, so this probably doesn't work yet. This would still need a real task writer implementation and additional arguments passed to create the FileAppenderFactory.

switch (format) {
case PARQUET:
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we will need to provide the schema which combines with writeSchema and file & pos columns to construct the ParquetValueWriter because the PositionDeleteStructWriter will treat the user-provided rows and file+pos as a whole record to write into the target file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builder here will automatically wrap the row schema and the function passed here to add the extra schema layer. So we just need to configure this for rows, not for the combined schema. That's part of why the position writer's delete method accepts file and position independent of row, to keep the encapsulation and not leak the concern to places like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this sentence reateWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)), the msgType have included the file and pos columns, but the dsSchema hasn't. I mean we need to provide a correct dsSchema so that it could match the msgType exactly. It's similar to this: https://github.com/apache/iceberg/pull/1663/files#diff-7f498f01885f6e813bc3892c8dfb02b8893365540438b78b3a0221f9c8667c8fR211

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a bug. My intent was to have the caller configure the writer just like normal for a data file or equality delete file. There is no need to expose that complexity to the writer. So we should update the builder to extract the row type and pass it into the createWriterFunc.

currentWriter.add(record);
}

public void delete(T record) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to add this delete(T record) to the TaskWriter interface ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not TaskWriter, it's DeltaWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. I just wanted to demonstrate that we can add a delete here that works with the rolling writer. What we actually expose will probably be different.

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;

public class UnpartitionedDeltaWriter<T> extends BaseTaskWriter<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would still need to add an extra DeltaWriter between the TaskWriter and RollingFileWriters, because for a fanout TaskWriter, it will have rows from different partitions or buckets, and the writer for different partitions(or buckets) would accept data record, equality deletions , which is named DeltaWriter.

In this way, we could move all the equality delete logics inside a single common DeltaWriter class, and the TaskWriter will focus on how to dispatch the records with the customized policy to the methods in DeltaWriter, for example, Flink's RowData has INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER, if the row is a DELETE, then could use the fanout policy to direct it to DeltaWriter's delete method.

@rdblue
Copy link
Contributor Author

rdblue commented Nov 25, 2020

Closing this because it is incorporated in #1818, which is a full working implementation.

@rdblue rdblue closed this Nov 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants