Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 26 additions & 61 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,10 +34,7 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -86,7 +82,7 @@ static Metrics fromWriter(Writer writer) {
private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema,
final ColumnStatistics[] colStats) {
final Schema schema = ORCSchemaUtil.convert(orcSchema);
final Set<TypeDescription> columnsInContainers = findColumnsInContainers(schema, orcSchema);
final Set<TypeDescription> columnsInContainers = findColumnsInContainers(orcSchema);
Map<Integer, Long> columnSizes = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> valueCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> nullCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Expand Down Expand Up @@ -214,64 +210,33 @@ private static Optional<ByteBuffer> fromOrcMax(Types.NestedField column,
return Optional.ofNullable(Conversions.toByteBuffer(column.type(), max));
}

private static Set<TypeDescription> findColumnsInContainers(Schema schema,
TypeDescription orcSchema) {
ColumnsInContainersVisitor visitor = new ColumnsInContainersVisitor();
OrcSchemaWithTypeVisitor.visit(schema, orcSchema, visitor);
return visitor.getColumnsInContainers();
private static Set<TypeDescription> findColumnsInContainers(TypeDescription orcSchema) {
Set<TypeDescription> columnsInContainers = Sets.newHashSet();
findColumnsInContainers(orcSchema, columnsInContainers, false);
return Collections.unmodifiableSet(columnsInContainers);
}

private static class ColumnsInContainersVisitor extends OrcSchemaWithTypeVisitor<TypeDescription> {

private final Set<TypeDescription> columnsInContainers;

private ColumnsInContainersVisitor() {
columnsInContainers = Sets.newHashSet();
}

public Set<TypeDescription> getColumnsInContainers() {
return columnsInContainers;
}

private Set<TypeDescription> flatten(TypeDescription rootType) {
if (rootType == null) {
return ImmutableSet.of();
}

final Set<TypeDescription> flatTypes = Sets.newHashSetWithExpectedSize(rootType.getMaximumId());
final Queue<TypeDescription> queue = Queues.newLinkedBlockingQueue();
queue.add(rootType);
while (!queue.isEmpty()) {
TypeDescription type = queue.remove();
flatTypes.add(type);
queue.addAll(Optional.ofNullable(type.getChildren()).orElse(ImmutableList.of()));
}
return flatTypes;
private static void findColumnsInContainers(TypeDescription column,
Set<TypeDescription> columnsInContainers,
boolean isInContainers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main reservation about this is that it doesn't use the ORC type visitor.

The visitors keep code cleaner. Recursive code is kept to a single method that is reused, so all of the tree traversals are similar. Visitors also allow us to go find all of the places that traverse a given structure to make sure they are up to date, so updates and maintenance are easier.

Copy link
Contributor

@edgarRd edgarRd Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this change, if the concern is compute complexity on (really) large values of N we should not be using recursion either in as it is used here due to potential stack overflows, and instead an iterative version should be used.

I found myself using the visitor to be simpler for maintenance and readability since it's a common pattern used across the codebase. Usually, schemas would not be that large (possibly there are some exceptions), but even in the 100s of columns I'm not sure if the complexity impact is too high. I guess this was a trade-off on complexity vs using a common pattern in the codebase.

Copy link
Contributor

@rdblue rdblue Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about the complexity of traversing a schema not being a huge concern. I think the main value of this is to simplify the code to make it easier to maintain.

Here's the version I came up with, which is a bit simpler:

  private static Set<Integer> statsColumns(TypeDescription schema) {
    return OrcSchemaWithTypeVisitor.visit((Type) null, schema, new StatsColumnsVisitor());
  }

  private static class StatsColumnsVisitor extends OrcSchemaWithTypeVisitor<Set<Integer>> {
    @Override
    public Set<Integer> record(Types.StructType s, TypeDescription record, List<String> names, List<Set<Integer>> fields) {
      ImmutableSet.Builder<Integer> result = ImmutableSet.builder();
      fields.stream().filter(Objects::nonNull).forEach(result::addAll);
      record.getChildren().stream().map(ORCSchemaUtil::fieldId).forEach(result::add);
      return result.build();
    }

    @Override
    public Set<Integer> list(Types.ListType l, TypeDescription array, Set<Integer> element) {
      return null;
    }

    @Override
    public Set<Integer> map(Types.MapType m, TypeDescription map, Set<Integer> key, Set<Integer> value) {
      return null;
    }

    @Override
    public Set<Integer> primitive(Type.PrimitiveType p, TypeDescription primitive) {
      return null;
    }
  }

Using this would avoid the need to negate the check, so it would be if (statsColumns.contains(icebergId)) {...}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @edgarRd I think these are good points. @rdblue Your code snippet is indeed a neat and logical solution. Please feel free to close this PR and supersede it with yours.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lxynov, I opened #1167 to fix this. Please have a look and review it if you have time. I'll close this one.

if (isInContainers) {
columnsInContainers.add(column);
}

@Override
public TypeDescription record(Types.StructType iStruct, TypeDescription record,
List<String> names, List<TypeDescription> fields) {
return record;
}

@Override
public TypeDescription list(Types.ListType iList, TypeDescription array, TypeDescription element) {
columnsInContainers.addAll(flatten(element));
return array;
}

@Override
public TypeDescription map(Types.MapType iMap, TypeDescription map,
TypeDescription key, TypeDescription value) {
columnsInContainers.addAll(flatten(key));
columnsInContainers.addAll(flatten(value));
return map;
}

@Override
public TypeDescription primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
return primitive;
switch (column.getCategory()) {
case LIST:
case MAP:
for (TypeDescription child : column.getChildren()) {
findColumnsInContainers(child, columnsInContainers, true);
}
return;
case STRUCT:
for (TypeDescription child : column.getChildren()) {
findColumnsInContainers(child, columnsInContainers, isInContainers);
}
return;
case UNION:
throw new UnsupportedOperationException("Cannot handle " + column);
default:
}
}
}