diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 74cc775718a2e..9c568858294d0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -52,7 +52,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; -import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig; import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig; import org.apache.hudi.utilities.exception.HoodieSchemaFetchException; import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; @@ -64,7 +63,6 @@ import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; -import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.Source; @@ -116,7 +114,6 @@ import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; @@ -547,12 +544,6 @@ public static SchemaProvider wrapSchemaProviderWithPostProcessor(SchemaProvider String schemaPostProcessorClass = getStringWithAltKeys( cfg, SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR, true); - boolean enableSparkAvroPostProcessor = - getBooleanWithAltKeys(cfg, HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE); - if (transformerClassNames != null && !transformerClassNames.isEmpty() - && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) { - schemaPostProcessorClass = SparkAvroPostProcessor.class.getName(); - } if (schemaPostProcessorClass == null || schemaPostProcessorClass.isEmpty()) { return provider; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java index e7ec7815b976c..6e1dbab8e9ae4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java @@ -59,13 +59,6 @@ public class HoodieSchemaProviderConfig extends HoodieConfig { .markAdvanced() .withDocumentation("The class name of the custom schema converter to use."); - public static final ConfigProperty SPARK_AVRO_POST_PROCESSOR_ENABLE = ConfigProperty - .key(SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable") - .defaultValue(true) - .withAlternatives(OLD_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable") - .markAdvanced() - .withDocumentation("Whether to enable Spark Avro post processor."); - public static final ConfigProperty SCHEMA_REGISTRY_BASE_URL = ConfigProperty .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl") .noDefaultValue() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java deleted file mode 100644 index 406f71872d59f..0000000000000 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.schema; - -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig; - -import org.apache.avro.Schema; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.types.StructType; - -/** - * HUDI-1343:Add standard schema postprocessor which would rewrite the schema using spark-avro conversion. - */ -public class SparkAvroPostProcessor extends SchemaPostProcessor { - - @Deprecated - public static class Config { - @Deprecated - public static final String SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = - HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key(); - } - - public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) { - super(props, jssc); - } - - @Override - public Schema processSchema(Schema schema) { - if (schema == null) { - return null; - } - - StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); - // NOTE: It's critical that we preserve incoming schema's qualified record-name to make - // sure we maintain schema's compatibility (as defined by [[AvroSchemaCompatibility]]) - return AvroConversionUtils.convertStructTypeToAvroSchema(structType, schema.getFullName()); - } -} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index 0818c43cc02fb..264332ec77ce1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -23,7 +23,6 @@ import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor; @@ -88,7 +87,6 @@ public void testPostProcessor() throws IOException { @Test public void testSparkAvro() throws IOException { - properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), SparkAvroPostProcessor.class.getName()); List transformerClassNames = new ArrayList<>(); transformerClassNames.add(FlatteningTransformer.class.getName()); @@ -185,11 +183,4 @@ public void testAddPrimitiveTypeColumn(String type) { assertEquals(type, newColumn.schema().getType().getName()); } - - @Test - public void testSparkAvroSchema() throws IOException { - SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null); - Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); - assertEquals(RESULT_SCHEMA, processor.processSchema(schema).toString()); - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 51a8d26754a63..03ca6bd67815e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -187,7 +187,6 @@ protected static void prepareInitialConfigs(HoodieStorage storage, String dfsBas dfsBasePath + "/sql-transformer.properties"); UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source.avsc", storage, dfsBasePath + "/source.avsc"); UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved.avsc", storage, dfsBasePath + "/source_evolved.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved_post_processed.avsc", storage, dfsBasePath + "/source_evolved_post_processed.avsc"); UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source-flattened.avsc", storage, dfsBasePath + "/source-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target.avsc", storage, dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target-flattened.avsc", storage, dfsBasePath + "/target-flattened.avsc"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index e20875608de55..d2c5cfa148e21 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -81,7 +81,6 @@ import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieIndexer; -import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig; import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.config.SourceTestConfig; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; @@ -278,23 +277,15 @@ private static HoodieStreamer.Config getBaseConfig() { */ private static Stream schemaEvolArgs() { return Stream.of( - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.AVRO), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.AVRO), - - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.SPARK), - Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.SPARK)); + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.AVRO), + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.AVRO), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.AVRO), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.AVRO), + + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.SPARK), + Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.SPARK), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.SPARK), + Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.SPARK)); } private static Stream provideValidCliArgs() { @@ -569,8 +560,8 @@ private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer ex // TODO add tests w/ disabled reconciliation @ParameterizedTest @MethodSource("schemaEvolArgs") - public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor, HoodieRecordType recordType) throws Exception { - String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor; + public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, HoodieRecordType recordType) throws Exception { + String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema; defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); // Insert data produced with Schema A, pass Schema A HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), @@ -579,9 +570,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); - if (!useSchemaPostProcessor) { - cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); - } + new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); @@ -593,9 +582,6 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc"); cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); - if (!useSchemaPostProcessor) { - cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); - } new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1450, tableBasePath, sqlContext); @@ -619,9 +605,6 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, if (useUserProvidedSchema) { cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc"); } - if (!useSchemaPostProcessor) { - cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false"); - } cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true"); new HoodieDeltaStreamer(cfg, jsc).sync(); // again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. @@ -636,11 +619,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, assertNotNull(tableSchema); Schema expectedSchema; - if (!useSchemaPostProcessor) { - expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved.avsc"))); - } else { - expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved_post_processed.avsc"))); - } + expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved.avsc"))); assertEquals(expectedSchema, tableSchema); // clean up and reinit diff --git a/hudi-utilities/src/test/resources/streamer-config/source_evolved_post_processed.avsc b/hudi-utilities/src/test/resources/streamer-config/source_evolved_post_processed.avsc deleted file mode 100644 index af9844218c489..0000000000000 --- a/hudi-utilities/src/test/resources/streamer-config/source_evolved_post_processed.avsc +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -{ - "type": "record", - "name": "triprec", - "fields": [ - { - "name": "timestamp", - "type": "long" - }, - { - "name": "_row_key", - "type": "string" - }, - { - "name": "partition_path", - "type": "string" - }, - { - "name": "trip_type", - "type": "string" - }, - { - "name": "rider", - "type": "string" - }, - { - "name": "driver", - "type": "string" - }, - { - "name": "begin_lat", - "type": "double" - }, - { - "name": "begin_lon", - "type": "double" - }, - { - "name": "end_lat", - "type": "double" - }, - { - "name": "end_lon", - "type": "double" - }, - { - "name": "distance_in_meters", - "type": "int" - }, - { - "name": "seconds_since_epoch", - "type": "long" - }, - { - "name": "weight", - "type": "float" - }, - { - "name": "nation", - "type": "bytes" - }, - { - "name": "current_date", - "type": { - "type": "int", - "logicalType": "date" - } - }, - { - "name": "current_ts", - "type": "long" - }, - { - "name": "height", - "type": { - "type": "fixed", - "name": "fixed", - "namespace": "triprec.height", - "size": 5, - "logicalType": "decimal", - "precision": 10, - "scale": 6 - } - }, - { - "name": "city_to_state", - "type": { - "type": "map", - "values": "string" - } - }, - { - "name": "fare", - "type": { - "type": "record", - "name": "fare", - "namespace": "triprec", - "fields": [ - { - "name": "amount", - "type": "double" - }, - { - "name": "currency", - "type": "string" - } - ] - } - }, - { - "name": "tip_history", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "tip_history", - "namespace": "triprec", - "fields": [ - { - "name": "amount", - "type": "double" - }, - { - "name": "currency", - "type": "string" - } - ] - } - } - }, - { - "name": "_hoodie_is_deleted", - "type": "boolean" - }, - { - "name": "evoluted_optional_union_field", - "type": [ - "null", - "string" - ], - "default": null - } - ] -}