diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 7d5cf0408b1ee..6d964864335b8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -47,8 +47,10 @@ public class TestHoodieAvroUtils { + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}," - + "{\"name\": \"new_col1\", \"type\": \"string\", \"default\": \"dummy_val\"}," - + "{\"name\": \"new_col2\", \"type\": [\"int\", \"null\"]}]}"; + + "{\"name\": \"new_col_not_nullable_default_dummy_val\", \"type\": \"string\", \"default\": \"dummy_val\"}," + + "{\"name\": \"new_col_nullable_wo_default\", \"type\": [\"int\", \"null\"]}," + + "{\"name\": \"new_col_nullable_default_null\", \"type\": [\"null\" ,\"string\"],\"default\": null}," + + "{\"name\": \"new_col_nullable_default_dummy_val\", \"type\": [\"string\" ,\"null\"],\"default\": \"dummy_val\"}]}"; private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," @@ -110,8 +112,10 @@ public void testDefaultValue() { rec.put("timestamp", 3.5); Schema schemaWithMetadata = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EVOLVED_SCHEMA)); GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, schemaWithMetadata); - assertEquals(rec1.get("new_col1"), "dummy_val"); - assertNull(rec1.get("new_col2")); + assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val"); + assertNull(rec1.get("new_col_nullable_wo_default")); + assertNull(rec1.get("new_col_nullable_default_null")); + assertEquals(rec1.get("new_col_nullable_default_dummy_val"), "dummy_val"); assertNull(rec1.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); } @@ -123,8 +127,8 @@ public void testDefaultValueWithSchemaEvolution() { rec.put("pii_col", "val2"); rec.put("timestamp", 3.5); GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA)); - assertEquals(rec1.get("new_col1"), "dummy_val"); - assertNull(rec1.get("new_col2")); + assertEquals(rec1.get("new_col_not_nullable_default_dummy_val"), "dummy_val"); + assertNull(rec1.get("new_col_nullable_wo_default")); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 079f88d685954..eda89e95c239f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -129,6 +129,7 @@ public static void initClass() throws Exception { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, dfsBasePath + "/sql-transformer.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_evoluted.avsc", dfs, dfsBasePath + "/source_evoluted.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); @@ -471,6 +472,44 @@ public void testBulkInsertsAndUpserts() throws Exception { assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); } + @Test + public void testSchemaEvolution() throws Exception { + String tableBasePath = dfsBasePath + "/test_table_schema_evolution"; + + // Insert data produced with Schema A, pass Schema A + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.INSERT, Collections.singletonList(IdentityTransformer.class.getName())); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc"); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc"); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + + // Upsert data produced with Schema B, pass Schema B + cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT, Collections.singletonList(TripsWithEvolutedOptionalFieldTransformer.class.getName())); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evoluted.avsc"); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evoluted.avsc"); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); + List counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext); + assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + // Upsert data produced with Schema A, pass Schema B + cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT, Collections.singletonList(IdentityTransformer.class.getName())); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evoluted.avsc"); + cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evoluted.avsc"); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3); + counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext); + assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").createOrReplaceTempView("tmp_trips"); + long recordCount = + sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count(); + assertEquals(450, recordCount); + } + @Test public void testUpsertsCOWContinuousMode() throws Exception { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); @@ -959,6 +998,30 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas } } + /** + * Add new field evoluted_optional_union_field with value of the field rider. + */ + public static class TripsWithEvolutedOptionalFieldTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + return rowDataset.withColumn("evoluted_optional_union_field", functions.col("rider")); + } + } + + /** + * Applies no transformation. + */ + public static class IdentityTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + return rowDataset; + } + } + public static class TestGenerator extends SimpleKeyGenerator { public TestGenerator(TypedProperties props) { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source_evoluted.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source_evoluted.avsc new file mode 100644 index 0000000000000..dfebfcc9b16ac --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_evoluted.avsc @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "distance_in_meters", + "type" : "int" + }, { + "name" : "seconds_since_epoch", + "type" : "long" + }, { + "name" : "weight", + "type" : "float" + },{ + "name" : "nation", + "type" : "bytes" + },{ + "name" : "current_date", + "type" : { + "type" : "int", + "logicalType" : "date" + } + },{ + "name" : "current_ts", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + },{ + "name" : "height", + "type" : { + "type" : "fixed", + "name" : "abc", + "size" : 5, + "logicalType" : "decimal", + "precision" : 10, + "scale": 6 + } + }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, + { + "name" : "fare", + "type" : { + "type" : "record", + "name" : "fare", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + }, + { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + }, + { + "name": "evoluted_optional_union_field", + "type": [ + "null", + { + "type": "string", + "avro.java.string": "String" + } + ], + "default": null + } ] +} +