diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java index c32be08263a9..299135ad9c98 100644 --- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -88,7 +88,7 @@ private static DataFile appendToLocalFile( return DataFiles.builder(table.spec()) .withRecordCount(records.size()) .withFileSizeInBytes(file.length()) - .withPath(file.toURI().toString()) + .withPath(Files.localInput(file).location()) .withMetrics(appender.metrics()) .withFormat(format) .withPartition(partition) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index f25af650b63b..1bad1c25952e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -61,7 +61,7 @@ public class FlinkInputFormat extends RichInputFormat @VisibleForTesting Schema projectedSchema() { - return context.projectedSchema(); + return context.project(); } @Override @@ -92,7 +92,7 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { this.iterator = new RowDataIterator( - split.getTask(), io, encryption, tableSchema, context.projectedSchema(), context.nameMapping(), + split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), context.caseSensitive()); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java index 21a2f71ac86d..b59574f585ec 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java @@ -22,6 +22,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** * TODO Implement {@link LocatableInputSplit}. @@ -44,4 +45,12 @@ public int getSplitNumber() { CombinedScanTask getTask() { return task; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("splitNumber", splitNumber) + .add("task", task) + .toString(); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index e4b5907af585..95a7ba936be2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -23,12 +23,12 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; -import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -70,10 +70,7 @@ public static class Builder { private Table table; private TableLoader tableLoader; private TableSchema projectedSchema; - private long limit; - private ScanContext context = new ScanContext(); - - private RowDataTypeInfo rowTypeInfo; + private final ScanContext.Builder contextBuilder = ScanContext.builder(); public Builder tableLoader(TableLoader newLoader) { this.tableLoader = newLoader; @@ -91,7 +88,7 @@ public Builder env(StreamExecutionEnvironment newEnv) { } public Builder filters(List filters) { - this.context = context.filterRows(filters); + contextBuilder.filters(filters); return this; } @@ -101,57 +98,62 @@ public Builder project(TableSchema schema) { } public Builder limit(long newLimit) { - this.limit = newLimit; + contextBuilder.limit(newLimit); return this; } public Builder properties(Map properties) { - this.context = context.fromProperties(properties); + contextBuilder.fromProperties(properties); return this; } public Builder caseSensitive(boolean caseSensitive) { - this.context = context.setCaseSensitive(caseSensitive); + contextBuilder.caseSensitive(caseSensitive); return this; } public Builder snapshotId(Long snapshotId) { - this.context = context.useSnapshotId(snapshotId); + contextBuilder.useSnapshotId(snapshotId); return this; } public Builder startSnapshotId(Long startSnapshotId) { - this.context = context.startSnapshotId(startSnapshotId); + contextBuilder.startSnapshotId(startSnapshotId); return this; } public Builder endSnapshotId(Long endSnapshotId) { - this.context = context.endSnapshotId(endSnapshotId); + contextBuilder.endSnapshotId(endSnapshotId); return this; } public Builder asOfTimestamp(Long asOfTimestamp) { - this.context = context.asOfTimestamp(asOfTimestamp); + contextBuilder.asOfTimestamp(asOfTimestamp); return this; } public Builder splitSize(Long splitSize) { - this.context = context.splitSize(splitSize); + contextBuilder.splitSize(splitSize); return this; } public Builder splitLookback(Integer splitLookback) { - this.context = context.splitLookback(splitLookback); + contextBuilder.splitLookback(splitLookback); return this; } public Builder splitOpenFileCost(Long splitOpenFileCost) { - this.context = context.splitOpenFileCost(splitOpenFileCost); + contextBuilder.splitOpenFileCost(splitOpenFileCost); + return this; + } + + public Builder streaming(boolean streaming) { + contextBuilder.streaming(streaming); return this; } public Builder nameMapping(String nameMapping) { - this.context = context.nameMapping(nameMapping); + contextBuilder.nameMapping(nameMapping); return this; } @@ -178,35 +180,37 @@ public FlinkInputFormat buildFormat() { encryption = table.encryption(); } - rowTypeInfo = RowDataTypeInfo.of((RowType) ( - projectedSchema == null ? - FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) : - projectedSchema).toRowDataType().getLogicalType()); - - context = context.project(projectedSchema == null ? icebergSchema : - FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); - - context = context.limit(limit); + if (projectedSchema == null) { + contextBuilder.project(icebergSchema); + } else { + contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); + } - return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context); + return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); } public DataStream build() { Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat(); - if (isBounded(context)) { - return env.createInput(format, rowTypeInfo); + + ScanContext context = contextBuilder.build(); + TypeInformation typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project())); + + if (!context.isStreaming()) { + return env.createInput(format, typeInfo); } else { - throw new UnsupportedOperationException("The Unbounded mode is not supported yet"); + StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context); + + String monitorFunctionName = String.format("Iceberg table (%s) monitor", table); + String readerOperatorName = String.format("Iceberg table (%s) reader", table); + + return env.addSource(function, monitorFunctionName) + .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format)); } } } - private static boolean isBounded(ScanContext context) { - return context.startSnapshotId() == null || context.endSnapshotId() != null; - } - public static boolean isBounded(Map properties) { - return isBounded(new ScanContext().fromProperties(properties)); + return !ScanContext.builder().fromProperties(properties).build().isStreaming(); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java index ade4cfb7c957..f495e0909b7e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -47,7 +47,7 @@ private static List tasks(Table table, ScanContext context) { TableScan scan = table .newScan() .caseSensitive(context.caseSensitive()) - .project(context.projectedSchema()); + .project(context.project()); if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); @@ -77,8 +77,8 @@ private static List tasks(Table table, ScanContext context) { scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString()); } - if (context.filterExpressions() != null) { - for (Expression filter : context.filterExpressions()) { + if (context.filters() != null) { + for (Expression filter : context.filters()) { scan = scan.filter(filter); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 42804a04355e..2896efb39655 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source; import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map; import org.apache.flink.configuration.ConfigOption; @@ -61,6 +62,12 @@ class ScanContext implements Serializable { private static final ConfigOption SPLIT_FILE_OPEN_COST = ConfigOptions.key("split-file-open-cost").longType().defaultValue(null); + private static final ConfigOption STREAMING = + ConfigOptions.key("streaming").booleanType().defaultValue(false); + + private static final ConfigOption MONITOR_INTERVAL = + ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10)); + private final boolean caseSensitive; private final Long snapshotId; private final Long startSnapshotId; @@ -69,29 +76,18 @@ class ScanContext implements Serializable { private final Long splitSize; private final Integer splitLookback; private final Long splitOpenFileCost; + private final boolean isStreaming; + private final Duration monitorInterval; + private final String nameMapping; - private final Schema projectedSchema; - private final List filterExpressions; - private final Long limit; - - ScanContext() { - this.caseSensitive = CASE_SENSITIVE.defaultValue(); - this.snapshotId = SNAPSHOT_ID.defaultValue(); - this.startSnapshotId = START_SNAPSHOT_ID.defaultValue(); - this.endSnapshotId = END_SNAPSHOT_ID.defaultValue(); - this.asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); - this.splitSize = SPLIT_SIZE.defaultValue(); - this.splitLookback = SPLIT_LOOKBACK.defaultValue(); - this.splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); - this.nameMapping = null; - this.projectedSchema = null; - this.filterExpressions = null; - this.limit = null; - } + private final Schema schema; + private final List filters; + private final long limit; private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, - Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, - String nameMapping, Schema projectedSchema, List filterExpressions, Long limit) { + Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, + boolean isStreaming, Duration monitorInterval, String nameMapping, + Schema schema, List filters, long limit) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; this.startSnapshotId = startSnapshotId; @@ -100,126 +96,224 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.splitSize = splitSize; this.splitLookback = splitLookback; this.splitOpenFileCost = splitOpenFileCost; + this.isStreaming = isStreaming; + this.monitorInterval = monitorInterval; + this.nameMapping = nameMapping; - this.projectedSchema = projectedSchema; - this.filterExpressions = filterExpressions; + this.schema = schema; + this.filters = filters; this.limit = limit; } - ScanContext fromProperties(Map properties) { - Configuration config = new Configuration(); - properties.forEach(config::setString); - return new ScanContext(config.get(CASE_SENSITIVE), config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID), - config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP), config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK), - config.get(SPLIT_FILE_OPEN_COST), properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions, - limit); - } - boolean caseSensitive() { return caseSensitive; } - ScanContext setCaseSensitive(boolean isCaseSensitive) { - return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long snapshotId() { return snapshotId; } - ScanContext useSnapshotId(Long scanSnapshotId) { - return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long startSnapshotId() { return startSnapshotId; } - ScanContext startSnapshotId(Long id) { - return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId, asOfTimestamp, splitSize, splitLookback, - splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long endSnapshotId() { return endSnapshotId; } - ScanContext endSnapshotId(Long id) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id, asOfTimestamp, splitSize, splitLookback, - splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long asOfTimestamp() { return asOfTimestamp; } - ScanContext asOfTimestamp(Long timestamp) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, timestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long splitSize() { return splitSize; } - ScanContext splitSize(Long size) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, size, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Integer splitLookback() { return splitLookback; } - ScanContext splitLookback(Integer lookback) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - lookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit); - } - Long splitOpenFileCost() { return splitOpenFileCost; } - ScanContext splitOpenFileCost(Long fileCost) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, fileCost, nameMapping, projectedSchema, filterExpressions, limit); + boolean isStreaming() { + return isStreaming; + } + + Duration monitorInterval() { + return monitorInterval; } String nameMapping() { return nameMapping; } - ScanContext nameMapping(String mapping) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, mapping, projectedSchema, filterExpressions, limit); + Schema project() { + return schema; } - Schema projectedSchema() { - return projectedSchema; + List filters() { + return filters; } - ScanContext project(Schema schema) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, schema, filterExpressions, limit); + long limit() { + return limit; } - List filterExpressions() { - return filterExpressions; + ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { + return ScanContext.builder() + .caseSensitive(caseSensitive) + .useSnapshotId(null) + .startSnapshotId(newStartSnapshotId) + .endSnapshotId(newEndSnapshotId) + .asOfTimestamp(null) + .splitSize(splitSize) + .splitLookback(splitLookback) + .splitOpenFileCost(splitOpenFileCost) + .streaming(isStreaming) + .monitorInterval(monitorInterval) + .nameMapping(nameMapping) + .project(schema) + .filters(filters) + .limit(limit) + .build(); } - ScanContext filterRows(List filters) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filters, limit); + ScanContext copyWithSnapshotId(long newSnapshotId) { + return ScanContext.builder() + .caseSensitive(caseSensitive) + .useSnapshotId(newSnapshotId) + .startSnapshotId(null) + .endSnapshotId(null) + .asOfTimestamp(null) + .splitSize(splitSize) + .splitLookback(splitLookback) + .splitOpenFileCost(splitOpenFileCost) + .streaming(isStreaming) + .monitorInterval(monitorInterval) + .nameMapping(nameMapping) + .project(schema) + .filters(filters) + .limit(limit) + .build(); } - long limit() { - return limit; + static Builder builder() { + return new Builder(); } - ScanContext limit(Long newLimit) { - return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, newLimit); + static class Builder { + private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); + private Long snapshotId = SNAPSHOT_ID.defaultValue(); + private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); + private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue(); + private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); + private Long splitSize = SPLIT_SIZE.defaultValue(); + private Integer splitLookback = SPLIT_LOOKBACK.defaultValue(); + private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); + private boolean isStreaming = STREAMING.defaultValue(); + private Duration monitorInterval = MONITOR_INTERVAL.defaultValue(); + private String nameMapping; + private Schema projectedSchema; + private List filters; + private long limit = -1L; + + private Builder() { + } + + Builder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + + Builder useSnapshotId(Long newSnapshotId) { + this.snapshotId = newSnapshotId; + return this; + } + + Builder startSnapshotId(Long newStartSnapshotId) { + this.startSnapshotId = newStartSnapshotId; + return this; + } + + Builder endSnapshotId(Long newEndSnapshotId) { + this.endSnapshotId = newEndSnapshotId; + return this; + } + + Builder asOfTimestamp(Long newAsOfTimestamp) { + this.asOfTimestamp = newAsOfTimestamp; + return this; + } + + Builder splitSize(Long newSplitSize) { + this.splitSize = newSplitSize; + return this; + } + + Builder splitLookback(Integer newSplitLookback) { + this.splitLookback = newSplitLookback; + return this; + } + + Builder splitOpenFileCost(Long newSplitOpenFileCost) { + this.splitOpenFileCost = newSplitOpenFileCost; + return this; + } + + Builder streaming(boolean streaming) { + this.isStreaming = streaming; + return this; + } + + Builder monitorInterval(Duration newMonitorInterval) { + this.monitorInterval = newMonitorInterval; + return this; + } + + Builder nameMapping(String newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + Builder project(Schema newProjectedSchema) { + this.projectedSchema = newProjectedSchema; + return this; + } + + Builder filters(List newFilters) { + this.filters = newFilters; + return this; + } + + Builder limit(long newLimit) { + this.limit = newLimit; + return this; + } + + Builder fromProperties(Map properties) { + Configuration config = new Configuration(); + properties.forEach(config::setString); + + return this.useSnapshotId(config.get(SNAPSHOT_ID)) + .caseSensitive(config.get(CASE_SENSITIVE)) + .asOfTimestamp(config.get(AS_OF_TIMESTAMP)) + .startSnapshotId(config.get(START_SNAPSHOT_ID)) + .endSnapshotId(config.get(END_SNAPSHOT_ID)) + .splitSize(config.get(SPLIT_SIZE)) + .splitLookback(config.get(SPLIT_LOOKBACK)) + .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST)) + .streaming(config.get(STREAMING)) + .monitorInterval(config.get(MONITOR_INTERVAL)) + .nameMapping(properties.get(DEFAULT_NAME_MAPPING)); + } + + public ScanContext build() { + return new ScanContext(caseSensitive, snapshotId, startSnapshotId, + endSnapshotId, asOfTimestamp, splitSize, splitLookback, + splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, + filters, limit); + } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java new file mode 100644 index 000000000000..b31426a099f0 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.SnapshotUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the single (non-parallel) monitoring task which takes a {@link FlinkInputFormat}, + * it is responsible for: + * + *
    + *
  1. Monitoring snapshots of the Iceberg table.
  2. + *
  3. Creating the {@link FlinkInputSplit splits} corresponding to the incremental files
  4. + *
  5. Assigning them to downstream tasks for further processing.
  6. + *
