From 4cb5672791c93115d2d50d3cf3a1a9127489c469 Mon Sep 17 00:00:00 2001 From: Lokesh Lingarajan Date: Tue, 29 Aug 2023 16:16:10 -0700 Subject: [PATCH] Enabling comprehensive schema evolution in delta streamer code --- .../common/config/HoodieCommonConfig.java | 8 + .../org/apache/hudi/DataSourceOptions.scala | 2 + .../org/apache/hudi/HoodieSchemaUtils.scala | 51 +++ .../apache/hudi/HoodieSparkSqlWriter.scala | 35 +- .../org/apache/hudi/HoodieWriterUtils.scala | 1 + .../apache/hudi/TestHoodieSparkUtils.scala | 51 ++- .../utilities/streamer/HoodieStreamer.java | 10 +- .../hudi/utilities/streamer/StreamSync.java | 72 ++-- ...estHoodieDeltaStreamerSchemaEvolution.java | 334 ++++++++++++++++++ .../schema-evolution/endTypePromotion.json | 2 + .../resources/schema-evolution/start.json | 10 + .../schema-evolution/startTypePromotion.json | 10 + .../testAddColChangeOrderAllFiles.json | 3 + .../testAddColChangeOrderSomeFiles.json | 2 + .../schema-evolution/testAddColRoot.json | 2 + .../testAddColRootWithSchemaProvider.json | 2 + .../schema-evolution/testAddColStruct.json | 2 + .../schema-evolution/testAddComplexField.json | 2 + .../schema-evolution/testAddMetaCol.json | 2 + .../schema-evolution/testDelColRoot.json | 2 + .../schema-evolution/testDelColStruct.json | 2 + 21 files changed, 539 insertions(+), 66 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java create mode 100644 hudi-utilities/src/test/resources/schema-evolution/endTypePromotion.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/start.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/startTypePromotion.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderAllFiles.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderSomeFiles.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddColRoot.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddColRootWithSchemaProvider.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddColStruct.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddComplexField.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testAddMetaCol.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testDelColRoot.json create mode 100644 hudi-utilities/src/test/resources/schema-evolution/testDelColStruct.json diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 45b1ff7f6463e..a5446cbfc598f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -70,6 +70,14 @@ public class HoodieCommonConfig extends HoodieConfig { + " operation will fail schema compatibility check. Set this option to true will make the newly added " + " column nullable to successfully complete the write operation."); + public static final ConfigProperty ADD_NULL_FOR_DELETED_COLUMNS = ConfigProperty + .key("hoodie.datasource.add.null.for.deleted.columns") + .defaultValue(false) + .markAdvanced() + .withDocumentation("When a non-nullable column is deleted in datasource during a write operation, the write " + + " operation will fail schema compatibility check. Set this option to true will make the deleted " + + " column be filled with null values to successfully complete the write operation."); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ff7f33d8bebb6..15f5f4058f8ac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -536,6 +536,8 @@ object DataSourceWriteOptions { val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA + val ADD_NULL_FOR_DELETED_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.ADD_NULL_FOR_DELETED_COLUMNS + val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala new file mode 100644 index 0000000000000..ef89ee9afb4ca --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.config.HoodieConfig +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.internal.schema.InternalSchema + +/** + * Util methods for Schema evolution in Hudi + */ +object HoodieSchemaUtils { + /** + * get latest internalSchema from table + * + * @param config instance of {@link HoodieConfig} + * @param tableMetaClient instance of HoodieTableMetaClient + * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. + */ + def getLatestTableInternalSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Option.empty[InternalSchema] + } else { + try { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) + val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata + if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None + } catch { + case _: Exception => None + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9231472a799a7..93cd9b00c5caf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -28,7 +28,7 @@ import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProper import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ -import org.apache.hudi.avro.AvroSchemaUtils.{canProject, isCompatibleProjectionOf, isSchemaCompatible, resolveNullableSchema} +import org.apache.hudi.avro.AvroSchemaUtils.{canProject, isCompatibleProjectionOf, isSchemaCompatible, isStrictProjectionOf, resolveNullableSchema} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields import org.apache.hudi.client.common.HoodieSparkEngineContext @@ -283,7 +283,7 @@ object HoodieSparkSqlWriter { .getOrElse(getAvroRecordNameAndNamespace(tblName)) val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) - val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse { + val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { @@ -317,7 +317,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient) + val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -499,6 +499,7 @@ object HoodieSparkSqlWriter { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { + val addNullForDeletedColumns = opts(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key()).toBoolean val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean @@ -579,7 +580,10 @@ object HoodieSparkSqlWriter { if (!shouldValidateSchemasCompatibility) { // if no validation is enabled, check for col drop // if col drop is allowed, go ahead. if not, check for projection, so that we do not allow dropping cols - if (allowAutoEvolutionColumnDrop || canProject(latestTableSchema, canonicalizedSourceSchema)) { + if (addNullForDeletedColumns && isStrictProjectionOf(latestTableSchema, canonicalizedSourceSchema) + && isSchemaCompatible(canonicalizedSourceSchema, latestTableSchema, true)) { + latestTableSchema + } else if (allowAutoEvolutionColumnDrop || canProject(latestTableSchema, canonicalizedSourceSchema)) { canonicalizedSourceSchema } else { log.error( @@ -717,29 +721,6 @@ object HoodieSparkSqlWriter { reconcileNullability(sourceSchema, latestTableSchema, opts) } - - /** - * get latest internalSchema from table - * - * @param config instance of {@link HoodieConfig} - * @param tableMetaClient instance of HoodieTableMetaClient - * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. - */ - def getLatestTableInternalSchema(config: HoodieConfig, - tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { - if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { - Option.empty[InternalSchema] - } else { - try { - val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata - if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None - } catch { - case _: Exception => None - } - } - } - private def registerAvroSchemasWithKryo(sparkContext: SparkContext, targetAvroSchemas: Schema*): Unit = { sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index b2c44cc333035..352747db50c3e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -83,6 +83,7 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE) hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER) hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) + hoodieConfig.setDefaultValue(ADD_NULL_FOR_DELETED_COLUMNS) hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 51682119d23f9..1cf2ecf298b33 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -20,8 +20,8 @@ package org.apache.hudi import org.apache.avro.generic.GenericRecord import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types.{ArrayType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -133,9 +133,9 @@ class TestHoodieSparkUtils { .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .getOrCreate - val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) + val innerStruct1 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true) val structType1 = new StructType().add("key", "string", false) - .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true) + .add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct1, true) val schema1 = AvroConversionUtils.convertStructTypeToAvroSchema(structType1, "test_struct_name", "test_namespace") val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2", 2L))) @@ -146,8 +146,8 @@ class TestHoodieSparkUtils { // create schema2 which has one addition column at the root level compared to schema1 val structType2 = new StructType().add("key", "string", false) - .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true) - .add("nullableInnerStruct2",innerStruct1,true) + .add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct1, true) + .add("nullableInnerStruct2", innerStruct1, true) val schema2 = AvroConversionUtils.convertStructTypeToAvroSchema(structType2, "test_struct_name", "test_namespace") val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L), Row("innerKey2_3", 2L))) val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2), structType2) @@ -161,12 +161,12 @@ class TestHoodieSparkUtils { assert(genRecRDD3.collect()(0).getSchema.equals(schema2)) genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2"))) - val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) - .add("new_nested_col","string",true) + val innerStruct3 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true) + .add("new_nested_col", "string", true) // create a schema which has one additional nested column compared to schema1, which is nullable val structType4 = new StructType().add("key", "string", false) - .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true) + .add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct3, true) val schema4 = AvroConversionUtils.convertStructTypeToAvroSchema(structType4, "test_struct_name", "test_namespace") val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L, "new_nested_col_val1"))) @@ -180,16 +180,16 @@ class TestHoodieSparkUtils { org.apache.hudi.common.util.Option.of(schema4)) assert(schema4.equals(genRecRDD4.collect()(0).getSchema)) val genRec = genRecRDD5.collect()(0) - val nestedRec : GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord] + val nestedRec: GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord] assertNull(nestedRec.get("new_nested_col")) assertNotNull(nestedRec.get("innerKey")) assertNotNull(nestedRec.get("innerValue")) - val innerStruct4 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) - .add("new_nested_col","string",false) + val innerStruct4 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true) + .add("new_nested_col", "string", false) // create a schema which has one additional nested column compared to schema1, which is non nullable val structType6 = new StructType().add("key", "string", false) - .add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true) + .add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct4, true) val schema6 = AvroConversionUtils.convertStructTypeToAvroSchema(structType6, "test_struct_name", "test_namespace") // convert batch 1 with schema5. should fail since the missed out column is not nullable. @@ -212,3 +212,28 @@ class TestHoodieSparkUtils { def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } + +object TestHoodieSparkUtils { + + + def setNullableRec(structType: StructType, columnName: Array[String], index: Int): StructType = { + StructType(structType.map { + case StructField(name, StructType(fields), nullable, metadata) if name.equals(columnName(index)) => + StructField(name, setNullableRec(StructType(fields), columnName, index + 1), nullable, metadata) + case StructField(name, ArrayType(StructType(fields), _), nullable, metadata) if name.equals(columnName(index)) => + StructField(name, ArrayType(setNullableRec(StructType(fields), columnName, index + 1)), nullable, metadata) + case StructField(name, dataType, _, metadata) if name.equals(columnName(index)) => + StructField(name, dataType, nullable = false, metadata) + case y: StructField => y + }) + } + + def setColumnNotNullable(df: DataFrame, columnName: String): DataFrame = { + // get schema + val schema = df.schema + // modify [[StructField] with name `cn` + val newSchema = setNullableRec(schema, columnName.split('.'), 0) + // apply new schema + df.sqlContext.createDataFrame(df.rdd, newSchema) + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 0626ac3960fef..576726a6874e2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -93,6 +93,8 @@ import static java.lang.String.format; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.utilities.UtilHelpers.buildProperties; +import static org.apache.hudi.utilities.UtilHelpers.readConfig; /** * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target @@ -170,7 +172,7 @@ private static TypedProperties combineProperties(Config cfg, Option * NOTE: These properties are already consolidated w/ CLI provided config-overrides */ private final TypedProperties props; @@ -520,6 +522,7 @@ private Pair>> fetchFromSourc final Option> avroRDDOptional; final String checkpointStr; SchemaProvider schemaProvider; + boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key()); if (transformer.isPresent()) { // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them // to generic records for writing @@ -533,7 +536,6 @@ private Pair>> fetchFromSourc ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key()); if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { // If the target schema is specified through Avro schema, // pass in the schema for the Row-to-Avro conversion @@ -559,35 +561,18 @@ private Pair>> fetchFromSourc } schemaProvider = this.userProvidedSchemaProvider; } else { - Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, cfg.targetBasePath); - // Deduce proper target (writer's) schema for the transformed dataset, reconciling its - // schema w/ the table's one - Option targetSchemaOpt = transformed.map(df -> { - Schema sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), - latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName))); - // Target (writer's) schema is determined based on the incoming source schema - // and existing table's one, reconciling the two (if necessary) based on configuration - return HoodieSparkSqlWriter.deduceWriterSchema( - sourceSchema, - HoodieConversionUtils.toScalaOption(latestTableSchemaOpt), - HoodieConversionUtils.toScalaOption(Option.empty()), - HoodieConversionUtils.fromProperties(props)); - }); - // Override schema provider with the reconciled target schema - schemaProvider = targetSchemaOpt.map(targetSchema -> - (SchemaProvider) new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(), - new SimpleSchemaProvider(hoodieSparkContext.jsc(), targetSchema, props))) - .orElse(dataAndCheckpoint.getSchemaProvider()); + schemaProvider = getDeducedSchemaProvider(transformed, dataAndCheckpoint.getSchemaProvider()); // Rewrite transformed records into the expected target schema avroRDDOptional = transformed.map(t -> getTransformedRDD(t, reconcileSchema, schemaProvider.getTargetSchema())); } } else { // Pull the data from the source & prepare the write - InputBatch> dataAndCheckpoint = - formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit); - avroRDDOptional = dataAndCheckpoint.getBatch(); + InputBatch> dataAndCheckpoint = + formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - schemaProvider = dataAndCheckpoint.getSchemaProvider(); + schemaProvider = getDeducedSchemaProvider(dataAndCheckpoint.getBatch(), dataAndCheckpoint.getSchemaProvider()); + // Rewrite transformed records into the expected target schema + avroRDDOptional = dataAndCheckpoint.getBatch().map(t -> getTransformedRDD(t, reconcileSchema, schemaProvider.getTargetSchema())); } if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { @@ -661,6 +646,43 @@ private Pair>> fetchFromSourc return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); } + /** + * Apply schema reconcile and schema evolution rules(schema on read) and generate new target schema provider. + * + * @param dataset Dataset from which source schema is derived. + * @param sourceSchemaProvider Source schema provider. + * @return the SchemaProvider that can be used as writer schema. + */ + private SchemaProvider getDeducedSchemaProvider(Option> dataset, SchemaProvider sourceSchemaProvider) { + Option latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, cfg.targetBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())) + .setBasePath(cfg.targetBasePath) + .setPayloadClassName(cfg.payloadClassName) + .build(); + Option internalSchemaOpt = HoodieConversionUtils.toJavaOption( + HoodieSchemaUtils.getLatestTableInternalSchema( + new HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg)), metaClient)); + // Deduce proper target (writer's) schema for the input dataset, reconciling its + // schema w/ the table's one + Option targetSchemaOpt = dataset.map(df -> { + Schema sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), + latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName))); + // Target (writer's) schema is determined based on the incoming source schema + // and existing table's one, reconciling the two (if necessary) based on configuration + return HoodieSparkSqlWriter.deduceWriterSchema( + sourceSchema, + HoodieConversionUtils.toScalaOption(latestTableSchemaOpt), + HoodieConversionUtils.toScalaOption(internalSchemaOpt), + HoodieConversionUtils.fromProperties(props)); + }); + + // Override schema provider with the reconciled target schema + return targetSchemaOpt.map(targetSchema -> + (SchemaProvider) new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), sourceSchemaProvider, + new SimpleSchemaProvider(hoodieSparkContext.jsc(), targetSchema, props))) + .orElse(sourceSchemaProvider); + } + private JavaRDD getTransformedRDD(Dataset rowDataset, boolean reconcileSchema, Schema readerSchema) { return HoodieSparkUtils.createRdd(rowDataset, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema, Option.ofNullable(readerSchema)).toJavaRDD(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java new file mode 100644 index 0000000000000..108fa5076eef0 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.deltastreamer; + +import org.apache.hudi.TestHoodieSparkUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.ParquetDFSSource; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Add test cases for out of the box schema evolution for deltastreamer: + * https://hudi.apache.org/docs/schema_evolution#out-of-the-box-schema-evolution + */ +public class TestHoodieDeltaStreamerSchemaEvolution extends HoodieDeltaStreamerTestBase { + + void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + long recordCount = sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).count(); + assertEquals(expected, recordCount); + } + + private HoodieDeltaStreamer.Config setupDeltaStreamer(String tableType, String tableBasePath, Boolean shouldCluster, Boolean shouldCompact, + boolean useSchemaProvider, boolean hasTransformer, + String sourceSchemaFile, String targetSchemaFile) throws IOException { + TypedProperties extraProps = new TypedProperties(); + extraProps.setProperty("hoodie.datasource.write.table.type", tableType); + extraProps.setProperty("hoodie.datasource.add.null.for.deleted.columns", "true"); + + if (shouldCompact) { + extraProps.setProperty(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + extraProps.setProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"); + } + + if (shouldCluster) { + extraProps.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true"); + extraProps.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2"); + extraProps.setProperty(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), "_row_key"); + } + + if (useSchemaProvider) { + defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); + } + prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps); + return TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_PARQUET, false, + useSchemaProvider, 100000, false, null, tableType, "timestamp", null); + } + + private void testBase(String tableType, Boolean shouldCluster, Boolean shouldCompact, String updateFile, String updateColumn, + String condition, int count, boolean isUpdateTypeDelete) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, updateFile, updateColumn, condition, count, true, false, false, null, null, isUpdateTypeDelete); + //adding non-nullable cols should fail, but instead it is adding nullable cols + //assertThrows(Exception.class, () -> testBase(tableType, shouldCluster, shouldCompact, updateFile, updateColumn, condition, count, false)); + } + + private void testReconcileWithSchemaProvider(String tableType, Boolean shouldCluster, Boolean shouldCompact, String updateFile, String updateColumn, + String condition, int count, boolean useSchemaProvider, boolean hasTransformer, + String sourceSchemaFile, String targetSchemaFile) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, updateFile, updateColumn, condition, count, true, useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, false); + } + + private void testBase(String tableType, Boolean shouldCluster, Boolean shouldCompact, String updateFile, String updateColumn, + String condition, int count, Boolean nullable, boolean useSchemaProvider, boolean hasTransformer, + String sourceSchemaFile, String targetSchemaFile, boolean isUpdateTypeDelete) throws Exception { + PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++; + String tableBasePath = basePath + "test_parquet_table" + testNum; + HoodieDeltaStreamer deltaStreamer = + new HoodieDeltaStreamer(setupDeltaStreamer(tableType, tableBasePath, shouldCluster, shouldCompact, useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile), + jsc); + + //first write + String datapath = String.class.getResource("/schema-evolution/start.json").getPath(); + sparkSession.read().json(datapath).write().format("parquet").save(PARQUET_SOURCE_ROOT); + deltaStreamer.sync(); + assertRecordCount(10, tableBasePath, sqlContext); + + if (!isUpdateTypeDelete) { + //second write + datapath = String.class.getResource("/schema-evolution/" + updateFile).getPath(); + Dataset df = sparkSession.read().json(datapath); + if (!nullable) { + df = TestHoodieSparkUtils.setColumnNotNullable(df, updateColumn); + } + df.write().format("parquet").mode(SaveMode.Append).save(PARQUET_SOURCE_ROOT); + deltaStreamer.sync(); + assertRecordCount(10, tableBasePath, sqlContext); + sparkSession.read().format("hudi").load(tableBasePath).select(updateColumn).show(10); + assertEquals(count, sparkSession.read().format("hudi").load(tableBasePath).filter(condition).count()); + } else { + //second write + datapath = String.class.getResource("/schema-evolution/" + updateFile).getPath(); + Dataset df = sparkSession.read().json(datapath); + df.write().format("parquet").mode(SaveMode.Append).save(PARQUET_SOURCE_ROOT); + deltaStreamer.sync(); + assertRecordCount(count, tableBasePath, sqlContext); + Dataset hudiDataFrame = sparkSession.read().format("hudi").load(tableBasePath); + boolean columnExists; + try { + hudiDataFrame.select(updateColumn); + columnExists = true; + } catch (RuntimeException e) { + columnExists = false; + } + assertTrue(columnExists); + } + } + + private static Stream testArgs() { + Stream.Builder b = Stream.builder(); + b.add(Arguments.of("COPY_ON_WRITE", false, false)); + b.add(Arguments.of("COPY_ON_WRITE", true, false)); + b.add(Arguments.of("MERGE_ON_READ", false, false)); + b.add(Arguments.of("MERGE_ON_READ", true, false)); + b.add(Arguments.of("MERGE_ON_READ", false, true)); + return b.build(); + } + + /** + * Add a new column at root level at the end + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testAddColRoot(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testAddColRoot.json", "zextra_col", "zextra_col = 'yes'", 2, false); + } + + /** + * Add a custom Hudi meta column + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testAddMetaCol(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testAddMetaCol.json", "_extra_col", "_extra_col = 'yes'", 2, false); + } + + /** + * Add a new column to inner struct (at the end) + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testAddColStruct(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testAddColStruct.json", "tip_history.zextra_col", "tip_history[0].zextra_col = 'yes'", 2, false); + } + + /** + * Add a new complex type field with default (array) + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testAddComplexField(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testAddComplexField.json", "zcomplex_array", "size(zcomplex_array) > 0", 2, false); + } + + /** + * Add a new column and change the ordering of fields + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testAddColChangeOrder(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testAddColChangeOrderAllFiles.json", "extra_col", "extra_col = 'yes'", 2, false); + //according to the docs, this should fail. But it doesn't + //assertThrows(Exception.class, () -> testBase(tableType, shouldCluster, shouldCompact, "testAddColChangeOrderSomeFiles.json", "extra_col", "extra_col = 'yes'", 1)); + } + + /** + * Delete a column at root level + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testDelColRoot(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testDelColRoot.json", "distance_in_meters", null, 12, true); + } + + /** + * Delete a column to inner struct (at the end) + */ + @ParameterizedTest + @MethodSource("testArgs") + public void testDelColStruct(String tableType, Boolean shouldCluster, Boolean shouldCompact) throws Exception { + testBase(tableType, shouldCluster, shouldCompact, "testDelColStruct.json", "tip_history.currency", null, 12, true); + } + + private void testTypePromotionBase(String tableType, Boolean shouldCluster, Boolean shouldCompact, Boolean reconcileSchema, String colName, DataType startType, DataType endType) throws Exception { + PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++; + String tableBasePath = basePath + "test_parquet_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(setupDeltaStreamer(tableType, tableBasePath, shouldCluster, shouldCompact, false, false, null, null), jsc); + + //first write + String datapath = String.class.getResource("/schema-evolution/startTypePromotion.json").getPath(); + Dataset df = sparkSession.read().json(datapath); + Column col = df.col(colName); + df = df.withColumn(colName, col.cast(startType)); + df.write().format("parquet").save(PARQUET_SOURCE_ROOT); + deltaStreamer.sync(); + assertRecordCount(10, tableBasePath, sqlContext); + assertEquals(startType, sparkSession.read().format("hudi").load(tableBasePath).select(colName).schema().fields()[0].dataType()); + + //second write + datapath = Objects.requireNonNull(String.class.getResource("/schema-evolution/endTypePromotion.json")).getPath(); + df = sparkSession.read().json(datapath); + col = df.col(colName); + df = df.withColumn(colName, col.cast(endType)); + df.write().format("parquet").mode(SaveMode.Append).save(PARQUET_SOURCE_ROOT); + deltaStreamer.sync(); + assertRecordCount(10, tableBasePath, sqlContext); + sparkSession.read().format("hudi").load(tableBasePath).select(colName).show(10); + assertEquals(reconcileSchema ? startType : endType, sparkSession.read().format("hudi").load(tableBasePath).select(colName).schema().fields()[0].dataType()); + } + + private static Stream testTypePromotionArgs() { + Stream.Builder b = Stream.builder(); + b.add(Arguments.of("COPY_ON_WRITE", false, false, false)); + b.add(Arguments.of("MERGE_ON_READ", false, false, false)); + /* + clustering fails for MOR and COW + b.add(Arguments.of("COPY_ON_WRITE", true, false, false)); + b.add(Arguments.of("MERGE_ON_READ", true, false, false)); + */ + b.add(Arguments.of("MERGE_ON_READ", false, true, false)); + return b.build(); + } + + /** + * Test type promotion for root level fields + */ + @ParameterizedTest + @MethodSource("testTypePromotionArgs") + public void testTypePromotion(String tableType, Boolean shouldCluster, Boolean shouldCompact, Boolean reconcileSchema) throws Exception { + //root data type promotions + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.IntegerType, DataTypes.LongType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.IntegerType, DataTypes.FloatType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.IntegerType, DataTypes.DoubleType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.IntegerType, DataTypes.StringType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.FloatType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.DoubleType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.StringType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "begin_lat", DataTypes.FloatType, DataTypes.DoubleType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "begin_lat", DataTypes.FloatType, DataTypes.StringType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "rider", DataTypes.StringType, DataTypes.BinaryType); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "tip_history", + DataTypes.createArrayType(DataTypes.IntegerType), DataTypes.createArrayType(DataTypes.LongType)); + //Seems to be supported for datasource. See org.apache.hudi.TestAvroSchemaResolutionSupport.testDataTypePromotions + //testTypePromotionBase(tableType, shouldCluster, shouldCompact, "rider", DataTypes.BinaryType, DataTypes.StringType); + + //nested data type promotions + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "fare", createFareStruct(DataTypes.FloatType), createFareStruct(DataTypes.DoubleType)); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "fare", createFareStruct(DataTypes.FloatType), createFareStruct(DataTypes.StringType)); + + //complex data type promotion + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "tip_history", + DataTypes.createArrayType(DataTypes.IntegerType), DataTypes.createArrayType(DataTypes.LongType)); + testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "tip_history", + DataTypes.createArrayType(DataTypes.IntegerType), DataTypes.createArrayType(DataTypes.StringType)); + + //test illegal type promotions + if (tableType.equals("COPY_ON_WRITE")) { + //illegal root data type promotion + SparkException e = assertThrows(SparkException.class, + () -> testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.IntegerType)); + assertTrue(e.getCause().getCause().getMessage().contains("cannot support rewrite value for schema type: \"int\" since the old schema type is: \"long\"")); + //illegal nested data type promotion + e = assertThrows(SparkException.class, + () -> testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "fare", createFareStruct(DataTypes.DoubleType), createFareStruct(DataTypes.FloatType))); + assertTrue(e.getCause().getCause().getMessage().contains("cannot support rewrite value for schema type: \"float\" since the old schema type is: \"double\"")); + //illegal complex data type promotion + e = assertThrows(SparkException.class, + () -> testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "tip_history", + DataTypes.createArrayType(DataTypes.LongType), DataTypes.createArrayType(DataTypes.IntegerType))); + assertTrue(e.getCause().getCause().getMessage().contains("cannot support rewrite value for schema type: \"int\" since the old schema type is: \"long\"")); + } else { + //illegal root data type promotion + if (shouldCompact) { + assertThrows(HoodieCompactionException.class, + () -> testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.IntegerType)); + } else { + SparkException e = assertThrows(SparkException.class, + () -> testTypePromotionBase(tableType, shouldCluster, shouldCompact, reconcileSchema, "distance_in_meters", DataTypes.LongType, DataTypes.IntegerType)); + assertTrue(e.getCause() instanceof NullPointerException); + } + + //nested and complex do not fail even though they should + } + } + + private StructType createFareStruct(DataType amountType) { + return DataTypes.createStructType(new StructField[] {new StructField("amount", amountType, true, Metadata.empty()), + new StructField("currency", DataTypes.StringType, true, Metadata.empty())}); + } +} diff --git a/hudi-utilities/src/test/resources/schema-evolution/endTypePromotion.json b/hudi-utilities/src/test/resources/schema-evolution/endTypePromotion.json new file mode 100644 index 0000000000000..b1a59efff4b36 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/endTypePromotion.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-001","driver":"driver-001","begin_lat":0.21927838567235019,"begin_lon":0.5594020723452937,"end_lat":0.7161653985102948,"end_lon":0.4971679897910298,"distance_in_meters":9361439213,"seconds_since_epoch":3794145268659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":16.671341480371346,"currency":"USD"},"tip_history":[951],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-001","driver":"driver-001","begin_lat":0.74714076296948563,"begin_lon":0.8776437421094859,"end_lat":0.9648524370765467,"end_lon":0.3911456321548304,"distance_in_meters":1137123412,"seconds_since_epoch":5028479681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":75.97606478430822,"currency":"USD"},"tip_history":[138],"_hoodie_is_deleted":false} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema-evolution/start.json b/hudi-utilities/src/test/resources/schema-evolution/start.json new file mode 100644 index 0000000000000..7e90c52352f09 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/start.json @@ -0,0 +1,10 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"1f7f4473-8889-488a-86f8-aaa63319b4b4","partition_path":"2015/03/17","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.09283534365767165,"begin_lon":0.7406047279761032,"end_lat":0.259529402287365,"end_lon":0.3793829234810173,"distance_in_meters":-1289053159,"seconds_since_epoch":6540247735540261975,"weight":0.74709326,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1338290882,"height":0.474291,"city_to_state":{"LA":"CA"},"fare":{"amount":41.8217733941428,"currency":"USD"},"tip_history":[{"amount":91.54707889420283,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"b7000dbd-d80f-4024-905d-532977ae43f9","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.5931504793109675,"begin_lon":0.9886471058049089,"end_lat":0.006118306492296055,"end_lon":0.19266950151149498,"distance_in_meters":-1686525516,"seconds_since_epoch":4166715486945369394,"weight":0.8310657,"nation":"Q2FuYWRh","current_date":"1970-01-13","current_ts":1105887562,"height":0.557941,"city_to_state":{"LA":"CA"},"fare":{"amount":63.60969374104979,"currency":"USD"},"tip_history":[{"amount":87.00454921048154,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"07076280-5bab-4b0d-8930-94a1de5991cd","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.04245323335756779,"begin_lon":0.9152007089994821,"end_lat":0.6511125556291417,"end_lon":0.28444356863277487,"distance_in_meters":-480499072,"seconds_since_epoch":-4541489022232815692,"weight":0.8729432,"nation":"Q2FuYWRh","current_date":"1970-01-14","current_ts":1180252692,"height":0.321330,"city_to_state":{"LA":"CA"},"fare":{"amount":56.86865265269785,"currency":"USD"},"tip_history":[{"amount":30.2448146817467,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"d41c5703-6c86-4f4c-ab2c-51253b02deaf","partition_path":"2015/03/17","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5331332869796412,"begin_lon":0.11236032208831404,"end_lat":0.7610323238172235,"end_lon":0.6414706864249624,"distance_in_meters":1212983241,"seconds_since_epoch":7090335803227873266,"weight":0.40637594,"nation":"Q2FuYWRh","current_date":"1970-01-14","current_ts":1172551761,"height":0.183033,"city_to_state":{"LA":"CA"},"fare":{"amount":87.58991293970846,"currency":"USD"},"tip_history":[{"amount":11.69405524258501,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"bcea510f-aaf6-42f5-a490-c61b42f59784","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.7362562672182036,"begin_lon":0.4745041047602002,"end_lat":0.22777332842138953,"end_lon":0.10094789978439622,"distance_in_meters":60306142,"seconds_since_epoch":5390769490275546019,"weight":0.9655821,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":982643754,"height":0.982110,"city_to_state":{"LA":"CA"},"fare":{"amount":70.10088696225361,"currency":"USD"},"tip_history":[{"amount":96.79449667264703,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"ad5ab2be-769a-4c7b-98af-e2780d016a9c","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5390219572718705,"begin_lon":0.08683108180272892,"end_lat":0.7835345528085245,"end_lon":0.695364227220298,"distance_in_meters":1746406037,"seconds_since_epoch":-1859359059343187038,"weight":0.7024137,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1356858937,"height":0.189173,"city_to_state":{"LA":"CA"},"fare":{"amount":29.865323585321068,"currency":"USD"},"tip_history":[{"amount":19.760372723830354,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"6c8b77e5-7806-43f1-9ecc-706a999d49fe","partition_path":"2015/03/17","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5347242863334416,"begin_lon":0.03138005638340591,"end_lat":0.6037366738340498,"end_lon":0.49273899834224566,"distance_in_meters":-1370828602,"seconds_since_epoch":-4712777615466527378,"weight":0.580827,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":1009523468,"height":0.624823,"city_to_state":{"LA":"CA"},"fare":{"amount":71.77332900090153,"currency":"USD"},"tip_history":[{"amount":7.720702671399637,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"1294ff48-1903-4846-8050-63034826bac6","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.2837618805741604,"begin_lon":0.04250621065910509,"end_lat":0.71481983832847,"end_lon":0.439897330022758,"distance_in_meters":-659337267,"seconds_since_epoch":-1158726169754094421,"weight":0.61601305,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":990690984,"height":0.227301,"city_to_state":{"LA":"CA"},"fare":{"amount":11.535336301433786,"currency":"USD"},"tip_history":[{"amount":76.92066962925655,"currency":"USD"}],"_hoodie_is_deleted":false} diff --git a/hudi-utilities/src/test/resources/schema-evolution/startTypePromotion.json b/hudi-utilities/src/test/resources/schema-evolution/startTypePromotion.json new file mode 100644 index 0000000000000..354ea1d883443 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/startTypePromotion.json @@ -0,0 +1,10 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[90],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[13],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"1f7f4473-8889-488a-86f8-aaa63319b4b4","partition_path":"2015/03/17","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.09283534365767165,"begin_lon":0.7406047279761032,"end_lat":0.259529402287365,"end_lon":0.3793829234810173,"distance_in_meters":-1289053159,"seconds_since_epoch":6540247735540261975,"weight":0.74709326,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1338290882,"height":0.474291,"city_to_state":{"LA":"CA"},"fare":{"amount":41.8217733941428,"currency":"USD"},"tip_history":[91],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"b7000dbd-d80f-4024-905d-532977ae43f9","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.5931504793109675,"begin_lon":0.9886471058049089,"end_lat":0.006118306492296055,"end_lon":0.19266950151149498,"distance_in_meters":-1686525516,"seconds_since_epoch":4166715486945369394,"weight":0.8310657,"nation":"Q2FuYWRh","current_date":"1970-01-13","current_ts":1105887562,"height":0.557941,"city_to_state":{"LA":"CA"},"fare":{"amount":63.60969374104979,"currency":"USD"},"tip_history":[87],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"07076280-5bab-4b0d-8930-94a1de5991cd","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.04245323335756779,"begin_lon":0.9152007089994821,"end_lat":0.6511125556291417,"end_lon":0.28444356863277487,"distance_in_meters":-480499072,"seconds_since_epoch":-4541489022232815692,"weight":0.8729432,"nation":"Q2FuYWRh","current_date":"1970-01-14","current_ts":1180252692,"height":0.321330,"city_to_state":{"LA":"CA"},"fare":{"amount":56.86865265269785,"currency":"USD"},"tip_history":[30],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"d41c5703-6c86-4f4c-ab2c-51253b02deaf","partition_path":"2015/03/17","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5331332869796412,"begin_lon":0.11236032208831404,"end_lat":0.7610323238172235,"end_lon":0.6414706864249624,"distance_in_meters":1212983241,"seconds_since_epoch":7090335803227873266,"weight":0.40637594,"nation":"Q2FuYWRh","current_date":"1970-01-14","current_ts":1172551761,"height":0.183033,"city_to_state":{"LA":"CA"},"fare":{"amount":87.58991293970846,"currency":"USD"},"tip_history":[11],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"bcea510f-aaf6-42f5-a490-c61b42f59784","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.7362562672182036,"begin_lon":0.4745041047602002,"end_lat":0.22777332842138953,"end_lon":0.10094789978439622,"distance_in_meters":60306142,"seconds_since_epoch":5390769490275546019,"weight":0.9655821,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":982643754,"height":0.982110,"city_to_state":{"LA":"CA"},"fare":{"amount":70.10088696225361,"currency":"USD"},"tip_history":[96],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"ad5ab2be-769a-4c7b-98af-e2780d016a9c","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5390219572718705,"begin_lon":0.08683108180272892,"end_lat":0.7835345528085245,"end_lon":0.695364227220298,"distance_in_meters":1746406037,"seconds_since_epoch":-1859359059343187038,"weight":0.7024137,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1356858937,"height":0.189173,"city_to_state":{"LA":"CA"},"fare":{"amount":29.865323585321068,"currency":"USD"},"tip_history":[19],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"6c8b77e5-7806-43f1-9ecc-706a999d49fe","partition_path":"2015/03/17","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.5347242863334416,"begin_lon":0.03138005638340591,"end_lat":0.6037366738340498,"end_lon":0.49273899834224566,"distance_in_meters":-1370828602,"seconds_since_epoch":-4712777615466527378,"weight":0.580827,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":1009523468,"height":0.624823,"city_to_state":{"LA":"CA"},"fare":{"amount":71.77332900090153,"currency":"USD"},"tip_history":[7],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"1294ff48-1903-4846-8050-63034826bac6","partition_path":"2016/03/15","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.2837618805741604,"begin_lon":0.04250621065910509,"end_lat":0.71481983832847,"end_lon":0.439897330022758,"distance_in_meters":-659337267,"seconds_since_epoch":-1158726169754094421,"weight":0.61601305,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":990690984,"height":0.227301,"city_to_state":{"LA":"CA"},"fare":{"amount":11.535336301433786,"currency":"USD"},"tip_history":[76],"_hoodie_is_deleted":false} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderAllFiles.json b/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderAllFiles.json new file mode 100644 index 0000000000000..ecec12ed70a05 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderAllFiles.json @@ -0,0 +1,3 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"extra_col":"yes"} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false,"extra_col":"yes"} +{"timestamp":0,"_row_key":"1f7f4473-8889-488a-86f8-aaa63319b4b4","partition_path":"2015/03/17","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.09283534365767165,"begin_lon":0.7406047279761032,"end_lat":0.259529402287365,"end_lon":0.3793829234810173,"distance_in_meters":-1289053159,"seconds_since_epoch":6540247735540261975,"weight":0.74709326,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1338290882,"height":0.474291,"city_to_state":{"LA":"CA"},"fare":{"amount":41.8217733941428,"currency":"USD"},"tip_history":[{"amount":91.54707889420283,"currency":"USD"}],"_hoodie_is_deleted":false,"extra_col":"no"} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderSomeFiles.json b/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderSomeFiles.json new file mode 100644 index 0000000000000..f920b319e1322 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddColChangeOrderSomeFiles.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"extra_col":"yes"} +{"timestamp":0,"_row_key":"1f7f4473-8889-488a-86f8-aaa63319b4b4","partition_path":"2015/03/17","trip_type":"UBERX","rider":"rider-000","driver":"driver-000","begin_lat":0.09283534365767165,"begin_lon":0.7406047279761032,"end_lat":0.259529402287365,"end_lon":0.3793829234810173,"distance_in_meters":-1289053159,"seconds_since_epoch":6540247735540261975,"weight":0.74709326,"nation":"Q2FuYWRh","current_date":"1970-01-16","current_ts":1338290882,"height":0.474291,"city_to_state":{"LA":"CA"},"fare":{"amount":41.8217733941428,"currency":"USD"},"tip_history":[{"amount":91.54707889420283,"currency":"USD"}],"_hoodie_is_deleted":false,"extra_col":"no"} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddColRoot.json b/hudi-utilities/src/test/resources/schema-evolution/testAddColRoot.json new file mode 100644 index 0000000000000..73c48e9c99698 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddColRoot.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"zextra_col":"yes"} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false,"zextra_col":"yes"} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddColRootWithSchemaProvider.json b/hudi-utilities/src/test/resources/schema-evolution/testAddColRootWithSchemaProvider.json new file mode 100644 index 0000000000000..5a8dbab702aac --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddColRootWithSchemaProvider.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"evoluted_optional_union_field":"yes"} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false,"evoluted_optional_union_field":"yes"} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddColStruct.json b/hudi-utilities/src/test/resources/schema-evolution/testAddColStruct.json new file mode 100644 index 0000000000000..d1f0e94b094bf --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddColStruct.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD","zextra_col":"yes"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD","zextra_col":"yes"}],"_hoodie_is_deleted":false} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddComplexField.json b/hudi-utilities/src/test/resources/schema-evolution/testAddComplexField.json new file mode 100644 index 0000000000000..c5ab402752275 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddComplexField.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"zcomplex_array":["a","b","c"]} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false,"zcomplex_array":["d"]} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testAddMetaCol.json b/hudi-utilities/src/test/resources/schema-evolution/testAddMetaCol.json new file mode 100644 index 0000000000000..84bfa4e2b001e --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testAddMetaCol.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdd0a","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false,"_extra_col":"yes"} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610c956","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false,"_extra_col":"yes"} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testDelColRoot.json b/hudi-utilities/src/test/resources/schema-evolution/testDelColRoot.json new file mode 100644 index 0000000000000..cd2bb5a641aa7 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testDelColRoot.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdbbb","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568,"currency":"USD"}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610cbbb","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558,"currency":"USD"}],"_hoodie_is_deleted":false} diff --git a/hudi-utilities/src/test/resources/schema-evolution/testDelColStruct.json b/hudi-utilities/src/test/resources/schema-evolution/testDelColStruct.json new file mode 100644 index 0000000000000..a99734ff415a6 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema-evolution/testDelColStruct.json @@ -0,0 +1,2 @@ +{"timestamp":0,"_row_key":"154fee81-6e2a-4c32-94f5-be5c456fdaaa","partition_path":"2016/03/15","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.21927838567558522,"begin_lon":0.5594020723099724,"end_lat":0.7161653985926594,"end_lon":0.49716798979953447,"distance_in_meters":936143957,"seconds_since_epoch":3794105168659998336,"weight":0.18520206,"nation":"Q2FuYWRh","current_date":"1970-01-15","current_ts":1244853103,"height":0.272661,"city_to_state":{"LA":"CA"},"fare":{"amount":12.671341480371346,"currency":"USD"},"tip_history":[{"amount":90.26735894145568}],"_hoodie_is_deleted":false} +{"timestamp":0,"_row_key":"c8c1bd1a-d58b-46c6-a38b-79a2a610cbbb","partition_path":"2015/03/16","trip_type":"BLACK","rider":"rider-000","driver":"driver-000","begin_lat":0.7471407629318884,"begin_lon":0.8776437421395643,"end_lat":0.9648524370990681,"end_lon":0.3911456751705831,"distance_in_meters":1137109733,"seconds_since_epoch":5028439681953251637,"weight":0.023411155,"nation":"Q2FuYWRh","current_date":"1970-01-12","current_ts":986645693,"height":0.898042,"city_to_state":{"LA":"CA"},"fare":{"amount":85.97606478430822,"currency":"USD"},"tip_history":[{"amount":13.7534224373558}],"_hoodie_is_deleted":false}