-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement the flink stream writer to accept the row data and emit the complete data files event to downstream #1145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I should have time to review this today or tomorrow. Thanks for getting this ready, @openinx! |
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @openinx , left some comments.
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/TaskWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/UnpartitionedWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
Outdated
Show resolved
Hide resolved
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the Flink 1.11 will be released soon.
And then, the connectors should implement source/sink using RowData instead of Row. RowData is something like Spark InternalRow. We must have many accessors too.
flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
|
Thanks for your reviewing, @JingsongLi let me take a look. |
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
| private TableUtil() { | ||
| } | ||
|
|
||
| static Table findTable(String path, Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea to mimic the behavior of Spark 2 here. That's really limited. What about using job configuration to instantiate a catalog and load a table by name, like the newer Spark 3 integration does? Here's the javadoc: https://github.com/apache/iceberg/blob/master/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L64-L77
That uses this logic to determine the catalog and load a table:
/**
* Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter.
*
* @param name Spark's catalog name
* @param options Spark's catalog options
* @return an Iceberg catalog
*/
protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) {
Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration();
String catalogType = options.getOrDefault("type", "hive");
switch (catalogType) {
case "hive":
int clientPoolSize = options.getInt("clients", 2);
String uri = options.get("uri");
return new HiveCatalog(name, uri, clientPoolSize, conf);
case "hadoop":
String warehouseLocation = options.get("warehouse");
return new HadoopCatalog(name, conf, warehouseLocation);
default:
throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good suggestion. And the extension is, in Flink, also has Catalog interface, it is better to integrate iceberg catalog to Flink catalog, just like what Spark 3 do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can/Should I have a try to contribute integrating iceberg catalog to Flink catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi seems we could work those things together. I will focus on the streaming writer and keep the simple findTable here. and you could provide a pull request in buildIcebergCatalog way to integrate iceberg catalog to flink catalog (if you want).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created #1170 for further discussion.
|
|
||
| @Override | ||
| public void open() { | ||
| this.table = TableUtil.findTable(tablePath, conf.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: in other modules, we don't use this. when reading a field, only when assigning to it so it is clear whether the assignment is to a local variable or a field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine, let's keep the rule.
| this.table = TableUtil.findTable(tablePath, conf.get()); | ||
| if (this.readSchema != null) { | ||
| // reassign ids to match the existing table schema | ||
| readSchema = TypeUtil.reassignIds(readSchema, table.schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this reassigning the read schema's IDs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the readSchema was converted from user-provided flink table schema, its field id(s) is not matching the written iceberg table's. Then the following validateWriteSchema will regard those two tables as incompatible because of the diff id. for example:
Cannot write incompatible dataset to table with schema:
table {
0: id: optional int
1: data: optional string
}
write schema:table {
1: id: optional int
2: data: optional string
}
Problems:
* data: int cannot be promoted to string
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
0: id: optional int
1: data: optional string
}
write schema:table {
1: id: optional int
2: data: optional string
}
Problems:
* data: int cannot be promoted to string
at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:216)
at org.apache.iceberg.flink.IcebergStreamWriter.open(IcebergStreamWriter.java:97)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:558)
at org.apache.iceberg.flink.TestIcebergStreamWriter.testWritingTable(TestIcebergStreamWriter.java:94)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that sounds correct to me.
One thing that doesn't make sense is why the read schema is coming from the user with IDs. We would normally want the projection to be provided as names to project from the table schema, then use table.schema().select("col1", "col2", ...) to get the read schema.
The way we do this for Spark SQL is we convert the Iceberg schema to the SQL representation, then are passed back a subset of that schema. Next, we convert the SQL schema back to Iceberg using a method like the reassignIds you use here. I think what I didn't expect was that you'd get an Iceberg schema passed in as the user's schema. Does Flink have a schema representation we could use instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the question, Flink provide both DataStream API and Table API. For DataStream API, actually we don't have to provide the table schema so it could be null. While for Table API, it's required to provide a table schema to specify where the data will be written into, so we need to do the validation for the user-provided schema. I written a flink connector before in here ( unit test is here ) , you may want to take a look.
| this.pos = new int[size]; | ||
| this.transforms = new Transform[size]; | ||
|
|
||
| Map<Integer, Integer> fieldId2Pos = buildFieldId2PosMap(spec.schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't handle nested data. I think it would be better to use a wrapper for Row and then use the standard accessors that are provided by schema.accessorForField(fieldId). That Row wrapper would implement StructLike, which is what the accessors use. It would also be responsible for converting to the internal representation of data values, like the wrapper for Iceberg generic rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my thought, the PartitionSpec will only use the root-level fields so I simplified the accessor to buildFieldId2PosMap... I'm not quite sure whether we need the complex tree-traverse, let me take a deeper look..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg allows partitioning by fields that are nested in structs. For simple example of when you'd want to do this, think about an HTTP log stream, with request and response structs. You might want to partition by truncate(response.status, 100) to get partitions of 200, 300, 400, and 500 response codes. Since a struct is just a logical grouping, we want to be able to partition by its fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public PartitionKey build(Row row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other cases, we avoid allocating a new key for the partition tuple every row, and we defensively copy partition keys. It might be a good idea to generalize the PartitionKey from Spark and use it here.
| import java.io.Serializable; | ||
| import org.apache.iceberg.DataFile; | ||
|
|
||
| public class SerializableDataFile implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? The DataFile implementations used internally are Serializable. Although we don't implement Serializable in the DataFile API, as long as you're using the internal implementation of DataFile, you should be able to serialize it.
| private final long taskId; | ||
| private int fileCount; | ||
|
|
||
| public OutputFileFactory(Table table, FileFormat format, long taskId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table is not serializable. That's why the Spark output file factory has fields for LocationProvider, FileIO, EncryptionManager, and PartitionSpec that are serializable. Those are held by the task and this is created on each task after serialization to workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think I get the difference you mentioned between spark writer implementation and flink writer implementation:
For spark, it will load the iceberg table firstly at driver side, then create the DataWriterFactory and serialize and dispatch it to each executor, then the executor will create its DataWriter, so here each executor won't need to load the iceberg table.
For flink , I currently implemented the IcebergStreamWriter by loading the iceberg table for each sub task, then each task get the table path and open the iceberg table, so the Table instance won't need to be serializable. Seems the iceberg table will be loaded 100 times if we have 100 parallerism for IcebergStreamWriter. @JingsongLi Any thought about this issue ? I mean: do we have some similar ways as spark did to optimize the iceberg table loading ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand what you mean.
First, the StreamOperator.open is invoked in tasks instead of client.
Maybe we should avoid loading table in tasks, image the catalog is hive catalog, too many tasks visit the catalog will lead to collapse of hive metastore.
So in this direction, we should store serializable LocationProvider, FileIO, EncryptionManager, and PartitionSpec in StreamOperator instead of loading table at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to avoid loading the table in each task. There is no need for the table reference on workers, and not having it will prevent patterns that are bad, like attempting to commit to the table from every task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I changed the design in commit 0026d16, say it will load the table firstly, then initialize the IcebergStreamWriter instances and then deploy the flink stream job to flink jobManager, and finally the flink jobManager will allocate slot and dispatch the task to each slot. In general, we currently load the table once.. Thanks for the discussion.
flink/src/main/java/org/apache/iceberg/flink/writer/OutputFileFactory.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
|
@rdblue @JingsongLi I've addressed the comment in the latest patch, Mind to take another look if you guys have time ? |
|
|
||
| // Emit the data file entries to downstream committer operator if there exist any complete files. | ||
| List<DataFile> completeFiles = writer.pollCompleteFiles(); | ||
| if (!completeFiles.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isEmpty redundant?
| LOG.info("Iceberg writer({}) begin preparing for checkpoint {}", toString(), checkpointId); | ||
| // close all open files and emit files to downstream committer operator | ||
| writer.close(); | ||
| for (DataFile dataFile : writer.pollCompleteFiles()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: writer.pollCompleteFiles().forEach(this::emit);
|
I should be able to take another look tomorrow. |
| } | ||
| } | ||
|
|
||
| static IcebergStreamWriter createStreamWriter(String path, TableSchema tableSchema, Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just pass a Iceberg Table instead of path here? We can get ride of TableUtil.findTable (Or just move it to test).
| // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining | ||
| // data files to downstream before closing the writer so that we won't miss any of them. | ||
| writer.close(); | ||
| for (DataFile dataFile : writer.pollCompleteFiles()) { | ||
| emit(dataFile); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it allowed to call pollCompleteFiles() after close(). IIUC, the writer is still needed when emitting the remaining datafiles as well as when calling pollCompleteFiles(). But possibly the contract around close is different than I expect it to be.
Additionally, the comment here is somewhat confusing. Based on the discussion above, and based on the task lifecycle of Flink operators outlined here, it seems like it might be more appropriate to say something along the lines of "Once the close method of this IcebergStreamWriter is invoked, we'll no longer be able to emit any remaining data files downstream. To get around this, we implement the BoundedOneInput interface in order to finish processing and emit any remaining data before graceful shutdown".
My third and final concern is that if we're closing the TaskWriter here during a graceful shutdown, if we only poll for complete files, what will happen to any remaining data that's being buffered in files that are not marked as complete? IIUC, the TaskWriter does not keep its opened / in-process files in flink's state such that it can be recovered after restore or replayed from the last checkpoint after a fatal. If I have that correct, it seems to me that we would want to poll for simply any remaining open files and emit them, otherwise we risk data loss. Can somebody help me better understand how we avoid losing data from in-process but incomplete files during a graceful shutdown (say a user takes a savepoint in order to restart their application and deploy a new version of their code)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But possibly the contract around close is different than I expect it to be.
@kbendick It's better to rename the close to flush here, because the current writer.close is just closing all the opening data files and the writer could still accept record and write to the new opened files. In the task writer abstraction pull request, we've discussed this and changed to the way similar with spark. Say once we need to get the whole completed data file list, we will close the writer. If there's new record to write, then we will initialize a new TaskWriter to write it.
what will happen to any remaining data that's being buffered in files that are not marked as complete?
Once the writer#close is invoked, then all the opening file handlers will flush their data to file system and close them, finally the completed DataFile (s) will be emitted to downstream operator IcebergFileCommitter. When a checkpoint come, then the committer will flush its DataFile states to flink StateBackend. So actually, we could recover the completed files from it.
|
Ping @rdblue @JingsongLi @kbendick , Mind to take another look if you have time ? Thanks. Some background: as we have discussed in the issue that flink-iceberg will use |
| * Copy constructor. | ||
| * | ||
| * @param toCopy a generic data file to copy. | ||
| * @param toCopy a generic data file to copy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you revert this change? We try to avoid changes that are non-functional and can cause commit conflicts between pull requests and while cherry-picking commits. In addition, keeping these aligned just leads to more changes. I think it's fine to format these aligned in new code, but I don't think it is a good idea to go back and make extra changes just to align.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| return; | ||
| case 12: | ||
| this.splitOffsets = (List<Long>) value; | ||
| this.splitOffsets = value != null ? ((List<Long>) value).toArray(new Long[0]) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding get implementation also needs to be updated. That's what is causing test failures.
| private transient int subTaskId; | ||
| private transient int attemptId; | ||
|
|
||
| IcebergStreamWriter(String tablePath, TaskWriterFactory<T> taskWriterFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I recommend using table.name() instead of table.location(). The name is set by the catalog, so it should be how the table was identified. For Hive, it is catalog.db.table and for Hadoop it is the location. So it is a more natural identifier to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean Table.toString()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is hidden inside BaseTable, yeah, we could use Table.toString() to get that name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. We can also add name to the table interface. I didn't realize we were still using toString.
| } | ||
|
|
||
| @Override | ||
| public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed to be called from the same thread as processElement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the same thread, but:
Before invoking them, the caller will take a same lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In recent versions, Flink has refactored the lock model, using queue and single thread.
Any way, Flink has ensured the thread safety of each method for a StreamOperator.
|
@openinx, I think this is ready to go when tests are passing. The only concern I have is about how methods are called in the stream writer -- whether multiple threads are used. |
Here is an example to show how to use the stream writer, saying we will chain the user provided |
|
Looks like this needs to be rebased, then I can merge it. |
|
OK, let me resolve this conflict. Thanks. |
| @Override | ||
| public List<Long> splitOffsets() { | ||
| return splitOffsets; | ||
| return splitOffsets != null ? Lists.newArrayList(splitOffsets) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return an ImmutableList so that it cannot be modified? That's what was happening before, although I think it matter less if this is creating a new list each time it is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that we use unmodified collection before because we don't wanna to change the contents inside splitOffsets. Now we've accomplished the same purpose. Returning it as modifiable or unmodifiable collection, both of them sounds good to me.
|
Thanks for fixing conflicts! I've merged this. Is #1185 next? |
|
Thanks for merging. I'd prefer to resolve this issue (#1305), especially the RowDataWrapper and Row->RowData refactor firstly, then we could start review the #1185. |
…fter the data compaction) (apache#1145)
…fore and after the data compaction) (apache#1145)
No description provided.