diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 0e4f9c304cb2b..bf4cbff6665cf 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.utils; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -189,7 +190,14 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Writable[] recordValues = new Writable[schema.getFields().size()]; int recordValueIndex = 0; for (Schema.Field field : schema.getFields()) { - recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema()); + // TODO Revisit Avro exception handling in future + Object fieldValue = null; + try { + fieldValue = record.get(field.name()); + } catch (AvroRuntimeException e) { + LOG.debug("Field:" + field.name() + "not found in Schema:" + schema.toString()); + } + recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema()); } return new ArrayWritable(Writable.class, recordValues); case ENUM: diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 74b7120fd0a5f..f334bbf3bc977 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -44,8 +44,10 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.conf.Configuration; @@ -69,6 +71,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -897,6 +900,20 @@ public void testIncrementalWithCompaction() throws Exception { assertTrue(splits.length == 0); } + @Test + public void testAvroToArrayWritable() throws IOException { + Schema schema = SchemaTestUtil.getEvolvedSchema(); + GenericRecord record = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "100", "100", false); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema); + assertEquals(schema.getFields().size(), aWritable.get().length); + + // In some queries, generic records that Hudi gets are just part of the full records. + // Here test the case that some fields are missing in the record. + Schema schemaWithMetaFields = HoodieAvroUtils.addMetadataFields(schema); + ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields); + assertEquals(schemaWithMetaFields.getFields().size(), aWritable2.get().length); + } + private File createCompactionFile(java.nio.file.Path basePath, String commitTime) throws IOException { File file = basePath.resolve(".hoodie")