Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ public TableScan newScan() {
return lazyTable().newScan();
}

@Override
public IncrementalAppendScan newIncrementalAppendScan() {
return lazyTable().newIncrementalAppendScan();
}

@Override
public Snapshot currentSnapshot() {
return lazyTable().currentSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
import org.apache.iceberg.flink.source.RowDataRewriter;
Expand Down Expand Up @@ -51,7 +52,7 @@ protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScan
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter =
new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
new RowDataRewriter((SerializableTable) SerializableTable.copyOf(table()), caseSensitive());
try {
return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
Expand All @@ -49,13 +48,10 @@ public class DataIterator<T> implements CloseableIterator<T> {
private long recordOffset;

public DataIterator(
FileScanTaskReader<T> fileScanTaskReader,
CombinedScanTask task,
FileIO io,
EncryptionManager encryption) {
Table table, FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task) {
this.fileScanTaskReader = fileScanTaskReader;

this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
this.inputFilesDecryptor = new InputFilesDecryptor(task, table.io(), table.encryption());
this.combinedTask = task;

this.tasks = task.files().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.ThreadPools;

Expand All @@ -41,28 +39,19 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>

private static final long serialVersionUID = 1L;

private final TableLoader tableLoader;
private final FileIO io;
private final EncryptionManager encryption;
private final Table table;
private final ScanContext context;
private final RowDataFileScanTaskReader rowDataReader;

private transient DataIterator<RowData> iterator;
private transient long currentReadCount = 0L;

FlinkInputFormat(
TableLoader tableLoader,
Schema tableSchema,
FileIO io,
EncryptionManager encryption,
ScanContext context) {
this.tableLoader = tableLoader;
this.io = io;
this.encryption = encryption;
FlinkInputFormat(SerializableTable table, ScanContext context) {
this.table = table;
this.context = context;
this.rowDataReader =
new RowDataFileScanTaskReader(
tableSchema, context.project(), context.nameMapping(), context.caseSensitive());
table.schema(), context.project(), context.nameMapping(), context.caseSensitive());
}

@VisibleForTesting
Expand All @@ -79,15 +68,9 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
@Override
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
tableLoader.open();
final ExecutorService workerPool =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", context.planParallelism());
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
} finally {
workerPool.shutdown();
}
return FlinkSplitPlanner.planInputSplits(table, context, workerPool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check. input format is only for batch query, right? SerializableTable is read only and can't be used for long-running streaming source.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink source uses the monitor function to monitor snapshots and forward splits to StreamingReaderOperator, StreamingReaderOperator uses FlinkInputFormat to open splits. Opening splits should work even the table is readonly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. using readonly table for reader function is fine.

}

@Override
Expand All @@ -102,7 +85,7 @@ public void configure(Configuration parameters) {}

@Override
public void open(FlinkInputSplit split) {
this.iterator = new DataIterator<>(rowDataReader, split.getTask(), io, encryption);
this.iterator = new DataIterator<>(table, rowDataReader, split.getTask());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -189,25 +188,17 @@ public FlinkInputFormat buildFormat() {
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

Schema icebergSchema;
FileIO io;
EncryptionManager encryption;
if (table == null) {
// load required fields by table loader.
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
}

icebergSchema = table.schema();
if (projectedSchema == null) {
contextBuilder.project(icebergSchema);
} else {
Expand All @@ -220,7 +211,7 @@ public FlinkInputFormat buildFormat() {
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));

return new FlinkInputFormat(
tableLoader, icebergSchema, io, encryption, contextBuilder.build());
(SerializableTable) SerializableTable.copyOf(table), contextBuilder.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I like that in this PRFlinkInputFormat use the SeriallizableTable as arg type so that we are clear it is a SerializableTable or regular table.

I see other place just use Table as arg to avoid type cast, e.g. RowDataTaskWriterFactory. hence just to point it out.

  static IcebergStreamWriter<RowData> createStreamWriter(
      Table table,
      FlinkWriteConf flinkWriteConf,
      RowType flinkRowType,
      List<Integer> equalityFieldIds) {
    Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");

    Table serializableTable = SerializableTable.copyOf(table);
    TaskWriterFactory<RowData> taskWriterFactory =
        new RowDataTaskWriterFactory(
            serializableTable,
            flinkRowType,
            flinkWriteConf.targetDataFileSize(),
            flinkWriteConf.dataFileFormat(),
            equalityFieldIds,
            flinkWriteConf.upsertMode());
    return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Let me change those input parameters explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a place where we specifically can not work with tables, just SerializedTables?
I usually try to stick to the lowest requirements in method arguments, so we can have higher reusability.

}

public DataStream<RowData> build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -357,13 +358,11 @@ public IcebergSource<T> build() {
if (readerFunction == null) {
RowDataReaderFunction rowDataReaderFunction =
new RowDataReaderFunction(
(SerializableTable) SerializableTable.copyOf(table),
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this essentially deprecates ScanContext#nameMapping and only retrieve from table property. Technically, this breaks backward compatibility. is there any use case where Flink job needs to set a different name mapping than table property?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that when we were implementing historical queries in Hive (AS OF VERSION, AS OF TIMESTAMP) then we found that the namemapping is bound to the table, and not bound to the version. So if some specific schema evolution happens (migrate a table, rename a column, add back an old column - or something like this - sadly I do not remember the specifics) then we were not able to restore the original mapping from the current one, and we do not able to query the data.
Being able to provide a namemapping could help here.

Arguably, this is a rare case, and the correct fix would be a spec change, still I thought it worth to mention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it was long ago, so we might want to double check the current situation before acting on this 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, This one (#2275) does the job. Now we can use schema Id in the snapshot to track which schema it writes. Anyway, let me revert this back since it should be in another PR even if we want to remove this.

context.caseSensitive(),
table.io(),
table.encryption());
context.caseSensitive());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -50,22 +47,14 @@ public class RowDataRewriter {

private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);

private final Schema schema;
private final String nameMapping;
private final FileIO io;
private final Table table;
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final TaskWriterFactory<RowData> taskWriterFactory;
private final String tableName;

public RowDataRewriter(
Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) {
this.schema = table.schema();
public RowDataRewriter(SerializableTable table, boolean caseSensitive) {
this.table = table;
this.caseSensitive = caseSensitive;
this.io = io;
this.encryptionManager = encryptionManager;
this.nameMapping =
PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null);
this.tableName = table.name();

String formatString =
Expand All @@ -77,20 +66,12 @@ public RowDataRewriter(
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory =
new RowDataTaskWriterFactory(
SerializableTable.copyOf(table),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table seems like a regular table, if we trace back the code. so we may not be able to change this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed ctor input parameter to SerializableTable.

flinkSchema,
Long.MAX_VALUE,
format,
table.properties(),
null,
false);
table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, false);
}

public List<DataFile> rewriteDataForTasks(
DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {
RewriteMap map =
new RewriteMap(
schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);
RewriteMap map = new RewriteMap(table, caseSensitive, taskWriterFactory);
DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);
return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream()
.flatMap(Collection::stream)
Expand All @@ -103,29 +84,20 @@ public static class RewriteMap extends RichMapFunction<CombinedScanTask, List<Da
private int subTaskId;
private int attemptId;

private final Schema schema;
private final String nameMapping;
private final FileIO io;
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final Table table;
private final TaskWriterFactory<RowData> taskWriterFactory;
private final RowDataFileScanTaskReader rowDataReader;

public RewriteMap(
Schema schema,
String nameMapping,
FileIO io,
boolean caseSensitive,
EncryptionManager encryptionManager,
TaskWriterFactory<RowData> taskWriterFactory) {
this.schema = schema;
this.nameMapping = nameMapping;
this.io = io;
this.caseSensitive = caseSensitive;
this.encryptionManager = encryptionManager;
Table table, boolean caseSensitive, TaskWriterFactory<RowData> taskWriterFactory) {
this.table = table;
this.taskWriterFactory = taskWriterFactory;
this.rowDataReader =
new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
new RowDataFileScanTaskReader(
table.schema(),
table.schema(),
table.properties().get(DEFAULT_NAME_MAPPING),
caseSensitive);
}

@Override
Expand All @@ -140,8 +112,7 @@ public void open(Configuration parameters) {
public List<DataFile> map(CombinedScanTask task) throws Exception {
// Initialize the task writer.
this.writer = taskWriterFactory.create();
try (DataIterator<RowData> iterator =
new DataIterator<>(rowDataReader, task, io, encryptionManager)) {
try (DataIterator<RowData> iterator = new DataIterator<>(table, rowDataReader, task)) {
while (iterator.hasNext()) {
RowData rowData = iterator.next();
writer.write(rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,43 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
private final Schema tableSchema;
private final Table table;
private final Schema readSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final FileIO io;
private final EncryptionManager encryption;

public RowDataReaderFunction(
SerializableTable table,
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
boolean caseSensitive) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
new RowDataRecordFactory(
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)))));
this.tableSchema = tableSchema;
this.readSchema = readSchema(tableSchema, projectedSchema);
FlinkSchemaUtil.convert(readSchema(table.schema(), projectedSchema)))));
this.table = table;
this.readSchema = readSchema(table.schema(), projectedSchema);
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.io = io;
this.encryption = encryption;
}

@Override
public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
return new DataIterator<>(
new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive),
split.task(),
io,
encryption);
table,
new RowDataFileScanTaskReader(table.schema(), readSchema, nameMapping, caseSensitive),
split.task());
}

private static Schema readSchema(Schema tableSchema, Schema projectedSchema) {
Expand Down
Loading