From 97fbbab9c59bf0b63affcf66a9a21ba517b4c530 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Mon, 10 Jan 2022 10:05:30 -0800 Subject: [PATCH] Revert "Flink: Add FLIP-27 Iceberg source split (#3501)" This reverts commit d2c26a02190a16539c8c0621c4d8aac2e9e3ec6c. --- .../flink/source/FlinkInputFormat.java | 2 +- ...tPlanner.java => FlinkSplitGenerator.java} | 47 ++----- .../iceberg/flink/source/ScanContext.java | 24 +--- .../source/StreamingMonitorFunction.java | 2 +- .../source/split/IcebergSourceSplit.java | 122 ------------------ .../split/IcebergSourceSplitSerializer.java | 56 -------- .../iceberg/flink/source/SplitHelpers.java | 90 ------------- .../source/TestStreamingReaderOperator.java | 2 +- .../TestIcebergSourceSplitSerializer.java | 116 ----------------- 9 files changed, 20 insertions(+), 441 deletions(-) rename flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/{FlinkSplitPlanner.java => FlinkSplitGenerator.java} (63%) delete mode 100644 flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java delete mode 100644 flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java delete mode 100644 flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java delete mode 100644 flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index a4cbab5c37e4..8b757ac31606 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException tableLoader.open(); try (TableLoader loader = tableLoader) { Table table = loader.loadTable(); - return FlinkSplitPlanner.planInputSplits(table, context); + return FlinkSplitGenerator.createInputSplits(table, context); } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java similarity index 63% rename from flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java rename to flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java index e0001146299e..f495e0909b7e 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -22,56 +22,33 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -@Internal -public class FlinkSplitPlanner { - private FlinkSplitPlanner() { +class FlinkSplitGenerator { + private FlinkSplitGenerator() { } - static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) { - try (CloseableIterable tasksIterable = planTasks(table, context)) { - List tasks = Lists.newArrayList(tasksIterable); - FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; - for (int i = 0; i < tasks.size(); i++) { - splits[i] = new FlinkInputSplit(i, tasks.get(i)); - } - return splits; - } catch (IOException e) { - throw new UncheckedIOException("Failed to process tasks iterable", e); - } - } - - /** - * This returns splits for the FLIP-27 source - */ - public static List planIcebergSourceSplits(Table table, ScanContext context) { - try (CloseableIterable tasksIterable = planTasks(table, context)) { - return Lists.newArrayList(CloseableIterable.transform(tasksIterable, - task -> IcebergSourceSplit.fromCombinedScanTask(task))); - } catch (IOException e) { - throw new UncheckedIOException("Failed to process task iterable: ", e); + static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) { + List tasks = tasks(table, context); + FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) { + splits[i] = new FlinkInputSplit(i, tasks.get(i)); } + return splits; } - static CloseableIterable planTasks(Table table, ScanContext context) { + private static List tasks(Table table, ScanContext context) { TableScan scan = table .newScan() .caseSensitive(context.caseSensitive()) .project(context.project()); - if (context.includeColumnStats()) { - scan = scan.includeColumnStats(); - } - if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); } @@ -106,6 +83,10 @@ static CloseableIterable planTasks(Table table, ScanContext co } } - return scan.planTasks(); + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); + } } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index d290a6478f90..2896efb39655 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -68,9 +68,6 @@ class ScanContext implements Serializable { private static final ConfigOption MONITOR_INTERVAL = ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10)); - private static final ConfigOption INCLUDE_COLUMN_STATS = - ConfigOptions.key("include-column-stats").booleanType().defaultValue(false); - private final boolean caseSensitive; private final Long snapshotId; private final Long startSnapshotId; @@ -86,12 +83,11 @@ class ScanContext implements Serializable { private final Schema schema; private final List filters; private final long limit; - private final boolean includeColumnStats; private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming, Duration monitorInterval, String nameMapping, - Schema schema, List filters, long limit, boolean includeColumnStats) { + Schema schema, List filters, long limit) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; this.startSnapshotId = startSnapshotId; @@ -107,7 +103,6 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.schema = schema; this.filters = filters; this.limit = limit; - this.includeColumnStats = includeColumnStats; } boolean caseSensitive() { @@ -166,10 +161,6 @@ long limit() { return limit; } - boolean includeColumnStats() { - return includeColumnStats; - } - ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) @@ -186,7 +177,6 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .project(schema) .filters(filters) .limit(limit) - .includeColumnStats(includeColumnStats) .build(); } @@ -206,7 +196,6 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .project(schema) .filters(filters) .limit(limit) - .includeColumnStats(includeColumnStats) .build(); } @@ -229,7 +218,6 @@ static class Builder { private Schema projectedSchema; private List filters; private long limit = -1L; - private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue(); private Builder() { } @@ -304,11 +292,6 @@ Builder limit(long newLimit) { return this; } - Builder includeColumnStats(boolean newIncludeColumnStats) { - this.includeColumnStats = newIncludeColumnStats; - return this; - } - Builder fromProperties(Map properties) { Configuration config = new Configuration(); properties.forEach(config::setString); @@ -323,15 +306,14 @@ Builder fromProperties(Map properties) { .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST)) .streaming(config.get(STREAMING)) .monitorInterval(config.get(MONITOR_INTERVAL)) - .nameMapping(properties.get(DEFAULT_NAME_MAPPING)) - .includeColumnStats(config.get(INCLUDE_COLUMN_STATS)); + .nameMapping(properties.get(DEFAULT_NAME_MAPPING)); } public ScanContext build() { return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, - filters, limit, includeColumnStats); + filters, limit); } } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index 8bfad6d05fd7..9d8e204a2228 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() { newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId); } - FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext); + FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext); for (FlinkInputSplit split : splits) { sourceContext.collect(split); } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java deleted file mode 100644 index b46096af0e67..000000000000 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source.split; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.stream.Collectors; -import javax.annotation.Nullable; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.util.InstantiationUtil; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; - -@Internal -public class IcebergSourceSplit implements SourceSplit, Serializable { - private static final long serialVersionUID = 1L; - - private final CombinedScanTask task; - - private int fileOffset; - private long recordOffset; - - // The splits are frequently serialized into checkpoints. - // Caching the byte representation makes repeated serialization cheap. - @Nullable - private transient byte[] serializedBytesCache; - - private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) { - this.task = task; - this.fileOffset = fileOffset; - this.recordOffset = recordOffset; - } - - public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { - return fromCombinedScanTask(combinedScanTask, 0, 0L); - } - - public static IcebergSourceSplit fromCombinedScanTask( - CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { - return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset); - } - - public CombinedScanTask task() { - return task; - } - - public int fileOffset() { - return fileOffset; - } - - public long recordOffset() { - return recordOffset; - } - - @Override - public String splitId() { - return MoreObjects.toStringHelper(this) - .add("files", toString(task.files())) - .toString(); - } - - public void updatePosition(int newFileOffset, long newRecordOffset) { - // invalidate the cache after position change - serializedBytesCache = null; - fileOffset = newFileOffset; - recordOffset = newRecordOffset; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("files", toString(task.files())) - .add("fileOffset", fileOffset) - .add("recordOffset", recordOffset) - .toString(); - } - - private String toString(Collection files) { - return Iterables.toString(files.stream().map(fileScanTask -> - MoreObjects.toStringHelper(fileScanTask) - .add("file", fileScanTask.file().path().toString()) - .add("start", fileScanTask.start()) - .add("length", fileScanTask.length()) - .toString()).collect(Collectors.toList())); - } - - byte[] serializeV1() throws IOException { - if (serializedBytesCache == null) { - serializedBytesCache = InstantiationUtil.serializeObject(this); - } - return serializedBytesCache; - } - - static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { - try { - return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader()); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Failed to deserialize the split.", e); - } - } -} diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java deleted file mode 100644 index 9e32af5429b9..000000000000 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source.split; - -import java.io.IOException; -import org.apache.flink.annotation.Internal; -import org.apache.flink.core.io.SimpleVersionedSerializer; - -/** - * TODO: use Java serialization for now. - * Will switch to more stable serializer from - * issue-1698. - */ -@Internal -public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { - public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer(); - private static final int VERSION = 1; - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public byte[] serialize(IcebergSourceSplit split) throws IOException { - return split.serializeV1(); - } - - @Override - public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException { - switch (version) { - case 1: - return IcebergSourceSplit.deserializeV1(serialized); - default: - throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " + - "Encountered unsupported version: %d. Supported version are [1]", version)); - } - } -} diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java deleted file mode 100644 index df988089ab21..000000000000 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source; - -import java.io.File; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.BaseCombinedScanTask; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Table; -import org.apache.iceberg.data.GenericAppenderHelper; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.flink.TestFixtures; -import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.junit.Assert; -import org.junit.rules.TemporaryFolder; - -public class SplitHelpers { - - private static final AtomicLong splitLengthIncrement = new AtomicLong(); - - private SplitHelpers() { - } - - /** - * This create a list of IcebergSourceSplit from real files - *
  • Create a new Hadoop table under the {@code temporaryFolder} - *
  • write {@code fileCount} number of files to the new Iceberg table - *
  • Discover the splits from the table and partition the splits by the {@code filePerSplit} limit - *
  • Delete the Hadoop table - * - * Since the table and data files are deleted before this method return, - * caller shouldn't attempt to read the data files. - */ - public static List createSplitsFromTransientHadoopTable( - TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception { - final File warehouseFile = temporaryFolder.newFolder(); - Assert.assertTrue(warehouseFile.delete()); - final String warehouse = "file:" + warehouseFile; - Configuration hadoopConf = new Configuration(); - final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse); - try { - final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); - final GenericAppenderHelper dataAppender = new GenericAppenderHelper( - table, FileFormat.PARQUET, temporaryFolder); - for (int i = 0; i < fileCount; ++i) { - List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); - dataAppender.appendToTable(records); - } - - final ScanContext scanContext = ScanContext.builder().build(); - final List splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext); - return splits.stream() - .flatMap(split -> { - List> filesList = Lists.partition( - Lists.newArrayList(split.task().files()), filesPerSplit); - return filesList.stream() - .map(files -> new BaseCombinedScanTask(files)) - .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); - }) - .collect(Collectors.toList()); - } finally { - catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); - catalog.close(); - } - } -} diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index 7978af6c4eec..19c2b6ad7d76 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -254,7 +254,7 @@ private List generateSplits() { .build(); } - Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits(table, scanContext)); + Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext)); } return inputSplits; diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java deleted file mode 100644 index 36eea1e8a409..000000000000 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.source.split; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import org.apache.iceberg.flink.source.SplitHelpers; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestIcebergSourceSplitSerializer { - - @ClassRule - public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE; - - @Test - public void testLatestVersion() throws Exception { - serializeAndDeserialize(1, 1); - serializeAndDeserialize(10, 2); - } - - private void serializeAndDeserialize(int splitCount, int filesPerSplit) throws Exception { - final List splits = SplitHelpers - .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit); - for (IcebergSourceSplit split : splits) { - byte[] result = serializer.serialize(split); - IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); - assertSplitEquals(split, deserialized); - - byte[] cachedResult = serializer.serialize(split); - Assert.assertSame(result, cachedResult); - IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); - assertSplitEquals(split, deserialized2); - - split.updatePosition(0, 100); - byte[] resultAfterUpdatePosition = serializer.serialize(split); - // after position change, serialized bytes should have changed - Assert.assertNotSame(cachedResult, resultAfterUpdatePosition); - IcebergSourceSplit deserialized3 = serializer.deserialize(serializer.getVersion(), resultAfterUpdatePosition); - assertSplitEquals(split, deserialized3); - } - } - - @Test - public void testV1() throws Exception { - serializeAndDeserializeV1(1, 1); - serializeAndDeserializeV1(10, 2); - } - - private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws Exception { - final List splits = SplitHelpers - .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit); - for (IcebergSourceSplit split : splits) { - byte[] result = split.serializeV1(); - IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV1(result); - assertSplitEquals(split, deserialized); - } - } - - @Test - public void testCheckpointedPosition() throws Exception { - final AtomicInteger index = new AtomicInteger(); - final List splits = SplitHelpers - .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 2).stream() - .map(split -> { - IcebergSourceSplit result; - if (index.get() % 2 == 0) { - result = IcebergSourceSplit.fromCombinedScanTask(split.task(), index.get(), index.get()); - } else { - result = split; - } - index.incrementAndGet(); - return result; - }) - .collect(Collectors.toList()); - - for (IcebergSourceSplit split : splits) { - byte[] result = serializer.serialize(split); - IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); - assertSplitEquals(split, deserialized); - - byte[] cachedResult = serializer.serialize(split); - Assert.assertSame(result, cachedResult); - IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); - assertSplitEquals(split, deserialized2); - } - } - - private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) { - Assert.assertEquals(expected.splitId(), actual.splitId()); - Assert.assertEquals(expected.fileOffset(), actual.fileOffset()); - Assert.assertEquals(expected.recordOffset(), actual.recordOffset()); - } -}