diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 31432aeee9e7f..440eff38b2b2e 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -27,7 +27,7 @@ import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
-import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
@@ -138,7 +138,7 @@ object AvroConversionHelper {
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
- val avroFieldIndexes = new Array[Int](length)
+ val avroFieldNames = new Array[String](length)
var i = 0
while (i < length) {
val sqlField = struct.fields(i)
@@ -147,7 +147,7 @@ object AvroConversionHelper {
val converter = createConverter(avroField.schema(), sqlField.dataType,
path :+ sqlField.name)
converters(i) = converter
- avroFieldIndexes(i) = avroField.pos()
+ avroFieldNames(i) = avroField.name()
} else if (!sqlField.nullable) {
throw new IncompatibleSchemaException(
s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
@@ -166,10 +166,11 @@ object AvroConversionHelper {
val result = new Array[Any](length)
var i = 0
+
while (i < converters.length) {
if (converters(i) != null) {
val converter = converters(i)
- result(i) = converter(record.get(avroFieldIndexes(i)))
+ result(i) = converter(record.get(avroFieldNames(i)))
}
i += 1
}
@@ -341,7 +342,7 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
- val schema: Schema = convertStructTypeToAvroSchema(structType, structName, recordNamespace)
+ val schema: Schema = removeNamespaceFromFixedFields(convertStructTypeToAvroSchema(structType, structName, recordNamespace))
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
@@ -366,4 +367,40 @@ object AvroConversionHelper {
}
}
}
+
+ /**
+ * Remove namespace from fixed field.
+ * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+ * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+ * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+ * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+ *
+ * @param schema Schema from which namespace needs to be removed for fixed fields
+ * @return input schema with namespace removed for fixed fields, if any
+ */
+ def removeNamespaceFromFixedFields(schema: Schema): Schema ={
+ val fields = new util.ArrayList[Schema.Field]
+ var isSchemaChanged = false
+
+ import scala.collection.JavaConversions._
+
+ for (field <- schema.getFields) {
+ var fieldSchema = field.schema
+ if (fieldSchema.getType.getName == "fixed" && fieldSchema.getLogicalType != null
+ && fieldSchema.getLogicalType.getName == "decimal" && fieldSchema.getNamespace != "") {
+ isSchemaChanged = true
+ val name = fieldSchema.getName
+ val avroType = fieldSchema.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
+ fieldSchema = avroType.addToSchema(SchemaBuilder.fixed(name).size(fieldSchema.getFixedSize))
+ }
+ fields.add(new Schema.Field(field.name, fieldSchema, field.doc, field.defaultVal))
+ }
+
+ if (isSchemaChanged) {
+ Schema.createRecord(schema.getName, "", schema.getNamespace, false, fields)
+ }
+ else {
+ schema
+ }
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 5b87fee14a1e2..05478ee44bcfe 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -60,7 +60,7 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
- getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace))
+ AvroConversionHelper.removeNamespaceFromFixedFields(getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)))
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayloadWithSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayloadWithSchema.java
new file mode 100644
index 0000000000000..42ed8786751d3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayloadWithSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.Serializable;
+
+/**
+ * Base class for AVRO record based payloads, which stores writer schema as well.
+ */
+public abstract class BaseAvroPayloadWithSchema extends BaseAvroPayload implements Serializable {
+ /**
+ * Schema used to convert avro to bytes.
+ */
+ protected final Schema writerSchema;
+
+ /**
+ * Instantiate {@link BaseAvroPayloadWithSchema}.
+ *
+ * @param record Generic record for the payload.
+ * @param orderingVal {@link Comparable} to be used in pre combine.
+ */
+ public BaseAvroPayloadWithSchema(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ this.writerSchema = record != null ? record.getSchema() : null;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index dd1853d835277..98a95b07ce146 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -35,7 +35,7 @@
*
combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record
*
*/
-public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
+public class OverwriteWithLatestAvroPayload extends BaseAvroPayloadWithSchema
implements HoodieRecordPayload {
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
@@ -66,7 +66,7 @@ public Option getInsertValue(Schema schema) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}
- IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
+ IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, writerSchema, schema);
if (isDeleteRecord((GenericRecord) indexedRecord)) {
return Option.empty();
} else {
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 8887cfe3d300d..e18251747b29e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -46,8 +46,11 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
- + "{\"name\": \"new_col1\", \"type\": \"string\", \"default\": \"dummy_val\"},"
- + "{\"name\": \"new_col2\", \"type\": [\"int\", \"null\"]}]}";
+ + "{\"name\": \"new_col_not_nullable_default_dummy_val\", \"type\": \"string\", \"default\": \"dummy_val\"},"
+ + "{\"name\": \"new_col_nullable_wo_default\", \"type\": [\"int\", \"null\"]},"
+ + "{\"name\": \"new_col_nullable_default_null\", \"type\": [\"null\" ,\"string\"],\"default\": null},"
+ + "{\"name\": \"new_col_nullable_default_dummy_val\", \"type\": [\"string\" ,\"null\"],\"default\": \"dummy_val\"}]}";
+
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
@@ -111,8 +114,10 @@ public void testDefaultValue() {
rec.put("timestamp", 3.5);
Schema schemaWithMetadata = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EVOLVED_SCHEMA));
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, schemaWithMetadata);
- assertEquals(rec1.get("new_col1"), "dummy_val");
- assertNull(rec1.get("new_col2"));
+ assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val");
+ assertNull(rec1.get("new_col_nullable_wo_default"));
+ assertNull(rec1.get("new_col_nullable_default_null"));
+ assertEquals(rec1.get("new_col_nullable_default_dummy_val"), "dummy_val");
assertNull(rec1.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
@@ -124,8 +129,8 @@ public void testDefaultValueWithSchemaEvolution() {
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
- assertEquals(rec1.get("new_col1"), "dummy_val");
- assertNull(rec1.get("new_col2"));
+ assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val");
+ assertNull(rec1.get("new_col_nullable_wo_default"));
}
@Test
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 7499894c26ce1..8940df15a1b7a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -108,7 +108,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"nation\", \"type\": \"bytes\"},"
+ "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},"
- + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
+ + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"fixed\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
public static final String TRIP_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index f9d016214844c..8a6709c5bd08f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -168,6 +168,7 @@ public static void initClass() throws Exception {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_evolved.avsc", dfs, dfsBasePath + "/source_evolved.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc");
@@ -696,6 +697,44 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
assertTrue(fieldNames.containsAll(expectedFieldNames));
}
+ @Test
+ public void testSchemaEvolution() throws Exception {
+ String tableBasePath = dfsBasePath + "/test_table_schema_evolution";
+
+ // Insert data produced with Schema A, pass Schema A
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()));
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc");
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+
+ // Upsert data produced with Schema B, pass Schema B
+ cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()));
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+ List counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
+ assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+ sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").createOrReplaceTempView("tmp_trips");
+ long recordCount =
+ sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count();
+ assertEquals(950, recordCount);
+
+ // Upsert data produced with Schema A, pass Schema B
+ cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()));
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+ cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3);
+ counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
+ assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+ }
+
@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
@@ -1659,4 +1698,16 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas
return rowDataset;
}
}
+
+ /**
+ * Add new field evoluted_optional_union_field with value of the field rider.
+ */
+ public static class TripsWithEvolvedOptionalFieldTransformer implements Transformer {
+
+ @Override
+ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset,
+ TypedProperties properties) {
+ return rowDataset.withColumn("evoluted_optional_union_field", functions.col("rider"));
+ }
+ }
}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index 4b4beb35dd8f3..d953dd49d4b85 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -70,7 +70,7 @@
"name" : "height",
"type" : {
"type" : "fixed",
- "name" : "abc",
+ "name" : "fixed",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc
new file mode 100644
index 0000000000000..009b32faa761a
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type" : "record",
+ "name" : "triprec",
+ "fields" : [
+ {
+ "name" : "timestamp",
+ "type" : "long"
+ }, {
+ "name" : "_row_key",
+ "type" : "string"
+ }, {
+ "name" : "rider",
+ "type" : "string"
+ }, {
+ "name" : "driver",
+ "type" : "string"
+ }, {
+ "name" : "begin_lat",
+ "type" : "double"
+ }, {
+ "name" : "begin_lon",
+ "type" : "double"
+ }, {
+ "name" : "end_lat",
+ "type" : "double"
+ }, {
+ "name" : "end_lon",
+ "type" : "double"
+ }, {
+ "name" : "distance_in_meters",
+ "type" : "int"
+ }, {
+ "name" : "seconds_since_epoch",
+ "type" : "long"
+ }, {
+ "name" : "weight",
+ "type" : "float"
+ },{
+ "name" : "nation",
+ "type" : "bytes"
+ },{
+ "name" : "current_date",
+ "type" : {
+ "type" : "int",
+ "logicalType" : "date"
+ }
+ },{
+ "name" : "current_ts",
+ "type" : {
+ "type" : "long"
+ }
+ },{
+ "name" : "height",
+ "type" : {
+ "type" : "fixed",
+ "name" : "fixed",
+ "size" : 5,
+ "logicalType" : "decimal",
+ "precision" : 10,
+ "scale": 6
+ }
+ }, {
+ "name" :"city_to_state",
+ "type" : {
+ "type" : "map",
+ "values": "string"
+ }
+ },
+ {
+ "name" : "fare",
+ "type" : {
+ "type" : "record",
+ "name" : "fare",
+ "fields" : [
+ {
+ "name" : "amount",
+ "type" : "double"
+ },
+ {
+ "name" : "currency",
+ "type" : "string"
+ }
+ ]
+ }
+ },
+ {
+ "name": "evoluted_optional_union_field",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name" : "tip_history",
+ "type" : {
+ "type" : "array",
+ "items" : {
+ "type" : "record",
+ "name" : "tip_history",
+ "fields" : [
+ {
+ "name" : "amount",
+ "type" : "double"
+ },
+ {
+ "name" : "currency",
+ "type" : "string"
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name" : "_hoodie_is_deleted",
+ "type" : "boolean",
+ "default" : false
+ } ]
+}
+
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index 4252b7e66ce19..5dd193d1479f0 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -70,7 +70,7 @@
"name" : "height",
"type" : {
"type" : "fixed",
- "name" : "abc",
+ "name" : "fixed",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,