Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,8 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
Expression residual,
Map<Integer, ?> idToConstant,
SparkDeleteFilter deleteFilter) {
// get required schema for filtering out equality-delete rows in case equality-delete uses
// columns are
// not selected.
Schema requiredSchema =
deleteFilter != null && deleteFilter.hasEqDeletes()
? deleteFilter.requiredSchema()
: expectedSchema();
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();

return Parquet.read(inputFile)
.project(requiredSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
Expand All @@ -50,25 +55,35 @@
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.spark.source.metrics.NumDeletes;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -134,6 +149,13 @@ public static void stopMetastoreAndSpark() throws Exception {
spark = null;
}

@After
@Override
public void cleanup() throws IOException {
super.cleanup();
dropTable("test3");
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of("default", name), schema);
Expand Down Expand Up @@ -508,6 +530,75 @@ public void testIsDeletedColumnWithoutDeleteFile() {
checkDeleteCount(0L);
}

@Test
public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException {
Assume.assumeTrue(format.equals("parquet"));

String tblName = "test3";
Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned());

List<Path> fileSplits = Lists.newArrayList();
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
Configuration conf = new Configuration();
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Path testFilePath = new Path(testFile.getAbsolutePath());

// Write a Parquet file with more than one row group
ParquetFileWriter parquetFileWriter =
new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath);
parquetFileWriter.start();
for (int i = 0; i < 2; i += 1) {
File split = temp.newFile();
Assert.assertTrue("Delete should succeed", split.delete());
Path splitPath = new Path(split.getAbsolutePath());
fileSplits.add(splitPath);
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(split))
.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType))
.schema(SCHEMA)
.overwrite()
.build()) {
Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37);
writer.addAll(records);
}
parquetFileWriter.appendFile(
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf));
}
parquetFileWriter.end(
ParquetFileWriter.mergeMetadataFiles(fileSplits, conf)
.getFileMetaData()
.getKeyValueMetaData());

// Add the file to the table
DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf))
.withFormat("parquet")
.withRecordCount(200)
.build();
tbl.newAppend().appendFile(dataFile).commit();

// Add positional deletes to the table
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 97L),
Pair.of(dataFile.path(), 98L),
Pair.of(dataFile.path(), 99L),
Pair.of(dataFile.path(), 101L),
Pair.of(dataFile.path(), 103L),
Pair.of(dataFile.path(), 107L),
Pair.of(dataFile.path(), 109L));
Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes);
tbl.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
Copy link
Contributor Author

@wypoon wypoon Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, this assertion fails for the vectorized case.
There are 3 deletes applied to the first row group and 4 deletes applied to the second row group. Without the fix, the 3 deletes for the first row group are applied to the second as well (instead of the 4 that should be applied). Thus 6 rows are deleted (instead of 7) and the result is 194 rows, instead of the expected 193.

}

private static final Schema PROJECTION_SCHEMA =
new Schema(
required(1, "id", Types.IntegerType.get()),
Expand Down