Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ public void readIcebergVectorized() {
});
}

@Benchmark
@Threads(1)
public void readIcebergWithIsDeletedColumn() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(
tableProperties,
() -> {
String tableLocation = table().location();
Dataset<Row> df =
spark().read().format("iceberg").load(tableLocation).filter("_deleted = false");
materialize(df);
});
}

@Benchmark
@Threads(1)
public void readDeletedRows() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
withTableProperties(
tableProperties,
() -> {
String tableLocation = table().location();
Dataset<Row> df =
spark().read().format("iceberg").load(tableLocation).filter("_deleted = true");
materialize(df);
});
}

protected abstract void appendData() throws IOException;

protected void writeData(int fileNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,10 @@ protected StructLike asStructLike(InternalRow row) {
protected InputFile getInputFile(String location) {
return RowDataReader.this.getInputFile(location);
}

@Override
protected void markRowDeleted(InternalRow row) {
row.setBoolean(columnIsDeletedPosition(), true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public MetadataColumn[] metadataColumns() {
new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false)
new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false),
new SparkMetadataColumn(MetadataColumns.IS_DELETED.name(), DataTypes.BooleanType, false)
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
package org.apache.iceberg.spark.source;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -40,16 +43,19 @@
import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
Expand All @@ -58,8 +64,10 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.jetbrains.annotations.NotNull;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -78,8 +86,8 @@ public TestSparkReaderDeletes(boolean vectorized) {
}

@Parameterized.Parameters(name = "vectorized = {0}")
public static Object[][] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
public static Object[] parameters() {
return new Object[] {false, true};
}

@BeforeClass
Expand Down Expand Up @@ -131,6 +139,8 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
TableProperties.PARQUET_BATCH_SIZE,
"4") // split 7 records to two batches to cover more code paths
.commit();
} else {
table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit();
}
return table;
}
Expand All @@ -142,14 +152,17 @@ protected void dropTable(String name) {

@Override
public StructLikeSet rowSet(String name, Table table, String... columns) {
return rowSet(name, table.schema().select(columns).asStruct(), columns);
}

public StructLikeSet rowSet(String name, Types.StructType projection, String... columns) {
Dataset<Row> df =
spark
.read()
.format("iceberg")
.load(TableIdentifier.of("default", name).toString())
.selectExpr(columns);

Types.StructType projection = table.schema().select(columns).asStruct();
StructLikeSet set = StructLikeSet.create(projection);
df.collectAsList()
.forEach(
Expand Down Expand Up @@ -293,4 +306,243 @@ public void testPosDeletesAllRowsInBatch() throws IOException {

Assert.assertEquals("Table should contain expected rows", expected, actual);
}

@Test
public void testPosDeletesWithDeletedColumn() throws IOException {
Assume.assumeFalse(vectorized);

// read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all
// deleted.
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 0L), // id = 29
Pair.of(dataFile.path(), 1L), // id = 43
Pair.of(dataFile.path(), 2L), // id = 61
Pair.of(dataFile.path(), 3L) // id = 89
);

Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);

table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
StructLikeSet actual =
rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted");

Assert.assertEquals("Table should contain expected row", expected, actual);
}

@Test
public void testEqualityDeleteWithDeletedColumn() throws IOException {
Assume.assumeFalse(vectorized);

String tableName = table.name().substring(table.name().lastIndexOf(".") + 1);
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
List<Record> dataDeletes =
Lists.newArrayList(
dataDelete.copy("data", "a"), // id = 29
dataDelete.copy("data", "d"), // id = 89
dataDelete.copy("data", "g") // id = 122
);

DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(temp.newFile()),
TestHelpers.Row.of(0),
dataDeletes,
deleteRowSchema);

table.newRowDelta().addDeletes(eqDeletes).commit();

StructLikeSet expected = expectedRowSet(29, 89, 122);
StructLikeSet actual =
rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted");

Assert.assertEquals("Table should contain expected row", expected, actual);
}

@Test
public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
Assume.assumeFalse(vectorized);

Schema dataSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(dataSchema);
List<Record> dataDeletes =
Lists.newArrayList(
dataDelete.copy("data", "a"), // id = 29
dataDelete.copy("data", "d"), // id = 89
dataDelete.copy("data", "g") // id = 122
);

DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(temp.newFile()),
TestHelpers.Row.of(0),
dataDeletes,
dataSchema);

