records, Sc
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
}
+ /**
+ * Given an Avro record and list of columns to remove, this method removes the list of columns from
+ * the given avro record using rewriteRecord method.
+ *
+ * To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
+ */
+ public static GenericRecord removeFields(GenericRecord record, List fieldsToRemove) {
+ Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
+ return rewriteRecord(record, newSchema);
+ }
+
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name());
@@ -485,6 +481,17 @@ public static Schema generateProjectionSchema(Schema originalSchema, List innerTypes = schema.getTypes();
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
- .findFirst()
- .orElse(null);
-
- if (nonNullType == null) {
- throw new AvroRuntimeException(
- String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
- }
-
- return nonNullType;
- }
-
- public static Schema resolveNullableSchema(Schema schema) {
- if (schema.getType() != Schema.Type.UNION) {
- return schema;
- }
-
- List innerTypes = schema.getTypes();
- Schema nonNullType =
- innerTypes.stream()
- .filter(it -> it.getType() != Schema.Type.NULL)
- .findFirst()
- .orElse(null);
-
- if (innerTypes.size() != 2 || nonNullType == null) {
- throw new AvroRuntimeException(
- String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
- }
-
- return nonNullType;
- }
-
/**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
* support deep rewrite for nested record.
@@ -774,14 +741,28 @@ public static Schema resolveNullableSchema(Schema schema) {
*
* @param oldRecord oldRecord to be rewritten
* @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return newRecord for new Schema
*/
- public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) {
- Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema);
+ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map renameCols) {
+ Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
return (GenericData.Record) newRecord;
}
- private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) {
+ /**
+ * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
+ * support deep rewrite for nested record and adjust rename operation.
+ * This particular method does the following things :
+ * a) Create a new empty GenericRecord with the new schema.
+ * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
+ *
+ * @param oldRecord oldRecord to be rewritten
+ * @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
+ * @param fieldNames track the full name of visited field when we travel new schema.
+ * @return newRecord for new Schema
+ */
+ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) {
if (oldRecord == null) {
return null;
}
@@ -796,10 +777,23 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
+ String fieldName = field.name();
+ fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
- helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema()));
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+ } else {
+ String fieldFullName = createFullName(fieldNames);
+ String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
+ String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+ // deal with rename
+ if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
+ // find rename
+ Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+ }
}
+ fieldNames.pop();
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
@@ -820,9 +814,11 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
Collection array = (Collection)oldRecord;
List