Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.util.List;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.orc.TypeDescription;

class ApplyNameMapping extends OrcSchemaVisitor<TypeDescription> {
private final NameMapping nameMapping;

ApplyNameMapping(NameMapping nameMapping) {
this.nameMapping = nameMapping;
}

@Override
public String elementName() {
return "element";
}

@Override
public String keyName() {
return "key";
}

@Override
public String valueName() {
return "value";
}

TypeDescription setId(TypeDescription type, MappedField mappedField) {
if (mappedField != null) {
type.setAttribute(ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, mappedField.id().toString());
}
return type;
}

@Override
public TypeDescription record(TypeDescription record, List<String> names, List<TypeDescription> fields) {
Preconditions.checkArgument(names.size() == fields.size(), "All fields must have names");
MappedField field = nameMapping.find(currentPath());
TypeDescription structType = TypeDescription.createStruct();

for (int i = 0; i < fields.size(); i++) {
String fieldName = names.get(i);
TypeDescription fieldType = fields.get(i);
if (fieldType != null) {
structType.addField(fieldName, fieldType);
}
}
return setId(structType, field);
}

@Override
public TypeDescription list(TypeDescription array, TypeDescription element) {
Preconditions.checkArgument(element != null, "List type must have element type");

MappedField field = nameMapping.find(currentPath());
TypeDescription listType = TypeDescription.createList(element);
return setId(listType, field);
}

@Override
public TypeDescription map(TypeDescription map, TypeDescription key, TypeDescription value) {
Preconditions.checkArgument(key != null && value != null, "Map type must have both key and value types");

MappedField field = nameMapping.find(currentPath());
TypeDescription mapType = TypeDescription.createMap(key, value);
return setId(mapType, field);
}

@Override
public TypeDescription primitive(TypeDescription primitive) {
MappedField field = nameMapping.find(currentPath());
return setId(primitive.clone(), field);
}
}
47 changes: 47 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/HasIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.util.List;
import java.util.function.Predicate;
import org.apache.orc.TypeDescription;

class HasIds extends OrcSchemaVisitor<Boolean> {

@Override
public Boolean record(TypeDescription record, List<String> names, List<Boolean> fields) {
return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true));
}

@Override
public Boolean list(TypeDescription array, Boolean element) {
return ORCSchemaUtil.icebergID(array).isPresent() || element;
}

@Override
public Boolean map(TypeDescription map, Boolean key, Boolean value) {
return ORCSchemaUtil.icebergID(map).isPresent() || key || value;
}

@Override
public Boolean primitive(TypeDescription primitive) {
return ORCSchemaUtil.icebergID(primitive).isPresent();
}
}
13 changes: 10 additions & 3 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
Expand Down Expand Up @@ -120,11 +121,12 @@ public static ReadBuilder read(InputFile file) {
public static class ReadBuilder {
private final InputFile file;
private final Configuration conf;
private org.apache.iceberg.Schema schema = null;
private Schema schema = null;
private Long start = null;
private Long length = null;
private Expression filter = null;
private boolean caseSensitive = true;
private NameMapping nameMapping = null;

private Function<TypeDescription, OrcRowReader<?>> readerFunc;
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
Expand Down Expand Up @@ -193,10 +195,15 @@ public ReadBuilder recordsPerBatch(int numRecordsPerBatch) {
return this;
}

public ReadBuilder withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcIterable<>(file, conf, schema, start, length, readerFunc, caseSensitive, filter, batchedReaderFunc,
recordsPerBatch);
return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter,
batchedReaderFunc, recordsPerBatch);
}
}

Expand Down
13 changes: 13 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -395,6 +396,18 @@ static boolean isOptional(TypeDescription orcType) {
return true;
}

static TypeDescription removeIds(TypeDescription type) {
return OrcSchemaVisitor.visit(type, new RemoveIds());
}

static boolean hasIds(TypeDescription orcSchema) {
return OrcSchemaVisitor.visit(orcSchema, new HasIds());
}

static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping nameMapping) {
return OrcSchemaVisitor.visit(orcSchema, new ApplyNameMapping(nameMapping));
}

/**
* Generates mapping from field IDs to ORC qualified names. See {@link IdToOrcName} for details.
*/
Expand Down
19 changes: 17 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.util.Pair;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
Expand All @@ -51,14 +53,16 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final boolean caseSensitive;
private final Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction;
private final int recordsPerBatch;
private NameMapping nameMapping;

