Skip to content

Commit

Permalink
Ensure properties are copied when modifying schema (#11441)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex committed Jun 13, 2024
1 parent 6f35392 commit eb20273
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, getAppliedRequiredSchema}
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand Down Expand Up @@ -159,14 +159,14 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
//always remove the row index column from the skeleton because the data file will also have the same column
val skeletonProjection = projectRecord(skeletonRequiredSchema,
AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, rowIndexColumn))
HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))

//If we need to do position based merging with log files we will leave the row index column at the end
val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) {
getIdentityProjection
} else {
projectRecord(dataRequiredSchema,
AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, rowIndexColumn))
HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
}

//row index will always be the last column
Expand Down
29 changes: 16 additions & 13 deletions hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -236,7 +237,7 @@ private static Option<Schema.Field> findNestedField(Schema schema, String[] fiel
isUnion = true;
foundSchema = resolveNullableSchema(foundSchema);
}
Schema newSchema = Schema.createRecord(foundSchema.getName(), foundSchema.getDoc(), foundSchema.getNamespace(), false, Collections.singletonList(nestedPart.get()));
Schema newSchema = createNewSchemaFromFieldsWithReference(foundSchema, Collections.singletonList(nestedPart.get()));
return Option.of(new Schema.Field(foundField.name(), isUnion ? createNullableSchema(newSchema) : newSchema, foundField.doc(), foundField.defaultVal()));
}

Expand All @@ -259,10 +260,7 @@ public static Schema mergeSchemas(Schema a, Schema b) {
fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()));
}
}

Schema newSchema = Schema.createRecord(a.getName(), a.getDoc(), a.getNamespace(), a.isError());
newSchema.setFields(fields);
return newSchema;
return createNewSchemaFromFieldsWithReference(a, fields);
}

/**
Expand Down Expand Up @@ -292,17 +290,22 @@ private static Schema appendFieldsToSchemaBase(Schema schema, List<Schema.Field>
fields.addAll(newFields);
}

Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
newSchema.setFields(fields);
return newSchema;
return createNewSchemaFromFieldsWithReference(schema, fields);
}

public static Schema removeFieldsFromSchema(Schema schema, Set<String> fieldNames) {
List<Schema.Field> fields = schema.getFields().stream()
.filter(field -> !fieldNames.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.collect(Collectors.toList());
/**
* Create a new schema but maintain all meta info from the old schema
*
* @param schema schema to get the meta info from
* @param fields list of fields in order that will be in the new schema
*
* @return schema with fields from fields, and metadata from schema
*/
public static Schema createNewSchemaFromFieldsWithReference(Schema schema, List<Schema.Field> fields) {
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
newSchema.addProp(prop.getKey(), prop.getValue());
}
newSchema.setFields(fields);
return newSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.apache.avro.Schema.Type.ARRAY;
import static org.apache.avro.Schema.Type.MAP;
import static org.apache.avro.Schema.Type.UNION;
import static org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
Expand Down Expand Up @@ -338,13 +339,7 @@ public static Schema addMetadataFields(Schema schema, boolean withOperationField
parentFields.add(newField);
}
}

Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
for (Map.Entry<String, Object> prop : schema.getObjectProps().entrySet()) {
mergedSchema.addProp(prop.getKey(), prop.getValue());
}
mergedSchema.setFields(parentFields);
return mergedSchema;
return createNewSchemaFromFieldsWithReference(schema, parentFields);
}

public static boolean isSchemaNull(Schema schema) {
Expand All @@ -364,9 +359,8 @@ public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
.filter(field -> !fieldsToRemove.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.collect(Collectors.toList());
Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
filteredSchema.setFields(filteredFields);
return filteredSchema;

return createNewSchemaFromFieldsWithReference(schema, filteredFields);
}

public static String addMetadataColumnTypes(String hiveColumnTypes) {
Expand All @@ -385,9 +379,7 @@ public static Schema makeFieldNonNull(Schema schema, String fieldName, Object fi
}
})
.collect(Collectors.toList());
Schema withNonNullField = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
withNonNullField.setFields(filteredFields);
return withNonNullField;
return createNewSchemaFromFieldsWithReference(schema, filteredFields);
}

private static Schema initRecordKeySchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ public Schema createSchemaFromFields(List<Schema.Field> fields) {
fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), curr.defaultVal()));
}
Schema newSchema = Schema.createRecord(dataSchema.getName(), dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
for (Map.Entry<String, Object> prop : dataSchema.getObjectProps().entrySet()) {
newSchema.addProp(prop.getKey(), prop.getValue());
}
newSchema.setFields(fields);
return newSchema;
}
Expand Down

0 comments on commit eb20273

Please sign in to comment.