Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,10 @@ private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IO

private void validatePreviousDeletes(PositionDeleteIndex index) {
Preconditions.checkArgument(
index.deleteFiles().stream().allMatch(this::isFileScoped),
index.deleteFiles().stream().allMatch(ContentFileUtil::isFileScoped),
"Previous deletes must be file-scoped");
}

private boolean isFileScoped(DeleteFile deleteFile) {
return ContentFileUtil.referencedDataFile(deleteFile) != null;
}

private Collection<CharSequence> sort(Collection<CharSequence> paths) {
if (paths.size() <= 1) {
return paths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) {
return location != null ? location.toString() : null;
}

public static boolean isFileScoped(DeleteFile deleteFile) {
return referencedDataFile(deleteFile) != null;
}

public static boolean isDV(DeleteFile deleteFile) {
return deleteFile.format() == FileFormat.PUFFIN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -99,6 +101,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

@TestTemplate
public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

@TestTemplate
public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete()
throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -55,19 +56,82 @@ public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
@TestTemplate
public void testUpdateFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
checkUpdateFileGranularity(DeleteGranularity.FILE);
sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
Table table = validationCatalog.loadTable(tableIdent);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

createBranchIfNeeded();
@TestTemplate
public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");
initTable("", DeleteGranularity.FILE);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "4";
validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));

sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
table.refresh();
currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
expectedDeleteFilesCount = "2";

validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
initTable("PARTITIONED BY (dep)", deleteGranularity);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Expand All @@ -91,4 +155,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we refactor common logic but this makes tests harder to read. For instance, the reader has no indication we add 4 batches in the init method.

createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -48,6 +50,8 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.NamedReference;
Expand Down Expand Up @@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) {
}
}

protected Map<String, DeleteFileSet> rewritableDeletes() {
Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();

for (ScanTask task : tasks()) {
FileScanTask fileScanTask = task.asFileScanTask();
for (DeleteFile deleteFile : fileScanTask.deletes()) {
if (ContentFileUtil.isFileScoped(deleteFile)) {
rewritableDeletes
.computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create())
.add(deleteFile);
}
}
}

return rewritableDeletes;
}

// at this moment, Spark can only pass IN filters for a single attribute
// if there are multiple filter attributes, Spark will pass two separate IN filters
private Expression convertRuntimeFilters(Predicate[] predicates) {
Expand Down
Loading