From a00a9bd5c093965f123033bb7344643c25624085 Mon Sep 17 00:00:00 2001 From: Alex Reid Date: Thu, 9 Feb 2023 10:16:18 -0800 Subject: [PATCH] API,Core,Spark: Add rewritten bytes to rewrite data files procedure results --- .../iceberg/actions/RewriteDataFiles.java | 8 ++ .../actions/BaseFileGroupRewriteResult.java | 18 +++ .../iceberg/actions/RewriteFileGroup.java | 4 +- .../TestRewriteDataFilesProcedure.java | 124 ++++++++++++++---- .../procedures/RewriteDataFilesProcedure.java | 7 +- .../iceberg/spark/SparkCatalogConfig.java | 5 +- .../iceberg/spark/SparkTestHelperBase.java | 2 +- .../actions/TestRewriteDataFilesAction.java | 55 ++++++++ .../iceberg/spark/sql/TestRefreshTable.java | 4 +- 9 files changed, 197 insertions(+), 30 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index 39e2b9bc66f3..5f76528d6019 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -187,6 +187,10 @@ default int rewrittenDataFilesCount() { .mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount) .sum(); } + + default long rewrittenBytesCount() { + return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum(); + } } /** @@ -199,6 +203,10 @@ interface FileGroupRewriteResult { int addedDataFilesCount(); int rewrittenDataFilesCount(); + + default long rewrittenBytesCount() { + return 0L; + } } /** diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java index fd44f7f6a333..f730c4303f97 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java @@ -24,13 +24,26 @@ public class BaseFileGroupRewriteResult implements FileGroupRewriteResult { private final int addedDataFilesCount; private final int rewrittenDataFilesCount; + private final long rewrittenBytesCount; private final FileGroupInfo info; + /** + * @deprecated Will be removed in 1.3.0; use {@link + * BaseFileGroupRewriteResult#BaseFileGroupRewriteResult(FileGroupInfo, int, int, long)} + * instead. + */ + @Deprecated public BaseFileGroupRewriteResult( FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) { + this(info, addedFilesCount, rewrittenFilesCount, 0L); + } + + public BaseFileGroupRewriteResult( + FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount, long rewrittenBytesCount) { this.info = info; this.addedDataFilesCount = addedFilesCount; this.rewrittenDataFilesCount = rewrittenFilesCount; + this.rewrittenBytesCount = rewrittenBytesCount; } @Override @@ -47,4 +60,9 @@ public int addedDataFilesCount() { public int rewrittenDataFilesCount() { return rewrittenDataFilesCount; } + + @Override + public long rewrittenBytesCount() { + return rewrittenBytesCount; + } } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index dd4516be76c0..c26753c01d7d 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -65,7 +65,8 @@ public Set addedFiles() { public RewriteDataFiles.FileGroupRewriteResult asResult() { Preconditions.checkState(addedFiles != null, "Cannot get result, Group was never rewritten"); - return new BaseFileGroupRewriteResult(info, addedFiles.size(), fileScanTasks.size()); + return new BaseFileGroupRewriteResult( + info, addedFiles.size(), fileScanTasks.size(), sizeInBytes()); } @Override @@ -76,6 +77,7 @@ public String toString() { .add( "numAddedFiles", addedFiles == null ? "Rewrite Incomplete" : Integer.toString(addedFiles.size())) + .add("numRewrittenBytes", sizeInBytes()) .toString(); } diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 5e80c85864ba..4126f359193e 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -18,10 +18,15 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.NamedReference; import org.apache.iceberg.expressions.Zorder; @@ -69,7 +74,7 @@ public void testZOrderSortExpression() { public void testRewriteDataFilesInEmptyTable() { createTable(); List output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output); + assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output); } @Test @@ -84,8 +89,13 @@ public void testRewriteDataFilesOnPartitionTable() { assertEquals( "Action should rewrite 10 data files and add 2 data files (one per partition) ", - ImmutableList.of(row(10, 2)), - output); + row(10, 2), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -103,8 +113,13 @@ public void testRewriteDataFilesOnNonPartitionTable() { assertEquals( "Action should rewrite 10 data files and add 1 data files", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -125,7 +140,7 @@ public void testRewriteDataFilesWithOptions() { assertEquals( "Action should rewrite 0 data files and add 0 data files", - ImmutableList.of(row(0, 0)), + ImmutableList.of(row(0, 0, 0L)), output); List actualRecords = currentData(); @@ -148,8 +163,13 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals( "Action should rewrite 10 data files and add 1 data files", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -170,8 +190,13 @@ public void testRewriteDataFilesWithZOrder() { assertEquals( "Action should rewrite 10 data files and add 1 data files", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); // Due to Z_order, the data written will be in the below order. // As there is only one small output file, we can validate the query ordering (as it will not @@ -207,8 +232,13 @@ public void testRewriteDataFilesWithFilter() { assertEquals( "Action should rewrite 5 data files (containing c1 = 1) and add 1 data files", - ImmutableList.of(row(5, 1)), - output); + row(5, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -230,8 +260,13 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() { assertEquals( "Action should rewrite 5 data files from single matching partition" + "(containing c2 = bar) and add 1 data files", - ImmutableList.of(row(5, 1)), - output); + row(5, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -253,8 +288,13 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() { assertEquals( "Action should rewrite 5 data files from single matching partition" + "(containing c2 = bar) and add 1 data files", - ImmutableList.of(row(5, 1)), - output); + row(5, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -471,6 +511,8 @@ public void testInvalidCasesForRewriteDataFiles() { public void testBinPackTableWithSpecialChars() { Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())); + TableIdentifier identifier = + TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")); sql( "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); @@ -486,8 +528,13 @@ public void testBinPackTableWithSpecialChars() { assertEquals( "Action should rewrite 10 data files and add 1 data file", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isEqualTo( + Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -499,6 +546,8 @@ public void testBinPackTableWithSpecialChars() { public void testSortTableWithSpecialChars() { Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())); + TableIdentifier identifier = + TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")); sql( "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); @@ -518,8 +567,14 @@ public void testSortTableWithSpecialChars() { assertEquals( "Action should rewrite 10 data files and add 1 data file", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo( + Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -531,6 +586,8 @@ public void testSortTableWithSpecialChars() { public void testZOrderTableWithSpecialChars() { Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())); + TableIdentifier identifier = + TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", "")); sql( "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); @@ -550,8 +607,14 @@ public void testZOrderTableWithSpecialChars() { assertEquals( "Action should rewrite 10 data files and add 1 data file", - ImmutableList.of(row(10, 1)), - output); + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo( + Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -580,8 +643,13 @@ public void testDefaultSortOrder() { assertEquals( "Action should rewrite 2 data files and add 1 data files", - ImmutableList.of(row(2, 1)), - output); + row(2, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); List actualRecords = currentData(); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); @@ -622,6 +690,14 @@ private void insertData(String table, int filesCount) { } } + private Map snapshotSummary() { + return snapshotSummary(tableIdent); + } + + private Map snapshotSummary(TableIdentifier tableIdentifier) { + return validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary(); + } + private List currentData() { return currentData(tableName); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java index 39b8f7f30a94..1aea61e74785 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java @@ -67,7 +67,8 @@ class RewriteDataFilesProcedure extends BaseProcedure { new StructField( "rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()), new StructField( - "added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()) + "added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()) }); public static ProcedureBuilder builder() { @@ -213,8 +214,10 @@ private SortOrder buildSortOrder( private InternalRow[] toOutputRows(RewriteDataFiles.Result result) { int rewrittenDataFilesCount = result.rewrittenDataFilesCount(); + long rewrittenBytesCount = result.rewrittenBytesCount(); int addedDataFilesCount = result.addedDataFilesCount(); - InternalRow row = newInternalRow(rewrittenDataFilesCount, addedDataFilesCount); + InternalRow row = + newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount); return new InternalRow[] {row}; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index 1006ed380ff9..fc18ed3bb174 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -28,7 +28,10 @@ public enum SparkCatalogConfig { ImmutableMap.of( "type", "hive", "default-namespace", "default")), - HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop")), + HADOOP( + "testhadoop", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hadoop", "cache-enabled", "false")), SPARK( "spark_catalog", SparkSessionCatalog.class.getName(), diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java index 9d21f392a811..97484702cad6 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java @@ -68,7 +68,7 @@ protected void assertEquals( } } - private void assertEquals(String context, Object[] expectedRow, Object[] actualRow) { + protected void assertEquals(String context, Object[] expectedRow, Object[] actualRow) { Assert.assertEquals("Number of columns should match", expectedRow.length, actualRow.length); for (int col = 0; col < actualRow.length; col += 1) { Object expectedValue = expectedRow[col]; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index ebe02bb98d2d..61a855b1dd52 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -23,6 +23,7 @@ import static org.apache.spark.sql.functions.current_date; import static org.apache.spark.sql.functions.date_add; import static org.apache.spark.sql.functions.expr; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; @@ -155,10 +156,12 @@ public void testBinPackUnpartitionedTable() { Table table = createTable(4); shouldHaveFiles(table, 4); List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table).execute(); Assert.assertEquals("Action should rewrite 4 data files", 4, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 1); List actual = currentData(); @@ -171,10 +174,12 @@ public void testBinPackPartitionedTable() { Table table = createTablePartitioned(4, 2); shouldHaveFiles(table, 8); List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table).execute(); Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 4); List actualRecords = currentData(); @@ -187,6 +192,7 @@ public void testBinPackWithFilter() { Table table = createTablePartitioned(4, 2); shouldHaveFiles(table, 8); List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table) @@ -196,6 +202,7 @@ public void testBinPackWithFilter() { Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); shouldHaveFiles(table, 7); @@ -212,6 +219,7 @@ public void testBinPackAfterPartitionChange() { table.updateSpec().addField(Expressions.ref("c1")).commit(); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -227,6 +235,7 @@ public void testBinPackAfterPartitionChange() { "Should have 1 fileGroup because all files were not correctly partitioned", 1, result.rewriteResults().size()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -262,6 +271,8 @@ public void testBinPackWithDeletes() throws Exception { rowDelta.commit(); table.refresh(); List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); + Result result = actions() .rewriteDataFiles(table) @@ -272,6 +283,7 @@ public void testBinPackWithDeletes() throws Exception { .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2") .execute(); Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); List actualRecords = currentData(); assertEquals("Rows must match", expectedRecords, actualRecords); @@ -298,12 +310,15 @@ public void testBinPackWithDeleteAllData() { rowDelta.commit(); table.refresh(); List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); + Result result = actions() .rewriteDataFiles(table) .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1") .execute(); Assert.assertEquals("Action should rewrite 1 data files", 1, result.rewrittenDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); List actualRecords = currentData(); assertEquals("Rows must match", expectedRecords, actualRecords); @@ -329,11 +344,13 @@ public void testBinPackWithStartingSequenceNumber() { table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); table.refresh(); long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true").execute(); Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 4); List actualRecords = currentData(); @@ -361,11 +378,13 @@ public void testBinPackWithStartingSequenceNumberV1Compatibility() { table.refresh(); long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true").execute(); Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 4); List actualRecords = currentData(); @@ -411,9 +430,11 @@ public void testRewriteLargeTableHasResiduals() { shouldHaveFiles(table, 2); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table).filter(Expressions.equal("c3", "0")).execute(); Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); List actualRecords = currentData(); @@ -428,6 +449,7 @@ public void testBinPackSplitLargeFile() { List expectedRecords = currentData(); long targetSize = testDataSize(table) / 2; + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(targetSize)) @@ -436,6 +458,7 @@ public void testBinPackSplitLargeFile() { Assert.assertEquals("Action should delete 1 data files", 1, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 2); @@ -457,6 +480,7 @@ public void testBinPackCombineMixedFiles() { int targetSize = averageFileSize(table); + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(targetSize + 1000)) @@ -468,6 +492,7 @@ public void testBinPackCombineMixedFiles() { // Should Split the big files into 3 pieces, one of which should be combined with the two // smaller files Assert.assertEquals("Action should add 3 data files", 3, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 3); @@ -484,6 +509,7 @@ public void testBinPackCombineMediumFiles() { int targetSize = ((int) testDataSize(table) / 3); // The test is to see if we can combine parts of files to make files of the correct size + long dataSizeBefore = testDataSize(table); Result result = basicRewrite(table) .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(targetSize)) @@ -495,6 +521,7 @@ public void testBinPackCombineMediumFiles() { Assert.assertEquals("Action should delete 4 data files", 4, result.rewrittenDataFilesCount()); Assert.assertEquals("Action should add 3 data files", 3, result.addedDataFilesCount()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); shouldHaveFiles(table, 3); @@ -508,6 +535,7 @@ public void testPartialProgressEnabled() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); // Perform a rewrite but only allow 2 files to be compacted at a time RewriteDataFiles.Result result = @@ -519,6 +547,7 @@ public void testPartialProgressEnabled() { .execute(); Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -535,6 +564,7 @@ public void testMultipleGroups() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); // Perform a rewrite but only allow 2 files to be compacted at a time RewriteDataFiles.Result result = @@ -545,6 +575,7 @@ public void testMultipleGroups() { .execute(); Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -561,6 +592,7 @@ public void testPartialProgressMaxCommits() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); // Perform a rewrite but only allow 2 files to be compacted at a time RewriteDataFiles.Result result = @@ -572,6 +604,7 @@ public void testPartialProgressMaxCommits() { .execute(); Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -694,6 +727,7 @@ public void testPartialProgressWithRewriteFailure() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFilesSparkAction realRewrite = basicRewrite(table) @@ -713,6 +747,7 @@ public void testPartialProgressWithRewriteFailure() { RewriteDataFiles.Result result = spyRewrite.execute(); Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); table.refresh(); @@ -732,6 +767,7 @@ public void testParallelPartialProgressWithRewriteFailure() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFilesSparkAction realRewrite = basicRewrite(table) @@ -752,6 +788,7 @@ public void testParallelPartialProgressWithRewriteFailure() { RewriteDataFiles.Result result = spyRewrite.execute(); Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); table.refresh(); @@ -771,6 +808,7 @@ public void testParallelPartialProgressWithCommitFailure() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFilesSparkAction realRewrite = basicRewrite(table) @@ -796,6 +834,7 @@ public void testParallelPartialProgressWithCommitFailure() { // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10 total groups comitted Assert.assertEquals("Should have 6 fileGroups", 6, result.rewriteResults().size()); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); table.refresh(); @@ -849,6 +888,7 @@ public void testSortMultipleGroups() { int fileSize = averageFileSize(table); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); // Perform a rewrite but only allow 2 files to be compacted at a time RewriteDataFiles.Result result = @@ -860,6 +900,7 @@ public void testSortMultipleGroups() { .execute(); Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -878,6 +919,7 @@ public void testSimpleSort() { shouldHaveLastCommitUnsorted(table, "c2"); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -889,6 +931,7 @@ public void testSimpleSort() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -910,6 +953,7 @@ public void testSortAfterPartitionChange() { shouldHaveLastCommitUnsorted(table, "c2"); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -924,6 +968,7 @@ public void testSortAfterPartitionChange() { "Should have 1 fileGroup because all files were not correctly partitioned", result.rewriteResults().size(), 1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -943,6 +988,7 @@ public void testSortCustomSortOrder() { shouldHaveFiles(table, 20); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -953,6 +999,7 @@ public void testSortCustomSortOrder() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -979,6 +1026,7 @@ public void testSortCustomSortOrderRequiresRepartition() { shouldHaveFiles(table, 20); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -990,6 +1038,7 @@ public void testSortCustomSortOrderRequiresRepartition() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); table.refresh(); @@ -1010,6 +1059,7 @@ public void testAutoSortShuffleOutput() { shouldHaveFiles(table, 20); List originalData = currentData(); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) @@ -1025,6 +1075,7 @@ public void testAutoSortShuffleOutput() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); Assert.assertTrue( "Should have written 40+ files", Iterables.size(table.currentSnapshot().addedDataFiles(table.io())) >= 40); @@ -1088,6 +1139,7 @@ public void testZOrderSort() { Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 0.99); Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 0.99); + long dataSizeBefore = testDataSize(table); RewriteDataFiles.Result result = basicRewrite(table) .zOrder("c2", "c3") @@ -1102,6 +1154,7 @@ public void testZOrderSort() { .execute(); Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedDataFiles(table.io())); Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40); @@ -1137,6 +1190,7 @@ public void testZOrderAllTypesSort() { List originalRaw = spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); List originalData = rowsToJava(originalRaw); + long dataSizeBefore = testDataSize(table); // TODO add in UUID when it is supported in Spark RewriteDataFiles.Result result = @@ -1156,6 +1210,7 @@ public void testZOrderAllTypesSort() { .execute(); Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedDataFiles(table.io())); Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java index 3eaca6329477..7da2dc0882db 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.junit.After; import org.junit.Before; @@ -49,7 +50,8 @@ public void removeTables() { public void testRefreshCommand() { // We are not allowed to change the session catalog after it has been initialized, so build a // new one - if (catalogName.equals("spark_catalog")) { + if (catalogName.equals(SparkCatalogConfig.SPARK.catalogName()) + || catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())) { spark.conf().set("spark.sql.catalog." + catalogName + ".cache-enabled", true); spark = spark.cloneSession(); }