Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -37,6 +38,9 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -55,6 +59,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
Expand Down Expand Up @@ -237,18 +242,18 @@ public static StructLikeSet actualRowSet(Table table, Long snapshotId, String...
public static List<DataFile> partitionDataFiles(Table table, Map<String, Object> partitionValues)
throws IOException {
table.refresh();
Types.StructType spec = table.spec().partitionType();
Types.StructType partitionType = table.spec().partitionType();

Record partitionRecord = GenericRecord.create(spec).copy(partitionValues);
Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues);
StructLikeWrapper expectedWrapper = StructLikeWrapper
.forType(spec)
.forType(partitionType)
.set(partitionRecord);

List<DataFile> dataFiles = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
for (FileScanTask scanTask : fileScanTasks) {
StructLikeWrapper wrapper = StructLikeWrapper
.forType(spec)
.forType(partitionType)
.set(scanTask.file().partition());

if (expectedWrapper.equals(wrapper)) {
Expand All @@ -259,4 +264,35 @@ public static List<DataFile> partitionDataFiles(Table table, Map<String, Object>

return dataFiles;
}

public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
table.refresh();
Map<Long, List<DataFile>> result = Maps.newHashMap();
List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests();
for (ManifestFile manifestFile : manifestFiles) {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
List<DataFile> dataFiles = Lists.newArrayList(reader);
if (result.containsKey(manifestFile.snapshotId())) {
result.get(manifestFile.snapshotId()).addAll(dataFiles);
} else {
result.put(manifestFile.snapshotId(), dataFiles);
}
}
}
return result;
}

public static List<DataFile> matchingPartitions(
List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) {
Types.StructType partitionType = partitionSpec.partitionType();
Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues);
StructLikeWrapper expected = StructLikeWrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionData implements the equals method. we can construct PartitionData using this API from DataFiles class. not sure if it is better. but at least it is more specific.

  public static PartitionData copy(PartitionSpec spec, StructLike partition) {
    return copyPartitionData(spec, partition, null);
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, but when I tried I found PartitionData is package protected so can't access it here unfortunately.

.forType(partitionType)
.set(partitionRecord);
return dataFiles.stream().filter(df -> {
StructLikeWrapper wrapper = StructLikeWrapper.forType(partitionType).set(df.partition());
return wrapper.equals(expected);
}).collect(Collectors.toList());
}

}
181 changes: 95 additions & 86 deletions flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -170,114 +171,122 @@ public void testOverwriteTable() throws Exception {
public void testReplacePartitions() throws Exception {
Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", isStreamingJob);
String tableName = "test_partition";

sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')",
tableName, format.name());

Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));

sql("INSERT INTO %s SELECT 1, 'a'", tableName);
sql("INSERT INTO %s SELECT 2, 'b'", tableName);
sql("INSERT INTO %s SELECT 3, 'c'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "b"),
SimpleDataUtil.createRecord(3, "c")
));

sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(5, "a"),
SimpleDataUtil.createRecord(4, "b"),
SimpleDataUtil.createRecord(3, "c")
));

sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(6, "a"),
SimpleDataUtil.createRecord(4, "b"),
SimpleDataUtil.createRecord(3, "c")
));

sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
try {
Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));

sql("INSERT INTO %s SELECT 1, 'a'", tableName);
sql("INSERT INTO %s SELECT 2, 'b'", tableName);
sql("INSERT INTO %s SELECT 3, 'c'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "b"),
SimpleDataUtil.createRecord(3, "c")
));

sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(5, "a"),
SimpleDataUtil.createRecord(4, "b"),
SimpleDataUtil.createRecord(3, "c")
));

sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(6, "a"),
SimpleDataUtil.createRecord(4, "b"),
SimpleDataUtil.createRecord(3, "c")
));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testInsertIntoPartition() throws Exception {
String tableName = "test_insert_into_partition";

sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')",
tableName, format.name());

Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));

// Full partition.
sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "a"),
SimpleDataUtil.createRecord(3, "b")
));

// Partial partition.
sql("INSERT INTO %s SELECT 4, 'c'", tableName);
sql("INSERT INTO %s SELECT 5, 'd'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "a"),
SimpleDataUtil.createRecord(3, "b"),
SimpleDataUtil.createRecord(4, "c"),
SimpleDataUtil.createRecord(5, "d")
));

sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
try {
Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));

// Full partition.
sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "a"),
SimpleDataUtil.createRecord(3, "b")
));

// Partial partition.
sql("INSERT INTO %s SELECT 4, 'c'", tableName);
sql("INSERT INTO %s SELECT 5, 'd'", tableName);

SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
SimpleDataUtil.createRecord(1, "a"),
SimpleDataUtil.createRecord(2, "a"),
SimpleDataUtil.createRecord(3, "b"),
SimpleDataUtil.createRecord(4, "c"),
SimpleDataUtil.createRecord(5, "d")
));
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

@Test
public void testHashDistributeMode() throws Exception {
String tableName = "test_hash_distribution_mode";

Map<String, String> tableProps = ImmutableMap.of(
"write.format.default", format.name(),
TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName()
);
sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableProps));

// Insert data set.
sql("INSERT INTO %s VALUES " +
"(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
"(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
"(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);

Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
SimpleDataUtil.createRecord(1, "aaa"),
SimpleDataUtil.createRecord(1, "bbb"),
SimpleDataUtil.createRecord(1, "ccc"),
SimpleDataUtil.createRecord(2, "aaa"),
SimpleDataUtil.createRecord(2, "bbb"),
SimpleDataUtil.createRecord(2, "ccc"),
SimpleDataUtil.createRecord(3, "aaa"),
SimpleDataUtil.createRecord(3, "bbb"),
SimpleDataUtil.createRecord(3, "ccc")
));

Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1,
SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "aaa")).size());
Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1,
SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "bbb")).size());
Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1,
SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "ccc")).size());

sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
try {
// Insert data set.
sql("INSERT INTO %s VALUES " +
"(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
"(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
"(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);

Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
SimpleDataUtil.createRecord(1, "aaa"),
SimpleDataUtil.createRecord(1, "bbb"),
SimpleDataUtil.createRecord(1, "ccc"),
SimpleDataUtil.createRecord(2, "aaa"),
SimpleDataUtil.createRecord(2, "bbb"),
SimpleDataUtil.createRecord(2, "ccc"),
SimpleDataUtil.createRecord(3, "aaa"),
SimpleDataUtil.createRecord(3, "bbb"),
SimpleDataUtil.createRecord(3, "ccc")
));

// Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval,
// thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition.
Map<Long, List<DataFile>> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table);
for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size());
Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "bbb")).size());
Assert.assertEquals("There should be 1 data file in partition 'ccc'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "ccc")).size());
}
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}
}