Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
import org.apache.iceberg.flink.source.RowDataRewriter;
Expand Down Expand Up @@ -51,7 +52,7 @@ protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScan
int size = combinedScanTasks.size();
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream = env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager());
RowDataRewriter rowDataRewriter = new RowDataRewriter(SerializableTable.copyOf(table()), caseSensitive());
try {
return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

/**
Expand All @@ -41,21 +38,14 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>

private static final long serialVersionUID = 1L;

private final TableLoader tableLoader;
private final Schema tableSchema;
private final FileIO io;
private final EncryptionManager encryption;
private final Table table;
private final ScanContext context;

private transient RowDataIterator iterator;
private transient long currentReadCount = 0L;

FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption,
ScanContext context) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
this.io = io;
this.encryption = encryption;
FlinkInputFormat(Table table, ScanContext context) {
this.table = table;
this.context = context;
}

Expand All @@ -72,12 +62,7 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

@Override
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// Called in Job manager, so it is OK to load table from catalog.
tableLoader.open();
try (TableLoader loader = tableLoader) {
Table table = loader.loadTable();
return FlinkSplitGenerator.createInputSplits(table, context);
}
return FlinkSplitGenerator.createInputSplits(table, context);
}

@Override
Expand All @@ -91,9 +76,7 @@ public void configure(Configuration parameters) {

@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
this.iterator = new RowDataIterator(table, split.getTask(), context.project(), context.caseSensitive());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class FlinkSource {
Expand Down Expand Up @@ -157,11 +156,6 @@ public Builder streaming(boolean streaming) {
return this;
}

public Builder nameMapping(String nameMapping) {
contextBuilder.nameMapping(nameMapping);
return this;
}

public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -170,33 +164,25 @@ public Builder flinkConf(ReadableConfig config) {
public FlinkInputFormat buildFormat() {
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

Schema icebergSchema;
FileIO io;
EncryptionManager encryption;
if (table == null) {
// load required fields by table loader.
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
}
Schema icebergSchema = table.schema();

if (projectedSchema == null) {
contextBuilder.project(icebergSchema);
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
}

return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
// we pass a read-only serializable copy of the current table state to FlinkInputFormat
return new FlinkInputFormat(SerializableTable.copyOf(table), contextBuilder.build());
}

public DataStream<RowData> build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroReader;
Expand All @@ -37,7 +37,6 @@
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
Expand All @@ -47,19 +46,20 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

class RowDataIterator extends DataIterator<RowData> {

private final Schema tableSchema;
private final Schema projectedSchema;
private final String nameMapping;
private final boolean caseSensitive;

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
Schema projectedSchema, String nameMapping, boolean caseSensitive) {
super(task, io, encryption);
this.tableSchema = tableSchema;
RowDataIterator(Table table, CombinedScanTask task, Schema projectedSchema, boolean caseSensitive) {
super(task, table.io(), table.encryption());
this.tableSchema = table.schema();
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,57 +31,45 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

public class RowDataRewriter {

private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);

private final Schema schema;
private final String nameMapping;
private final FileIO io;
private final Table table;
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final TaskWriterFactory<RowData> taskWriterFactory;
private final String tableName;

public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, EncryptionManager encryptionManager) {
this.schema = table.schema();
public RowDataRewriter(Table table, boolean caseSensitive) {
this.table = table;
this.caseSensitive = caseSensitive;
this.io = io;
this.encryptionManager = encryptionManager;
this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null);
this.tableName = table.name();

String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory = new RowDataTaskWriterFactory(
SerializableTable.copyOf(table),
table,
flinkSchema,
Long.MAX_VALUE,
format,
null);
}

public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {
RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory);
RewriteMap map = new RewriteMap(table, caseSensitive, taskWriterFactory);
DataStream<List<DataFile>> ds = dataStream.map(map).setParallelism(parallelism);
return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream)
.collect(Collectors.toList());
Expand All @@ -93,20 +81,13 @@ public static class RewriteMap extends RichMapFunction<CombinedScanTask, List<Da
private int subTaskId;
private int attemptId;

private final Schema schema;
private final String nameMapping;
private final FileIO io;
private final Table table;
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final TaskWriterFactory<RowData> taskWriterFactory;

public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean caseSensitive,
EncryptionManager encryptionManager, TaskWriterFactory<RowData> taskWriterFactory) {
this.schema = schema;
this.nameMapping = nameMapping;
this.io = io;
public RewriteMap(Table table, boolean caseSensitive, TaskWriterFactory<RowData> taskWriterFactory) {
this.table = table;
this.caseSensitive = caseSensitive;
this.encryptionManager = encryptionManager;
this.taskWriterFactory = taskWriterFactory;
}

Expand All @@ -122,8 +103,7 @@ public void open(Configuration parameters) {
public List<DataFile> map(CombinedScanTask task) throws Exception {
// Initialize the task writer.
this.writer = taskWriterFactory.create();
try (RowDataIterator iterator =
new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) {
try (RowDataIterator iterator = new RowDataIterator(table, task, table.schema(), caseSensitive)) {
while (iterator.hasNext()) {
RowData rowData = iterator.next();
writer.write(rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

/**
* Context object with optional arguments for a Flink Scan.
*/
Expand Down Expand Up @@ -79,14 +77,13 @@ class ScanContext implements Serializable {
private final boolean isStreaming;
private final Duration monitorInterval;

private final String nameMapping;
private final Schema schema;
private final List<Expression> filters;
private final long limit;

private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
boolean isStreaming, Duration monitorInterval, String nameMapping,
boolean isStreaming, Duration monitorInterval,
Schema schema, List<Expression> filters, long limit) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
Expand All @@ -99,7 +96,6 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId
this.isStreaming = isStreaming;
this.monitorInterval = monitorInterval;

this.nameMapping = nameMapping;
this.schema = schema;
this.filters = filters;
this.limit = limit;
Expand Down Expand Up @@ -145,10 +141,6 @@ Duration monitorInterval() {
return monitorInterval;
}

String nameMapping() {
return nameMapping;
}

Schema project() {
return schema;
}
Expand All @@ -173,7 +165,6 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI
.splitOpenFileCost(splitOpenFileCost)
.streaming(isStreaming)
.monitorInterval(monitorInterval)
.nameMapping(nameMapping)
.project(schema)
.filters(filters)
.limit(limit)
Expand All @@ -192,7 +183,6 @@ ScanContext copyWithSnapshotId(long newSnapshotId) {
.splitOpenFileCost(splitOpenFileCost)
.streaming(isStreaming)
.monitorInterval(monitorInterval)
.nameMapping(nameMapping)
.project(schema)
.filters(filters)
.limit(limit)
Expand All @@ -214,7 +204,6 @@ static class Builder {
private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
private boolean isStreaming = STREAMING.defaultValue();
private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
private String nameMapping;
private Schema projectedSchema;
private List<Expression> filters;
private long limit = -1L;
Expand Down Expand Up @@ -272,11 +261,6 @@ Builder monitorInterval(Duration newMonitorInterval) {
return this;
}

Builder nameMapping(String newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

Builder project(Schema newProjectedSchema) {
this.projectedSchema = newProjectedSchema;
return this;
Expand Down Expand Up @@ -305,14 +289,13 @@ Builder fromProperties(Map<String, String> properties) {
.splitLookback(config.get(SPLIT_LOOKBACK))
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
.monitorInterval(config.get(MONITOR_INTERVAL))
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
.monitorInterval(config.get(MONITOR_INTERVAL));
}

public ScanContext build() {
return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
splitOpenFileCost, isStreaming, monitorInterval, projectedSchema,
filters, limit);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public void testInferedParallelism() throws IOException {
long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), dataFile2.fileSizeInBytes());
sql("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen);

table.refresh();
flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();

// 2 splits (max infer is the default value 100 , max > splits num), the parallelism is splits num : 2
parallelism = FlinkSource.forRowData().inferParallelism(flinkInputFormat, scanContext);
Assert.assertEquals("Should produce the expected parallelism.", 2, parallelism);
Expand Down