Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static DataFile appendToLocalFile(
return DataFiles.builder(table.spec())
.withRecordCount(records.size())
.withFileSizeInBytes(file.length())
.withPath(file.toURI().toString())
.withPath(Files.localInput(file).location())
.withMetrics(appender.metrics())
.withFormat(format)
.withPartition(partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>

@VisibleForTesting
Schema projectedSchema() {
return context.projectedSchema();
return context.project();
}

@Override
Expand Down Expand Up @@ -92,7 +92,7 @@ public void configure(Configuration parameters) {
@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, tableSchema, context.projectedSchema(), context.nameMapping(),
split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* TODO Implement {@link LocatableInputSplit}.
Expand All @@ -44,4 +45,12 @@ public int getSplitNumber() {
CombinedScanTask getTask() {
return task;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("splitNumber", splitNumber)
.add("task", task)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -70,10 +70,7 @@ public static class Builder {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private long limit;
private ScanContext context = new ScanContext();

private RowDataTypeInfo rowTypeInfo;
private final ScanContext.Builder contextBuilder = ScanContext.builder();

public Builder tableLoader(TableLoader newLoader) {
this.tableLoader = newLoader;
Expand All @@ -91,7 +88,7 @@ public Builder env(StreamExecutionEnvironment newEnv) {
}

public Builder filters(List<Expression> filters) {
this.context = context.filterRows(filters);
contextBuilder.filters(filters);
return this;
}

Expand All @@ -101,57 +98,62 @@ public Builder project(TableSchema schema) {
}

public Builder limit(long newLimit) {
this.limit = newLimit;
contextBuilder.limit(newLimit);
return this;
}

public Builder properties(Map<String, String> properties) {
this.context = context.fromProperties(properties);
contextBuilder.fromProperties(properties);
return this;
}

public Builder caseSensitive(boolean caseSensitive) {
this.context = context.setCaseSensitive(caseSensitive);
contextBuilder.caseSensitive(caseSensitive);
return this;
}

public Builder snapshotId(Long snapshotId) {
this.context = context.useSnapshotId(snapshotId);
contextBuilder.useSnapshotId(snapshotId);
return this;
}

public Builder startSnapshotId(Long startSnapshotId) {
this.context = context.startSnapshotId(startSnapshotId);
contextBuilder.startSnapshotId(startSnapshotId);
return this;
}

public Builder endSnapshotId(Long endSnapshotId) {
this.context = context.endSnapshotId(endSnapshotId);
contextBuilder.endSnapshotId(endSnapshotId);
return this;
}

public Builder asOfTimestamp(Long asOfTimestamp) {
this.context = context.asOfTimestamp(asOfTimestamp);
contextBuilder.asOfTimestamp(asOfTimestamp);
return this;
}

public Builder splitSize(Long splitSize) {
this.context = context.splitSize(splitSize);
contextBuilder.splitSize(splitSize);
return this;
}

public Builder splitLookback(Integer splitLookback) {
this.context = context.splitLookback(splitLookback);
contextBuilder.splitLookback(splitLookback);
return this;
}

public Builder splitOpenFileCost(Long splitOpenFileCost) {
this.context = context.splitOpenFileCost(splitOpenFileCost);
contextBuilder.splitOpenFileCost(splitOpenFileCost);
return this;
}

public Builder streaming(boolean streaming) {
contextBuilder.streaming(streaming);
return this;
}

public Builder nameMapping(String nameMapping) {
this.context = context.nameMapping(nameMapping);
contextBuilder.nameMapping(nameMapping);
return this;
}

Expand All @@ -178,35 +180,37 @@ public FlinkInputFormat buildFormat() {
encryption = table.encryption();
}

rowTypeInfo = RowDataTypeInfo.of((RowType) (
projectedSchema == null ?
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) :
projectedSchema).toRowDataType().getLogicalType());

context = context.project(projectedSchema == null ? icebergSchema :
FlinkSchemaUtil.convert(icebergSchema, projectedSchema));

context = context.limit(limit);
if (projectedSchema == null) {
contextBuilder.project(icebergSchema);
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
}

return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context);
return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}

public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
if (isBounded(context)) {
return env.createInput(format, rowTypeInfo);

ScanContext context = contextBuilder.build();
TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));

if (!context.isStreaming()) {
return env.createInput(format, typeInfo);
} else {
throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);

String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
String readerOperatorName = String.format("Iceberg table (%s) reader", table);

return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
}
}
}

private static boolean isBounded(ScanContext context) {
return context.startSnapshotId() == null || context.endSnapshotId() != null;
}

public static boolean isBounded(Map<String, String> properties) {
return isBounded(new ScanContext().fromProperties(properties));
return !ScanContext.builder().fromProperties(properties).build().isStreaming();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
.project(context.projectedSchema());
.project(context.project());

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
Expand Down Expand Up @@ -77,8 +77,8 @@ private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}

if (context.filterExpressions() != null) {
for (Expression filter : context.filterExpressions()) {
if (context.filters() != null) {
for (Expression filter : context.filters()) {
scan = scan.filter(filter);
}
}
Expand Down
Loading