-
Notifications
You must be signed in to change notification settings - Fork 3k
Data: Fix equality delete set save DATE/TIMESTAMP type of data throw IllegalStateException #3135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
6469812
4dcfe1a
2dcdc84
69d3d76
fc9759c
e615d51
9436bf0
d66edfb
190fbfa
fc613f7
3c4af3c
c146f19
3463021
507f49b
75530dc
e8ab7b0
f6114eb
16bb7bd
f6f0804
84ac227
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| package org.apache.iceberg.data; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.LocalDate; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.DataFile; | ||
|
|
@@ -34,6 +35,7 @@ | |
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.ArrayUtil; | ||
| import org.apache.iceberg.util.CharSequenceSet; | ||
| import org.apache.iceberg.util.DateTimeUtil; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.apache.iceberg.util.StructLikeSet; | ||
| import org.apache.iceberg.util.StructProjection; | ||
|
|
@@ -51,17 +53,30 @@ public abstract class DeleteReadTests { | |
| Types.NestedField.required(2, "data", Types.StringType.get()) | ||
| ); | ||
|
|
||
| public static final Schema DATE_SCHEMA = new Schema( | ||
| Types.NestedField.required(1, "dt", Types.DateType.get()), | ||
| Types.NestedField.required(2, "data", Types.StringType.get()), | ||
| Types.NestedField.required(3, "id", Types.IntegerType.get()) | ||
| ); | ||
|
|
||
| // Partition spec used to create tables | ||
| public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) | ||
| .bucket("data", 16) | ||
| .build(); | ||
|
|
||
| public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA) | ||
| .day("dt") | ||
| .build(); | ||
|
|
||
| @Rule | ||
| public TemporaryFolder temp = new TemporaryFolder(); | ||
|
|
||
| private String tableName = null; | ||
| protected String tableName = null; | ||
| protected String dateTableName = null; | ||
| protected Table table = null; | ||
| protected Table dateTable = null; | ||
| private List<Record> records = null; | ||
| private List<Record> dateRecords = null; | ||
| private DataFile dataFile = null; | ||
|
|
||
| @Before | ||
|
|
@@ -90,6 +105,46 @@ public void writeTestDataFile() throws IOException { | |
| @After | ||
| public void cleanup() throws IOException { | ||
| dropTable("test"); | ||
| dropTable("test2"); | ||
| } | ||
|
|
||
| private void initDateTable() throws IOException { | ||
| dropTable("test2"); | ||
| this.dateTableName = "test2"; | ||
| this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); | ||
|
|
||
| GenericRecord record = GenericRecord.create(dateTable.schema()); | ||
|
|
||
| this.dateRecords = Lists.newArrayList( | ||
| record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), | ||
| record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), | ||
| record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3), | ||
| record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4), | ||
| record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5)); | ||
|
|
||
| DataFile dataFile1 = FileHelpers.writeDataFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1)); | ||
| DataFile dataFile2 = FileHelpers.writeDataFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2)); | ||
| DataFile dataFile3 = FileHelpers.writeDataFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3)); | ||
| DataFile dataFile4 = FileHelpers.writeDataFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4)); | ||
| DataFile dataFile5 = FileHelpers.writeDataFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5)); | ||
|
|
||
| dateTable.newAppend() | ||
| .appendFile(dataFile1) | ||
| .appendFile(dataFile2) | ||
| .appendFile(dataFile3) | ||
| .appendFile(dataFile4) | ||
| .appendFile(dataFile5) | ||
| .commit(); | ||
| } | ||
|
|
||
| protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException; | ||
|
|
@@ -119,12 +174,47 @@ public void testEqualityDeletes() throws IOException { | |
| .addDeletes(eqDeletes) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(29, 89, 122); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
| } | ||
|
|
||
| @Test | ||
| public void testEqualityDateDeletes() throws IOException { | ||
| initDateTable(); | ||
|
|
||
| Schema deleteRowSchema = dateTable.schema().select("*"); | ||
| Record dataDelete = GenericRecord.create(deleteRowSchema); | ||
| List<Record> dataDeletes = Lists.newArrayList( | ||
| dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), | ||
| dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), | ||
| dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3) | ||
| ); | ||
|
|
||
| DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema); | ||
| DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema); | ||
| DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile( | ||
| dateTable, Files.localOutput(temp.newFile()), | ||
| Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema); | ||
|
|
||
| dateTable.newRowDelta() | ||
| .addDeletes(eqDeletes1) | ||
| .addDeletes(eqDeletes2) | ||
| .addDeletes(eqDeletes3) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3); | ||
|
|
||
| StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
| } | ||
|
|
||
| @Test | ||
| public void testEqualityDeletesWithRequiredEqColumn() throws IOException { | ||
| Schema deleteRowSchema = table.schema().select("data"); | ||
|
|
@@ -142,7 +232,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { | |
| .addDeletes(eqDeletes) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id"); | ||
| StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id"); | ||
| StructLikeSet actual = rowSet(tableName, table, "id"); | ||
|
|
||
| if (expectPruned()) { | ||
|
|
@@ -180,7 +270,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { | |
| .addDeletes(eqDeletes) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
|
|
@@ -202,7 +292,7 @@ public void testPositionDeletes() throws IOException { | |
| .validateDataFilesExist(posDeletes.second()) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(29, 89, 122); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
|
|
@@ -235,7 +325,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { | |
| .validateDataFilesExist(posDeletes.second()) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
|
|
@@ -269,7 +359,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { | |
| .addDeletes(idEqDeletes) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
|
|
@@ -306,7 +396,7 @@ public void testEqualityDeleteByNull() throws IOException { | |
| .addDeletes(eqDeletes) | ||
| .commit(); | ||
|
|
||
| StructLikeSet expected = rowSetWithoutIds(131); | ||
| StructLikeSet expected = rowSetWithoutIds(table, records, 131); | ||
| StructLikeSet actual = rowSet(tableName, table, "*"); | ||
|
|
||
| Assert.assertEquals("Table should contain expected rows", expected, actual); | ||
|
|
@@ -321,11 +411,12 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { | |
| return set; | ||
| } | ||
|
|
||
| private StructLikeSet rowSetWithoutIds(int... idsToRemove) { | ||
| private StructLikeSet rowSetWithoutIds(Table iTable, List<Record> recordList, int... idsToRemove) { | ||
|
||
| Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); | ||
| StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); | ||
| records.stream() | ||
| StructLikeSet set = StructLikeSet.create(iTable.schema().asStruct()); | ||
| recordList.stream() | ||
| .filter(row -> !deletedIds.contains(row.getField("id"))) | ||
| .map(record -> new InternalRecordWrapper(iTable.schema().asStruct()).wrap(record)) | ||
| .forEach(set::add); | ||
| return set; | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the other
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok i'll rewrite the origin rowSetWithoutIds() method |
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.