diff --git a/flink-runtime/build.gradle b/flink-runtime/build.gradle new file mode 100644 index 000000000000..c8a0a8e87487 --- /dev/null +++ b/flink-runtime/build.gradle @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +project(':iceberg-flink-runtime') { + apply plugin: 'com.github.johnrengelman.shadow' + + tasks.jar.dependsOn tasks.shadowJar + + configurations { + implementation { + exclude group: 'org.apache.flink' + // included in Flink + exclude group: 'org.slf4j' + exclude group: 'org.apache.commons' + exclude group: 'commons-pool' + exclude group: 'commons-codec' + exclude group: 'org.xerial.snappy' + exclude group: 'javax.xml.bind' + exclude group: 'javax.annotation' + } + } + + dependencies { + implementation project(':iceberg-flink') + implementation project(':iceberg-aws') + implementation(project(':iceberg-nessie')) { + exclude group: 'com.google.code.findbugs', module: 'jsr305' + } + + // flink-connector-base is not part of Flink runtime. Hence, + // iceberg-flink-runtime should include it as a transitive dependency. + implementation "org.apache.flink:flink-connector-base" + } + + shadowJar { + configurations = [project.configurations.runtimeClasspath] + + zip64 true + + // include the LICENSE and NOTICE files for the shaded Jar + from(projectDir) { + include 'LICENSE' + include 'NOTICE' + } + + // Relocate dependencies to avoid conflicts + relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' + relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' + relocate 'com.google', 'org.apache.iceberg.shaded.com.google' + relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml' + relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes' + relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework' + relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' + relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' + relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' + relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + + classifier null + } + + jar { + enabled = false + } +} + diff --git a/flink/build.gradle b/flink/build.gradle index bbf65177bc3f..936d5bec0e46 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -28,6 +28,7 @@ project(':iceberg-flink') { implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') + compileOnly "org.apache.flink:flink-connector-base" compileOnly "org.apache.flink:flink-streaming-java_2.12" compileOnly "org.apache.flink:flink-streaming-java_2.12::tests" compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12" @@ -56,6 +57,7 @@ project(':iceberg-flink') { exclude group: 'org.apache.hive', module: 'hive-storage-api' } + testImplementation "org.apache.flink:flink-connector-test-utils" testImplementation "org.apache.flink:flink-core" testImplementation "org.apache.flink:flink-runtime_2.12" testImplementation "org.apache.flink:flink-table-planner-blink_2.12" diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 067abe8a6e41..ebad99951b77 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -40,4 +40,10 @@ private FlinkConfigOptions() { .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator."); + + public static final ConfigOption SOURCE_READER_FETCH_RECORD_BATCH_SIZE = ConfigOptions + .key("source.iceberg.reader.fetch-record-batch-size") + .intType() + .defaultValue(2048) + .withDescription("The target number of records for Iceberg reader fetch batch."); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index d470b0752304..29472ccb4341 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -29,6 +29,7 @@ import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * Flink data iterator that reads {@link CombinedScanTask} into a {@link CloseableIterator} @@ -41,7 +42,10 @@ public class DataIterator implements CloseableIterator { private final FileScanTaskReader fileScanTaskReader; private final InputFilesDecryptor inputFilesDecryptor; - private Iterator tasks; + private final CombinedScanTask combinedTask; + private final Position position; + + private Iterator fileTasksIterator; private CloseableIterator currentIterator; public DataIterator(FileScanTaskReader fileScanTaskReader, CombinedScanTask task, @@ -49,10 +53,36 @@ public DataIterator(FileScanTaskReader fileScanTaskReader, CombinedScanTask t this.fileScanTaskReader = fileScanTaskReader; this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption); - this.tasks = task.files().iterator(); + this.combinedTask = task; + // fileOffset starts at -1 because we started + // from an empty iterator that is not from the split files. + this.position = new Position(-1, 0L); + + this.fileTasksIterator = task.files().iterator(); this.currentIterator = CloseableIterator.empty(); } + public void seek(Position startingPosition) { + // skip files + Preconditions.checkArgument(startingPosition.fileOffset() < combinedTask.files().size(), + "Checkpointed file offset is %d, while CombinedScanTask has %d files", + startingPosition.fileOffset(), combinedTask.files().size()); + for (long i = 0L; i < startingPosition.fileOffset(); ++i) { + fileTasksIterator.next(); + } + updateCurrentIterator(); + // skip records within the file + for (long i = 0; i < startingPosition.recordOffset(); ++i) { + if (hasNext()) { + next(); + } else { + throw new IllegalStateException("Not enough records to skip: " + + startingPosition.recordOffset()); + } + } + this.position.update(startingPosition.fileOffset(), startingPosition.recordOffset()); + } + @Override public boolean hasNext() { updateCurrentIterator(); @@ -62,18 +92,24 @@ public boolean hasNext() { @Override public T next() { updateCurrentIterator(); + position.advanceRecord(); return currentIterator.next(); } + public boolean isCurrentIteratorDone() { + return !currentIterator.hasNext(); + } + /** * Updates the current iterator field to ensure that the current Iterator * is not exhausted. */ private void updateCurrentIterator() { try { - while (!currentIterator.hasNext() && tasks.hasNext()) { + while (!currentIterator.hasNext() && fileTasksIterator.hasNext()) { currentIterator.close(); - currentIterator = openTaskIterator(tasks.next()); + currentIterator = openTaskIterator(fileTasksIterator.next()); + position.advanceFile(); } } catch (IOException e) { throw new UncheckedIOException(e); @@ -88,6 +124,10 @@ private CloseableIterator openTaskIterator(FileScanTask scanTask) { public void close() throws IOException { // close the current iterator currentIterator.close(); - tasks = null; + fileTasksIterator = null; + } + + public Position position() { + return position; } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 8b757ac31606..a4cbab5c37e4 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException tableLoader.open(); try (TableLoader loader = tableLoader) { Table table = loader.loadTable(); - return FlinkSplitGenerator.createInputSplits(table, context); + return FlinkSplitPlanner.planInputSplits(table, context); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java similarity index 63% rename from flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java rename to flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index f495e0909b7e..ef0f71c05a67 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -22,33 +22,58 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class FlinkSplitGenerator { - private FlinkSplitGenerator() { +@Internal +public class FlinkSplitPlanner { + private FlinkSplitPlanner() { } - static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) { - List tasks = tasks(table, context); - FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; - for (int i = 0; i < tasks.size(); i++) { - splits[i] = new FlinkInputSplit(i, tasks.get(i)); + static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) { + try (CloseableIterable tasksIterable = planTasks(table, context)) { + List tasks = Lists.newArrayList(tasksIterable); + FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) { + splits[i] = new FlinkInputSplit(i, tasks.get(i)); + } + return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to process tasks iterable", e); + } + } + + /** + * This returns splits for the FLIP-27 source + */ + public static List planIcebergSourceSplits( + Table table, ScanContext context) { + try (CloseableIterable tasksIterable = planTasks(table, context)) { + List splits = Lists.newArrayList(); + tasksIterable.forEach(task -> splits.add(IcebergSourceSplit.fromCombinedScanTask(task))); + return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to process task iterable: ", e); } - return splits; } - private static List tasks(Table table, ScanContext context) { + static CloseableIterable planTasks(Table table, ScanContext context) { TableScan scan = table .newScan() .caseSensitive(context.caseSensitive()) .project(context.project()); + if (context.includeColumnStats()) { + scan = scan.includeColumnStats(); + } + if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); } @@ -83,10 +108,6 @@ private static List tasks(Table table, ScanContext context) { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { - return Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close table scan: " + scan, e); - } + return scan.planTasks(); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/Position.java b/flink/src/main/java/org/apache/iceberg/flink/source/Position.java new file mode 100644 index 000000000000..0e7acecfac7e --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/Position.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.Serializable; +import java.util.Objects; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** + * A mutable class that defines the read position + *
    + *
  • file offset in the list of files in a {@link CombinedScanTask}
  • + *
  • record offset within a file
  • + *
+ */ +@Internal +public class Position implements Serializable { + + private static final long serialVersionUID = 1L; + + private int fileOffset; + private long recordOffset; + + public Position(int fileOffset, long recordOffset) { + this.fileOffset = fileOffset; + this.recordOffset = recordOffset; + } + + void advanceFile() { + this.fileOffset += 1; + this.recordOffset = 0L; + } + + void advanceRecord() { + this.recordOffset += 1L; + } + + public void update(int newFileOffset, long newRecordOffset) { + this.fileOffset = newFileOffset; + this.recordOffset = newRecordOffset; + } + + public int fileOffset() { + return fileOffset; + } + + public long recordOffset() { + return recordOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Position that = (Position) o; + return Objects.equals(fileOffset, that.fileOffset) && + Objects.equals(recordOffset, that.recordOffset); + } + + @Override + public int hashCode() { + return Objects.hash(fileOffset, recordOffset); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("fileOffset", fileOffset) + .add("recordOffset", recordOffset) + .toString(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 2896efb39655..73a31930cd35 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -34,7 +34,7 @@ /** * Context object with optional arguments for a Flink Scan. */ -class ScanContext implements Serializable { +public class ScanContext implements Serializable { private static final long serialVersionUID = 1L; @@ -68,6 +68,9 @@ class ScanContext implements Serializable { private static final ConfigOption MONITOR_INTERVAL = ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10)); + private static final ConfigOption INCLUDE_COLUMN_STATS = + ConfigOptions.key("include-column-stats").booleanType().defaultValue(false); + private final boolean caseSensitive; private final Long snapshotId; private final Long startSnapshotId; @@ -83,11 +86,12 @@ class ScanContext implements Serializable { private final Schema schema; private final List filters; private final long limit; + private final boolean includeColumnStats; private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming, Duration monitorInterval, String nameMapping, - Schema schema, List filters, long limit) { + Schema schema, List filters, long limit, boolean includeColumnStats) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; this.startSnapshotId = startSnapshotId; @@ -103,65 +107,70 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.schema = schema; this.filters = filters; this.limit = limit; + this.includeColumnStats = includeColumnStats; } - boolean caseSensitive() { + public boolean caseSensitive() { return caseSensitive; } - Long snapshotId() { + public Long snapshotId() { return snapshotId; } - Long startSnapshotId() { + public Long startSnapshotId() { return startSnapshotId; } - Long endSnapshotId() { + public Long endSnapshotId() { return endSnapshotId; } - Long asOfTimestamp() { + public Long asOfTimestamp() { return asOfTimestamp; } - Long splitSize() { + public Long splitSize() { return splitSize; } - Integer splitLookback() { + public Integer splitLookback() { return splitLookback; } - Long splitOpenFileCost() { + public Long splitOpenFileCost() { return splitOpenFileCost; } - boolean isStreaming() { + public boolean isStreaming() { return isStreaming; } - Duration monitorInterval() { + public Duration monitorInterval() { return monitorInterval; } - String nameMapping() { + public String nameMapping() { return nameMapping; } - Schema project() { + public Schema project() { return schema; } - List filters() { + public List filters() { return filters; } - long limit() { + public long limit() { return limit; } - ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { + public boolean includeColumnStats() { + return includeColumnStats; + } + + public ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(null) @@ -177,10 +186,11 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .project(schema) .filters(filters) .limit(limit) + .includeColumnStats(includeColumnStats) .build(); } - ScanContext copyWithSnapshotId(long newSnapshotId) { + public ScanContext copyWithSnapshotId(long newSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(newSnapshotId) @@ -196,14 +206,15 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .project(schema) .filters(filters) .limit(limit) + .includeColumnStats(includeColumnStats) .build(); } - static Builder builder() { + public static Builder builder() { return new Builder(); } - static class Builder { + public static class Builder { private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); private Long snapshotId = SNAPSHOT_ID.defaultValue(); private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); @@ -218,81 +229,87 @@ static class Builder { private Schema projectedSchema; private List filters; private long limit = -1L; + private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue(); private Builder() { } - Builder caseSensitive(boolean newCaseSensitive) { + public Builder caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; return this; } - Builder useSnapshotId(Long newSnapshotId) { + public Builder useSnapshotId(Long newSnapshotId) { this.snapshotId = newSnapshotId; return this; } - Builder startSnapshotId(Long newStartSnapshotId) { + public Builder startSnapshotId(Long newStartSnapshotId) { this.startSnapshotId = newStartSnapshotId; return this; } - Builder endSnapshotId(Long newEndSnapshotId) { + public Builder endSnapshotId(Long newEndSnapshotId) { this.endSnapshotId = newEndSnapshotId; return this; } - Builder asOfTimestamp(Long newAsOfTimestamp) { + public Builder asOfTimestamp(Long newAsOfTimestamp) { this.asOfTimestamp = newAsOfTimestamp; return this; } - Builder splitSize(Long newSplitSize) { + public Builder splitSize(Long newSplitSize) { this.splitSize = newSplitSize; return this; } - Builder splitLookback(Integer newSplitLookback) { + public Builder splitLookback(Integer newSplitLookback) { this.splitLookback = newSplitLookback; return this; } - Builder splitOpenFileCost(Long newSplitOpenFileCost) { + public Builder splitOpenFileCost(Long newSplitOpenFileCost) { this.splitOpenFileCost = newSplitOpenFileCost; return this; } - Builder streaming(boolean streaming) { + public Builder streaming(boolean streaming) { this.isStreaming = streaming; return this; } - Builder monitorInterval(Duration newMonitorInterval) { + public Builder monitorInterval(Duration newMonitorInterval) { this.monitorInterval = newMonitorInterval; return this; } - Builder nameMapping(String newNameMapping) { + public Builder nameMapping(String newNameMapping) { this.nameMapping = newNameMapping; return this; } - Builder project(Schema newProjectedSchema) { + public Builder project(Schema newProjectedSchema) { this.projectedSchema = newProjectedSchema; return this; } - Builder filters(List newFilters) { + public Builder filters(List newFilters) { this.filters = newFilters; return this; } - Builder limit(long newLimit) { + public Builder limit(long newLimit) { this.limit = newLimit; return this; } - Builder fromProperties(Map properties) { + public Builder includeColumnStats(boolean newIncludeColumnStats) { + this.includeColumnStats = newIncludeColumnStats; + return this; + } + + public Builder fromProperties(Map properties) { Configuration config = new Configuration(); properties.forEach(config::setString); @@ -306,14 +323,15 @@ Builder fromProperties(Map properties) { .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST)) .streaming(config.get(STREAMING)) .monitorInterval(config.get(MONITOR_INTERVAL)) - .nameMapping(properties.get(DEFAULT_NAME_MAPPING)); + .nameMapping(properties.get(DEFAULT_NAME_MAPPING)) + .includeColumnStats(config.get(INCLUDE_COLUMN_STATS)); } public ScanContext build() { return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, - filters, limit); + filters, limit, includeColumnStats); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index b31426a099f0..7913a18bde9c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() { newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId); } - FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext); + FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext); for (FlinkInputSplit split : splits) { sourceContext.collect(split); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java new file mode 100644 index 000000000000..19a6f5f437a6 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.file.src.util.Pool; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.io.CloseableIterator; + +/** + * FLIP-27's {@link SplitReader#fetch()} returns batched {@link RecordsWithSplitIds} + * {@link DataIterator} can return reused object, like {@code RowData}. In order to + * work with batched fetch API, we need to store cloned objects into object pools. + */ +class ArrayPoolDataIteratorBatcher implements DataIteratorBatcher { + private final Configuration config; + private final RecordFactory recordFactory; + + ArrayPoolDataIteratorBatcher(Configuration config, RecordFactory recordFactory) { + this.config = config; + this.recordFactory = recordFactory; + } + + @Override + public CloseableIterator>> batch( + String splitId, DataIterator inputIterator) { + return new ArrayPoolBatchIterator(splitId, inputIterator); + } + + private class ArrayPoolBatchIterator implements CloseableIterator>> { + + private final String splitId; + private final DataIterator inputIterator; + private final int batchSize; + private final Pool pool; + + ArrayPoolBatchIterator(String splitId, DataIterator inputIterator) { + this.splitId = splitId; + this.inputIterator = inputIterator; + this.batchSize = config.getInteger(FlinkConfigOptions.SOURCE_READER_FETCH_RECORD_BATCH_SIZE); + this.pool = createPoolOfBatches(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); + } + + @Override + public boolean hasNext() { + return inputIterator.hasNext(); + } + + @Override + public RecordsWithSplitIds> next() { + final T[] batch = getCachedEntry(); + int num = 0; + while (inputIterator.hasNext() && num < batchSize) { + // The record produced by inputIterator can be reused like for the RowData case. + // inputIterator.next() can't be called again until the copy is made + // since the record is not consumed immediately. + T nextRecord = inputIterator.next(); + recordFactory.clone(nextRecord, batch[num]); + num++; + if (inputIterator.isCurrentIteratorDone()) { + // break early so that records in the ArrayResultIterator + // have the same fileOffset. + break; + } + } + + if (num == 0) { + return null; + } else { + Position position = inputIterator.position(); + return SplitRecords.forRecords(splitId, new RecyclableArrayIterator<>( + pool.recycler(), batch, num, position.fileOffset(), position.recordOffset() - num)); + } + } + + @Override + public void close() throws IOException { + if (inputIterator != null) { + inputIterator.close(); + } + } + + private Pool createPoolOfBatches(int numBatches) { + final Pool poolOfBatches = new Pool<>(numBatches); + for (int batchId = 0; batchId < numBatches; batchId++) { + T[] batch = recordFactory.createBatch(batchSize); + poolOfBatches.add(batch); + } + return poolOfBatches; + } + + private T[] getCachedEntry() { + try { + return pool.pollEntry(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(new IOException("Interrupted")); + } + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java new file mode 100644 index 000000000000..f95a7f95e669 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorBatcher.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Batcher converts iterator of T into iterator of batched {@code RecordsWithSplitIds>}, + * as FLIP-27's {@link SplitReader#fetch()} returns batched records. + */ +@FunctionalInterface +public interface DataIteratorBatcher extends Serializable { + CloseableIterator>> batch(String splitId, DataIterator inputIterator); +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFunction.java new file mode 100644 index 000000000000..95c65bde1974 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/DataIteratorReaderFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; + +/** + * A {@link ReaderFunction} implementation that uses {@link DataIterator}. + */ +public abstract class DataIteratorReaderFunction implements ReaderFunction { + private final DataIteratorBatcher batcher; + + public DataIteratorReaderFunction(DataIteratorBatcher batcher) { + this.batcher = batcher; + } + + public abstract DataIterator createDataIterator(IcebergSourceSplit split); + + @Override + public CloseableIterator>> apply(IcebergSourceSplit split) { + DataIterator inputIterator = createDataIterator(split); + if (split.position() != null) { + inputIterator.seek(split.position()); + } + return batcher.batch(split.splitId(), inputIterator); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java new file mode 100644 index 000000000000..9e9d419bd5f0 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +@Internal +public class IcebergSourceReaderMetrics { + private final AtomicLong numRecordsOut; + private final AtomicLong assignedSplits; + private final AtomicLong finishedSplits; + private final Counter splitReaderFetches; + + public IcebergSourceReaderMetrics(MetricGroup metricGroup) { + final MetricGroup readerMetricGroup = metricGroup.addGroup("IcebergSourceReader"); + + this.numRecordsOut = new AtomicLong(); + this.assignedSplits = new AtomicLong(); + this.finishedSplits = new AtomicLong(); + readerMetricGroup.gauge("numRecordsOut", numRecordsOut::get); + readerMetricGroup.gauge("assignedSplits", assignedSplits::get); + readerMetricGroup.gauge("finishedSplits", finishedSplits::get); + this.splitReaderFetches = readerMetricGroup.counter("splitReaderFetches"); + } + + public void incrementNumRecordsOut(long delta) { + numRecordsOut.addAndGet(delta); + } + + public void incrementAssignedSplits(long delta) { + assignedSplits.addAndGet(delta); + } + + public void incrementFinishedSplits(long delta) { + finishedSplits.addAndGet(delta); + } + + public void recordSplitReaderFetches() { + splitReaderFetches.inc(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java new file mode 100644 index 000000000000..10fe9483f171 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IcebergSourceSplitReader implements SplitReader, IcebergSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class); + + private final ReaderFunction readerFunction; + private final int indexOfSubtask; + private final IcebergSourceReaderMetrics metrics; + private final Queue splits; + + @Nullable + private CloseableIterator>> currentReader; + @Nullable + private String currentSplitId; + + IcebergSourceSplitReader(ReaderFunction readerFunction, + SourceReaderContext context, + IcebergSourceReaderMetrics metrics) { + this.readerFunction = readerFunction; + this.indexOfSubtask = context.getIndexOfSubtask(); + this.metrics = metrics; + this.splits = new ArrayDeque<>(); + } + + @Override + public RecordsWithSplitIds> fetch() throws IOException { + metrics.recordSplitReaderFetches(); + checkSplitOrStartNext(); + if (currentReader.hasNext()) { + // Because Iterator#next() doesn't support checked exception, + // we need to wrap and unwrap the checked IOException with UncheckedIOException + try { + return currentReader.next(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } else { + return finishSplit(); + } + } + + @Override + public void handleSplitsChanges(SplitsChange splitsChanges) { + LOG.debug("Add splits to reader: {}", splitsChanges.splits()); + splits.addAll(splitsChanges.splits()); + metrics.incrementAssignedSplits(splitsChanges.splits().size()); + } + + @Override + public void wakeUp() { + } + + @Override + public void close() throws Exception { + currentSplitId = null; + if (currentReader != null) { + currentReader.close(); + } + } + + private void checkSplitOrStartNext() throws IOException { + if (currentReader != null) { + return; + } + final IcebergSourceSplit nextSplit = splits.poll(); + if (nextSplit == null) { + throw new IOException("No split remaining"); + } + currentSplitId = nextSplit.splitId(); + currentReader = readerFunction.apply(nextSplit); + } + + private SplitRecords finishSplit() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + final SplitRecords finishRecords = SplitRecords.finishedSplit(currentSplitId); + LOG.debug("Split reader {} finished split: {}", indexOfSubtask, currentSplitId); + currentSplitId = null; + metrics.incrementFinishedSplits(1L); + return finishRecords; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/MutableRecordAndPosition.junk b/flink/src/main/java/org/apache/iceberg/flink/source/reader/MutableRecordAndPosition.junk new file mode 100644 index 000000000000..ab3fde206017 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/MutableRecordAndPosition.junk @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.annotation.Internal; + +/** + * A mutable version of the {@link RecordAndPosition}. + * + *

This mutable object is useful in cases where only once instance of a {@code RecordAndPosition} + * is needed at a time, like for the result values of the {@link RecyclableArrayIterator}. + */ +@Internal +public class MutableRecordAndPosition extends RecordAndPosition { + + /** Updates the record and position in this object. */ + public void set(T record, int fileOffset, long recordOffset) { + this.record = record; + this.fileOffset = fileOffset; + this.recordOffset = recordOffset; + } + + /** Sets the position without setting a record. */ + public void position(int fileOffset, long recordOffset) { + this.fileOffset = fileOffset; + this.recordOffset = recordOffset; + } + + /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */ + public void record(T record) { + this.record = record; + this.recordOffset++; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFunction.java new file mode 100644 index 000000000000..b008f6f5c7fa --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/ReaderFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; +import java.util.function.Function; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; + +@FunctionalInterface +public interface ReaderFunction extends Serializable, Function>>> { +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java new file mode 100644 index 000000000000..e0980ca54c93 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.annotation.Internal; + +/** + * A record along with the reader position to be stored in the checkpoint. + * + *

The position defines the point in the reader AFTER the record. Record processing and updating + * checkpointed state happens atomically. The position points to where the reader should resume + * after this record is processed. + * + *

This mutable object is useful in cases where only once instance of a {@code RecordAndPosition} + * is needed at a time, like for the result values of the {@link RecyclableArrayIterator}. + */ +@Internal +public class RecordAndPosition { + private T record; + private int fileOffset; + private long recordOffset; + + public RecordAndPosition(T record, int fileOffset, long recordOffset) { + this.record = record; + this.fileOffset = fileOffset; + this.recordOffset = recordOffset; + } + + public RecordAndPosition() { + } + + // ------------------------------------------------------------------------ + + public T record() { + return record; + } + + public int fileOffset() { + return fileOffset; + } + + public long recordOffset() { + return recordOffset; + } + + /** Updates the record and position in this object. */ + public void set(T newRecord, int newFileOffset, long newRecordOffset) { + this.record = newRecord; + this.fileOffset = newFileOffset; + this.recordOffset = newRecordOffset; + } + + /** Sets the position without setting a record. */ + public void position(int newFileOffset, long newRecordOffset) { + this.fileOffset = newFileOffset; + this.recordOffset = newRecordOffset; + } + + /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */ + public void record(T nextRecord) { + this.record = nextRecord; + this.recordOffset++; + } + + @Override + public String toString() { + return String.format("%s @ %d + %d", record, fileOffset, recordOffset); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordFactory.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordFactory.java new file mode 100644 index 000000000000..c006558e8de4 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.Serializable; + +interface RecordFactory extends Serializable { + /** + * Create a batch of records + */ + T[] createBatch(int batchSize); + + /** + * Clone record + */ + void clone(T from, T to); +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecyclableArrayIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecyclableArrayIterator.java new file mode 100644 index 000000000000..0a197c731610 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecyclableArrayIterator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import javax.annotation.Nullable; +import org.apache.flink.connector.file.src.util.ArrayResultIterator; +import org.apache.flink.connector.file.src.util.Pool; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Similar to the {@link ArrayResultIterator}. + * Main difference is the records array can be recycled back to a pool. + * + * Each record's {@link RecordAndPosition} will have the same fileOffset (for {@link RecordAndPosition#fileOffset()}. + * The first returned record will have a records-to-skip count of {@code recordOffset + 1}, following + * the contract that each record needs to point to the position AFTER itself + * (because a checkpoint taken after the record was emitted needs to resume from after that record). + */ +final class RecyclableArrayIterator implements CloseableIterator> { + private final Pool.Recycler recycler; + private final E[] records; + private final int num; + private final RecordAndPosition recordAndPosition; + + private int pos; + + RecyclableArrayIterator(Pool.Recycler recycler) { + this(recycler, null, 0, -1, 0L); + } + + RecyclableArrayIterator( + Pool.Recycler recycler, E[] newRecords, + int newNum, int fileOffset, long recordOffset) { + this.recycler = recycler; + this.records = newRecords; + this.num = newNum; + this.recordAndPosition = new RecordAndPosition<>(); + this.recordAndPosition.set(null, fileOffset, recordOffset); + + this.pos = 0; + } + + @Override + public boolean hasNext() { + return pos < num; + } + + @Override + @Nullable + public RecordAndPosition next() { + if (pos < num) { + recordAndPosition.record(records[pos++]); + return recordAndPosition; + } else { + return null; + } + } + + @Override + public void close() { + recycler.recycle(records); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java new file mode 100644 index 000000000000..aa70097b871b --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +public class RowDataReaderFunction extends DataIteratorReaderFunction { + private final Table table; + private final ScanContext scanContext; + private final Schema readSchema; + + public RowDataReaderFunction( + Configuration config, + Table table, + ScanContext scanContext) { + super(new ArrayPoolDataIteratorBatcher<>(config, new RowDataRecordFactory( + FlinkSchemaUtil.convert(readSchema(table, scanContext))))); + this.table = table; + this.scanContext = scanContext; + this.readSchema = readSchema(table, scanContext); + } + + @Override + public DataIterator createDataIterator(IcebergSourceSplit split) { + return new DataIterator<>( + new RowDataFileScanTaskReader( + table.schema(), + readSchema, + scanContext.nameMapping(), + scanContext.caseSensitive()), + split.task(), + table.io(), + table.encryption()); + } + + private static Schema readSchema(Table table, ScanContext scanContext) { + return scanContext.project() == null ? table.schema() : scanContext.project(); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java new file mode 100644 index 000000000000..3e46574cc9fa --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.flink.data.RowDataUtil; + +class RowDataRecordFactory implements RecordFactory { + private final RowType rowType; + private final TypeSerializer[] fieldSerializers; + + RowDataRecordFactory(final RowType rowType) { + this.rowType = rowType; + this.fieldSerializers = createFieldSerializers(rowType); + } + + static TypeSerializer[] createFieldSerializers(RowType rowType) { + return rowType.getChildren().stream() + .map(InternalSerializers::create) + .toArray(TypeSerializer[]::new); + } + + @Override + public RowData[] createBatch(int batchSize) { + RowData[] arr = new RowData[batchSize]; + for (int i = 0; i < batchSize; ++i) { + arr[i] = new GenericRowData(rowType.getFieldCount()); + } + return arr; + } + + @Override + public void clone(RowData from, RowData to) { + RowDataUtil.clone(from, to, rowType, fieldSerializers); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitRecords.java b/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitRecords.java new file mode 100644 index 000000000000..25eeccef3774 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitRecords.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.iceberg.io.CloseableIterator; + +/** + * A batch of records for one split + */ +@Internal +public class SplitRecords implements RecordsWithSplitIds> { + + @Nullable + private final CloseableIterator> recordsForSplit; + private final Set finishedSplits; + + @Nullable + private String splitId; + @Nullable + private CloseableIterator> recordsForSplitCurrent; + + private SplitRecords( + @Nullable String splitId, + @Nullable CloseableIterator> recordsForSplit, + Set finishedSplits) { + + this.splitId = splitId; + this.recordsForSplit = recordsForSplit; + this.finishedSplits = finishedSplits; + } + + @Nullable + @Override + public String nextSplit() { + // move the split one (from current value to null) + final String nextSplit = this.splitId; + this.splitId = null; + + // move the iterator, from null to value (if first move) or to null (if second move) + this.recordsForSplitCurrent = nextSplit != null ? this.recordsForSplit : null; + + return nextSplit; + } + + @Nullable + @Override + public RecordAndPosition nextRecordFromSplit() { + if (recordsForSplitCurrent != null) { + return recordsForSplitCurrent.next(); + } else { + throw new IllegalStateException(); + } + } + + @Override + public void recycle() { + if (recordsForSplit != null) { + try { + recordsForSplit.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the record batch"); + } + } + } + + @Override + public Set finishedSplits() { + return finishedSplits; + } + + public static SplitRecords forRecords( + final String splitId, final CloseableIterator> recordsForSplit) { + return new SplitRecords<>(splitId, recordsForSplit, Collections.emptySet()); + } + + public static SplitRecords finishedSplit(String splitId) { + return new SplitRecords<>(null, null, Collections.singleton(splitId)); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java new file mode 100644 index 000000000000..3344dd4bef11 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private final CombinedScanTask task; + /** + * Position field is mutable + */ + @Nullable + private final Position position; + + /** + * The splits are frequently serialized into checkpoints. + * Caching the byte representation makes repeated serialization cheap. + */ + @Nullable + private transient byte[] serializedFormCache; + + public IcebergSourceSplit(CombinedScanTask task, Position position) { + this.task = task; + this.position = position; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, new Position(fileOffset, recordOffset)); + } + + public CombinedScanTask task() { + return task; + } + + public Position position() { + return position; + } + + byte[] serializedFormCache() { + return serializedFormCache; + } + + void serializedFormCache(byte[] cachedBytes) { + this.serializedFormCache = cachedBytes; + } + + @Override + public String splitId() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .toString(); + } + + public void updatePosition(int newFileOffset, long newRecordOffset) { + position.update(newFileOffset, newRecordOffset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergSourceSplit split = (IcebergSourceSplit) o; + return Objects.equal(splitId(), split.splitId()) && + Objects.equal(position, split.position()); + } + + @Override + public int hashCode() { + return Objects.hashCode(splitId()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .add("position", position) + .toString(); + } + + private String toString(Collection files) { + return Iterables.toString(files.stream().map(fileScanTask -> + MoreObjects.toStringHelper(fileScanTask) + .add("file", fileScanTask.file() != null ? + fileScanTask.file().path().toString() : + "NoFile") + .add("start", fileScanTask.start()) + .add("length", fileScanTask.length()) + .toString()).collect(Collectors.toList())); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java new file mode 100644 index 000000000000..9bb65497ff37 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.IOException; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.InstantiationUtil; + +/** + * TODO: use Java serialization for now. + * Will switch to more stable serializer from + * issue-1698. + */ +@Internal +public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { + public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer(); + private static final int VERSION = 1; + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(IcebergSourceSplit split) throws IOException { + if (split.serializedFormCache() == null) { + final byte[] result = serializeV1(split); + split.serializedFormCache(result); + } + return split.serializedFormCache(); + } + + @Override + public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return deserializeV1(serialized); + default: + throw new IOException("Unknown version: " + version); + } + } + + @VisibleForTesting + byte[] serializeV1(IcebergSourceSplit split) throws IOException { + return InstantiationUtil.serializeObject(split); + } + + @VisibleForTesting + IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to deserialize the split.", e); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java b/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java new file mode 100644 index 000000000000..a205b22f3ed5 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.junit.Assert; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; + +public class HadoopTableResource extends ExternalResource { + + private final TemporaryFolder temporaryFolder; + private final String database; + private final String tableName; + private final Schema schema; + + private HadoopCatalog catalog; + private TableLoader tableLoader; + private Table table; + + public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName, Schema schema) { + this.temporaryFolder = temporaryFolder; + this.database = database; + this.tableName = tableName; + this.schema = schema; + } + + @Override + protected void before() throws Throwable { + File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + // before variables + String warehouse = "file:" + warehouseFile; + Configuration hadoopConf = new Configuration(); + this.catalog = new HadoopCatalog(hadoopConf, warehouse); + String location = String.format("%s/%s/%s", warehouse, database, tableName); + this.tableLoader = TableLoader.fromHadoopTable(location); + this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema); + tableLoader.open(); + } + + @Override + protected void after() { + try { + catalog.dropTable(TableIdentifier.of(database, tableName)); + catalog.close(); + tableLoader.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close catalog resource"); + } + } + + public TableLoader tableLoader() { + return tableLoader; + } + + public Table table() { + return table; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index 0f5d6e1e4975..353fd8dfda0d 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -254,7 +254,7 @@ private List generateSplits() { .build(); } - Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext)); + Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits(table, scanContext)); } return inputSplits; diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java new file mode 100644 index 000000000000..ae4862f7f3cc --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class ReaderFunctionTestBase { + + @Parameterized.Parameters(name = "fileFormat={0}") + public static Object[][] parameters() { + return new Object[][]{ + new Object[]{FileFormat.AVRO}, + new Object[]{FileFormat.ORC}, + new Object[]{FileFormat.PARQUET} + }; + } + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + protected static final ScanContext scanContext = ScanContext.builder() + .project(TestFixtures.SCHEMA) + .build(); + + @Rule + public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER, + TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + + protected abstract ReaderFunction readerFunction(); + + protected abstract void assertRecords(List expected, List actual, Schema schema); + + private final FileFormat fileFormat; + + public ReaderFunctionTestBase(FileFormat fileFormat) { + this.fileFormat = fileFormat; + } + + private List> recordBatchList; + private List dataFileList; + private IcebergSourceSplit icebergSplit; + + @Before + public void before() throws IOException { + final GenericAppenderHelper dataAppender = new GenericAppenderHelper( + tableResource.table(), fileFormat, TEMPORARY_FOLDER); + recordBatchList = new ArrayList<>(3); + dataFileList = new ArrayList<>(2); + for (int i = 0; i < 3; ++i) { + List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); + recordBatchList.add(records); + DataFile dataFile = dataAppender.writeFile(null, records); + dataFileList.add(dataFile); + dataAppender.appendToTable(dataFile); + } + + final List splits = FlinkSplitPlanner + .planIcebergSourceSplits(tableResource.table(), scanContext); + Assert.assertEquals(1, splits.size()); + Assert.assertEquals(3, splits.get(0).task().files().size()); + icebergSplit = sortFilesAsAppendOrder(splits.get(0), dataFileList); + } + + /** + * Split planning doesn't guarantee the order is the same as appended. + * So we re-arrange the list to make the assertion simpler + */ + public static IcebergSourceSplit sortFilesAsAppendOrder(IcebergSourceSplit split, List dataFiles) { + Collection files = split.task().files(); + Assert.assertEquals(files.size(), dataFiles.size()); + FileScanTask[] sortedFileArray = new FileScanTask[files.size()]; + for (FileScanTask fileScanTask : files) { + for (int i = 0; i < dataFiles.size(); ++i) { + if (fileScanTask.file().path().toString().equals(dataFiles.get(i).path().toString())) { + sortedFileArray[i] = fileScanTask; + } + } + } + List sortedFileList = Lists.newArrayList(sortedFileArray); + Assert.assertThat(sortedFileList, CoreMatchers.everyItem(CoreMatchers.notNullValue(FileScanTask.class))); + CombinedScanTask rearrangedCombinedTask = new BaseCombinedScanTask(sortedFileList); + return IcebergSourceSplit.fromCombinedScanTask(rearrangedCombinedTask); + } + + /** + * We have to combine the record extraction and position assertion in a single function, + * because iterator is only valid for one pass. + */ + private List extractRecordsAndAssertPosition( + RecordsWithSplitIds> batch, + long expectedCount, int exptectedFileOffset, long startRecordOffset) { + // need to call nextSplit first in order to read the batch + batch.nextSplit(); + final List records = new ArrayList<>(); + long recordOffset = startRecordOffset; + RecordAndPosition recordAndPosition; + while ((recordAndPosition = batch.nextRecordFromSplit()) != null) { + records.add(recordAndPosition.record()); + Assert.assertEquals("expected file offset", exptectedFileOffset, recordAndPosition.fileOffset()); + Assert.assertEquals("expected record offset", recordOffset, recordAndPosition.recordOffset() - 1); + recordOffset++; + } + Assert.assertEquals("expected record count", expectedCount, records.size()); + return records; + } + + @Test + public void testNoCheckpointedPosition() throws IOException { + final IcebergSourceSplit split = icebergSplit; + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch0 = reader.next(); + final List actual0 = extractRecordsAndAssertPosition(batch0, recordBatchList.get(0).size(), 0, 0L); + assertRecords(recordBatchList.get(0), actual0, TestFixtures.SCHEMA); + batch0.recycle(); + + final RecordsWithSplitIds> batch1 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch1, recordBatchList.get(1).size(), 1, 0L); + assertRecords(recordBatchList.get(1), actual1, TestFixtures.SCHEMA); + batch1.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + + @Test + public void testCheckpointedPositionBeforeFirstFile() throws IOException { + final IcebergSourceSplit split = new IcebergSourceSplit( + icebergSplit.task(), + new Position(0, 0L)); + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch0 = reader.next(); + final List actual0 = extractRecordsAndAssertPosition(batch0, recordBatchList.get(0).size(), 0, 0L); + assertRecords(recordBatchList.get(0), actual0, TestFixtures.SCHEMA); + batch0.recycle(); + + final RecordsWithSplitIds> batch1 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch1, recordBatchList.get(1).size(), 1, 0L); + assertRecords(recordBatchList.get(1), actual1, TestFixtures.SCHEMA); + batch1.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + + @Test + public void testCheckpointedPositionMiddleFirstFile() throws IOException { + final IcebergSourceSplit split = new IcebergSourceSplit( + icebergSplit.task(), + new Position(0, 1L)); + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch0 = reader.next(); + final List actual0 = extractRecordsAndAssertPosition(batch0, 1L, 0, 1L); + assertRecords(recordBatchList.get(0).subList(1, 2), actual0, TestFixtures.SCHEMA); + batch0.recycle(); + + final RecordsWithSplitIds> batch1 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch1, recordBatchList.get(1).size(), 1, 0L); + assertRecords(recordBatchList.get(1), actual1, TestFixtures.SCHEMA); + batch1.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + + @Test + public void testCheckpointedPositionAfterFirstFile() throws IOException { + final IcebergSourceSplit split = new IcebergSourceSplit(icebergSplit.task(), + new Position(0, 2L)); + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch0 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch0, recordBatchList.get(1).size(), 1, 0L); + assertRecords(recordBatchList.get(1), actual1, TestFixtures.SCHEMA); + batch0.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + + @Test + public void testCheckpointedPositionBeforeSecondFile() throws IOException { + final IcebergSourceSplit split = new IcebergSourceSplit(icebergSplit.task(), + new Position(1, 0L)); + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch1 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch1, recordBatchList.get(1).size(), 1, 0L); + assertRecords(recordBatchList.get(1), actual1, TestFixtures.SCHEMA); + batch1.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + + @Test + public void testCheckpointedPositionMidSecondFile() throws IOException { + final IcebergSourceSplit split = new IcebergSourceSplit( + icebergSplit.task(), + new Position(1, 1L)); + final CloseableIterator>> reader = readerFunction().apply(split); + + final RecordsWithSplitIds> batch1 = reader.next(); + final List actual1 = extractRecordsAndAssertPosition(batch1, 1L, 1, 1L); + assertRecords(recordBatchList.get(1).subList(1, 2), actual1, TestFixtures.SCHEMA); + batch1.recycle(); + + final RecordsWithSplitIds> batch2 = reader.next(); + final List actual2 = extractRecordsAndAssertPosition(batch2, recordBatchList.get(2).size(), 2, 0L); + assertRecords(recordBatchList.get(2), actual2, TestFixtures.SCHEMA); + batch2.recycle(); + } + +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java new file mode 100644 index 000000000000..020d56438c05 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.HadoopTableResource; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; + +public class TestIcebergSourceSplitReader { + + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + public static final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER, + TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA); + + @ClassRule + public static final TestRule chain = RuleChain + .outerRule(TEMPORARY_FOLDER) + .around(tableResource); + + private static final ScanContext scanContext = ScanContext.builder() + .project(TestFixtures.SCHEMA) + .build(); + private static final FileFormat fileFormat = FileFormat.PARQUET; + + private static List> recordBatchList; + private static List dataFileList; + private static IcebergSourceSplit icebergSplit; + + private IcebergSourceSplitReader reader; + + @BeforeClass + public static void beforeClass() throws IOException { + final GenericAppenderHelper dataAppender = new GenericAppenderHelper( + tableResource.table(), fileFormat, TEMPORARY_FOLDER); + recordBatchList = new ArrayList<>(3); + dataFileList = new ArrayList<>(2); + for (int i = 0; i < 3; ++i) { + List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); + recordBatchList.add(records); + DataFile dataFile = dataAppender.writeFile(null, records); + dataFileList.add(dataFile); + dataAppender.appendToTable(dataFile); + } + + final List splits = FlinkSplitPlanner + .planIcebergSourceSplits(tableResource.table(), scanContext); + Assert.assertEquals(1, splits.size()); + Assert.assertEquals(3, splits.get(0).task().files().size()); + icebergSplit = ReaderFunctionTestBase.sortFilesAsAppendOrder(splits.get(0), dataFileList); + } + + @Before + public void before() { + reader = createSplitReader(); + } + + private IcebergSourceSplitReader createSplitReader() { + final Configuration config = new Configuration(); + return new IcebergSourceSplitReader( + new RowDataReaderFunction(config, tableResource.table(), scanContext), + new TestingReaderContext(), + new IcebergSourceReaderMetrics(new UnregisteredMetricsGroup())); + } + + @Test + public void testFullScan() throws Exception { + final IcebergSourceSplit split = icebergSplit; + reader.handleSplitsChanges(new SplitsAddition(Arrays.asList(split))); + + final RecordsWithSplitIds> readBatch0 = reader.fetch(); + final List rowBatch0 = readRows(readBatch0, split.splitId(), 0, 0L); + TestHelpers.assertRecords(rowBatch0, recordBatchList.get(0), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> readBatch1 + = reader.fetch(); + final List rowBatch1 = readRows(readBatch1, split.splitId(), 1, 0L); + TestHelpers.assertRecords(rowBatch1, recordBatchList.get(1), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> readBatch2 = reader.fetch(); + final List rowBatch2 = readRows(readBatch2, split.splitId(), 2, 0L); + TestHelpers.assertRecords(rowBatch2, recordBatchList.get(2), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> finishedBatch = reader.fetch(); + Assert.assertEquals(Sets.newHashSet(split.splitId()), finishedBatch.finishedSplits()); + Assert.assertEquals(null, finishedBatch.nextSplit()); + } + + @Test + public void testResumeFromEndOfFirstBatch() throws Exception { + final IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(icebergSplit.task(), 0, 2L); + reader.handleSplitsChanges(new SplitsAddition(Arrays.asList(split))); + + final RecordsWithSplitIds> readBatch1 = reader.fetch(); + final List rowBatch1 = readRows(readBatch1, split.splitId(), 1, 0L); + TestHelpers.assertRecords(rowBatch1, recordBatchList.get(1), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> readBatch2 = reader.fetch(); + final List rowBatch2 = readRows(readBatch2, split.splitId(), 2, 0L); + TestHelpers.assertRecords(rowBatch2, recordBatchList.get(2), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> finishedBatch = reader.fetch(); + Assert.assertEquals(Sets.newHashSet(split.splitId()), finishedBatch.finishedSplits()); + Assert.assertEquals(null, finishedBatch.nextSplit()); + } + + @Test + public void testResumeFromStartOfSecondBatch() throws Exception { + final IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(icebergSplit.task(), 1, 0L); + reader.handleSplitsChanges(new SplitsAddition(Arrays.asList(split))); + + final RecordsWithSplitIds> readBatch1 = reader.fetch(); + final List rowBatch1 = readRows(readBatch1, split.splitId(), 1, 0L); + TestHelpers.assertRecords(rowBatch1, recordBatchList.get(1), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> readBatch2 = reader.fetch(); + final List rowBatch2 = readRows(readBatch2, split.splitId(), 2, 0L); + TestHelpers.assertRecords(rowBatch2, recordBatchList.get(2), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> finishedBatch + = reader.fetch(); + Assert.assertEquals(Sets.newHashSet(split.splitId()), finishedBatch.finishedSplits()); + Assert.assertEquals(null, finishedBatch.nextSplit()); + } + + @Test + public void testResumeFromMiddleOfSecondBatch() throws Exception { + final IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask(icebergSplit.task(), 1, 1L); + reader.handleSplitsChanges(new SplitsAddition(Arrays.asList(split))); + + final RecordsWithSplitIds> readBatch1 = reader.fetch(); + final List rowBatch1 = readRows(readBatch1, split.splitId(), 1, 1L); + TestHelpers.assertRecords(rowBatch1, recordBatchList.get(1).subList(1, 2), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> readBatch2 = reader.fetch(); + final List rowBatch2 = readRows(readBatch2, split.splitId(), 2, 0L); + TestHelpers.assertRecords(rowBatch2, recordBatchList.get(2), TestFixtures.SCHEMA); + + final RecordsWithSplitIds> finishedBatch + = reader.fetch(); + Assert.assertEquals(Sets.newHashSet(split.splitId()), finishedBatch.finishedSplits()); + Assert.assertEquals(null, finishedBatch.nextSplit()); + } + + private List readRows( + RecordsWithSplitIds> readBatch, + String expectedSplitId, int expectedFileOffset, long expectedStartingRecordOffset) { + Assert.assertEquals(expectedSplitId, readBatch.nextSplit()); + final List rowDataList = new ArrayList<>(); + RecordAndPosition row; + int num = 0; + while ((row = readBatch.nextRecordFromSplit()) != null) { + Assert.assertEquals(expectedFileOffset, row.fileOffset()); + num++; + Assert.assertEquals(expectedStartingRecordOffset + num, row.recordOffset()); + rowDataList.add(row.record()); + } + readBatch.recycle(); + return TestHelpers.convertRowDataToRow(rowDataList, TestFixtures.ROW_TYPE); + } + +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRecyclableArrayIterator.java b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRecyclableArrayIterator.java new file mode 100644 index 000000000000..bf36efcfdc34 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRecyclableArrayIterator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; +import org.junit.Test; + +public class TestRecyclableArrayIterator { + + @Test + public void testEmptyConstruction() { + // dummy recycler + final RecyclableArrayIterator iter = new RecyclableArrayIterator<>( + ignored -> System.currentTimeMillis()); + Assert.assertNull(iter.next()); + } + + @Test + public void testGetElements() { + final String[] elements = new String[]{"1", "2", "3", "4"}; + final int initialOffset = 3; + final long initialSkipCount = 17; + + // dummy recycler + final RecyclableArrayIterator iter = new RecyclableArrayIterator<>( + ignored -> System.currentTimeMillis(), elements, elements.length, initialOffset, initialSkipCount); + + for (int i = 0; i < elements.length; i++) { + final RecordAndPosition recAndPos = iter.next(); + Assert.assertEquals(elements[i], recAndPos.record()); + Assert.assertEquals(initialOffset, recAndPos.fileOffset()); + Assert.assertEquals(initialSkipCount + i + 1, recAndPos.recordOffset()); + } + } + + @Test + public void testExhausted() { + // dummy recycler + final RecyclableArrayIterator iter = new RecyclableArrayIterator<>( + ignored -> System.currentTimeMillis(), new String[]{"1", "2"}, 2, 0, 0L); + + iter.next(); + iter.next(); + + Assert.assertNull(iter.next()); + } + + @Test + public void testArraySubRange() { + // dummy recycler + final RecyclableArrayIterator iter = new RecyclableArrayIterator<>(ignored -> System.currentTimeMillis(), + new String[]{"1", "2", "3"}, 2, 0, 0L); + + Assert.assertNotNull(iter.next()); + Assert.assertNotNull(iter.next()); + Assert.assertNull(iter.next()); + } + + @Test + public void testRecycler() { + final AtomicBoolean recycled = new AtomicBoolean(); + final RecyclableArrayIterator iter = new RecyclableArrayIterator<>(ignored -> recycled.set(true)); + iter.close(); + Assert.assertTrue(recycled.get()); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java new file mode 100644 index 000000000000..c0f6c62cd033 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.reader; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; + +public class TestRowDataReaderFunction extends ReaderFunctionTestBase { + + protected static final RowType rowType = FlinkSchemaUtil + .convert(scanContext.project()); + private static final DataStructureConverter rowDataConverter = DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(rowType)); + private static final org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + + public TestRowDataReaderFunction(FileFormat fileFormat) { + super(fileFormat); + } + + @Override + protected ReaderFunction readerFunction() { + return new RowDataReaderFunction(new Configuration(), tableResource.table(), scanContext); + } + + @Override + protected void assertRecords(List expected, List actual, Schema schema) { + final List rows = toRows(actual); + TestHelpers.assertRecords(rows, expected, TestFixtures.SCHEMA); + } + + private List toRows(List actual) { + return actual.stream() + .map(rowData -> (Row) rowDataConverter.toExternal(rowData)) + .collect(Collectors.toList()); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/split/SplitHelpers.java b/flink/src/test/java/org/apache/iceberg/flink/source/split/SplitHelpers.java new file mode 100644 index 000000000000..c97111b6fd97 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/split/SplitHelpers.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.FlinkSplitPlanner; +import org.apache.iceberg.flink.source.ScanContext; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +public class SplitHelpers { + + private static final AtomicLong splitLengthIncrement = new AtomicLong(); + + private SplitHelpers() { + + } + + public static List createMockedSplits(int splitCount) { + final List splits = new ArrayList<>(); + for (int i = 0; i < splitCount; ++i) { + // make sure each task has a different length, + // as it is part of the splitId calculation. + // This way, we can make sure all generated splits have different splitIds + FileScanTask fileScanTask = new MockFileScanTask(1024 + splitLengthIncrement.incrementAndGet()); + CombinedScanTask combinedScanTask = new BaseCombinedScanTask(fileScanTask); + splits.add(IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); + } + return splits; + } + + public static List createFileSplits( + TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception { + final File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + final String warehouse = "file:" + warehouseFile; + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse); + try { + final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + final GenericAppenderHelper dataAppender = new GenericAppenderHelper( + table, FileFormat.PARQUET, temporaryFolder); + for (int i = 0; i < fileCount; ++i) { + List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); + dataAppender.appendToTable(records); + } + + final ScanContext scanContext = ScanContext.builder().build(); + final List splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext); + return splits.stream() + .flatMap(split -> { + List> filesList = Lists.partition(new ArrayList<>(split.task().files()), filesPerSplit); + return filesList.stream() + .map(files -> new BaseCombinedScanTask(files)) + .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); + }) + .collect(Collectors.toList()); + } finally { + catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); + catalog.close(); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java new file mode 100644 index 000000000000..38b358ec55a7 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.iceberg.flink.source.Position; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceSplitSerializer { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE; + + @Test + public void testLatestVersion() throws Exception { + serializeAndDeserialize(1, 1); + serializeAndDeserialize(10, 2); + } + + private void serializeAndDeserialize(int splitCount, int filesPerSplit) throws Exception { + final List splits = SplitHelpers.createFileSplits(TEMPORARY_FOLDER, splitCount, filesPerSplit); + for (IcebergSourceSplit split : splits) { + final byte[] result = serializer.serialize(split); + final IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); + Assert.assertEquals(split, deserialized); + + final byte[] cachedResult = serializer.serialize(split); + Assert.assertSame(result, cachedResult); + final IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); + Assert.assertEquals(split, deserialized2); + } + } + + @Test + public void testV1() throws Exception { + serializeAndDeserializeV1(1, 1); + serializeAndDeserializeV1(10, 2); + } + + private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws Exception { + final List splits = SplitHelpers.createFileSplits(TEMPORARY_FOLDER, splitCount, filesPerSplit); + for (IcebergSourceSplit split : splits) { + final byte[] result = serializer.serializeV1(split); + final IcebergSourceSplit deserialized = serializer.deserializeV1(result); + Assert.assertEquals(split, deserialized); + } + } + + @Test + public void testCheckpointedPosition() throws Exception { + final AtomicInteger index = new AtomicInteger(); + final List splits = SplitHelpers.createFileSplits(TEMPORARY_FOLDER, 10, 2).stream() + .map(split -> { + final IcebergSourceSplit result; + if (index.get() % 2 == 0) { + result = new IcebergSourceSplit(split.task(), new Position(index.get(), index.get())); + } else { + result = split; + } + index.incrementAndGet(); + return result; + }) + .collect(Collectors.toList()); + + for (IcebergSourceSplit split : splits) { + final byte[] result = serializer.serialize(split); + final IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); + Assert.assertEquals(split, deserialized); + + final byte[] cachedResult = serializer.serialize(split); + Assert.assertSame(result, cachedResult); + final IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); + Assert.assertEquals(split, deserialized2); + } + } +}