Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 11 additions & 61 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand All @@ -36,11 +36,8 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -87,7 +84,7 @@ static Metrics fromWriter(Writer writer) {
private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema,
final ColumnStatistics[] colStats) {
final Schema schema = ORCSchemaUtil.convert(orcSchema);
final Set<TypeDescription> columnsInContainers = findColumnsInContainers(schema, orcSchema);
final Set<Integer> statsColumns = statsColumns(orcSchema);
Map<Integer, Long> columnSizes = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> valueCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> nullCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Expand All @@ -106,7 +103,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti

columnSizes.put(fieldId, colStat.getBytesOnDisk());

if (!columnsInContainers.contains(orcCol)) {
if (statsColumns.contains(fieldId)) {
// Since ORC does not track null values nor repeated ones, the value count for columns in
// containers (maps, list) may be larger than what it actually is, however these are not
// used in experssions right now. For such cases, we use the value number of values
Expand Down Expand Up @@ -209,64 +206,17 @@ private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
return Optional.ofNullable(Conversions.toByteBuffer(column.type(), max));
}

private static Set<TypeDescription> findColumnsInContainers(Schema schema,
TypeDescription orcSchema) {
ColumnsInContainersVisitor visitor = new ColumnsInContainersVisitor();
OrcSchemaWithTypeVisitor.visit(schema, orcSchema, visitor);
return visitor.getColumnsInContainers();
private static Set<Integer> statsColumns(TypeDescription schema) {
return OrcSchemaVisitor.visit(schema, new StatsColumnsVisitor());
}

private static class ColumnsInContainersVisitor extends OrcSchemaWithTypeVisitor<TypeDescription> {

private final Set<TypeDescription> columnsInContainers;

private ColumnsInContainersVisitor() {
columnsInContainers = Sets.newHashSet();
}

public Set<TypeDescription> getColumnsInContainers() {
return columnsInContainers;
}

private Set<TypeDescription> flatten(TypeDescription rootType) {
if (rootType == null) {
return ImmutableSet.of();
}

final Set<TypeDescription> flatTypes = Sets.newHashSetWithExpectedSize(rootType.getMaximumId());
final Queue<TypeDescription> queue = Queues.newLinkedBlockingQueue();
queue.add(rootType);
while (!queue.isEmpty()) {
TypeDescription type = queue.remove();
flatTypes.add(type);
queue.addAll(Optional.ofNullable(type.getChildren()).orElse(ImmutableList.of()));
}
return flatTypes;
}

@Override
public TypeDescription record(Types.StructType iStruct, TypeDescription record,
List<String> names, List<TypeDescription> fields) {
return record;
}

@Override
public TypeDescription list(Types.ListType iList, TypeDescription array, TypeDescription element) {
columnsInContainers.addAll(flatten(element));
return array;
}

@Override
public TypeDescription map(Types.MapType iMap, TypeDescription map,
TypeDescription key, TypeDescription value) {
columnsInContainers.addAll(flatten(key));
columnsInContainers.addAll(flatten(value));
return map;
}

private static class StatsColumnsVisitor extends OrcSchemaVisitor<Set<Integer>> {
@Override
public TypeDescription primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
return primitive;
public Set<Integer> record(TypeDescription record, List<String> names, List<Set<Integer>> fields) {
ImmutableSet.Builder<Integer> result = ImmutableSet.builder();
fields.stream().filter(Objects::nonNull).forEach(result::addAll);
record.getChildren().stream().map(ORCSchemaUtil::fieldId).forEach(result::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to verify, this would fail if there's any column that does not have an Iceberg ID. Is that preferred to skipping the metrics instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. This is called when Iceberg wrote the file, so we should be able to assume the IDs are present, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the ORC files are written by Iceberg it should be fine. I was thinking for the case of importing existing ORC files although we'd need to implement name mapping fallback strategy.

Copy link
Contributor

@edgarRd edgarRd Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks pretty much any import of non-Iceberg ORC tables via SparkTableUtil.

return result.build();
}
}

Expand Down