diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema -> diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema -> diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 49c43952135c..c05b694a60dc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -32,12 +31,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { @@ -84,21 +81,9 @@ private CloseableIterable newParquetIterable( SparkDeleteFilter deleteFilter) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false; - Schema projectedSchema = requiredSchema; - if (hasPositionDelete) { - // We need to add MetadataColumns.ROW_POSITION in the schema for - // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any - // more after #10107 is merged. - List columns = Lists.newArrayList(requiredSchema.columns()); - if (!columns.contains(MetadataColumns.ROW_POSITION)) { - columns.add(MetadataColumns.ROW_POSITION); - projectedSchema = new Schema(columns); - } - } return Parquet.read(inputFile) - .project(projectedSchema) + .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( fileSchema ->