Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ public class AvroInternalSchemaConverter {
* @param tableName the record name.
* @return an avro Schema.
*/
public static Schema convert(InternalSchema internalSchema, String tableName, String namespace) {
return buildAvroSchemaFromInternalSchema(internalSchema, tableName, namespace);
}

public static Schema convert(InternalSchema internalSchema, String tableName) {
return buildAvroSchemaFromInternalSchema(internalSchema, tableName);
return buildAvroSchemaFromInternalSchema(internalSchema, tableName, "");
}

/**
Expand Down Expand Up @@ -241,7 +245,7 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) {
*/
public static Schema buildAvroSchemaFromType(Type type, String recordName) {
Map<Type, Schema> cache = new HashMap<>();
return visitInternalSchemaToBuildAvroSchema(type, cache, recordName);
return visitInternalSchemaToBuildAvroSchema(type, cache, recordName, "");
}

/**
Expand All @@ -251,9 +255,9 @@ public static Schema buildAvroSchemaFromType(Type type, String recordName) {
* @param recordName the record name
* @return a Avro schema match hudi internal schema.
*/
public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, String recordName) {
public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, String recordName, String namespace) {
Map<Type, Schema> cache = new HashMap<>();
return visitInternalSchemaToBuildAvroSchema(schema.getRecord(), cache, recordName);
return visitInternalSchemaToBuildAvroSchema(schema.getRecord(), cache, recordName, namespace);
}

/**
Expand All @@ -264,13 +268,15 @@ public static Schema buildAvroSchemaFromInternalSchema(InternalSchema schema, St
* @param recordName the record name
* @return a Avro schema match this type
*/
private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map<Type, Schema> cache, String recordName) {
private static Schema visitInternalSchemaToBuildAvroSchema(
Type type, Map<Type, Schema> cache, String recordName, String namespace) {
switch (type.typeId()) {
case RECORD:
Types.RecordType record = (Types.RecordType) type;
List<Schema> schemas = new ArrayList<>();
record.fields().forEach(f -> {
Schema tempSchema = visitInternalSchemaToBuildAvroSchema(f.type(), cache, recordName + "_" + f.name());
Schema tempSchema = visitInternalSchemaToBuildAvroSchema(
f.type(), cache, recordName + "_" + f.name(), namespace);
// convert tempSchema
Schema result = f.isOptional() ? AvroInternalSchemaConverter.nullableSchema(tempSchema) : tempSchema;
schemas.add(result);
Expand All @@ -281,13 +287,13 @@ private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map<Type,
if (recordSchema != null) {
return recordSchema;
}
recordSchema = visitInternalRecordToBuildAvroRecord(record, schemas, recordName);
recordSchema = visitInternalRecordToBuildAvroRecord(record, schemas, recordName, namespace);
cache.put(record, recordSchema);
return recordSchema;
case ARRAY:
Types.ArrayType array = (Types.ArrayType) type;
Schema elementSchema;
elementSchema = visitInternalSchemaToBuildAvroSchema(array.elementType(), cache, recordName);
elementSchema = visitInternalSchemaToBuildAvroSchema(array.elementType(), cache, recordName, namespace);
Schema arraySchema;
arraySchema = cache.get(array);
if (arraySchema != null) {
Expand All @@ -300,8 +306,8 @@ private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map<Type,
Types.MapType map = (Types.MapType) type;
Schema keySchema;
Schema valueSchema;
keySchema = visitInternalSchemaToBuildAvroSchema(map.keyType(), cache, recordName);
valueSchema = visitInternalSchemaToBuildAvroSchema(map.valueType(), cache, recordName);
keySchema = visitInternalSchemaToBuildAvroSchema(map.keyType(), cache, recordName, namespace);
valueSchema = visitInternalSchemaToBuildAvroSchema(map.valueType(), cache, recordName, namespace);
Schema mapSchema;
mapSchema = cache.get(map);
if (mapSchema != null) {
Expand All @@ -321,15 +327,16 @@ private static Schema visitInternalSchemaToBuildAvroSchema(Type type, Map<Type,
* Converts hudi RecordType to Avro RecordType.
* this is auxiliary function used by visitInternalSchemaToBuildAvroSchema
*/
private static Schema visitInternalRecordToBuildAvroRecord(Types.RecordType record, List<Schema> fieldSchemas, String recordName) {
private static Schema visitInternalRecordToBuildAvroRecord(
Types.RecordType record, List<Schema> fieldSchemas, String recordName, String namespace) {
List<Types.Field> fields = record.fields();
List<Schema.Field> avroFields = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
Types.Field f = fields.get(i);
Schema.Field field = new Schema.Field(f.name(), fieldSchemas.get(i), f.doc(), f.isOptional() ? JsonProperties.NULL_VALUE : null);
avroFields.add(field);
}
return Schema.createRecord(recordName, null, null, false, avroFields);
return Schema.createRecord(recordName, null, namespace, false, avroFields);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public static Schema canonicalizeColumnNullability(Schema writeSchema, Schema re
// try to correct all changes
TableChanges.ColumnUpdateChange updateChange = TableChanges.ColumnUpdateChange.get(writeInternalSchema);
candidateUpdateCols.stream().forEach(f -> updateChange.updateColumnNullability(f, true));
Schema result = AvroInternalSchemaConverter.convert(SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange), writeSchema.getName());
Schema result = AvroInternalSchemaConverter.convert(
SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange),
writeSchema.getName(), writeSchema.getNamespace());
return result;
}
}
Expand Down