OrcIterable(InputFile file, Configuration config, Schema schema,
Long start, Long length,
NameMapping nameMapping, Long start, Long length,
Function<TypeDescription, OrcRowReader<?>> readerFunction, boolean caseSensitive, Expression filter,
Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction, int recordsPerBatch) {
this.schema = schema;
this.readerFunction = readerFunction;
this.file = file;
this.nameMapping = nameMapping;
this.start = start;
this.length = length;
this.config = config;
Expand All @@ -73,7 +77,18 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
public CloseableIterator<T> iterator() {
Reader orcFileReader = ORC.newFileReader(file, config);
addCloseable(orcFileReader);
TypeDescription readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, orcFileReader.getSchema());

TypeDescription fileSchema = orcFileReader.getSchema();
final TypeDescription readOrcSchema;
if (ORCSchemaUtil.hasIds(fileSchema)) {
readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, fileSchema);
} else {
if (nameMapping == null) {
nameMapping = MappingUtil.create(schema);
}
TypeDescription typeWithIds = ORCSchemaUtil.applyNameMapping(fileSchema, nameMapping);
readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, typeWithIds);
}

SearchArgument sarg = null;
if (filter != null) {
Expand Down
93 changes: 88 additions & 5 deletions orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.orc;

import java.util.Deque;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -29,6 +30,8 @@
*/
public abstract class OrcSchemaVisitor<T> {

private final Deque<String> fieldNames = Lists.newLinkedList();

public static <T> List<T> visitSchema(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
Preconditions.checkArgument(schema.getId() == 0, "TypeDescription must be root schema.");

Expand All @@ -47,11 +50,37 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
throw new UnsupportedOperationException("Cannot handle " + schema);

case LIST:
return visitor.list(schema, visit(schema.getChildren().get(0), visitor));
final T elementResult;

TypeDescription element = schema.getChildren().get(0);
visitor.beforeElementField(element);
try {
elementResult = visit(element, visitor);
} finally {
visitor.afterElementField(element);
}
return visitor.list(schema, elementResult);

case MAP:
return visitor.map(schema, visit(schema.getChildren().get(0), visitor),
visit(schema.getChildren().get(1), visitor));
final T keyResult;
final T valueResult;

TypeDescription key = schema.getChildren().get(0);
visitor.beforeKeyField(key);
try {
keyResult = visit(key, visitor);
} finally {
visitor.afterKeyField(key);
}

TypeDescription value = schema.getChildren().get(1);
visitor.beforeValueField(value);
try {
valueResult = visit(value, visitor);
} finally {
visitor.afterValueField(value);
}
return visitor.map(schema, keyResult, valueResult);

default:
return visitor.primitive(schema);
Expand Down Expand Up @@ -83,9 +112,53 @@ private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> vis
return visitor.record(record, names, visitFields(fields, names, visitor));
}

public void beforeField(String name, TypeDescription type) {}
public String elementName() {
return "_elem";
}

public String keyName() {
return "_key";
}

public String valueName() {
return "_value";
}

public String currentFieldName() {
return fieldNames.peek();
}

public void beforeField(String name, TypeDescription type) {
fieldNames.push(name);
}

public void afterField(String name, TypeDescription type) {
fieldNames.pop();
}

public void beforeElementField(TypeDescription element) {
beforeField(elementName(), element);
}

public void afterElementField(TypeDescription element) {
afterField(elementName(), element);
}

public void beforeKeyField(TypeDescription key) {
beforeField(keyName(), key);
}

public void afterKeyField(TypeDescription key) {
afterField(keyName(), key);
}

public void afterField(String name, TypeDescription type) {}
public void beforeValueField(TypeDescription value) {
beforeField(valueName(), value);
}

public void afterValueField(TypeDescription value) {
afterField(valueName(), value);
}

public T record(TypeDescription record, List<String> names, List<T> fields) {
return null;
Expand All @@ -102,4 +175,14 @@ public T map(TypeDescription map, T key, T value) {
public T primitive(TypeDescription primitive) {
return null;
}

protected String[] currentPath() {
return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]);
}

protected String[] path(String name) {
List<String> list = Lists.newArrayList(fieldNames.descendingIterator());
list.add(name);
return list.toArray(new String[0]);
}
}
Loading