-
Notifications
You must be signed in to change notification settings - Fork 3k
Avro metrics support: track metrics in Avro value writers #1963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // ---------------------------------- Helpers --------------------------------------------- | ||
|
|
||
| private Deque<String> fieldNames = Lists.newLinkedList(); | ||
| private Deque<Schema> parentSchemas = Lists.newLinkedList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of updating all visitors, why not add extra callbacks like the visitors for Iceberg schemas? I think that supporting beforeField and afterField would be a better way to handle this than passing the parent and name around. The implementation to get a field's ID from its parent and name seems a bit awkward compared to adding an ID stack in one visitor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I did think of that pattern, and from my notes the main reason I didn't do it is that it won't be as clean as the existing pattern of before/afterField in other visitors, as different data structure has different way of retrieving field id information. For example, for fields that are part of the struct, the field id is stored in its own Schema.Field so that we can directly pass field to before/afterField within the for loop when looping through fields; but for map value, the id is stored in the map's schema instead of its own schema, so that beforeMapValue should be passed with the parent schema. My thought was that to require different visitors to implement before/after with all these different parameters, and to duplicate the various ID retrieval logic among data types in different visitor implementations could be messy to reason about, that keeping them here might actually be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about creating a fake Schema.Field to pass to the before/after method instead? Another alternative is to pass the field information to the method, like this:
public beforeField(int fieldId, String name, Schema type);
public afterField(int fieldId, String name, Schema type);
public beforeListElement(int elementId, Schema elementType);
public beforeMapKey(int keyId, Schema keyType);
public beforeMapValue(int valueId, Schema valueType);I think some variation on this would be better. We want to avoid keeping additional state in all visitors.
| @Override | ||
| public void write(Void ignored, Encoder encoder) throws IOException { | ||
| encoder.writeNull(); | ||
| throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary? Do you think that it would actually cause a problem if it were used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it won't, this was just me trying to make this fail loudly instead of fail silently and to avoid usage confusion in code, I can revert it back if you think it's not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should revert this back so that writeNull is correctly called. Nulls shouldn't be written as anything, but the fact that the encoder has a writeNull method makes me think that it should be called. We don't know what the encoder might be using it for.
| private long nullValueCount; | ||
|
|
||
| private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) { | ||
| private OptionWriter(int nullIndex, ValueWriter<T> valueWriter, Schema.Type type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the only place that type is used is in an error message. I'd prefer not to change the signature here just to print a type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think this type is also used for checking isMetricSupportedType/supportsMetrics so we will still need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should not pass the type here. Instead, let's have a MetricsOptionWriter and a non-metrics version and choose which one to use ahead of time. That way, we don't keep a count that won't be used. There's no need to check whether the null count should be used when metrics is called. That can be determined ahead of time.
If we do that, then there is no need to pass the type here or into the option method. We can either add a boolean (collectMetrics) or have a separate factory method. I think that would be a bit cleaner.
8914589 to
39efa9f
Compare
yyanyy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @rdblue for reviewing this PR! I have rebased the changes and responded/addressed the feedback in this PR and #1946 except for the one that I forgot to include rowWriter as part of the metrics() for positional delete writer, which I'll do in a separate PR since I wanted to have both code and test coverage in the same PR for that.
| CodecFactory codec, Map<String, String> metadata) throws IOException { | ||
| DataFileWriter<D> writer = new DataFileWriter<>( | ||
| (DatumWriter<D>) metricsAwareDatumWriter); | ||
| (DatumWriter<D>) datumWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for the newline any more.
| return schema.getObjectProp(propertyName) != null; | ||
| } | ||
|
|
||
| public static int fieldId(Schema currentSchema, Schema parentSchema, Supplier<String> fieldNameGetter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to avoid needing this method because of the strange input types. I think those show that the way we're traversing the schema isn't quite right, which is why this is complicated.
| private static final BooleanWriter INSTANCE = new BooleanWriter(); | ||
| @Override | ||
| public Stream<FieldMetrics> metrics() { | ||
| throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's default this to Stream.empty() so that it won't fail.
| * @param <T2> Type after transformation | ||
| */ | ||
| @SuppressWarnings("checkstyle:VisibilityModifier") | ||
| public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically use S and T for input and output types in other transform classes.
| @Override | ||
| public void write(T1 datum, Encoder encoder) throws IOException { | ||
| valueCount++; | ||
| if (datum == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? Before, all of the type-specific writers assumed that the input value was non-null because null isn't allowed unless the type is optional. If it is optional, the writer will be wrapped in an option writer, so there is no need to handle null.
I think this should similarly assume that the option writer tracks null values and that all values will be non-null. That simplifies this class quite a bit.
| min = transformedDatum; | ||
| } | ||
| writeVal(transformedDatum, encoder); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no newline after if blocks and unnecessary newline before closing curly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this in a couple other places, too.
| * exceptions when they are accessed. | ||
| */ | ||
| public class FloatFieldMetrics extends FieldMetrics { | ||
| public class FloatFieldMetrics extends FieldMetrics<Number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reviewing #2464, I think I understand why Number is used here instead of Float or Double, but I think it would be better to make each FieldMetrics class specific to a value type. This will probably be done when #2464 is merged and this is rebased for float, but in the mean time you may want to update this for any other type metrics you're introducing in this PR.
| CodecFactory codec, Map<String, String> metadata) throws IOException { | ||
| DataFileWriter<D> writer = new DataFileWriter<>( | ||
| (DatumWriter<D>) metricsAwareDatumWriter); | ||
| DataFileWriter<D> writer = new DataFileWriter<>((DatumWriter<D>) datumWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this rename needed? While I support cleaning up names, I would generally opt to leave these as-is to have smaller commits that are less likely to cause conflicts.
| // TODO will populate in following PRs if datum writer is a MetricsAwareDatumWriter | ||
| return new Metrics(numRecords, null, null, null); | ||
| if (!(datumWriter instanceof MetricsAwareDatumWriter)) { | ||
| return new Metrics(numRecords, null, null, null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is numRecords correct? What if this is for a field nested in a map or list?
|
|
||
| metricsAwareDatumWriter.metrics().forEach(metrics -> { | ||
| String columnName = schema.findColumnName(metrics.id()); | ||
| MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a method to look up metrics mode by ID?
| switch (type.typeId()) { | ||
| case STRING: | ||
| lowerBound = UnicodeUtil.truncateStringMin( | ||
| Literal.of((CharSequence) metrics.lowerBound()), truncateLength).value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use UnicodeUtil.truncateStringMin(CharSequence, int) instead of the Literal method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see that the min and max methods take literals. I think it would be better to refactor those methods to expose non-Literal implementations instead of creating the literals and unwrapping the result.
| } | ||
|
|
||
| updateLowerBound(metrics, type, metricsMode).ifPresent(lowerBound -> lowerBounds.put(metrics.id(), lowerBound)); | ||
| updateUpperBound(metrics, type, metricsMode).ifPresent(upperBound -> upperBounds.put(metrics.id(), upperBound)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand the decision to return an option instead of just passing lowerBounds or upperBounds into the update method and having the put happen there. Wouldn't it be simpler if the put happened at the end of updateLowerBound or updateUpperBound?
| metricsConfig = inputMetricsConfig; | ||
| } | ||
|
|
||
| Map<Integer, Long> valueCounts = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we typically prefer Maps.newHashMap(), which will add null checking, and no put value should be null here.
| Map<Integer, Long> nullValueCounts = new HashMap<>(); | ||
| Map<Integer, Long> nanValueCounts = new HashMap<>(); | ||
| Map<Integer, ByteBuffer> lowerBounds = new HashMap<>(); | ||
| Map<Integer, ByteBuffer> upperBounds = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to consider is that quite a bit of this method and the helper methods below is generic and could be written for Stream<FieldMetrics>. I don't think it needs to happen right now, but I think it would be good to have this separated into the Avro-specific part and a metrics part that lives in MetricsUtil.
| if (schema.getLogicalType() instanceof LogicalMap) { | ||
| return visitor.array(schema, visit(schema.getElementType(), visitor)); | ||
| T result = visit(schema.getElementType(), visitor); | ||
| return visitor.array(schema, result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like this change is needed?
| "Invalid map: %s is not a string", keyType); | ||
| return visitor.map(partner, schema, visit(visitor.mapValueType(partner), schema.getValueType(), visitor)); | ||
|
|
||
| visitor.beforeMapValue("value", schema.getValueType(), schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this make an artificial call to the map key callbacks using a generic String schema as well?
| * This util class helps Avro DatumWriter builders to retrieve the correct field Id when building Avro DatumWriters | ||
| * with visitor pattern. | ||
| */ | ||
| public class AvroWriterBuilderFieldIdUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I think it would be a bit shorter to make this an abstract class that can be extended to get the current field ID via a protected method, but this is a good solution already.
| public abstract static class StoredAsIntWriter<T> implements ValueWriter<T> { | ||
| protected final int id; | ||
| protected long valueCount; | ||
| protected Integer max; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is only used for non-null values, I think you can simplify the min and max updates by removing the null check. Just use protected int min = Integer.MAX_VALUE here and remove the null check. Then when creating FieldMetrics, you can check whether valueCount is non-zero to know whether min and max are valid.
| } | ||
| } | ||
|
|
||
| private abstract static class FloatingPointWriter<T extends Comparable<T>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After #2464, would this be needed? Or would you just use the floating point classes from that PR?
| private static final LongWriter INSTANCE = new LongWriter(); | ||
|
|
||
| private LongWriter() { | ||
| private static class LongWriter extends ComparableWriter<Long> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect LongWriter to extend StoredAsLongWriter instead of ComparableWriter. Same for IntegerWriter. Could you update those?
| default Stream<FieldMetrics> metrics() { | ||
| return Stream.empty(); // TODO will populate in following PRs | ||
| } | ||
| Stream<FieldMetrics> metrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FieldMetrics is parameterized, but this is a bare reference. Could you update it? I think it should be FieldMetrics<?> since the metrics are not necessarily for the written value type, D.
| writeVal(datum, encoder); | ||
| } | ||
|
|
||
| protected abstract void writeVal(T datum, Encoder encoder) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably name this encode rather than writeVal so that there is less confusion with the write method.
| protected abstract void writeVal(T datum, Encoder encoder) throws IOException; | ||
|
|
||
| @Override | ||
| public Stream<FieldMetrics> metrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to fix all of the references that don't parameterize FieldMetrics. I think they should be FieldMetrics<?>.
| ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); | ||
| assertCounts(7, 1L, 0L, 1L, metrics); | ||
| assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); | ||
| if (fileFormat() == FileFormat.AVRO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed if #2464 goes in first?
| private void assertNonNullColumnSizes(Metrics metrics) { | ||
| if (fileFormat() != FileFormat.AVRO) { | ||
| Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an assertion for Avro? I think that the column sizes map should be null, is that correct?
| public class TestGenericAvroMetrics extends TestAvroMetrics { | ||
|
|
||
| protected Metrics getMetrics(Schema schema, OutputFile file, Map<String, String> properties, | ||
| MetricsConfig metricsConfig, Record... records) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indentation is off.
| public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer) { | ||
| return new OptionWriter<>(nullIndex, writer); | ||
| public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer, Schema.Type type) { | ||
| if (AvroSchemaUtil.supportsMetrics(type)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than introducing supportsMetrics, why not just check the value writer to see whether it is a MetricsAwareWriter? I know that not all of the writers extend that class, but you could either introduce a MetricsWriter interface to signal that the inner writer supports metrics that all of the implementations extend, or maybe you could alter the hierarchy a little so that StoredAsIntWriter and StoredAsLongWriter actually do extend MetricsAwareWriter. Then you wouldn't need changes to AvroSchemaUtil or so many changes to option handling.
|
|
||
| @Override | ||
| public Stream<FieldMetrics> metrics() { | ||
| return metrics(DecimalData::toBigDecimal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this return a Java BigDecimal? I know we've had problems with this method in Spark because it actually produces a Scala BigDecimal.
|
|
||
| @Override | ||
| public Stream<FieldMetrics> metrics() { | ||
| return metrics(Decimal::toJavaBigDecimal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to see you got the right one.
|
Any update on this one, @yyanyy? |
Apologies, didn't find a chance to update this. I'll make sure to allocate time to address the comments in the coming two weeks! |
|
Thanks, @yyanyy! No rush, I just wanted to check in. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This change is a smaller PR broken down from #1935.
This change adds field id to constructors of Avro primitive value writers, and make these writers to track stats such as value count, min and max, and expose a
metricsmethod that could be called to collectFieldMetrics. However nothing is calling these method yet.This change doesn't have any test, and tests will be included in the next PR when end to end integration is set up.
Please note: regarding change to the signature of
FieldMetrics, the alternative would be to keepByteBufferas the return value for lower/upper bound ofFieldMetricsand ingest each field's metrics mode to each leaf value writer during construction, so that when collecting metrics from these writers, truncation and conversion to byte buffer could happen. I think it's doable but it would touch a lot of methods' signatures, including adding metric mode to the constructor of every leaf writer, and adding metrics config to every datum writer (e.g.DataWriter,GenericAppenderFactory), but it does avoid skip computing min/max for fields that don't need them. Please let me know if you are interested, and I'll post a new commit to this PR so that the differences in these two implementations could be compared.