+ * + *

The splits to be read are forwarded to the downstream {@link StreamingReaderOperator} + * which can have parallelism greater than one. + */ +public class StreamingMonitorFunction extends RichSourceFunction implements CheckpointedFunction { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class); + + private static final long INIT_LAST_SNAPSHOT_ID = -1L; + + private final TableLoader tableLoader; + private final ScanContext scanContext; + + private volatile boolean isRunning = true; + + // The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to + // mark this as volatile. + private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID; + + private transient SourceContext sourceContext; + private transient Table table; + private transient ListState lastSnapshotIdState; + + public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) { + Preconditions.checkArgument(scanContext.snapshotId() == null, + "Cannot set snapshot-id option for streaming reader"); + Preconditions.checkArgument(scanContext.asOfTimestamp() == null, + "Cannot set as-of-timestamp option for streaming reader"); + Preconditions.checkArgument(scanContext.endSnapshotId() == null, + "Cannot set end-snapshot-id option for streaming reader"); + this.tableLoader = tableLoader; + this.scanContext = scanContext; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // Load iceberg table from table loader. + tableLoader.open(); + table = tableLoader.loadTable(); + + // Initialize the flink state for last snapshot id. + lastSnapshotIdState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>( + "snapshot-id-state", + LongSerializer.INSTANCE)); + + // Restore the last-snapshot-id from flink's state if possible. + if (context.isRestored()) { + LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + lastSnapshotId = lastSnapshotIdState.get().iterator().next(); + } else if (scanContext.startSnapshotId() != null) { + Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table."); + + long currentSnapshotId = table.currentSnapshot().snapshotId(); + Preconditions.checkState(SnapshotUtil.ancestorOf(table, currentSnapshotId, scanContext.startSnapshotId()), + "The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId()); + + lastSnapshotId = scanContext.startSnapshotId(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + lastSnapshotIdState.clear(); + lastSnapshotIdState.add(lastSnapshotId); + } + + @Override + public void run(SourceContext ctx) throws Exception { + this.sourceContext = ctx; + while (isRunning) { + synchronized (sourceContext.getCheckpointLock()) { + if (isRunning) { + monitorAndForwardSplits(); + } + } + Thread.sleep(scanContext.monitorInterval().toMillis()); + } + } + + private void monitorAndForwardSplits() { + // Refresh the table to get the latest committed snapshot. + table.refresh(); + + Snapshot snapshot = table.currentSnapshot(); + if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { + long snapshotId = snapshot.snapshotId(); + + ScanContext newScanContext; + if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) { + newScanContext = scanContext.copyWithSnapshotId(snapshotId); + } else { + newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId); + } + + FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext); + for (FlinkInputSplit split : splits) { + sourceContext.collect(split); + } + + lastSnapshotId = snapshotId; + } + } + + @Override + public void cancel() { + // this is to cover the case where cancel() is called before the run() + if (sourceContext != null) { + synchronized (sourceContext.getCheckpointLock()) { + isRunning = false; + } + } else { + isRunning = false; + } + + // Release all the resources here. + if (tableLoader != null) { + try { + tableLoader.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void close() { + cancel(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java new file mode 100644 index 000000000000..235b17332f5d --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.Queue; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.state.JavaSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.MailboxExecutor; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.StreamSourceContexts; +import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The operator that reads the {@link FlinkInputSplit splits} received from the preceding {@link + * StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} which has a parallelism of 1, + * this operator can have multiple parallelism. + * + *

