Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -49,7 +53,7 @@ public void removeTable() {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
}

@Test
Expand All @@ -64,8 +68,13 @@ public void testRewriteDataFilesOnPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 2 data files (one per partition) ",
ImmutableList.of(row(10, 2)),
output);
row(10, 2),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -83,8 +92,13 @@ public void testRewriteDataFilesOnNonPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -105,7 +119,7 @@ public void testRewriteDataFilesWithOptions() {

assertEquals(
"Action should rewrite 0 data files and add 0 data files",
ImmutableList.of(row(0, 0)),
ImmutableList.of(row(0, 0, 0L)),
output);

List<Object[]> actualRecords = currentData();
Expand All @@ -128,8 +142,13 @@ public void testRewriteDataFilesWithSortStrategy() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -151,8 +170,13 @@ public void testRewriteDataFilesWithFilter() {

assertEquals(
"Action should rewrite 5 data files (containing c1 = 1) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -174,8 +198,13 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -197,8 +226,13 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -409,6 +443,10 @@ private void insertData(int filesCount) {
}
}

private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}

private List<Object[]> currentData() {
return rowsToJava(
spark.sql("SELECT * FROM " + tableName + " order by c1, c2, c3").collectAsList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
new StructField(
"rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
});

public static ProcedureBuilder builder() {
Expand Down Expand Up @@ -187,8 +188,10 @@ private SortOrder collectSortOrders(Table table, String sortOrderStr) {

private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
InternalRow row = newInternalRow(rewrittenDataFilesCount, addedDataFilesCount);
InternalRow row =
newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
return new InternalRow[] {row};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public enum SparkCatalogConfig {
ImmutableMap.of(
"type", "hive",
"default-namespace", "default")),
HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop")),
HADOOP(
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ protected void assertEquals(
}
}

private void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
protected void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
Assert.assertEquals("Number of columns should match", expectedRow.length, actualRow.length);
for (int col = 0; col < actualRow.length; col += 1) {
Object expectedValue = expectedRow[col];
Expand Down
Loading