Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
if (task.file().format() == FileFormat.PARQUET) {
SparkDeleteFilter deleteFilter = deleteFilter(task);

// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema;

Parquet.ReadBuilder builder =
Parquet.read(location)
.project(expectedSchema)
.project(requiredSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
expectedSchema,
requiredSchema,
fileSchema, /* setArrowValidityVector */
NullCheckingForGet.NULL_CHECKING_ENABLED,
idToConstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;

import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -45,19 +50,28 @@
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -117,16 +131,26 @@ public static void stopMetastoreAndSpark() throws Exception {
spark = null;
}

@After
@Override
public void cleanup() throws IOException {
super.cleanup();
dropTable("test3");
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of("default", name), schema);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
table
.updateProperties()
.set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized))
.commit();
Comment on lines +147 to +150
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT changed from false to true since this code was written. Thus, before, we only set TableProperties.PARQUET_VECTORIZATION_ENABLED explicitly if vectorized is true, but since TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT changed to true, all the tests in this class have been running only with vectorized reads and not with non-vectorized reads at all!
This fixes that issue.

if (vectorized) {
table
.updateProperties()
.set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
.set(
TableProperties.PARQUET_BATCH_SIZE,
"4") // split 7 records to two batches to cover more code paths
Expand Down Expand Up @@ -293,4 +317,71 @@ public void testPosDeletesAllRowsInBatch() throws IOException {

Assert.assertEquals("Table should contain expected rows", expected, actual);
}

@Test
public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException {
String tblName = "test3";
Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned());

List<Path> fileSplits = Lists.newArrayList();
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
Configuration conf = new Configuration();
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
Path testFilePath = new Path(testFile.getAbsolutePath());

// Write a Parquet file with more than one row group
ParquetFileWriter parquetFileWriter =
new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath);
parquetFileWriter.start();
for (int i = 0; i < 2; i += 1) {
File split = temp.newFile();
Assert.assertTrue("Delete should succeed", split.delete());
Path splitPath = new Path(split.getAbsolutePath());
fileSplits.add(splitPath);
try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(split))
.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType))
.schema(SCHEMA)
.overwrite()
.build()) {
Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37);
writer.addAll(records);
}
parquetFileWriter.appendFile(
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf));
}
parquetFileWriter.end(
ParquetFileWriter.mergeMetadataFiles(fileSplits, conf)
.getFileMetaData()
.getKeyValueMetaData());

// Add the file to the table
DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf))
.withFormat("parquet")
.withRecordCount(200)
.build();
tbl.newAppend().appendFile(dataFile).commit();

// Add positional deletes to the table
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(dataFile.path(), 97L),
Pair.of(dataFile.path(), 98L),
Pair.of(dataFile.path(), 99L),
Pair.of(dataFile.path(), 101L),
Pair.of(dataFile.path(), 103L),
Pair.of(dataFile.path(), 107L),
Pair.of(dataFile.path(), 109L));
Pair<DeleteFile, CharSequenceSet> posDeletes =
FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes);
tbl.newRowDelta()
.addDeletes(posDeletes.first())
.validateDataFilesExist(posDeletes.second())
.commit();

Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
}
}