Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hadoop.utils;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -189,7 +190,14 @@ public static Writable avroToArrayWritable(Object value, Schema schema) {
Writable[] recordValues = new Writable[schema.getFields().size()];
int recordValueIndex = 0;
for (Schema.Field field : schema.getFields()) {
recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
// TODO Revisit Avro exception handling in future
Object fieldValue = null;
try {
fieldValue = record.get(field.name());
} catch (AvroRuntimeException e) {
LOG.debug("Field:" + field.name() + "not found in Schema:" + schema.toString());
Comment on lines +195 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we have performance concerns regarding to this change. Catching exceptions is not efficient.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TheGenericRecord Api does not have the hasField method in avro 1.8.2 https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/GenericRecord.html#hasField-java.lang.String-
so i think in general when it performs this get we have to do some exception catching i believe.

its only present in the 1.10.2 https://avro.apache.org/docs/1.10.2/api/java/org/apache/avro/generic/GenericRecord.html#hasField-java.lang.String-

Copy link
Collaborator Author

@rahil-c rahil-c Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think also the perfomance concerns are not with the actual try catch but more so to do with the exception handling https://www.oreilly.com/library/view/programming-jakarta-struts/0596006519/ch10s02.html#:~:text=In%20general%2C%20wrapping%20your%20Java,proper%20handler%20for%20the%20exception.

In general, wrapping your Java code with try/catch blocks doesn’t have a significant performance impact on your applications. Only when exceptions actually occur is there a negative performance impact, which is due to the lookup the JVM must perform to locate the proper handler for the exception.

In this case though i think we will need to catch the exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the field is not found, should that fail the conversion instead of filling null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the field not being found in this case is normal based on the use case mentioned in the detailed pr description.

cc @zhedoubushishi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua The way things are currently implemented, is that this function is supposed to return a record with complete schema. We cannot fail if the field is not found, as it is required for both bootstrap and schema evolution scenarios. In case of bootstrap, the metadata fields may not be found in the data file and need to be filled with nulls. Similarly with schema evolution, we can hit a scenario like this and historically what we do is return nulls for the new columns if the old record does not have them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me now. If the existing behavior is to return null, then this is OK. @rahil-c could you create a ticket for revisiting the performance?

}
recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema());
}
return new ArrayWritable(Writable.class, recordValues);
case ENUM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -69,6 +71,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -897,6 +900,20 @@ public void testIncrementalWithCompaction() throws Exception {
assertTrue(splits.length == 0);
}

@Test
public void testAvroToArrayWritable() throws IOException {
Schema schema = SchemaTestUtil.getEvolvedSchema();
GenericRecord record = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "100", "100", false);
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema);
assertEquals(schema.getFields().size(), aWritable.get().length);

// In some queries, generic records that Hudi gets are just part of the full records.
// Here test the case that some fields are missing in the record.
Schema schemaWithMetaFields = HoodieAvroUtils.addMetadataFields(schema);
ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields);
assertEquals(schemaWithMetaFields.getFields().size(), aWritable2.get().length);
}

private File createCompactionFile(java.nio.file.Path basePath, String commitTime)
throws IOException {
File file = basePath.resolve(".hoodie")
Expand Down