Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions core/src/main/java/org/apache/iceberg/FieldMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@
package org.apache.iceberg;


import java.nio.ByteBuffer;

/**
* Iceberg internally tracked field level metrics.
*/
public class FieldMetrics {
public class FieldMetrics<T> {
private final int id;
private final long valueCount;
private final long nullValueCount;
private final long nanValueCount;
private final ByteBuffer lowerBound;
private final ByteBuffer upperBound;
private final T lowerBound;
private final T upperBound;

public FieldMetrics(int id,
long valueCount,
long nullValueCount,
long nanValueCount,
ByteBuffer lowerBound,
ByteBuffer upperBound) {
T lowerBound,
T upperBound) {
this.id = id;
this.valueCount = valueCount;
this.nullValueCount = nullValueCount;
Expand Down Expand Up @@ -78,14 +76,14 @@ public long nanValueCount() {
/**
* Returns the lower bound value of this field.
*/
public ByteBuffer lowerBound() {
public T lowerBound() {
return lowerBound;
}

/**
* Returns the upper bound value of this field.
*/
public ByteBuffer upperBound() {
public T upperBound() {
return upperBound;
}
}
8 changes: 3 additions & 5 deletions core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@

package org.apache.iceberg;

import java.nio.ByteBuffer;

/**
* Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
* <p>
* Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
* This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
* exceptions when they are accessed.
*/
public class FloatFieldMetrics extends FieldMetrics {
public class FloatFieldMetrics extends FieldMetrics<Number> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing #2464, I think I understand why Number is used here instead of Float or Double, but I think it would be better to make each FieldMetrics class specific to a value type. This will probably be done when #2464 is merged and this is rebased for float, but in the mean time you may want to update this for any other type metrics you're introducing in this PR.


/**
* Constructor for creating a FieldMetrics with only NaN counter.
Expand All @@ -51,12 +49,12 @@ public long nullValueCount() {
}

@Override
public ByteBuffer lowerBound() {
public Number lowerBound() {
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}

@Override
public ByteBuffer upperBound() {
public Number upperBound() {
throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. ");
}
}
14 changes: 9 additions & 5 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
Expand Down Expand Up @@ -330,8 +331,10 @@ public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
* A {@link DatumWriter} implementation that wraps another to produce position deletes.
*/
private static class PositionDatumWriter implements MetricsAwareDatumWriter<PositionDelete<?>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
private static final ValueWriter<CharSequence> PATH_WRITER =
ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId());
private static final ValueWriter<Long> POS_WRITER =
ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId());

@Override
public void setSchema(Schema schema) {
Expand All @@ -355,9 +358,10 @@ public Stream<FieldMetrics> metrics() {
* @param <D> the type of datum written as a deleted row
*/
private static class PositionAndRowDatumWriter<D> implements MetricsAwareDatumWriter<PositionDelete<D>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();

private static final ValueWriter<CharSequence> PATH_WRITER =
ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId());
private static final ValueWriter<Long> POS_WRITER =
ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId());
private final DatumWriter<D> rowWriter;

private PositionAndRowDatumWriter(DatumWriter<D> rowWriter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ public void close() throws IOException {

@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
Schema schema, PositionOutputStream stream, DatumWriter<?> metricsAwareDatumWriter,
Schema schema, PositionOutputStream stream, DatumWriter<?> datumWriter,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) metricsAwareDatumWriter);
DataFileWriter<D> writer = new DataFileWriter<>((DatumWriter<D>) datumWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this rename needed? While I support cleaning up names, I would generally opt to leave these as-is to have smaller commits that are less likely to cause conflicts.


writer.setCodec(codec);

Expand Down
120 changes: 118 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@

package org.apache.iceberg.avro;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.UnicodeUtil;

public class AvroMetrics {

Expand All @@ -31,7 +42,112 @@ private AvroMetrics() {

static Metrics fromWriter(DatumWriter<?> datumWriter, Schema schema, long numRecords,
MetricsConfig inputMetricsConfig) {
// TODO will populate in following PRs if datum writer is a MetricsAwareDatumWriter
return new Metrics(numRecords, null, null, null);
if (!(datumWriter instanceof MetricsAwareDatumWriter)) {
return new Metrics(numRecords, null, null, null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is numRecords correct? What if this is for a field nested in a map or list?

}

MetricsAwareDatumWriter<?> metricsAwareDatumWriter = (MetricsAwareDatumWriter<?>) datumWriter;
MetricsConfig metricsConfig;
if (inputMetricsConfig == null) {
metricsConfig = MetricsConfig.getDefault();
} else {
metricsConfig = inputMetricsConfig;
}

Map<Integer, Long> valueCounts = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we typically prefer Maps.newHashMap(), which will add null checking, and no put value should be null here.

Map<Integer, Long> nullValueCounts = new HashMap<>();
Map<Integer, Long> nanValueCounts = new HashMap<>();
Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider is that quite a bit of this method and the helper methods below is generic and could be written for Stream<FieldMetrics>. I don't think it needs to happen right now, but I think it would be good to have this separated into the Avro-specific part and a metrics part that lives in MetricsUtil.


metricsAwareDatumWriter.metrics().forEach(metrics -> {
String columnName = schema.findColumnName(metrics.id());
MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a method to look up metrics mode by ID?

if (metricsMode == MetricsModes.None.get()) {
return;
}

valueCounts.put(metrics.id(), metrics.valueCount());
nullValueCounts.put(metrics.id(), metrics.nullValueCount());
Type type = schema.findType(metrics.id());

if (type.typeId() == Type.TypeID.FLOAT || type.typeId() == Type.TypeID.DOUBLE) {
nanValueCounts.put(metrics.id(), metrics.nanValueCount());
}

if (metricsMode == MetricsModes.Counts.get()) {
return;
}

updateLowerBound(metrics, type, metricsMode).ifPresent(lowerBound -> lowerBounds.put(metrics.id(), lowerBound));
updateUpperBound(metrics, type, metricsMode).ifPresent(upperBound -> upperBounds.put(metrics.id(), upperBound));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the decision to return an option instead of just passing lowerBounds or upperBounds into the update method and having the put happen there. Wouldn't it be simpler if the put happened at the end of updateLowerBound or updateUpperBound?

});

return new Metrics(numRecords, null,
valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds);
}

private static Optional<ByteBuffer> updateLowerBound(FieldMetrics metrics, Type type,
MetricsModes.MetricsMode metricsMode) {
if (metrics.lowerBound() == null) {
return Optional.empty();
}

Object lowerBound = metrics.lowerBound();
if (metricsMode instanceof MetricsModes.Truncate) {
MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode;
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
lowerBound = UnicodeUtil.truncateStringMin(
Literal.of((CharSequence) metrics.lowerBound()), truncateLength).value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use UnicodeUtil.truncateStringMin(CharSequence, int) instead of the Literal method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see that the min and max methods take literals. I think it would be better to refactor those methods to expose non-Literal implementations instead of creating the literals and unwrapping the result.

break;
case FIXED:
case BINARY:
lowerBound = BinaryUtil.truncateBinaryMin(
Literal.of((ByteBuffer) metrics.lowerBound()), truncateLength).value();
break;
default:
break;
}
}

return Optional.ofNullable(Conversions.toByteBuffer(type, lowerBound));
}

private static Optional<ByteBuffer> updateUpperBound(FieldMetrics metrics, Type type,
MetricsModes.MetricsMode metricsMode) {
if (metrics.upperBound() == null) {
return Optional.empty();
}

Object upperBound = null;
if (metricsMode instanceof MetricsModes.Truncate) {
MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode;
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
upperBound = Optional.ofNullable(
UnicodeUtil.truncateStringMax(Literal.of((CharSequence) metrics.upperBound()), truncateLength))
.map(Literal::value)
.orElse(null);
break;
case FIXED:
case BINARY:
upperBound = Optional.ofNullable(
BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) metrics.upperBound()), truncateLength))
.map(Literal::value)
.orElse(null);
break;
default:
break;
}
}

if (upperBound == null) {
upperBound = metrics.upperBound();
}

return Optional.ofNullable(Conversions.toByteBuffer(type, upperBound));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,11 @@ private static String sanitize(char character) {
}
return "_x" + Integer.toHexString(character).toUpperCase();
}

static boolean supportsMetrics(Schema.Type type) {
// ENUM will not be created by converting iceberg schema to avro schema, and thus not included
return type == Schema.Type.BOOLEAN || type == Schema.Type.INT || type == Schema.Type.LONG ||
type == Schema.Type.FLOAT || type == Schema.Type.DOUBLE || type == Schema.Type.STRING ||
type == Schema.Type.FIXED || type == Schema.Type.BYTES;
}
}
41 changes: 38 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (Schema.Field field : schema.getFields()) {
names.add(field.name());
visitor.beforeField(field.name(), field.schema(), schema);

T result = visitWithName(field.name(), field.schema(), visitor);
visitor.afterField(field.name(), field.schema(), schema);

results.add(result);
}

Expand All @@ -59,13 +63,22 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {

case ARRAY:
if (schema.getLogicalType() instanceof LogicalMap) {
return visitor.array(schema, visit(schema.getElementType(), visitor));
T result = visit(schema.getElementType(), visitor);
return visitor.array(schema, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this change is needed?

} else {
return visitor.array(schema, visitWithName("element", schema.getElementType(), visitor));
visitor.beforeListElement("element", schema.getElementType(), schema);
T result = visitWithName("element", schema.getElementType(), visitor);
visitor.afterListElement("element", schema.getElementType(), schema);

return visitor.array(schema, result);
}

case MAP:
return visitor.map(schema, visitWithName("value", schema.getValueType(), visitor));
visitor.beforeMapValue("value", schema.getValueType(), schema);
T result = visitWithName("value", schema.getValueType(), visitor);
visitor.afterMapValue("value", schema.getValueType(), schema);

return visitor.map(schema, result);

default:
return visitor.primitive(schema);
Expand Down Expand Up @@ -107,4 +120,26 @@ public T map(Schema map, T value) {
public T primitive(Schema primitive) {
return null;
}

public void beforeField(String name, Schema type, Schema parentSchema) {
}

public void afterField(String name, Schema type, Schema parentSchema) {
}

public void beforeListElement(String name, Schema type, Schema parentSchema) {
beforeField(name, type, parentSchema);
}

public void afterListElement(String name, Schema type, Schema parentSchema) {
afterField(name, type, parentSchema);
}

public void beforeMapValue(String name, Schema type, Schema parentSchema) {
beforeField(name, type, parentSchema);
}

public void afterMapValue(String name, Schema type, Schema parentSchema) {
afterField(name, type, parentSchema);
}
}
Loading