Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,7 @@ private TableProperties() {

public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;

public static final String READ_ORC_IGNORE_FILE_FIELD_IDS = "read.orc.ignore.field-ids.enabled";
public static final boolean READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hivelink.core;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,7 +42,6 @@
import org.apache.iceberg.hivelink.core.utils.HiveTypeUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -90,8 +88,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Types.StructType dataStructType = schema.asStruct();
List<Types.NestedField> fields = Lists.newArrayList(dataStructType.fields());

String partitionColumnIdMappingString = props.get("partition.column.ids");
Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema, partitionColumnIdMappingString);
Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema);
Types.StructType partitionStructType = partitionSchema.asStruct();
fields.addAll(partitionStructType.fields());
return new Schema(fields);
Expand All @@ -110,8 +107,7 @@ static StructTypeInfo structTypeInfoFromCols(List<FieldSchema> cols) {
return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos);
}

private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema, String idMapping) {
Map<String, Integer> nameToId = parsePartitionColId(idMapping);
private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema) {
AtomicInteger fieldId = new AtomicInteger(10000);
List<Types.NestedField> partitionFields = Lists.newArrayList();
partitionKeys.forEach(f -> {
Expand All @@ -121,39 +117,11 @@ private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema da
}
partitionFields.add(
Types.NestedField.optional(
nameToId.containsKey(f.getName()) ? nameToId.get(f.getName()) : fieldId.incrementAndGet(),
f.getName(), primitiveIcebergType(f.getType()), f.getComment()));
fieldId.incrementAndGet(), f.getName(), primitiveIcebergType(f.getType()), f.getComment()));
});
return new Schema(partitionFields);
}

/**
*
* @param idMapping A comma separated string representation of column name
* and its id, e.g. partitionCol1:10,partitionCol2:11, no
* whitespace is allowed in the middle
* @return The parsed in-mem Map representation of the name to
* id mapping
*/
private static Map<String, Integer> parsePartitionColId(String idMapping) {
Map<String, Integer> nameToId = Maps.newHashMap();
if (idMapping != null) {
// parse idMapping string
Arrays.stream(idMapping.split(",")).forEach(kv -> {
String[] split = kv.split(":");
if (split.length != 2) {
throw new IllegalStateException(String.format(
"partition.column.ids property is invalid format: %s",
idMapping));
}
String name = split[0];
Integer id = Integer.parseInt(split[1]);
nameToId.put(name, id);
});
}
return nameToId;
}

private static Type primitiveIcebergType(String hiveTypeString) {
PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString);
return HiveTypeUtil.convert(primitiveTypeInfo);
Expand Down
9 changes: 8 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public static class ReadBuilder {
private NameMapping nameMapping = null;
private OrcRowFilter rowFilter = null;

private boolean ignoreFileFieldIds = false;

private Function<TypeDescription, OrcRowReader<?>> readerFunc;
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;
Expand Down Expand Up @@ -217,10 +219,15 @@ public ReadBuilder rowFilter(OrcRowFilter newRowFilter) {
return this;
}

public ReadBuilder setIgnoreFileFieldIds(boolean ignoreFileFieldIds) {
this.ignoreFileFieldIds = ignoreFileFieldIds;
return this;
}

public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter,
batchedReaderFunc, recordsPerBatch, rowFilter);
batchedReaderFunc, recordsPerBatch, rowFilter, ignoreFileFieldIds);
}
}

Expand Down
7 changes: 5 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private NameMapping nameMapping;
private final OrcRowFilter rowFilter;

private final boolean ignoreFileFieldIds;

