diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index d8c91b5a64ab..7e7f2c6fe7c4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.GenericRowData; @@ -37,6 +38,9 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -55,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; @@ -237,18 +242,18 @@ public static StructLikeSet actualRowSet(Table table, Long snapshotId, String... public static List partitionDataFiles(Table table, Map partitionValues) throws IOException { table.refresh(); - Types.StructType spec = table.spec().partitionType(); + Types.StructType partitionType = table.spec().partitionType(); - Record partitionRecord = GenericRecord.create(spec).copy(partitionValues); + Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues); StructLikeWrapper expectedWrapper = StructLikeWrapper - .forType(spec) + .forType(partitionType) .set(partitionRecord); List dataFiles = Lists.newArrayList(); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { for (FileScanTask scanTask : fileScanTasks) { StructLikeWrapper wrapper = StructLikeWrapper - .forType(spec) + .forType(partitionType) .set(scanTask.file().partition()); if (expectedWrapper.equals(wrapper)) { @@ -259,4 +264,35 @@ public static List partitionDataFiles(Table table, Map return dataFiles; } + + public static Map> snapshotToDataFiles(Table table) throws IOException { + table.refresh(); + Map> result = Maps.newHashMap(); + List manifestFiles = table.currentSnapshot().dataManifests(); + for (ManifestFile manifestFile : manifestFiles) { + try (ManifestReader reader = ManifestFiles.read(manifestFile, table.io())) { + List dataFiles = Lists.newArrayList(reader); + if (result.containsKey(manifestFile.snapshotId())) { + result.get(manifestFile.snapshotId()).addAll(dataFiles); + } else { + result.put(manifestFile.snapshotId(), dataFiles); + } + } + } + return result; + } + + public static List matchingPartitions( + List dataFiles, PartitionSpec partitionSpec, Map partitionValues) { + Types.StructType partitionType = partitionSpec.partitionType(); + Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues); + StructLikeWrapper expected = StructLikeWrapper + .forType(partitionType) + .set(partitionRecord); + return dataFiles.stream().filter(df -> { + StructLikeWrapper wrapper = StructLikeWrapper.forType(partitionType).set(df.partition()); + return wrapper.equals(expected); + }).collect(Collectors.toList()); + } + } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 03bf26d08558..eb37e307c814 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; @@ -170,81 +171,82 @@ public void testOverwriteTable() throws Exception { public void testReplacePartitions() throws Exception { Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", isStreamingJob); String tableName = "test_partition"; - sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, format.name()); - Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - - sql("INSERT INTO %s SELECT 1, 'a'", tableName); - sql("INSERT INTO %s SELECT 2, 'b'", tableName); - sql("INSERT INTO %s SELECT 3, 'c'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName); - sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(5, "a"), - SimpleDataUtil.createRecord(4, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(6, "a"), - SimpleDataUtil.createRecord(4, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + sql("INSERT INTO %s SELECT 1, 'a'", tableName); + sql("INSERT INTO %s SELECT 2, 'b'", tableName); + sql("INSERT INTO %s SELECT 3, 'c'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName); + sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(5, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(6, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } } @Test public void testInsertIntoPartition() throws Exception { String tableName = "test_insert_into_partition"; - sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, format.name()); - Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - - // Full partition. - sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName); - sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName); - sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "a"), - SimpleDataUtil.createRecord(3, "b") - )); - - // Partial partition. - sql("INSERT INTO %s SELECT 4, 'c'", tableName); - sql("INSERT INTO %s SELECT 5, 'd'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "a"), - SimpleDataUtil.createRecord(3, "b"), - SimpleDataUtil.createRecord(4, "c"), - SimpleDataUtil.createRecord(5, "d") - )); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + // Full partition. + sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName); + sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName); + sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b") + )); + + // Partial partition. + sql("INSERT INTO %s SELECT 4, 'c'", tableName); + sql("INSERT INTO %s SELECT 5, 'd'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b"), + SimpleDataUtil.createRecord(4, "c"), + SimpleDataUtil.createRecord(5, "d") + )); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } } @Test public void testHashDistributeMode() throws Exception { String tableName = "test_hash_distribution_mode"; - Map tableProps = ImmutableMap.of( "write.format.default", format.name(), TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName() @@ -252,32 +254,39 @@ public void testHashDistributeMode() throws Exception { sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableProps)); - // Insert data set. - sql("INSERT INTO %s VALUES " + - "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + - "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + - "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); - - Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - SimpleDataUtil.assertTableRecords(table, ImmutableList.of( - SimpleDataUtil.createRecord(1, "aaa"), - SimpleDataUtil.createRecord(1, "bbb"), - SimpleDataUtil.createRecord(1, "ccc"), - SimpleDataUtil.createRecord(2, "aaa"), - SimpleDataUtil.createRecord(2, "bbb"), - SimpleDataUtil.createRecord(2, "ccc"), - SimpleDataUtil.createRecord(3, "aaa"), - SimpleDataUtil.createRecord(3, "bbb"), - SimpleDataUtil.createRecord(3, "ccc") - )); - - Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "aaa")).size()); - Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "bbb")).size()); - Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "ccc")).size()); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + // Insert data set. + sql("INSERT INTO %s VALUES " + + "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + + "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + + "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); + + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + SimpleDataUtil.assertTableRecords(table, ImmutableList.of( + SimpleDataUtil.createRecord(1, "aaa"), + SimpleDataUtil.createRecord(1, "bbb"), + SimpleDataUtil.createRecord(1, "ccc"), + SimpleDataUtil.createRecord(2, "aaa"), + SimpleDataUtil.createRecord(2, "bbb"), + SimpleDataUtil.createRecord(2, "ccc"), + SimpleDataUtil.createRecord(3, "aaa"), + SimpleDataUtil.createRecord(3, "bbb"), + SimpleDataUtil.createRecord(3, "ccc") + )); + + // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval, + // thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition. + Map> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table); + for (List dataFiles : snapshotToDataFiles.values()) { + Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1, + SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size()); + Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1, + SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "bbb")).size()); + Assert.assertEquals("There should be 1 data file in partition 'ccc'", 1, + SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "ccc")).size()); + } + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } } }