diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a8eb13cdfa68..148f9d90c035 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -134,9 +134,14 @@ private List> applyEqDeletes() { Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); + + // copy the delete records because they will be held in a set + CloseableIterable records = CloseableIterable.transform( + CloseableIterable.concat(deleteRecords), Record::copy); + StructLikeSet deleteSet = Deletes.toEqualitySet( - // copy the delete records because they will be held in a set - CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), + CloseableIterable.transform( + records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), deleteSchema.asStruct()); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 70ac77473c5d..69b0a572ad73 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data; import java.io.IOException; +import java.time.LocalDate; import java.util.List; import java.util.Set; import org.apache.iceberg.DataFile; @@ -34,6 +35,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -51,17 +53,30 @@ public abstract class DeleteReadTests { Types.NestedField.required(2, "data", Types.StringType.get()) ); + public static final Schema DATE_SCHEMA = new Schema( + Types.NestedField.required(1, "dt", Types.DateType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "id", Types.IntegerType.get()) + ); + // Partition spec used to create tables public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); + public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA) + .day("dt") + .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); - private String tableName = null; + protected String tableName = null; + protected String dateTableName = null; protected Table table = null; + protected Table dateTable = null; private List records = null; + private List dateRecords = null; private DataFile dataFile = null; @Before @@ -90,6 +105,46 @@ public void writeTestDataFile() throws IOException { @After public void cleanup() throws IOException { dropTable("test"); + dropTable("test2"); + } + + private void initDateTable() throws IOException { + dropTable("test2"); + this.dateTableName = "test2"; + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + + GenericRecord record = GenericRecord.create(dateTable.schema()); + + this.dateRecords = Lists.newArrayList( + record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3), + record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4), + record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5)); + + DataFile dataFile1 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1)); + DataFile dataFile2 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2)); + DataFile dataFile3 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3)); + DataFile dataFile4 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4)); + DataFile dataFile5 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5)); + + dateTable.newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .appendFile(dataFile5) + .commit(); } protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException; @@ -119,12 +174,47 @@ public void testEqualityDeletes() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testEqualityDateDeletes() throws IOException { + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("*"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3) + ); + + DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema); + DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema); + DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema); + + dateTable.newRowDelta() + .addDeletes(eqDeletes1) + .addDeletes(eqDeletes2) + .addDeletes(eqDeletes3) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3); + + StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + @Test public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Schema deleteRowSchema = table.schema().select("data"); @@ -142,7 +232,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id"); + StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id"); StructLikeSet actual = rowSet(tableName, table, "id"); if (expectPruned()) { @@ -180,7 +270,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -202,7 +292,7 @@ public void testPositionDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -235,7 +325,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -269,7 +359,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { .addDeletes(idEqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -306,7 +396,7 @@ public void testEqualityDeleteByNull() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(131); + StructLikeSet expected = rowSetWithoutIds(table, records, 131); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -321,11 +411,12 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { return set; } - private StructLikeSet rowSetWithoutIds(int... idsToRemove) { + private static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - records.stream() + recordList.stream() .filter(row -> !deletedIds.contains(row.getField("id"))) + .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)) .forEach(set::add); return set; } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java index 72d48831de06..e92c0daec385 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TestTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -48,7 +49,8 @@ protected void dropTable(String name) { public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException { StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); + Iterables.addAll(set, CloseableIterable.transform( + reader, record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))); } return set; } diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index ef964f8fb92e..2ba4e50e8aa1 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -32,6 +32,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -104,6 +105,7 @@ public StructLikeSet rowSet(String name, Table table, String... columns) { .filter(recordFactory -> recordFactory.name().equals(inputFormat)) .map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords()) .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) .collect(Collectors.toList()) );