diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 8b757ac31606..a4cbab5c37e4 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -77,7 +77,7 @@ public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException tableLoader.open(); try (TableLoader loader = tableLoader) { Table table = loader.loadTable(); - return FlinkSplitGenerator.createInputSplits(table, context); + return FlinkSplitPlanner.planInputSplits(table, context); } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java similarity index 63% rename from flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java rename to flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index f495e0909b7e..e0001146299e 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -22,33 +22,56 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -class FlinkSplitGenerator { - private FlinkSplitGenerator() { +@Internal +public class FlinkSplitPlanner { + private FlinkSplitPlanner() { } - static FlinkInputSplit[] createInputSplits(Table table, ScanContext context) { - List tasks = tasks(table, context); - FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; - for (int i = 0; i < tasks.size(); i++) { - splits[i] = new FlinkInputSplit(i, tasks.get(i)); + static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) { + try (CloseableIterable tasksIterable = planTasks(table, context)) { + List tasks = Lists.newArrayList(tasksIterable); + FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) { + splits[i] = new FlinkInputSplit(i, tasks.get(i)); + } + return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to process tasks iterable", e); + } + } + + /** + * This returns splits for the FLIP-27 source + */ + public static List planIcebergSourceSplits(Table table, ScanContext context) { + try (CloseableIterable tasksIterable = planTasks(table, context)) { + return Lists.newArrayList(CloseableIterable.transform(tasksIterable, + task -> IcebergSourceSplit.fromCombinedScanTask(task))); + } catch (IOException e) { + throw new UncheckedIOException("Failed to process task iterable: ", e); } - return splits; } - private static List tasks(Table table, ScanContext context) { + static CloseableIterable planTasks(Table table, ScanContext context) { TableScan scan = table .newScan() .caseSensitive(context.caseSensitive()) .project(context.project()); + if (context.includeColumnStats()) { + scan = scan.includeColumnStats(); + } + if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); } @@ -83,10 +106,6 @@ private static List tasks(Table table, ScanContext context) { } } - try (CloseableIterable tasksIterable = scan.planTasks()) { - return Lists.newArrayList(tasksIterable); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close table scan: " + scan, e); - } + return scan.planTasks(); } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 2896efb39655..d290a6478f90 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -68,6 +68,9 @@ class ScanContext implements Serializable { private static final ConfigOption MONITOR_INTERVAL = ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10)); + private static final ConfigOption INCLUDE_COLUMN_STATS = + ConfigOptions.key("include-column-stats").booleanType().defaultValue(false); + private final boolean caseSensitive; private final Long snapshotId; private final Long startSnapshotId; @@ -83,11 +86,12 @@ class ScanContext implements Serializable { private final Schema schema; private final List filters; private final long limit; + private final boolean includeColumnStats; private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming, Duration monitorInterval, String nameMapping, - Schema schema, List filters, long limit) { + Schema schema, List filters, long limit, boolean includeColumnStats) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; this.startSnapshotId = startSnapshotId; @@ -103,6 +107,7 @@ private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId this.schema = schema; this.filters = filters; this.limit = limit; + this.includeColumnStats = includeColumnStats; } boolean caseSensitive() { @@ -161,6 +166,10 @@ long limit() { return limit; } + boolean includeColumnStats() { + return includeColumnStats; + } + ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) @@ -177,6 +186,7 @@ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotI .project(schema) .filters(filters) .limit(limit) + .includeColumnStats(includeColumnStats) .build(); } @@ -196,6 +206,7 @@ ScanContext copyWithSnapshotId(long newSnapshotId) { .project(schema) .filters(filters) .limit(limit) + .includeColumnStats(includeColumnStats) .build(); } @@ -218,6 +229,7 @@ static class Builder { private Schema projectedSchema; private List filters; private long limit = -1L; + private boolean includeColumnStats = INCLUDE_COLUMN_STATS.defaultValue(); private Builder() { } @@ -292,6 +304,11 @@ Builder limit(long newLimit) { return this; } + Builder includeColumnStats(boolean newIncludeColumnStats) { + this.includeColumnStats = newIncludeColumnStats; + return this; + } + Builder fromProperties(Map properties) { Configuration config = new Configuration(); properties.forEach(config::setString); @@ -306,14 +323,15 @@ Builder fromProperties(Map properties) { .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST)) .streaming(config.get(STREAMING)) .monitorInterval(config.get(MONITOR_INTERVAL)) - .nameMapping(properties.get(DEFAULT_NAME_MAPPING)); + .nameMapping(properties.get(DEFAULT_NAME_MAPPING)) + .includeColumnStats(config.get(INCLUDE_COLUMN_STATS)); } public ScanContext build() { return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback, splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema, - filters, limit); + filters, limit, includeColumnStats); } } } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index 9d8e204a2228..8bfad6d05fd7 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -140,7 +140,7 @@ private void monitorAndForwardSplits() { newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId); } - FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext); + FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(table, newScanContext); for (FlinkInputSplit split : splits) { sourceContext.collect(split); } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java new file mode 100644 index 000000000000..b46096af0e67 --- /dev/null +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.util.InstantiationUtil; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private static final long serialVersionUID = 1L; + + private final CombinedScanTask task; + + private int fileOffset; + private long recordOffset; + + // The splits are frequently serialized into checkpoints. + // Caching the byte representation makes repeated serialization cheap. + @Nullable + private transient byte[] serializedBytesCache; + + private IcebergSourceSplit(CombinedScanTask task, int fileOffset, long recordOffset) { + this.task = task; + this.fileOffset = fileOffset; + this.recordOffset = recordOffset; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, fileOffset, recordOffset); + } + + public CombinedScanTask task() { + return task; + } + + public int fileOffset() { + return fileOffset; + } + + public long recordOffset() { + return recordOffset; + } + + @Override + public String splitId() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .toString(); + } + + public void updatePosition(int newFileOffset, long newRecordOffset) { + // invalidate the cache after position change + serializedBytesCache = null; + fileOffset = newFileOffset; + recordOffset = newRecordOffset; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .add("fileOffset", fileOffset) + .add("recordOffset", recordOffset) + .toString(); + } + + private String toString(Collection files) { + return Iterables.toString(files.stream().map(fileScanTask -> + MoreObjects.toStringHelper(fileScanTask) + .add("file", fileScanTask.file().path().toString()) + .add("start", fileScanTask.start()) + .add("length", fileScanTask.length()) + .toString()).collect(Collectors.toList())); + } + + byte[] serializeV1() throws IOException { + if (serializedBytesCache == null) { + serializedBytesCache = InstantiationUtil.serializeObject(this); + } + return serializedBytesCache; + } + + static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject(serialized, IcebergSourceSplit.class.getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to deserialize the split.", e); + } + } +} diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java new file mode 100644 index 000000000000..9e32af5429b9 --- /dev/null +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.IOException; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * TODO: use Java serialization for now. + * Will switch to more stable serializer from + * issue-1698. + */ +@Internal +public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { + public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer(); + private static final int VERSION = 1; + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(IcebergSourceSplit split) throws IOException { + return split.serializeV1(); + } + + @Override + public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException { + switch (version) { + case 1: + return IcebergSourceSplit.deserializeV1(serialized); + default: + throw new IOException(String.format("Failed to deserialize IcebergSourceSplit. " + + "Encountered unsupported version: %d. Supported version are [1]", version)); + } + } +} diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java new file mode 100644 index 000000000000..df988089ab21 --- /dev/null +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +public class SplitHelpers { + + private static final AtomicLong splitLengthIncrement = new AtomicLong(); + + private SplitHelpers() { + } + + /** + * This create a list of IcebergSourceSplit from real files + *
  • Create a new Hadoop table under the {@code temporaryFolder} + *
  • write {@code fileCount} number of files to the new Iceberg table + *
  • Discover the splits from the table and partition the splits by the {@code filePerSplit} limit + *
  • Delete the Hadoop table + * + * Since the table and data files are deleted before this method return, + * caller shouldn't attempt to read the data files. + */ + public static List createSplitsFromTransientHadoopTable( + TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception { + final File warehouseFile = temporaryFolder.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + final String warehouse = "file:" + warehouseFile; + Configuration hadoopConf = new Configuration(); + final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse); + try { + final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + final GenericAppenderHelper dataAppender = new GenericAppenderHelper( + table, FileFormat.PARQUET, temporaryFolder); + for (int i = 0; i < fileCount; ++i) { + List records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); + dataAppender.appendToTable(records); + } + + final ScanContext scanContext = ScanContext.builder().build(); + final List splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext); + return splits.stream() + .flatMap(split -> { + List> filesList = Lists.partition( + Lists.newArrayList(split.task().files()), filesPerSplit); + return filesList.stream() + .map(files -> new BaseCombinedScanTask(files)) + .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); + }) + .collect(Collectors.toList()); + } finally { + catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); + catalog.close(); + } + } +} diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java index 19c2b6ad7d76..7978af6c4eec 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java @@ -254,7 +254,7 @@ private List generateSplits() { .build(); } - Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext)); + Collections.addAll(inputSplits, FlinkSplitPlanner.planInputSplits(table, scanContext)); } return inputSplits; diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java new file mode 100644 index 000000000000..36eea1e8a409 --- /dev/null +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.iceberg.flink.source.SplitHelpers; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergSourceSplitSerializer { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE; + + @Test + public void testLatestVersion() throws Exception { + serializeAndDeserialize(1, 1); + serializeAndDeserialize(10, 2); + } + + private void serializeAndDeserialize(int splitCount, int filesPerSplit) throws Exception { + final List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit); + for (IcebergSourceSplit split : splits) { + byte[] result = serializer.serialize(split); + IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); + assertSplitEquals(split, deserialized); + + byte[] cachedResult = serializer.serialize(split); + Assert.assertSame(result, cachedResult); + IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); + assertSplitEquals(split, deserialized2); + + split.updatePosition(0, 100); + byte[] resultAfterUpdatePosition = serializer.serialize(split); + // after position change, serialized bytes should have changed + Assert.assertNotSame(cachedResult, resultAfterUpdatePosition); + IcebergSourceSplit deserialized3 = serializer.deserialize(serializer.getVersion(), resultAfterUpdatePosition); + assertSplitEquals(split, deserialized3); + } + } + + @Test + public void testV1() throws Exception { + serializeAndDeserializeV1(1, 1); + serializeAndDeserializeV1(10, 2); + } + + private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws Exception { + final List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, splitCount, filesPerSplit); + for (IcebergSourceSplit split : splits) { + byte[] result = split.serializeV1(); + IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV1(result); + assertSplitEquals(split, deserialized); + } + } + + @Test + public void testCheckpointedPosition() throws Exception { + final AtomicInteger index = new AtomicInteger(); + final List splits = SplitHelpers + .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 2).stream() + .map(split -> { + IcebergSourceSplit result; + if (index.get() % 2 == 0) { + result = IcebergSourceSplit.fromCombinedScanTask(split.task(), index.get(), index.get()); + } else { + result = split; + } + index.incrementAndGet(); + return result; + }) + .collect(Collectors.toList()); + + for (IcebergSourceSplit split : splits) { + byte[] result = serializer.serialize(split); + IcebergSourceSplit deserialized = serializer.deserialize(serializer.getVersion(), result); + assertSplitEquals(split, deserialized); + + byte[] cachedResult = serializer.serialize(split); + Assert.assertSame(result, cachedResult); + IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); + assertSplitEquals(split, deserialized2); + } + } + + private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) { + Assert.assertEquals(expected.splitId(), actual.splitId()); + Assert.assertEquals(expected.fileOffset(), actual.fileOffset()); + Assert.assertEquals(expected.recordOffset(), actual.recordOffset()); + } +}