Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.junit.Assert;
Expand Down Expand Up @@ -120,6 +122,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

@Test
public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

@Test
public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete()
throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
Expand Down Expand Up @@ -75,19 +76,82 @@ public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
@Test
public void testUpdateFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
checkUpdateFileGranularity(DeleteGranularity.FILE);
sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
Table table = validationCatalog.loadTable(tableIdent);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

createBranchIfNeeded();
@Test
public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");
initTable("", DeleteGranularity.FILE);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "4";
validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));

sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
table.refresh();
currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
expectedDeleteFilesCount = "2";

validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
initTable("PARTITIONED BY (dep)", deleteGranularity);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Expand All @@ -111,4 +175,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -48,6 +50,8 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.NamedReference;
Expand Down Expand Up @@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) {
}
}

protected Map<String, DeleteFileSet> rewritableDeletes() {
Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();

for (ScanTask task : tasks()) {
FileScanTask fileScanTask = task.asFileScanTask();
for (DeleteFile deleteFile : fileScanTask.deletes()) {
if (ContentFileUtil.isFileScoped(deleteFile)) {
rewritableDeletes
.computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create())
.add(deleteFile);
}
}
}

return rewritableDeletes;
}

// at this moment, Spark can only pass IN filters for a single attribute
// if there are multiple filter attributes, Spark will pass two separate IN filters
private Expression convertRuntimeFilters(Predicate[] predicates) {
Expand Down
Loading