Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ default int rewrittenDataFilesCount() {
.mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount)
.sum();
}

default long rewrittenBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}
}

/**
Expand All @@ -199,6 +203,10 @@ interface FileGroupRewriteResult {
int addedDataFilesCount();

int rewrittenDataFilesCount();

default long rewrittenBytesCount() {
return 0L;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,26 @@
public class BaseFileGroupRewriteResult implements FileGroupRewriteResult {
private final int addedDataFilesCount;
private final int rewrittenDataFilesCount;
private final long rewrittenBytesCount;
private final FileGroupInfo info;

/**
* @deprecated Will be removed in 1.3.0; use {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue for this, and add it to the 1.3.0 milestone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll usually clean things like this up right after a release, so I'll open an issue for that shortly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, just to make sure that's in the collective memory of the community :)

* BaseFileGroupRewriteResult#BaseFileGroupRewriteResult(FileGroupInfo, int, int, long)}
* instead.
*/
@Deprecated
public BaseFileGroupRewriteResult(
FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) {
this(info, addedFilesCount, rewrittenFilesCount, 0L);
}

public BaseFileGroupRewriteResult(
FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount, long rewrittenBytesCount) {
this.info = info;
this.addedDataFilesCount = addedFilesCount;
this.rewrittenDataFilesCount = rewrittenFilesCount;
this.rewrittenBytesCount = rewrittenBytesCount;
}

@Override
Expand All @@ -47,4 +60,9 @@ public int addedDataFilesCount() {
public int rewrittenDataFilesCount() {
return rewrittenDataFilesCount;
}

@Override
public long rewrittenBytesCount() {
return rewrittenBytesCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public Set<DataFile> addedFiles() {

public RewriteDataFiles.FileGroupRewriteResult asResult() {
Preconditions.checkState(addedFiles != null, "Cannot get result, Group was never rewritten");
return new BaseFileGroupRewriteResult(info, addedFiles.size(), fileScanTasks.size());
return new BaseFileGroupRewriteResult(
info, addedFiles.size(), fileScanTasks.size(), sizeInBytes());
}

@Override
Expand All @@ -76,6 +77,7 @@ public String toString() {
.add(
"numAddedFiles",
addedFiles == null ? "Rewrite Incomplete" : Integer.toString(addedFiles.size()))
.add("numRewrittenBytes", sizeInBytes())
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
Expand Down Expand Up @@ -69,7 +74,7 @@ public void testZOrderSortExpression() {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
}

@Test
Expand All @@ -84,8 +89,13 @@ public void testRewriteDataFilesOnPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 2 data files (one per partition) ",
ImmutableList.of(row(10, 2)),
output);
row(10, 2),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -103,8 +113,13 @@ public void testRewriteDataFilesOnNonPartitionTable() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -125,7 +140,7 @@ public void testRewriteDataFilesWithOptions() {

assertEquals(
"Action should rewrite 0 data files and add 0 data files",
ImmutableList.of(row(0, 0)),
ImmutableList.of(row(0, 0, 0L)),
output);

List<Object[]> actualRecords = currentData();
Expand All @@ -148,8 +163,13 @@ public void testRewriteDataFilesWithSortStrategy() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -170,8 +190,13 @@ public void testRewriteDataFilesWithZOrder() {

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

// Due to Z_order, the data written will be in the below order.
// As there is only one small output file, we can validate the query ordering (as it will not
Expand Down Expand Up @@ -207,8 +232,13 @@ public void testRewriteDataFilesWithFilter() {

assertEquals(
"Action should rewrite 5 data files (containing c1 = 1) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -230,8 +260,13 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -253,8 +288,13 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
ImmutableList.of(row(5, 1)),
output);
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -471,6 +511,8 @@ public void testInvalidCasesForRewriteDataFiles() {
public void testBinPackTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -486,8 +528,13 @@ public void testBinPackTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -499,6 +546,8 @@ public void testBinPackTableWithSpecialChars() {
public void testSortTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -518,8 +567,14 @@ public void testSortTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand All @@ -531,6 +586,8 @@ public void testSortTableWithSpecialChars() {
public void testZOrderTableWithSpecialChars() {
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));

TableIdentifier identifier =
TableIdentifier.of("default", QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
Expand All @@ -550,8 +607,14 @@ public void testZOrderTableWithSpecialChars() {

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
ImmutableList.of(row(10, 1)),
output);
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -580,8 +643,13 @@ public void testDefaultSortOrder() {

assertEquals(
"Action should rewrite 2 data files and add 1 data files",
ImmutableList.of(row(2, 1)),
output);
row(2, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
Expand Down Expand Up @@ -622,6 +690,14 @@ private void insertData(String table, int filesCount) {
}
}

private Map<String, String> snapshotSummary() {
return snapshotSummary(tableIdent);
}

private Map<String, String> snapshotSummary(TableIdentifier tableIdentifier) {
return validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary();
}

private List<Object[]> currentData() {
return currentData(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
new StructField(
"rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
});

public static ProcedureBuilder builder() {
Expand Down Expand Up @@ -213,8 +214,10 @@ private SortOrder buildSortOrder(

private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
InternalRow row = newInternalRow(rewrittenDataFilesCount, addedDataFilesCount);
InternalRow row =
newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
return new InternalRow[] {row};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public enum SparkCatalogConfig {
ImmutableMap.of(
"type", "hive",
"default-namespace", "default")),
HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop")),
HADOOP(
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void assertEquals(
}
}

private void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
protected void assertEquals(String context, Object[] expectedRow, Object[] actualRow) {
Assert.assertEquals("Number of columns should match", expectedRow.length, actualRow.length);
for (int col = 0; col < actualRow.length; col += 1) {
Object expectedValue = expectedRow[col];
Expand Down
Loading