List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 3L), // id = 89
Pair.of(dataFile.path(), 5L) // id = 121
);

Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);

table
.newRowDelta()
.addDeletes(eqDeletes)
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
StructLikeSet actual =
rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted");

Assert.assertEquals("Table should contain expected row", expected, actual);
}

@Test
public void testFilterOnDeletedMetadataColumn() throws IOException {
Assume.assumeFalse(vectorized);

List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 0L), // id = 29
Pair.of(dataFile.path(), 1L), // id = 43
Pair.of(dataFile.path(), 2L), // id = 61
Pair.of(dataFile.path(), 3L) // id = 89
);

Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);

table
.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

StructLikeSet expected = expectedRowSetWithNonDeletesOnly(29, 43, 61, 89);

// get non-deleted rows
Dataset<Row> df =
spark
.read()
.format("iceberg")
.load(TableIdentifier.of("default", tableName).toString())
.select("id", "data", "_deleted")
.filter("_deleted = false");

Types.StructType projection = PROJECTION_SCHEMA.asStruct();
StructLikeSet actual = StructLikeSet.create(projection);
df.collectAsList()
.forEach(
row -> {
SparkStructLike rowWrapper = new SparkStructLike(projection);
actual.add(rowWrapper.wrap(row));
});

Assert.assertEquals("Table should contain expected row", expected, actual);

StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61, 89);

// get deleted rows
df =
spark
.read()
.format("iceberg")
.load(TableIdentifier.of("default", tableName).toString())
.select("id", "data", "_deleted")
.filter("_deleted = true");

StructLikeSet actualDeleted = StructLikeSet.create(projection);
df.collectAsList()
.forEach(
row -> {
SparkStructLike rowWrapper = new SparkStructLike(projection);
actualDeleted.add(rowWrapper.wrap(row));
});

Assert.assertEquals("Table should contain expected row", expectedDeleted, actualDeleted);
}

@Test
public void testIsDeletedColumnWithoutDeleteFile() {
Assume.assumeFalse(vectorized);

StructLikeSet expected = expectedRowSet();
StructLikeSet actual =
rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted");
Assert.assertEquals("Table should contain expected row", expected, actual);
}

private static final Schema PROJECTION_SCHEMA =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()),
MetadataColumns.IS_DELETED);

private static StructLikeSet expectedRowSet(int... idsToRemove) {
return expectedRowSet(false, false, idsToRemove);
}

private static StructLikeSet expectedRowSetWithDeletesOnly(int... idsToRemove) {
return expectedRowSet(false, true, idsToRemove);
}

private static StructLikeSet expectedRowSetWithNonDeletesOnly(int... idsToRemove) {
return expectedRowSet(true, false, idsToRemove);
}

private static StructLikeSet expectedRowSet(
boolean removeDeleted, boolean removeNonDeleted, int... idsToRemove) {
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
List<Record> records = recordsWithDeletedColumn();
// mark rows deleted
records.forEach(
record -> {
if (deletedIds.contains(record.getField("id"))) {
record.setField(MetadataColumns.IS_DELETED.name(), true);
}
});

records.removeIf(record -> deletedIds.contains(record.getField("id")) && removeDeleted);
records.removeIf(record -> !deletedIds.contains(record.getField("id")) && removeNonDeleted);

StructLikeSet set = StructLikeSet.create(PROJECTION_SCHEMA.asStruct());
records.forEach(
record -> set.add(new InternalRecordWrapper(PROJECTION_SCHEMA.asStruct()).wrap(record)));

return set;
}

@NotNull
private static List recordsWithDeletedColumn() {
List records = Lists.newArrayList();

// records all use IDs that are in bucket id_bucket=0
GenericRecord record = GenericRecord.create(PROJECTION_SCHEMA);
records.add(record.copy("id", 29, "data", "a", "_deleted", false));
records.add(record.copy("id", 43, "data", "b", "_deleted", false));
records.add(record.copy("id", 61, "data", "c", "_deleted", false));
records.add(record.copy("id", 89, "data", "d", "_deleted", false));
records.add(record.copy("id", 100, "data", "e", "_deleted", false));
records.add(record.copy("id", 121, "data", "f", "_deleted", false));
records.add(record.copy("id", 122, "data", "g", "_deleted", false));
return records;
}
}