diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java new file mode 100644 index 0000000000000..e58dc4e20611b --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DeleteSupportSchemaPostProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * An implementation of {@link SchemaPostProcessor} which will add a column named "_hoodie_is_deleted" to the end of + * a given schema. + */ +public class DeleteSupportSchemaPostProcessor extends SchemaPostProcessor { + + private static final Logger LOG = LogManager.getLogger(DeleteSupportSchemaPostProcessor.class); + + public DeleteSupportSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema processSchema(Schema schema) { + + if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null) { + LOG.warn(String.format("column %s already exists!", HoodieRecord.HOODIE_IS_DELETED)); + return schema; + } + + List sourceFields = schema.getFields(); + List targetFields = new ArrayList<>(sourceFields.size() + 1); + // copy existing columns + for (Schema.Field sourceField : sourceFields) { + targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal())); + } + // add _hoodie_is_deleted column + targetFields.add(new Schema.Field(HoodieRecord.HOODIE_IS_DELETED, Schema.create(Schema.Type.BOOLEAN), null, false)); + + return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields); + } + +} + + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java index 34549885cf8c9..3322b7b010385 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java @@ -41,7 +41,7 @@ public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) { @Override public Schema processSchema(Schema schema) { return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema( - AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, + AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME, RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) : null; } } \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java index 2055406996f35..1ec3eb2f911f5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java @@ -18,9 +18,10 @@ package org.apache.hudi.utilities; -import org.apache.avro.Schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.avro.Schema; import org.apache.spark.api.java.JavaSparkContext; public class DummySchemaProvider extends SchemaProvider { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java index 90c0149b0906c..22dca7bd6389f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/SparkAvroSchemaProvider.java @@ -18,9 +18,10 @@ package org.apache.hudi.utilities; -import org.apache.avro.Schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.avro.Schema; import org.apache.spark.api.java.JavaSparkContext; public class SparkAvroSchemaProvider extends SchemaProvider { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java index b9e9282923853..68cd994ba10ea 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java @@ -19,17 +19,17 @@ package org.apache.hudi.utilities; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.hudi.utilities.transform.FlatteningTransformer; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.SchemaBuilder; - -import org.apache.hudi.utilities.transform.FlatteningTransformer; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.Test; @@ -82,6 +82,14 @@ public void testSparkAvro() throws IOException { assertNotNull(schema.getField("day")); } + @Test + public void testDeleteSupport() { + DeleteSupportSchemaPostProcessor processor = new DeleteSupportSchemaPostProcessor(properties, null); + Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA); + Schema targetSchema = processor.processSchema(schema); + assertNotNull(targetSchema.getField("_hoodie_is_deleted")); + } + @Test public void testSparkAvroSchema() throws IOException { SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);