From 7bda3f549657a4a099695039aa026e007a5936b0 Mon Sep 17 00:00:00 2001 From: Yann Date: Sat, 24 Sep 2022 09:14:18 +0800 Subject: [PATCH] [MINOR] retain avro's namespace --- .../convert/AvroInternalSchemaConverter.java | 31 ++++++++++++------- .../utils/AvroSchemaEvolutionUtils.java | 4 ++- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java index 360134f92803b..d941b27328aba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java @@ -50,8 +50,12 @@ public class AvroInternalSchemaConverter { * @param tableName the record name. * @return an avro Schema. */ + public static Schema convert(InternalSchema internalSchema, String tableName, String namespace) { + return buildAvroSchemaFromInternalSchema(internalSchema, tableName, namespace); + } + public static Schema convert(InternalSchema internalSchema, String tableName) { - return buildAvroSchemaFromInternalSchema(internalSchema, tableName); + return buildAvroSchemaFromInternalSchema(internalSchema, tableName, ""); } /** @@ -241,7 +245,7 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) { */ public static Schema buildAvroSchemaFromType(Type type, String recordName) { Map cache = new HashMap<>(); - return visitInternalSchemaToBuildAvroSchema(type, cache, recordName); + return visitInternalSchemaToBuildAvroSchema(type, cache, recordName, ""); } /** @@ -251,9 +255,9 @@ public static Schema buildAvroSchemaFromType(Type type, String recordName) { * @param recordName the record name * @return a Avro schema match hudi internal schema. */ - public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, String recordName) { + public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, String recordName, String namespace) { Map cache = new HashMap<>(); - return visitInternalSchemaToBuildAvroSchema(schema.getRecord(), cache, recordName); + return visitInternalSchemaToBuildAvroSchema(schema.getRecord(), cache, recordName, namespace); } /** @@ -264,13 +268,15 @@ public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, St * @param recordName the record name * @return a Avro schema match this type */ - private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map cache, String recordName) { + private static Schema visitInternalSchemaToBuildAvroSchema( + Type type, Map cache, String recordName, String namespace) { switch (type.typeId()) { case RECORD: Types.RecordType record = (Types.RecordType) type; List schemas = new ArrayList<>(); record.fields().forEach(f -> { - Schema tempSchema = visitInternalSchemaToBuildAvroSchema(f.type(), cache, recordName + "_" + f.name()); + Schema tempSchema = visitInternalSchemaToBuildAvroSchema( + f.type(), cache, recordName + "_" + f.name(), namespace); // convert tempSchema Schema result = f.isOptional() ? AvroInternalSchemaConverter.nullableSchema(tempSchema) : tempSchema; schemas.add(result); @@ -281,13 +287,13 @@ private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map fieldSchemas, String recordName) { + private static Schema visitInternalRecordToBuildAvroRecord( + Types.RecordType record, List fieldSchemas, String recordName, String namespace) { List fields = record.fields(); List avroFields = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { @@ -329,7 +336,7 @@ private static Schema visitInternalRecordToBuildAvroRecord(Types.RecordType reco Schema.Field field = new Schema.Field(f.name(), fieldSchemas.get(i), f.doc(), f.isOptional() ? JsonProperties.NULL_VALUE : null); avroFields.add(field); } - return Schema.createRecord(recordName, null, null, false, avroFields); + return Schema.createRecord(recordName, null, namespace, false, avroFields); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index 520a6b9ec75b2..413a3c4df1bc3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -129,7 +129,9 @@ public static Schema canonicalizeColumnNullability(Schema writeSchema, Schema re // try to correct all changes TableChanges.ColumnUpdateChange updateChange = TableChanges.ColumnUpdateChange.get(writeInternalSchema); candidateUpdateCols.stream().forEach(f -> updateChange.updateColumnNullability(f, true)); - Schema result = AvroInternalSchemaConverter.convert(SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange), writeSchema.getName()); + Schema result = AvroInternalSchemaConverter.convert( + SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange), + writeSchema.getName(), writeSchema.getNamespace()); return result; } }