Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -73,43 +74,60 @@ public static Metrics fromInputFile(InputFile file) {
}

public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig) {
return fromInputFile(file, metricsConfig, null);
}

public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig, NameMapping mapping) {
final Configuration config = (file instanceof HadoopInputFile) ?
((HadoopInputFile) file).getConf() : new Configuration();
return fromInputFile(file, config, metricsConfig);
return fromInputFile(file, config, metricsConfig, mapping);
}

static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig) {
static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig, NameMapping mapping) {
try (Reader orcReader = ORC.newFileReader(file, config)) {
return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics(),
metricsConfig);
metricsConfig, mapping);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location());
}
}

static Metrics fromWriter(Writer writer, MetricsConfig metricsConfig) {
try {
return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig);
return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig, null);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to get statistics from writer");
}
}

private static Metrics buildOrcMetrics(long numOfRows, TypeDescription orcSchema,
ColumnStatistics[] colStats, MetricsConfig metricsConfig) {
final Schema schema = ORCSchemaUtil.convert(orcSchema);
final Set<Integer> statsColumns = statsColumns(orcSchema);
private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema,
final ColumnStatistics[] colStats, final MetricsConfig metricsConfig,
final NameMapping mapping) {
final TypeDescription orcSchemaWithIds = (!ORCSchemaUtil.hasIds(orcSchema) && mapping != null) ?
ORCSchemaUtil.applyNameMapping(orcSchema, mapping) : orcSchema;
final Set<Integer> statsColumns = statsColumns(orcSchemaWithIds);
final MetricsConfig effectiveMetricsConfig = Optional.ofNullable(metricsConfig)
.orElseGet(MetricsConfig::getDefault);
Map<Integer, Long> columnSizes = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> valueCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> nullCounts = Maps.newHashMapWithExpectedSize(colStats.length);

if (!ORCSchemaUtil.hasIds(orcSchemaWithIds)) {
return new Metrics(numOfRows,
columnSizes,
valueCounts,
nullCounts,
null,
null);
}

final Schema schema = ORCSchemaUtil.convert(orcSchemaWithIds);
Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();

for (int i = 0; i < colStats.length; i++) {
final ColumnStatistics colStat = colStats[i];
final TypeDescription orcCol = orcSchema.findSubtype(i);
final TypeDescription orcCol = orcSchemaWithIds.findSubtype(i);
final Optional<Types.NestedField> icebergColOpt = ORCSchemaUtil.icebergID(orcCol)
.map(schema::findField);

Expand Down Expand Up @@ -261,7 +279,8 @@ private static class StatsColumnsVisitor extends OrcSchemaVisitor<Set<Integer>>
public Set<Integer> record(TypeDescription record, List<String> names, List<Set<Integer>> fields) {
ImmutableSet.Builder<Integer> result = ImmutableSet.builder();
fields.stream().filter(Objects::nonNull).forEach(result::addAll);
record.getChildren().stream().map(ORCSchemaUtil::fieldId).forEach(result::add);
record.getChildren().stream().map(ORCSchemaUtil::icebergID).filter(Optional::isPresent)
.map(Optional::get).forEach(result::add);
return result.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ public static List<DataFile> listPartition(Map<String, String> partition, String
} else if (format.contains("parquet")) {
return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
} else if (format.contains("orc")) {
// TODO: use NameMapping in listOrcPartition
return listOrcPartition(partition, uri, spec, conf, metricsConfig);
return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
} else {
throw new UnsupportedOperationException("Unknown partition format: " + format);
}
Expand Down Expand Up @@ -396,15 +395,16 @@ private static List<DataFile> listParquetPartition(Map<String, String> partition

private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
PartitionSpec spec, Configuration conf,
MetricsConfig metricsSpec) {
MetricsConfig metricsSpec, NameMapping mapping) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);

return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf), metricsSpec);
Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
metricsSpec, mapping);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
Expand Down
Loading