diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 8d8182963d..9e29c7404a 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -184,4 +184,7 @@ private TableProperties() { public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled"; public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true; + + public static final String READ_ORC_IGNORE_FILE_FIELD_IDS = "read.orc.ignore.field-ids.enabled"; + public static final boolean READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT = false; } diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/LegacyHiveTableUtils.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/LegacyHiveTableUtils.java index c5e650c910..41f1e42014 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/LegacyHiveTableUtils.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/LegacyHiveTableUtils.java @@ -19,7 +19,6 @@ package org.apache.iceberg.hivelink.core; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,7 +42,6 @@ import org.apache.iceberg.hivelink.core.utils.HiveTypeUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -90,8 +88,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { Types.StructType dataStructType = schema.asStruct(); List fields = Lists.newArrayList(dataStructType.fields()); - String partitionColumnIdMappingString = props.get("partition.column.ids"); - Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema, partitionColumnIdMappingString); + Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema); Types.StructType partitionStructType = partitionSchema.asStruct(); fields.addAll(partitionStructType.fields()); return new Schema(fields); @@ -110,8 +107,7 @@ static StructTypeInfo structTypeInfoFromCols(List cols) { return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos); } - private static Schema partitionSchema(List partitionKeys, Schema dataSchema, String idMapping) { - Map nameToId = parsePartitionColId(idMapping); + private static Schema partitionSchema(List partitionKeys, Schema dataSchema) { AtomicInteger fieldId = new AtomicInteger(10000); List partitionFields = Lists.newArrayList(); partitionKeys.forEach(f -> { @@ -121,39 +117,11 @@ private static Schema partitionSchema(List partitionKeys, Schema da } partitionFields.add( Types.NestedField.optional( - nameToId.containsKey(f.getName()) ? nameToId.get(f.getName()) : fieldId.incrementAndGet(), - f.getName(), primitiveIcebergType(f.getType()), f.getComment())); + fieldId.incrementAndGet(), f.getName(), primitiveIcebergType(f.getType()), f.getComment())); }); return new Schema(partitionFields); } - /** - * - * @param idMapping A comma separated string representation of column name - * and its id, e.g. partitionCol1:10,partitionCol2:11, no - * whitespace is allowed in the middle - * @return The parsed in-mem Map representation of the name to - * id mapping - */ - private static Map parsePartitionColId(String idMapping) { - Map nameToId = Maps.newHashMap(); - if (idMapping != null) { - // parse idMapping string - Arrays.stream(idMapping.split(",")).forEach(kv -> { - String[] split = kv.split(":"); - if (split.length != 2) { - throw new IllegalStateException(String.format( - "partition.column.ids property is invalid format: %s", - idMapping)); - } - String name = split[0]; - Integer id = Integer.parseInt(split[1]); - nameToId.put(name, id); - }); - } - return nameToId; - } - private static Type primitiveIcebergType(String hiveTypeString) { PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString); return HiveTypeUtil.convert(primitiveTypeInfo); diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 5ce484e6a7..4c1d90584a 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -137,6 +137,8 @@ public static class ReadBuilder { private NameMapping nameMapping = null; private OrcRowFilter rowFilter = null; + private boolean ignoreFileFieldIds = false; + private Function> readerFunc; private Function> batchedReaderFunc; private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE; @@ -217,10 +219,15 @@ public ReadBuilder rowFilter(OrcRowFilter newRowFilter) { return this; } + public ReadBuilder setIgnoreFileFieldIds(boolean ignoreFileFieldIds) { + this.ignoreFileFieldIds = ignoreFileFieldIds; + return this; + } + public CloseableIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter, - batchedReaderFunc, recordsPerBatch, rowFilter); + batchedReaderFunc, recordsPerBatch, rowFilter, ignoreFileFieldIds); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java index 07d1cca270..cb4c5232d4 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -60,11 +60,13 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { private NameMapping nameMapping; private final OrcRowFilter rowFilter; + private final boolean ignoreFileFieldIds; + OrcIterable(InputFile file, Configuration config, Schema schema, NameMapping nameMapping, Long start, Long length, Function> readerFunction, boolean caseSensitive, Expression filter, Function> batchReaderFunction, int recordsPerBatch, - OrcRowFilter rowFilter) { + OrcRowFilter rowFilter, boolean ignoreFileFieldIds) { this.schema = schema; this.readerFunction = readerFunction; this.file = file; @@ -77,6 +79,7 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { this.batchReaderFunction = batchReaderFunction; this.recordsPerBatch = recordsPerBatch; this.rowFilter = rowFilter; + this.ignoreFileFieldIds = ignoreFileFieldIds; } @SuppressWarnings("unchecked") @@ -88,7 +91,7 @@ public CloseableIterator iterator() { TypeDescription fileSchema = orcFileReader.getSchema(); final TypeDescription readOrcSchema; final TypeDescription fileSchemaWithIds; - if (ORCSchemaUtil.hasIds(fileSchema)) { + if (!ignoreFileFieldIds && ORCSchemaUtil.hasIds(fileSchema)) { fileSchemaWithIds = fileSchema; } else { if (nameMapping == null) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 6f495098ff..1a61183d58 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -47,15 +47,17 @@ class BatchDataReader extends BaseDataReader { private final String nameMapping; private final boolean caseSensitive; private final int batchSize; + private final boolean ignoreFileFieldIds; BatchDataReader( CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo, - EncryptionManager encryptionManager, boolean caseSensitive, int size) { + EncryptionManager encryptionManager, boolean caseSensitive, int size, boolean ignoreFileFieldIds) { super(task, fileIo, encryptionManager); this.expectedSchema = expectedSchema; this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; this.batchSize = size; + this.ignoreFileFieldIds = ignoreFileFieldIds; } @Override @@ -98,7 +100,8 @@ CloseableIterator open(FileScanTask task) { idToConstant)) .recordsPerBatch(batchSize) .filter(task.residual()) - .caseSensitive(caseSensitive); + .caseSensitive(caseSensitive) + .setIgnoreFileFieldIds(ignoreFileFieldIds); if (nameMapping != null) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 2abfc171e6..926279bed7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -71,16 +71,18 @@ class RowDataReader extends BaseDataReader { private final Schema expectedSchema; private final String nameMapping; private final boolean caseSensitive; + private final boolean ignoreFileFieldIds; RowDataReader( CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive) { + EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) { super(task, io, encryptionManager); this.io = io; this.tableSchema = tableSchema; this.expectedSchema = expectedSchema; this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; + this.ignoreFileFieldIds = ignoreFileFieldIds; } @Override @@ -185,7 +187,8 @@ private CloseableIterable newOrcIterable( .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(task.residual()) .caseSensitive(caseSensitive) - .rowFilter(orcRowFilter); + .rowFilter(orcRowFilter) + .setIgnoreFileFieldIds(ignoreFileFieldIds); if (nameMapping != null) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 07c7e008ba..003ec24e25 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory; import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT; public class RowDataRewriter implements Serializable { @@ -64,6 +66,7 @@ public class RowDataRewriter implements Serializable { private final LocationProvider locations; private final String nameMapping; private final boolean caseSensitive; + private final boolean ignoreFileFieldIds; public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive, Broadcast io, Broadcast encryptionManager) { @@ -80,6 +83,11 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive, String formatString = table.properties().getOrDefault( TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + + this.ignoreFileFieldIds = PropertyUtil.propertyAsBoolean( + table.properties(), + READ_ORC_IGNORE_FILE_FIELD_IDS, + READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT); } public List rewriteDataForTasks(JavaRDD taskRDD) { @@ -96,7 +104,7 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio long taskId = context.taskAttemptId(); RowDataReader dataReader = new RowDataReader( - task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); + task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive, ignoreFileFieldIds); StructType structType = SparkSchemaUtil.convert(schema); SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 676f269f48..3db5beee26 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -77,6 +77,8 @@ import org.slf4j.LoggerFactory; import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT; class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics { @@ -223,6 +225,10 @@ public List> planBatchInputPartitions() { String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(lazySchema()); String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING); + boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean( + table.properties(), + READ_ORC_IGNORE_FILE_FIELD_IDS, + READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT); ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes), "Cannot scan table %s: cannot apply required delete files", table); @@ -231,7 +237,7 @@ public List> planBatchInputPartitions() { for (CombinedScanTask task : tasks()) { readTasks.add(new ReadTask<>( task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive, - localityPreferred, new BatchReaderFactory(batchSize))); + localityPreferred, new BatchReaderFactory(batchSize), ignoreFileFieldIds)); } LOG.info("Batching input partitions with {} tasks.", readTasks.size()); @@ -246,12 +252,16 @@ public List> planInputPartitions() { String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(lazySchema()); String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING); + boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean( + table.properties(), + READ_ORC_IGNORE_FILE_FIELD_IDS, + READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT); List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { readTasks.add(new ReadTask<>( task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive, - localityPreferred, InternalRowReaderFactory.INSTANCE)); + localityPreferred, InternalRowReaderFactory.INSTANCE, ignoreFileFieldIds)); } return readTasks; @@ -455,13 +465,15 @@ private static class ReadTask implements Serializable, InputPartition { private final boolean localityPreferred; private final ReaderFactory readerFactory; + private final boolean ignoreFileFieldIds; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; private transient String[] preferredLocations = null; private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, String nameMappingString, Broadcast io, Broadcast encryptionManager, - boolean caseSensitive, boolean localityPreferred, ReaderFactory readerFactory) { + boolean caseSensitive, boolean localityPreferred, ReaderFactory readerFactory, + boolean ignoreFileFieldIds) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; @@ -472,12 +484,13 @@ private ReadTask(CombinedScanTask task, String tableSchemaString, String expecte this.preferredLocations = getPreferredLocations(); this.readerFactory = readerFactory; this.nameMappingString = nameMappingString; + this.ignoreFileFieldIds = ignoreFileFieldIds; } @Override public InputPartitionReader createPartitionReader() { return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(), - encryptionManager.value(), caseSensitive); + encryptionManager.value(), caseSensitive, ignoreFileFieldIds); } @Override @@ -513,7 +526,8 @@ private String[] getPreferredLocations() { private interface ReaderFactory extends Serializable { InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive); + EncryptionManager encryptionManager, boolean caseSensitive, + boolean ignoreFileFieldIds); } private static class InternalRowReaderFactory implements ReaderFactory { @@ -525,8 +539,10 @@ private InternalRowReaderFactory() { @Override public InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive) { - return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive); + EncryptionManager encryptionManager, boolean caseSensitive, + boolean ignoreFileFieldIds) { + return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, + ignoreFileFieldIds); } } @@ -540,22 +556,25 @@ private static class BatchReaderFactory implements ReaderFactory @Override public InputPartitionReader create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive) { - return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize); + EncryptionManager encryptionManager, boolean caseSensitive, + boolean ignoreFileFieldIds) { + return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize, + ignoreFileFieldIds); } } private static class RowReader extends RowDataReader implements InputPartitionReader { RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive) { - super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive); + EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) { + super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, ignoreFileFieldIds); } } private static class BatchReader extends BatchDataReader implements InputPartitionReader { BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io, - EncryptionManager encryptionManager, boolean caseSensitive, int size) { - super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size); + EncryptionManager encryptionManager, boolean caseSensitive, int size, + boolean ignoreFileFieldIds) { + super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size, ignoreFileFieldIds); } } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index 343d8bdd0e..817651d030 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -63,6 +63,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS; +import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT; + abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class); @@ -135,13 +138,17 @@ public InputPartition[] planInputPartitions() { String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(expectedSchema); String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean( + table.properties(), + READ_ORC_IGNORE_FILE_FIELD_IDS, + READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT); List scanTasks = tasks(); InputPartition[] readTasks = new InputPartition[scanTasks.size()]; for (int i = 0; i < scanTasks.size(); i++) { readTasks[i] = new ReadTask( scanTasks.get(i), tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, - caseSensitive, localityPreferred); + caseSensitive, localityPreferred, ignoreFileFieldIds); } return readTasks; @@ -271,14 +278,14 @@ public boolean supportColumnarReads(InputPartition partition) { private static class RowReader extends RowDataReader implements PartitionReader { RowReader(ReadTask task) { super(task.task, task.tableSchema(), task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(), - task.isCaseSensitive()); + task.isCaseSensitive(), task.getIgnoreFileFieldIds()); } } private static class BatchReader extends BatchDataReader implements PartitionReader { BatchReader(ReadTask task, int batchSize) { super(task.task, task.expectedSchema(), task.nameMappingString, task.io(), task.encryption(), - task.isCaseSensitive(), batchSize); + task.isCaseSensitive(), batchSize, task.getIgnoreFileFieldIds()); } } @@ -290,6 +297,7 @@ private static class ReadTask implements InputPartition, Serializable { private final Broadcast io; private final Broadcast encryptionManager; private final boolean caseSensitive; + private final boolean ignoreFileFieldIds; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; @@ -297,7 +305,7 @@ private static class ReadTask implements InputPartition, Serializable { ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, String nameMappingString, Broadcast io, Broadcast encryptionManager, boolean caseSensitive, - boolean localityPreferred) { + boolean localityPreferred, boolean ignoreFileFieldIds) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; @@ -310,6 +318,7 @@ private static class ReadTask implements InputPartition, Serializable { } else { this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE; } + this.ignoreFileFieldIds = ignoreFileFieldIds; } @Override @@ -346,5 +355,9 @@ private Schema expectedSchema() { } return expectedSchema; } + + public boolean getIgnoreFileFieldIds() { + return ignoreFileFieldIds; + } } }