As soon as a split descriptor is received, it is put in a queue, and use {@link MailboxExecutor} + * read the actual data of the split. This architecture allows the separation of the reading thread from the one split + * processing the checkpoint barriers, thus removing any potential back-pressure. + */ +public class StreamingReaderOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class); + + // It's the same thread that is running this operator and checkpoint actions. we use this executor to schedule only + // one split for future reading, so that a new checkpoint could be triggered without blocking long time for exhausting + // all scheduled splits. + private final MailboxExecutor executor; + private FlinkInputFormat format; + + private transient SourceFunction.SourceContext sourceContext; + + private transient ListState inputSplitsState; + private transient Queue splits; + + // Splits are read by the same thread that calls processElement. Each read task is submitted to that thread by adding + // them to the executor. This state is used to ensure that only one read task is in that queue at a time, so that read + // tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING. + // When there are no more files to read, this will be set to IDLE. + private transient SplitState currentSplitState; + + private StreamingReaderOperator(FlinkInputFormat format, ProcessingTimeService timeService, + MailboxExecutor mailboxExecutor) { + this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null."); + this.processingTimeService = timeService; + this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null."); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + // TODO Replace Java serialization with Avro approach to keep state compatibility. + // See issue: https://github.com/apache/iceberg/issues/1698 + inputSplitsState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("splits", new JavaSerializer<>())); + + // Initialize the current split state to IDLE. + currentSplitState = SplitState.IDLE; + + // Recover splits state from flink state backend if possible. + splits = Lists.newLinkedList(); + if (context.isRestored()) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Restoring state for the {} (taskIdx: {}).", getClass().getSimpleName(), subtaskIdx); + + for (FlinkInputSplit split : inputSplitsState.get()) { + splits.add(split); + } + } + + this.sourceContext = StreamSourceContexts.getSourceContext( + getOperatorConfig().getTimeCharacteristic(), + getProcessingTimeService(), + new Object(), // no actual locking needed + getContainingTask().getStreamStatusMaintainer(), + output, + getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), + -1); + + // Enqueue to process the recovered input splits. + enqueueProcessSplits(); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + inputSplitsState.clear(); + inputSplitsState.addAll(Lists.newArrayList(splits)); + } + + @Override + public void processElement(StreamRecord element) { + splits.add(element.getValue()); + enqueueProcessSplits(); + } + + private void enqueueProcessSplits() { + if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) { + currentSplitState = SplitState.RUNNING; + executor.execute(this::processSplits, this.getClass().getSimpleName()); + } + } + + private void processSplits() throws IOException { + FlinkInputSplit split = splits.poll(); + if (split == null) { + currentSplitState = SplitState.IDLE; + return; + } + + format.open(split); + try { + RowData nextElement = null; + while (!format.reachedEnd()) { + nextElement = format.nextRecord(nextElement); + sourceContext.collect(nextElement); + } + } finally { + currentSplitState = SplitState.IDLE; + format.close(); + } + + // Re-schedule to process the next split. + enqueueProcessSplits(); + } + + @Override + public void processWatermark(Watermark mark) { + // we do nothing because we emit our own watermarks if needed. + } + + @Override + public void dispose() throws Exception { + super.dispose(); + + if (format != null) { + format.close(); + format.closeInputFormat(); + format = null; + } + + sourceContext = null; + } + + @Override + public void close() throws Exception { + super.close(); + output.close(); + if (sourceContext != null) { + sourceContext.emitWatermark(Watermark.MAX_WATERMARK); + sourceContext.close(); + sourceContext = null; + } + } + + static OneInputStreamOperatorFactory factory(FlinkInputFormat format) { + return new OperatorFactory(format); + } + + private enum SplitState { + IDLE, RUNNING + } + + private static class OperatorFactory extends AbstractStreamOperatorFactory + implements YieldingOperatorFactory, OneInputStreamOperatorFactory { + + private final FlinkInputFormat format; + + private transient MailboxExecutor mailboxExecutor; + + private OperatorFactory(FlinkInputFormat format) { + this.format = format; + } + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + this.mailboxExecutor = mailboxExecutor; + } + + @SuppressWarnings("unchecked") + @Override + public > O createStreamOperator(StreamOperatorParameters parameters) { + StreamingReaderOperator operator = new StreamingReaderOperator(format, processingTimeService, mailboxExecutor); + operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + return (O) operator; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StreamingReaderOperator.class; + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java index 6782267fdd86..8302be306f9a 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java @@ -72,8 +72,17 @@ protected TableEnvironment getTableEnv() { return tEnv; } + protected static TableResult exec(TableEnvironment env, String query, Object... args) { + return env.executeSql(String.format(query, args)); + } + + protected TableResult exec(String query, Object... args) { + return exec(getTableEnv(), query, args); + } + protected List sql(String query, Object... args) { - TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); + TableResult tableResult = exec(String.format(query, args)); + tableResult.getJobClient().ifPresent(c -> { try { c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); @@ -81,12 +90,17 @@ protected List sql(String query, Object... args) { throw new RuntimeException(e); } }); - CloseableIterator iter = tableResult.collect(); + List results = Lists.newArrayList(); - while (iter.hasNext()) { - Row row = iter.next(); - results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new)); + try (CloseableIterator iter = tableResult.collect()) { + while (iter.hasNext()) { + Row row = iter.next(); + results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new)); + } + } catch (Exception e) { + throw new RuntimeException(e); } + return results; } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java index f3df4283f781..5f7ae29ec737 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java @@ -26,6 +26,10 @@ public class TestTableLoader implements TableLoader { private File dir; + public static TableLoader of(String dir) { + return new TestTableLoader(dir); + } + public TestTableLoader(String dir) { this.dir = new File(dir); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java new file mode 100644 index 000000000000..91ce98382c9d --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestStreamScanSql extends FlinkCatalogTestBase { + private static final String TABLE = "test_table"; + private static final FileFormat FORMAT = FileFormat.PARQUET; + + private TableEnvironment tEnv; + + public TestStreamScanSql(String catalogName, Namespace baseNamespace) { + super(catalogName, baseNamespace); + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + if (tEnv == null) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(400); + + StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + streamTableEnv.getConfig() + .getConfiguration() + .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + tEnv = streamTableEnv; + } + } + } + return tEnv; + } + + @Before + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @After + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE); + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + private void insertRows(String partition, Table table, Row... rows) throws IOException { + GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, TEMPORARY_FOLDER); + + GenericRecord gRecord = GenericRecord.create(table.schema()); + List records = Lists.newArrayList(); + for (Row row : rows) { + records.add(gRecord.copy( + "id", row.getField(0), + "data", row.getField(1), + "dt", row.getField(2) + )); + } + + if (partition != null) { + appender.appendToTable(TestHelpers.Row.of(partition, 0), records); + } else { + appender.appendToTable(records); + } + } + + private void insertRows(Table table, Row... rows) throws IOException { + insertRows(null, table, rows); + } + + private void assertRows(List expectedRows, Iterator iterator) { + for (Row expectedRow : expectedRows) { + Assert.assertTrue("Should have more records", iterator.hasNext()); + + Row actualRow = iterator.next(); + Assert.assertEquals("Should have expected fields", 3, actualRow.getArity()); + Assert.assertEquals("Should have expected id", expectedRow.getField(0), actualRow.getField(0)); + Assert.assertEquals("Should have expected data", expectedRow.getField(1), actualRow.getField(1)); + Assert.assertEquals("Should have expected dt", expectedRow.getField(2), actualRow.getField(2)); + } + } + + @Test + public void testUnPartitionedTable() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + try (CloseableIterator iterator = result.collect()) { + + Row row1 = Row.of(1, "aaa", "2021-01-01"); + insertRows(table, row1); + assertRows(ImmutableList.of(row1), iterator); + + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row2); + assertRows(ImmutableList.of(row2), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + + @Test + public void testPartitionedTable() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + try (CloseableIterator iterator = result.collect()) { + Row row1 = Row.of(1, "aaa", "2021-01-01"); + insertRows("2021-01-01", table, row1); + assertRows(ImmutableList.of(row1), iterator); + + Row row2 = Row.of(2, "bbb", "2021-01-02"); + insertRows("2021-01-02", table, row2); + assertRows(ImmutableList.of(row2), iterator); + + Row row3 = Row.of(1, "aaa", "2021-01-02"); + insertRows("2021-01-02", table, row3); + assertRows(ImmutableList.of(row3), iterator); + + Row row4 = Row.of(2, "bbb", "2021-01-01"); + insertRows("2021-01-01", table, row4); + assertRows(ImmutableList.of(row4), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @Test + public void testConsumeFromBeginning() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); + + TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + try (CloseableIterator iterator = result.collect()) { + assertRows(ImmutableList.of(row1, row2), iterator); + + Row row3 = Row.of(3, "ccc", "2021-01-01"); + insertRows(table, row3); + assertRows(ImmutableList.of(row3), iterator); + + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRows(table, row4); + assertRows(ImmutableList.of(row4), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @Test + public void testConsumeFromStartSnapshotId() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots. + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1); + insertRows(table, row2); + + long startSnapshotId = table.currentSnapshot().snapshotId(); + + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRows(table, row3, row4); + + TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', " + + "'start-snapshot-id'='%d')*/", TABLE, startSnapshotId); + try (CloseableIterator iterator = result.collect()) { + // The row2 in start snapshot will be excluded. + assertRows(ImmutableList.of(row3, row4), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java new file mode 100644 index 000000000000..dcd41dcb97c5 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestTableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestStreamingMonitorFunction extends TableTestBase { + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()) + ); + private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET; + private static final long WAIT_TIME_MILLIS = 10 * 1000L; + + @Parameterized.Parameters(name = "FormatVersion={0}") + public static Iterable parameters() { + return ImmutableList.of( + new Object[] {1}, + new Object[] {2} + ); + } + + public TestStreamingMonitorFunction(int formatVersion) { + super(formatVersion); + } + + @Before + @Override + public void setupTable() throws IOException { + this.tableDir = temp.newFolder(); + this.metadataDir = new File(tableDir, "metadata"); + Assert.assertTrue(tableDir.delete()); + + // Construct the iceberg table. + table = create(SCHEMA, PartitionSpec.unpartitioned()); + } + + private void runSourceFunctionInTask(TestSourceContext sourceContext, StreamingMonitorFunction function) { + Thread task = new Thread(() -> { + try { + function.run(sourceContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + task.start(); + } + + @Test + public void testConsumeWithoutStartSnapshotId() throws Exception { + List> recordsList = generateRecordsAndCommitTxn(10); + ScanContext scanContext = ScanContext.builder() + .monitorInterval(Duration.ofMillis(100)) + .build(); + + StreamingMonitorFunction function = createFunction(scanContext); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, function); + + Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + // Stop the stream task. + function.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + } + } + + @Test + public void testConsumeFromStartSnapshotId() throws Exception { + // Commit the first five transactions. + generateRecordsAndCommitTxn(5); + long startSnapshotId = table.currentSnapshot().snapshotId(); + + // Commit the next five transactions. + List> recordsList = generateRecordsAndCommitTxn(5); + + ScanContext scanContext = ScanContext.builder() + .monitorInterval(Duration.ofMillis(100)) + .startSnapshotId(startSnapshotId) + .build(); + + StreamingMonitorFunction function = createFunction(scanContext); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, function); + + Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + // Stop the stream task. + function.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + } + } + + @Test + public void testCheckpointRestore() throws Exception { + List> recordsList = generateRecordsAndCommitTxn(10); + ScanContext scanContext = ScanContext.builder() + .monitorInterval(Duration.ofMillis(100)) + .build(); + + StreamingMonitorFunction func = createFunction(scanContext); + OperatorSubtaskState state; + try (AbstractStreamOperatorTestHarness harness = createHarness(func)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, func); + + Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + state = harness.snapshot(1, 1); + + // Stop the stream task. + func.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + } + + List> newRecordsList = generateRecordsAndCommitTxn(10); + StreamingMonitorFunction newFunc = createFunction(scanContext); + try (AbstractStreamOperatorTestHarness harness = createHarness(newFunc)) { + harness.setup(); + // Recover to process the remaining snapshots. + harness.initializeState(state); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, newFunc); + + Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + // Stop the stream task. + newFunc.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA); + } + } + + private List> generateRecordsAndCommitTxn(int commitTimes) throws IOException { + List> expectedRecords = Lists.newArrayList(); + for (int i = 0; i < commitTimes; i++) { + List records = RandomGenericData.generate(SCHEMA, 100, 0L); + expectedRecords.add(records); + + // Commit those records to iceberg table. + writeRecords(records); + } + return expectedRecords; + } + + private void writeRecords(List records) throws IOException { + GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp); + appender.appendToTable(records); + } + + private StreamingMonitorFunction createFunction(ScanContext scanContext) { + return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext); + } + + private AbstractStreamOperatorTestHarness createHarness(StreamingMonitorFunction function) + throws Exception { + StreamSource streamSource = new StreamSource<>(function); + return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0); + } + + private class TestSourceContext implements SourceFunction.SourceContext { + private final List splits = Lists.newArrayList(); + private final Object checkpointLock = new Object(); + private final CountDownLatch latch; + + TestSourceContext(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void collect(FlinkInputSplit element) { + splits.add(element); + latch.countDown(); + } + + @Override + public void collectWithTimestamp(FlinkInputSplit element, long timestamp) { + collect(element); + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public void markAsTemporarilyIdle() { + + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() { + + } + + private List toRows() throws IOException { + FlinkInputFormat format = FlinkSource.forRowData() + .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath())) + .buildFormat(); + + List rows = Lists.newArrayList(); + for (FlinkInputSplit split : splits) { + format.open(split); + + RowData element = null; + try { + while (!format.reachedEnd()) { + element = format.nextRecord(element); + rows.add(Row.of(element.getInt(0), element.getString(1).toString())); + } + } finally { + format.close(); + } + } + + return rows; + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java new file mode 100644 index 000000000000..112f02167674 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestTableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestStreamingReaderOperator extends TableTestBase { + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()) + ); + private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET; + + @Parameterized.Parameters(name = "FormatVersion={0}") + public static Iterable parameters() { + return ImmutableList.of( + new Object[] {1}, + new Object[] {2} + ); + } + + public TestStreamingReaderOperator(int formatVersion) { + super(formatVersion); + } + + @Before + @Override + public void setupTable() throws IOException { + this.tableDir = temp.newFolder(); + this.metadataDir = new File(tableDir, "metadata"); + Assert.assertTrue(tableDir.delete()); + + // Construct the iceberg table. + table = create(SCHEMA, PartitionSpec.unpartitioned()); + } + + @Test + public void testProcessAllRecords() throws Exception { + List> expectedRecords = generateRecordsAndCommitTxn(10); + + List splits = generateSplits(); + Assert.assertEquals("Should have 10 splits", 10, splits.size()); + + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + + List expected = Lists.newArrayList(); + for (int i = 0; i < splits.size(); i++) { + // Process this element to enqueue to mail-box. + harness.processElement(splits.get(i), -1); + + // Run the mail-box once to read all records from the given split. + Assert.assertTrue("Should processed 1 split", processor.runMailboxStep()); + + // Assert the output has expected elements. + expected.addAll(expectedRecords.get(i)); + TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + } + } + } + + @Test + public void testTriggerCheckpoint() throws Exception { + // Received emitted splits: split1, split2, split3, checkpoint request is triggered when reading records from + // split1. + List> expectedRecords = generateRecordsAndCommitTxn(3); + + List splits = generateSplits(); + Assert.assertEquals("Should have 3 splits", 3, splits.size()); + + long timestamp = 0; + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + + harness.processElement(splits.get(0), ++timestamp); + harness.processElement(splits.get(1), ++timestamp); + harness.processElement(splits.get(2), ++timestamp); + + // Trigger snapshot state, it will start to work once all records from split0 are read. + processor.getMainMailboxExecutor() + .execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); + + Assert.assertTrue("Should have processed the split0", processor.runMailboxStep()); + Assert.assertTrue("Should have processed the snapshot state action", processor.runMailboxStep()); + + TestFlinkScan.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA); + + // Read records from split1. + Assert.assertTrue("Should have processed the split1", processor.runMailboxStep()); + + // Read records from split2. + Assert.assertTrue("Should have processed the split2", processor.runMailboxStep()); + + TestFlinkScan.assertRecords(readOutputValues(harness), + Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA); + } + } + + @Test + public void testCheckpointRestore() throws Exception { + List> expectedRecords = generateRecordsAndCommitTxn(15); + + List splits = generateSplits(); + Assert.assertEquals("Should have 10 splits", 15, splits.size()); + + OperatorSubtaskState state; + List expected = Lists.newArrayList(); + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + // Enqueue all the splits. + for (FlinkInputSplit split : splits) { + harness.processElement(split, -1); + } + + // Read all records from the first five splits. + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + for (int i = 0; i < 5; i++) { + expected.addAll(expectedRecords.get(i)); + Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + + TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + } + + // Snapshot state now, there're 10 splits left in the state. + state = harness.snapshot(1, 1); + } + + expected.clear(); + try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + // Recover to process the remaining splits. + harness.initializeState(state); + harness.open(); + + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + + for (int i = 5; i < 10; i++) { + expected.addAll(expectedRecords.get(i)); + Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep()); + + TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + } + + // Let's process the final 5 splits now. + for (int i = 10; i < 15; i++) { + expected.addAll(expectedRecords.get(i)); + harness.processElement(splits.get(i), 1); + + Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep()); + TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA); + } + } + } + + private List readOutputValues(OneInputStreamOperatorTestHarness harness) { + List results = Lists.newArrayList(); + for (RowData rowData : harness.extractOutputValues()) { + results.add(Row.of(rowData.getInt(0), rowData.getString(1).toString())); + } + return results; + } + + private List> generateRecordsAndCommitTxn(int commitTimes) throws IOException { + List> expectedRecords = Lists.newArrayList(); + for (int i = 0; i < commitTimes; i++) { + List records = RandomGenericData.generate(SCHEMA, 100, 0L); + expectedRecords.add(records); + + // Commit those records to iceberg table. + writeRecords(records); + } + return expectedRecords; + } + + private void writeRecords(List records) throws IOException { + GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp); + appender.appendToTable(records); + } + + private List generateSplits() { + List inputSplits = Lists.newArrayList(); + + List snapshotIds = SnapshotUtil.currentAncestors(table); + for (int i = snapshotIds.size() - 1; i >= 0; i--) { + ScanContext scanContext; + if (i == snapshotIds.size() - 1) { + // Generate the splits from the first snapshot. + scanContext = ScanContext.builder() + .useSnapshotId(snapshotIds.get(i)) + .build(); + } else { + // Generate the splits between the previous snapshot and current snapshot. + scanContext = ScanContext.builder() + .startSnapshotId(snapshotIds.get(i + 1)) + .endSnapshotId(snapshotIds.get(i)) + .build(); + } + + Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext)); + } + + return inputSplits; + } + + private OneInputStreamOperatorTestHarness createReader() throws Exception { + // This input format is used to opening the emitted split. + FlinkInputFormat inputFormat = FlinkSource.forRowData() + .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath())) + .buildFormat(); + + OneInputStreamOperatorFactory factory = StreamingReaderOperator.factory(inputFormat); + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( + factory, 1, 1, 0); + harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + return harness; + } + + private SteppingMailboxProcessor createLocalMailbox( + OneInputStreamOperatorTestHarness harness) { + return new SteppingMailboxProcessor( + MailboxDefaultAction.Controller::suspendDefaultAction, + harness.getTaskMailbox(), + StreamTaskActionExecutor.IMMEDIATE); + } +}