diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java new file mode 100644 index 0000000000000..4a41b75589a3c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DropColumnSchemaPostProcessor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; + +import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link SchemaPostProcessor} that support to delete column(s) from given schema. + *

+ * Multiple columns are separated by commas. + * For example: + *

+ * properties.put("hoodie.deltastreamer.schemaprovider.schema_post_processor.delete.columns", "column1,column2"). + */ +public class DropColumnSchemaPostProcessor extends SchemaPostProcessor { + + private static final Logger LOG = LogManager.getLogger(DropColumnSchemaPostProcessor.class); + + public DropColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + public static class Config { + public static final String DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = + "hoodie.deltastreamer.schemaprovider.schema_post_processor.delete.columns"; + } + + @Override + public Schema processSchema(Schema schema) { + + String columnToDeleteStr = this.config.getString(Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP); + + if (StringUtils.isNullOrEmpty(columnToDeleteStr)) { + LOG.warn(String.format("Param %s is null or empty, return original schema", Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP)); + } + + // convert field to lowerCase for compare purpose + Set columnsToDelete = Arrays.stream(columnToDeleteStr.split(",")) + .map(filed -> filed.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()); + + List sourceFields = schema.getFields(); + List targetFields = new LinkedList<>(); + + for (Schema.Field sourceField : sourceFields) { + if (!columnsToDelete.contains(sourceField.name().toLowerCase(Locale.ROOT))) { + targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal())); + } + } + + if (targetFields.isEmpty()) { + throw new HoodieSchemaPostProcessException("Target schema is empty, you can not remove all columns!"); + } + + return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields); + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index 108890060cba7..19b09a8901ec3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -19,7 +19,9 @@ package org.apache.hudi.utilities; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor; +import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -29,6 +31,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -116,6 +119,28 @@ public void testChainedSchemaPostProcessor() { assertNotNull(targetSchema.getField("testString")); } + @Test + public void testDeleteColumn() { + // remove column ums_id_ from source schema + properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "ums_id_"); + DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null); + Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); + Schema targetSchema = processor.processSchema(schema); + + assertNull(targetSchema.getField("ums_id_")); + assertNotNull(targetSchema.getField("ums_ts_")); + } + + @Test + public void testDeleteColumnThrows() { + // remove all columns from source schema + properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "ums_id_,ums_ts_"); + DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null); + Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); + + Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema)); + } + @Test public void testSparkAvroSchema() throws IOException { SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);