-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: write the CDC records into apache iceberg tables #1663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Awesome, thanks for working on this, @openinx! I'll take a look as soon as I get some time. |
|
Hi @rdblue , I've finished the basic abstraction for DeltaWriter (it's a draft, I proposed many TODO issues), which would write insert records, equality records and position records (with row or without row). How is your feeling about this abstraction work ? (I'm still working with the integration things so that I could import the mysql binlog events from the source flink-cdc-connector to apache iceberg table and read them correctly ). |
|
Thanks to @openinx , Our company has been looking forward to this CDC capability for a long time. I can't wait to experiment it! |
|
@rdblue Could you pls take a look this prototype if you have time, I want to make sure that our understanding about this design is roughly the same (After that I would love to divide this big patch into several small patches for reviewing). Thanks. |
|
Yes, will do. I meant to get to it yesterday, but I ran out of time. Thanks for your patience. |
| return new Schema( | ||
| MetadataColumns.DELETE_FILE_PATH, | ||
| MetadataColumns.DELETE_FILE_POS, | ||
| Types.NestedField.optional( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be required, not optional. If the row is included, it must always be included to avoid stats that do not match.
Looks like this was a problem before as well.
| NestedField.optional( | ||
| MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), | ||
| MetadataColumns.DELETE_FILE_ROW_DOC))); | ||
| appenderBuilder.schema(DeletesUtil.posDeleteSchema(rowSchema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the logic so that it is incorrect. If a row will not be written, then the schema should not include that row. The row may not be written if either the rowSchema is not set, or if the createWriterFunc is not set. But there are cases where the rowSchema is set and the other is not because of the way callers use this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the current version is incorrect. I will revert it. Thanks.
| @Parameterized.Parameters(name = "formatVersion = {0}") | ||
| public static Object[] parameters() { | ||
| return new Object[] { 1, 2 }; | ||
| return new Object[] {1, 2}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this file doesn't need to change.
| return currentFile.encryptingOutputFile().location(); | ||
| } | ||
|
|
||
| public long currentPos() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pos is typically used to indicate position in bytes rather than rows. How about changing this to something like rowCount instead?
|
|
||
| if (posDeleteWriter == null) { | ||
| // Only accept INSERT records. | ||
| Preconditions.checkArgument(equalityDeleteWriter == null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions should have error strings.
| } else { | ||
| // Delete the rows which was written in current delta writer. | ||
| for (FilePos filePos : existing) { | ||
| posDeleteWriter.write(positionDelete.set(filePos.path(), filePos.pos(), null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this strategy will violate the ordering requirements of position delete files.
Position delete files need to be ordered by file and position. Imagine 3 replacements for the same row key. The first creates insert 0, the second insert 1, and the third insert 2. The first adds 0 existing, the second adds 1, and the third adds 2. And the first time through this loop deletes position 0, the second deletes positions 0 and 1, and the third deletes positions 0, 1, and 2. The result is deleting positions 0, 0, 1, 0, 1, 2, which violates the order requirement.
For a given inserted position, we need to track that it is deleted and write all of the deletes at one time so that they are all in order.
I think this should change from using StructLikeMap<List<FilePos>> to StructLikeMap<FilePos>. Each time the row is deleted, get the current FilePos and add it to a deletePositions list. Then set the new row's FilePos in the map so that it is the one deleted the next time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point !
| return Lists.newArrayList(filePos); | ||
| } else { | ||
| v.add(filePos); | ||
| return v; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect multiple inserts for the same key? I would expect that we do not.
If that's the case, then the suggestion below to use StructLikeMap<FilePos> could simplify this logic. When a row is deleted by position (added to deletePositions), it could be removed from the map. Then this logic could be:
StructLike key = structLikeFun.apply(row);
FilePos previous = insertedRowMap.put(key, filePos);
ValidationException.check(previous == null, "Detected duplicate insert for %s", key);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's consider the three cases:
Case.1 : Import CDC data whose table has an unique key to iceberg table. In this case, it's impossible to have duplicated records with the same key to INSERT.
Case.2 : Import CDC data whose table does not have an unique key to iceberg table, the equality fields should be all columns in the table, but it be possible that we will encounter duplicated rows because database such as mysql would allow to write the same record twice. But that's not a common case, more people will create a table with unique keys, I think it's OK to not address this case.
Case.3: upsert records in the iceberg table. it must have primary keys. The UPSERT will be executed as an DELETE and an INSERT, then we could also ensure that duplicated keys won't happen in the writeRow method ( which will be called when INSERT row).
So in general, I think it's OK to expect that we don't have multiple inserts for the same key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, there's another issue. Consider the two cases:
Case.1 : Execute two transactions:
txn1: INSERT(1) ; DELETE(1); INSERT(1);
txn2: INSERT(1) ;
It won't throw any duplicate key exception because we don't check the key existences cross transactions.
Case.2: Execute only one transaction
txn1: INSERT(1) ; INSERT(1);
we will encounter a ValidationException when executing the second INSERT(1).
Will it confuse users because of the different behavior ?
For me, iceberg won't check the key existence when inserting so in theory we don't have to guarantee that iceberg must throw a duplicated key exception if users really insert two duplicated records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, now that I'm thinking about it, we should never throw an exception for this data because that would break processing. Instead, we should probably send the record to some callback (default no-op), log a warning, and either delete the previous copy or ignore the duplication.
The next question is: if we do expect duplicate inserts, then what is the right behavior? Should we make the second insert replace the first? Or just ignore the duplication?
I'm leaning toward adding a delete to replace the record, but that would only work if the two inserts were in the same checkpoint. If they arrive a few minutes apart, then the data would be duplicated in the table. But, since we consider the records identical by the insert key, I think it is correct to add a position delete for the first record.
If we choose to ignore the duplication, we then need to keep track of both insert positions in case we get INSERT(1), INSERT(1), DELETE(1). The delete would need to drop both rows, not just the second. That's why I would say that a duplicate insert replaces the previously inserted row. That way we can keep track of just one FilePos per key and lower the complexity of tracking this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about it again, My conclusion is : we should choose the solutions:
-
When a INSERT come in, then:
a. Eliminating duplicated INSERT (adding a delete to replace the record as you said) in the same checkpoint for a given TaskWriter;
b. Write it into data file, and just ignore the duplication INSERTs between checkpoints; -
when a DELETE come in, then:
a. write the pos-delete file if key exists in theinsertedRowMap;
b. always write the equality-delete.
Firstly, eliminating duplicated INSERT is easy, because we've already had the insertedRowMap, if there's a coming duplicated insert key, then we just replace the old INSERT (by writing the pos-delete file) with new INSERT (by writing the insert data file). The insertedRowMap don't have to track List<FilePos> per key, only need the FilePos per key.
Second, eliminating the duplicated INSERT between checkpoints is high-cost and we don't have to. because the equality-deletions will mask all duplicated INSERT for the same key in the previous checkpoint.
The code should be like that:
@Override
public void writeRow(T row) {
if (allowEqDelete()) {
RowOffset rowOffset = RowOffset.create(dataWriter.currentPath(), dataWriter.currentRows());
// Copy the key to avoid messing up the insertedRowMap.
StructLike copiedKey = asCopiedKey(row);
// Adding a pos-delete to replace the old row.
RowOffset previous = insertedRowMap.put(copiedKey, rowOffset);
if (previous != null) {
posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, row));
}
}
dataWriter.write(row);
}
@Override
public void writeEqualityDelete(T equalityDelete) {
Preconditions.checkState(allowEqDelete(), "Could not accept equality deletion.");
StructLike key = asKey(equalityDelete);
RowOffset previous = insertedRowMap.get(key);
if (previous != null) {
posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, equalityDelete));
insertedRowMap.remove(key);
}
eqDeleteWriter.write(equalityDelete);
}| currentFileWriter.close(); | ||
|
|
||
| ContentFileT contentFile = currentFileWriter.toContentFile(); | ||
| Metrics metrics = currentFileWriter.metrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like the only use of ContentFileWriter#metrics(). I think it would be better to simplify this and just expose rowCount() instead of all metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this class knows when a file has been rolled and keeps its own count of rows, so there is no need to get the number of rows passed to the current file writer. This should use currentRows instead. That way we can remove metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the currentRows is enough, don't have to expose the metrics now.
| /** | ||
| * Write the insert record. | ||
| */ | ||
| void writeRow(T row) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Iceberg classes should throw IOException like this. The FileAppender interface only throws IOException through Closeable, which is inherited.
| try { | ||
| dataWriter.abort(); | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to abort the data writer {} because: ", dataWriter, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think abort should accept an optional exception, so that the suppressed exceptions can be added to it rather than just logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still thinking how to handle this gracefully.
| import java.io.Closeable; | ||
| import java.util.Iterator; | ||
|
|
||
| public interface ContentFileWriter<T, R> extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little hesitant to add this. I think it is needed so that the same RollingContentFileWriter can be used for delete files and data files, but this introduces a lot of changes and new interfaces just to share about 20 lines of code. I'm not sure that it is worth the extra complexity, when compared to having one for data files and one for delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my throught, the whole write workflow should be in the following:
TaskWriter
|
|
|
--------------------------------------------------
| |
| |
V V
DeltaWriter DeltaWriter
(Partition-1) (Partition-2)
|
|
------------------------------------------------
| | |
| | |
V V V
RollingFileWriter RollingFileWriter RollingFileWriter
(Pos-Delete) (Insert) (Equality-Delete)
|
|
V
-----------------------------
| | |
| | |
| | |
V V V
FileAppender FileAppender ...
(closed) (Openning)
For each executor/task in compute engine, it have a TaskWriter to write
generic record. If it use the fanout policy to write records then it will
have multiple DeltaWriters and each one will write records for a single
partition, while if use the grouped polciy in spark then we might just have one
DeltaWriter in TaskWriter. The DeltaWriter could accept both INSERT/EQ-DELETE/POS-DELETE
records, each kind of record we will have a RollingFileWriter which will roll its file appender
to a newly opened file appender once its size reach the threshold.
In the RollingFileWriter, we should have the same logic. So in theory it's good to define an abstracted ContentFileWriter so that we don't have to define three kinds of RollingFileWriter.
Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way is to define a BaseRollingFileWriter and put the common logic there, then the DeltaWriter would use the BaseRollingFileWriter. when constructing the DeltaWriter, we would need to pass those subclasses PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter to it.
In this way, we will need to created the PosDeleteRollingFileWriter, EqDeleteRollingFileWriter, DataRollingFileWriter for different engines, for example FlinkPosDeleteRollingWriter, SparkPosDeleteRollingWriter because different engines need to contruct different FileAppenders to convert the specified data type into the unified parquet/orc/avro files. I think that won't be less complexity compared to current solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we would require a different class for each engine. The file writers are currently created using an AppenderFactory and we could continue using that.
Also, we would not need 3 rolling writers that are nearly identical. We would only need 2 because the position delete writer will be substantially different because of its sort order requirement.
Because deletes may come in any order relative to inserts and we need to write out a sorted delete file, we will need to buffer the deletes in memory. That's not as expensive as it seems at first because the file location will be reused (multiple deletes in the same data file) and so the main cost is the number of positions that get deleted, which is the number of rows written and deleted in the same checkpoint per partition. The position delete writer and logic to roll over to a new file is probably not going to be shared. I don't think I would even build a rolling position delete writer unless we see real cases where it is needed.
That leaves just the equality delete writer and the data file writer. I think it would be cleaner to just have two rolling file writers because the rolling logic is so small. I went ahead and started a PR to show what it would look like: #1802
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the PR, here is my feeling:
First of all, I like the idea to use a single FileAppenderFactory to customize different writers for different computing engines, it's unified and graceful. Developers would find it's easy to understand and customize.
Second, I agreed that it's necessary to consider the sort order for position delete writer. We have to sort the pairs in memory (it's similar to the process about flushing the sorted memstore to HFiles in HBase.), once our memory size reached the threshold then would flush it to pos-delete file, then the rolling policy is decided by the memory size rather than the current file size. Sounds reasonable to make it to be a separate pos-writer.
Third, I'd like to finish the whole cdc write path work (PoC) based on #1802 to see whether there're other issues.
Thanks.
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| public class DataFileWriter<T> implements ContentFileWriter<DataFile, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much of this duplicates FileAppender. Should we use that interface instead and add toDataFile to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, because FileAppender is the basic writer which would be shared by all of the data file writer, equality-delete writer, position-delete writer, even the manifest writer is based on it. So apparently putting the toDataFile to FileAppender is not a good choice.
…up keys in insertedRowMap.
| } | ||
|
|
||
| public Set<CharSequence> referencedDataFiles() { | ||
| return referencedDataFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This referencedDataFiles will be used to do the validation for RowDelta#validateDataFilesExist.
| /** | ||
| * Create a factory to initialize the {@link FileAppender}. | ||
| */ | ||
| FileAppenderFactory<T> createFileAppenderFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we should only have one FileAppenderFactory for one TaskWriter, each partition DeltaWriter could share the FileAppenderFactory. It's better to move it out of this factory.
The following createDataFileWriterFactory, createEqualityDeleteWriterFactory, createEqualityDeleteWriterFactory have the similar issues.
This patch will address the issue #1639, it's mainly used for POC purpose.
Currently, I finished few abstraction work:
ContentFileWriter, so that we DataFile , EqualityDeleteFile, PosDeleteFile could implement this interface and provide a unifiedwriteentry.It's still working in-progress now, I will remove the
Draftlabel once it's really for reviewing.