OrcIterable(InputFile file, Configuration config, Schema schema,
NameMapping nameMapping, Long start, Long length,
Function<TypeDescription, OrcRowReader<?>> readerFunction, boolean caseSensitive, Expression filter,
Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction, int recordsPerBatch,
OrcRowFilter rowFilter) {
OrcRowFilter rowFilter, boolean ignoreFileFieldIds) {
this.schema = schema;
this.readerFunction = readerFunction;
this.file = file;
Expand All @@ -77,6 +79,7 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
this.batchReaderFunction = batchReaderFunction;
this.recordsPerBatch = recordsPerBatch;
this.rowFilter = rowFilter;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@SuppressWarnings("unchecked")
Expand All @@ -88,7 +91,7 @@ public CloseableIterator<T> iterator() {
TypeDescription fileSchema = orcFileReader.getSchema();
final TypeDescription readOrcSchema;
final TypeDescription fileSchemaWithIds;
if (ORCSchemaUtil.hasIds(fileSchema)) {
if (!ignoreFileFieldIds && ORCSchemaUtil.hasIds(fileSchema)) {
fileSchemaWithIds = fileSchema;
} else {
if (nameMapping == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
private final boolean ignoreFileFieldIds;

BatchDataReader(
CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
EncryptionManager encryptionManager, boolean caseSensitive, int size) {
EncryptionManager encryptionManager, boolean caseSensitive, int size, boolean ignoreFileFieldIds) {
super(task, fileIo, encryptionManager);
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.batchSize = size;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
Expand Down Expand Up @@ -98,7 +100,8 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
idToConstant))
.recordsPerBatch(batchSize)
.filter(task.residual())
.caseSensitive(caseSensitive);
.caseSensitive(caseSensitive)
.setIgnoreFileFieldIds(ignoreFileFieldIds);

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final boolean ignoreFileFieldIds;

RowDataReader(
CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) {
super(task, io, encryptionManager);
this.io = io;
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
Expand Down Expand Up @@ -185,7 +187,8 @@ private CloseableIterable<InternalRow> newOrcIterable(
.createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
.filter(task.residual())
.caseSensitive(caseSensitive)
.rowFilter(orcRowFilter);
.rowFilter(orcRowFilter)
.setIgnoreFileFieldIds(ignoreFileFieldIds);

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT;

public class RowDataRewriter implements Serializable {

Expand All @@ -64,6 +66,7 @@ public class RowDataRewriter implements Serializable {
private final LocationProvider locations;
private final String nameMapping;
private final boolean caseSensitive;
private final boolean ignoreFileFieldIds;

public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
Expand All @@ -80,6 +83,11 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));

this.ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);
}

public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
Expand All @@ -96,7 +104,7 @@ private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws Exceptio
long taskId = context.taskAttemptId();

RowDataReader dataReader = new RowDataReader(
task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive, ignoreFileFieldIds);

StructType structType = SparkSchemaUtil.convert(schema);
SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
Expand Down
45 changes: 32 additions & 13 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT;

class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics {
Expand Down Expand Up @@ -223,6 +225,10 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);

ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
"Cannot scan table %s: cannot apply required delete files", table);
Expand All @@ -231,7 +237,7 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
localityPreferred, new BatchReaderFactory(batchSize)));
localityPreferred, new BatchReaderFactory(batchSize), ignoreFileFieldIds));
}
LOG.info("Batching input partitions with {} tasks.", readTasks.size());

Expand All @@ -246,12 +252,16 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);

List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
localityPreferred, InternalRowReaderFactory.INSTANCE));
localityPreferred, InternalRowReaderFactory.INSTANCE, ignoreFileFieldIds));
}

return readTasks;
Expand Down Expand Up @@ -455,13 +465,15 @@ private static class ReadTask<T> implements Serializable, InputPartition<T> {
private final boolean localityPreferred;
private final ReaderFactory<T> readerFactory;

private final boolean ignoreFileFieldIds;
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
private transient String[] preferredLocations = null;

private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory,
boolean ignoreFileFieldIds) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
Expand All @@ -472,12 +484,13 @@ private ReadTask(CombinedScanTask task, String tableSchemaString, String expecte
this.preferredLocations = getPreferredLocations();
this.readerFactory = readerFactory;
this.nameMappingString = nameMappingString;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
public InputPartitionReader<T> createPartitionReader() {
return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
encryptionManager.value(), caseSensitive);
encryptionManager.value(), caseSensitive, ignoreFileFieldIds);
}

@Override
Expand Down Expand Up @@ -513,7 +526,8 @@ private String[] getPreferredLocations() {
private interface ReaderFactory<T> extends Serializable {
InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds);
}

private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
Expand All @@ -525,8 +539,10 @@ private InternalRowReaderFactory() {
@Override
public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds) {
return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive,
ignoreFileFieldIds);
}
}

Expand All @@ -540,22 +556,25 @@ private static class BatchReaderFactory implements ReaderFactory<ColumnarBatch>
@Override
public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds) {
return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize,
ignoreFileFieldIds);
}
}

private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) {
super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, ignoreFileFieldIds);
}
}

private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive, int size) {
super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
EncryptionManager encryptionManager, boolean caseSensitive, int size,
boolean ignoreFileFieldIds) {
super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size, ignoreFileFieldIds);
}
}
}
Loading