Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
Expand Down Expand Up @@ -69,7 +74,7 @@ public void testZOrderSortExpression() {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
}

@Test
Expand All @@ -84,8 +89,13 @@ public void testRewriteDataFilesOnPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 2 data files (one per partition) ",
ImmutableList.of(row(10, 2)),
output);
row(10, 2),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -103,8 +113,13 @@ public void testRewriteDataFilesOnNonPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -125,7 +140,7 @@ public void testRewriteDataFilesWithOptions() {

assertEquals(
"Action should rewrite 0 data files and add 0 data files",
ImmutableList.of(row(0, 0)),
ImmutableList.of(row(0, 0, 0L)),
output);

List<Object[]> actualRecords = currentData();
Expand All @@ -148,8 +163,13 @@ public void testRewriteDataFilesWithSortStrategy() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -170,8 +190,13 @@ public void testRewriteDataFilesWithZOrder() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

// Due to Z_order, the data written will be in the below order.
// As there is only one small output file, we can validate the query ordering (as it will not
Expand Down Expand Up @@ -207,8 +232,13 @@ public void testRewriteDataFilesWithFilter() {

assertEquals(
"Action should rewrite 5 data files (containing c1 = 1) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -230,8 +260,13 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -253,8 +288,13 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -471,6 +511,8 @@ public void testInvalidCasesForRewriteDataFiles() {
public void testBinPackTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -486,8 +528,13 @@ public void testBinPackTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -499,6 +546,8 @@ public void testBinPackTableWithSpecialChars() {
public void testSortTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -518,8 +567,14 @@ public void testSortTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -531,6 +586,8 @@ public void testSortTableWithSpecialChars() {
public void testZOrderTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -550,8 +607,14 @@ public void testZOrderTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -580,8 +643,13 @@ public void testDefaultSortOrder() {

assertEquals(
"Action should rewrite 2 data files and add 1 data files",
ImmutableList.of(row(2, 1)),
output);
row(2, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -622,6 +690,14 @@ private void insertData(String table, int filesCount) {
}
}

private Map<String, String> snapshotSummary() {
return snapshotSummary(tableIdent);
}

private Map<String, String> snapshotSummary(TableIdentifier tableIdentifier) {
return validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary();
}

private List<Object[]> currentData() {
return currentData(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
new StructField(
"rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
});

public static ProcedureBuilder builder() {
Expand Down Expand Up @@ -213,8 +214,10 @@ private SortOrder buildSortOrder(

private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
InternalRow row = newInternalRow(rewrittenDataFilesCount, addedDataFilesCount);
InternalRow row =
newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
return new InternalRow[] {row};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public enum SparkCatalogConfig {
ImmutableMap.of(
"type", "hive",
"default-namespace", "default")),
HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop")),
HADOOP(
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void assertEquals(
}
}

private void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
protected void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
Assert.assertEquals("Number of columns should match", expectedRow.length, actualRow.length);
for (int col = 0; col < actualRow.length; col += 1) {
Object expectedValue = expectedRow[col];
Expand Down
Loading