diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 1f2a30c26656a..5c76b4c997bdf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -18,7 +18,6 @@ package org.apache.hudi.utilities; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -44,6 +43,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; +import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaPostProcessor; @@ -57,6 +58,7 @@ import org.apache.hudi.utilities.transform.Transformer; import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -133,7 +135,7 @@ public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(St } public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, - JavaSparkContext jssc) throws IOException { + JavaSparkContext jssc) throws IOException { try { return StringUtils.isNullOrEmpty(schemaProviderClass) ? null : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); @@ -143,10 +145,22 @@ public static SchemaProvider createSchemaProvider(String schemaProviderClass, Ty } public static SchemaPostProcessor createSchemaPostProcessor( - String schemaPostProcessorClass, TypedProperties cfg, JavaSparkContext jssc) { - return schemaPostProcessorClass == null - ? null - : (SchemaPostProcessor) ReflectionUtils.loadClass(schemaPostProcessorClass, cfg, jssc); + String schemaPostProcessorClassNames, TypedProperties cfg, JavaSparkContext jssc) { + + if (StringUtils.isNullOrEmpty(schemaPostProcessorClassNames)) { + return null; + } + + try { + List processors = new ArrayList<>(); + for (String className : (schemaPostProcessorClassNames.split(","))) { + processors.add((SchemaPostProcessor) ReflectionUtils.loadClass(className, cfg, jssc)); + } + return new ChainedSchemaPostProcessor(cfg, jssc, processors); + } catch (Throwable e) { + throw new HoodieSchemaPostProcessException("Could not load schemaPostProcessorClassNames class(es) " + schemaPostProcessorClassNames, e); + } + } public static Option createTransformer(List classNames) throws IOException { @@ -164,7 +178,7 @@ public static Option createTransformer(List classNames) thr public static InitialCheckPointProvider createInitialCheckpointProvider( String className, TypedProperties props) throws IOException { try { - return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[] {TypedProperties.class}, props); + return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[]{TypedProperties.class}, props); } catch (Throwable e) { throw new IOException("Could not load initial checkpoint provider class " + className, e); } @@ -421,21 +435,21 @@ public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProv } public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider, - TypedProperties cfg, JavaSparkContext jssc, List transformerClassNames) { + TypedProperties cfg, JavaSparkContext jssc, List transformerClassNames) { if (provider == null) { return null; } - if (provider instanceof SchemaProviderWithPostProcessor) { - return (SchemaProviderWithPostProcessor)provider; + if (provider instanceof SchemaProviderWithPostProcessor) { + return (SchemaProviderWithPostProcessor) provider; } String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null); boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE, "true")); if (transformerClassNames != null && !transformerClassNames.isEmpty() - && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) { + && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) { schemaPostProcessorClass = SparkAvroPostProcessor.class.getName(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaPostProcessException.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaPostProcessException.java new file mode 100644 index 0000000000000..dd765203d371c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSchemaPostProcessException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.exception; + +import org.apache.hudi.exception.HoodieException; + +/** + * Exception throws during schema post process. + */ +public class HoodieSchemaPostProcessException extends HoodieException { + public HoodieSchemaPostProcessException(String msg) { + super(msg); + } + + public HoodieSchemaPostProcessException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java new file mode 100644 index 0000000000000..5a88d9fd92f3d --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ChainedSchemaPostProcessor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; + +/** + * A {@link SchemaPostProcessor} to chain other {@link SchemaPostProcessor}s and process sequentially. + */ +public class ChainedSchemaPostProcessor extends SchemaPostProcessor { + + private List processors; + + protected ChainedSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + public ChainedSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc, List processors) { + super(props, jssc); + this.processors = processors; + } + + @Override + public Schema processSchema(Schema schema) { + Schema targetSchema = schema; + for (SchemaPostProcessor processor : processors) { + targetSchema = processor.processSchema(targetSchema); + } + return targetSchema; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaPostProcessor.java new file mode 100644 index 0000000000000..2f8d3eee7b62b --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaPostProcessor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaPostProcessor; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.spark.api.java.JavaSparkContext; + +public class DummySchemaPostProcessor extends SchemaPostProcessor { + + public DummySchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema processSchema(Schema schema) { + return SchemaBuilder.record("test").fields().optionalString("testString").endRecord(); + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index 68cd994ba10ea..108890060cba7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -29,8 +29,6 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; -import org.apache.avro.SchemaBuilder; -import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -39,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; public class TestSchemaPostProcessor extends UtilitiesTestBase { @@ -91,21 +90,36 @@ public void testDeleteSupport() { } @Test - public void testSparkAvroSchema() throws IOException { - SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null); + public void testChainedSchemaPostProcessor() { + // DeleteSupportSchemaPostProcessor first, DummySchemaPostProcessor second + properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, + "org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor"); + + SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc); Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); - assertEquals(processor.processSchema(schema).toString(), RESULT_SCHEMA); - } + Schema targetSchema = processor.processSchema(schema); - public static class DummySchemaPostProcessor extends SchemaPostProcessor { + assertNull(targetSchema.getField("ums_id_")); + assertNull(targetSchema.getField("_hoodie_is_deleted")); + assertNotNull(targetSchema.getField("testString")); - public DummySchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { - super(props, jssc); - } + // DummySchemaPostProcessor first, DeleteSupportSchemaPostProcessor second + properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, + "org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor"); - @Override - public Schema processSchema(Schema schema) { - return SchemaBuilder.record("test").fields().optionalString("testString").endRecord(); - } + processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc); + schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); + targetSchema = processor.processSchema(schema); + + assertNull(targetSchema.getField("ums_id_")); + assertNotNull(targetSchema.getField("_hoodie_is_deleted")); + assertNotNull(targetSchema.getField("testString")); + } + + @Test + public void testSparkAvroSchema() throws IOException { + SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null); + Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); + assertEquals(processor.processSchema(schema).toString(), RESULT_SCHEMA); } }