-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: use SerializableTable for source #6407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: use SerializableTable for source #6407
Conversation
| @Override | ||
| public IncrementalAppendScan newIncrementalAppendScan() { | ||
| TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider); | ||
| return new BaseIncrementalAppendScan(ops, lazyTable()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be lazyTable().newIncrementalAppendScan?
nit: please move this method next to newScan() above following the same order as Table interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| this.recordOffset = 0L; | ||
| } | ||
|
|
||
| public DataIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove the other constructor (also to avoid code duplication)? this is an @Internal class. it should be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| private final EncryptionManager encryption; | ||
| private final ScanContext context; | ||
| private final RowDataFileScanTaskReader rowDataReader; | ||
| private final Table table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put this before context following the same order on usage bellow.
|
|
||
| return new FlinkInputFormat( | ||
| tableLoader, icebergSchema, io, encryption, contextBuilder.build()); | ||
| (SerializableTable) SerializableTable.copyOf(table), contextBuilder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I like that in this PRFlinkInputFormat use the SeriallizableTable as arg type so that we are clear it is a SerializableTable or regular table.
I see other place just use Table as arg to avoid type cast, e.g. RowDataTaskWriterFactory. hence just to point it out.
static IcebergStreamWriter<RowData> createStreamWriter(
Table table,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
flinkWriteConf.dataFileFormat(),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. Let me change those input parameters explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a place where we specifically can not work with tables, just SerializedTables?
I usually try to stick to the lowest requirements in method arguments, so we can have higher reusability.
| } | ||
|
|
||
| icebergSchema = table.schema(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line not needed?
| RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); | ||
| this.taskWriterFactory = | ||
| new RowDataTaskWriterFactory( | ||
| SerializableTable.copyOf(table), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table seems like a regular table, if we trace back the code. so we may not be able to change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed ctor input parameter to SerializableTable.
| private final boolean caseSensitive; | ||
| private final FileIO io; | ||
| private final EncryptionManager encryption; | ||
| private final Table table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keep the same order as arg usage
| boolean caseSensitive, | ||
| FileIO io, | ||
| EncryptionManager encryption) { | ||
| Table table, ReadableConfig config, Schema projectedSchema, boolean caseSensitive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to be consistent, this should be SerializableTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); | ||
|
|
||
| public static final HadoopTables tables = new HadoopTables(new Configuration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use HadoopTableResource instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I didn't know that. It looks cleaner.
| flinkConfig, | ||
| table.schema(), | ||
| context.project(), | ||
| context.nameMapping(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this essentially deprecates ScanContext#nameMapping and only retrieve from table property. Technically, this breaks backward compatibility. is there any use case where Flink job needs to set a different name mapping than table property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember that when we were implementing historical queries in Hive (AS OF VERSION, AS OF TIMESTAMP) then we found that the namemapping is bound to the table, and not bound to the version. So if some specific schema evolution happens (migrate a table, rename a column, add back an old column - or something like this - sadly I do not remember the specifics) then we were not able to restore the original mapping from the current one, and we do not able to query the data.
Being able to provide a namemapping could help here.
Arguably, this is a rare case, and the correct fix would be a spec change, still I thought it worth to mention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it was long ago, so we might want to double check the current situation before acting on this 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, This one (#2275) does the job. Now we can use schema Id in the snapshot to track which schema it writes. Anyway, let me revert this back since it should be in another PR even if we want to remove this.
| } finally { | ||
| workerPool.shutdown(); | ||
| } | ||
| return FlinkSplitPlanner.planInputSplits(table, context, workerPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to double check. input format is only for batch query, right? SerializableTable is read only and can't be used for long-running streaming source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink source uses the monitor function to monitor snapshots and forward splits to StreamingReaderOperator, StreamingReaderOperator uses FlinkInputFormat to open splits. Opening splits should work even the table is readonly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. using readonly table for reader function is fine.
This revives effort from #2987. Copy most from @aokolnychyi 's PR.