Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
Expand All @@ -64,7 +63,6 @@
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
Expand Down Expand Up @@ -116,7 +114,6 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;

Expand Down Expand Up @@ -547,12 +544,6 @@ public static SchemaProvider wrapSchemaProviderWithPostProcessor(SchemaProvider

String schemaPostProcessorClass = getStringWithAltKeys(
cfg, SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR, true);
boolean enableSparkAvroPostProcessor =
getBooleanWithAltKeys(cfg, HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE);
if (transformerClassNames != null && !transformerClassNames.isEmpty()
&& enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
schemaPostProcessorClass = SparkAvroPostProcessor.class.getName();
}

if (schemaPostProcessorClass == null || schemaPostProcessorClass.isEmpty()) {
return provider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ public class HoodieSchemaProviderConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("The class name of the custom schema converter to use.");

public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_ENABLE = ConfigProperty
.key(SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
.defaultValue(true)
.withAlternatives(OLD_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
.markAdvanced()
.withDocumentation("Whether to enable Spark Avro post processor.");

public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL = ConfigProperty
.key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
.noDefaultValue()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
Expand Down Expand Up @@ -88,7 +87,6 @@ public void testPostProcessor() throws IOException {

@Test
public void testSparkAvro() throws IOException {
properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), SparkAvroPostProcessor.class.getName());
List<String> transformerClassNames = new ArrayList<>();
transformerClassNames.add(FlatteningTransformer.class.getName());

Expand Down Expand Up @@ -185,11 +183,4 @@ public void testAddPrimitiveTypeColumn(String type) {
assertEquals(type, newColumn.schema().getType().getName());

}

@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
assertEquals(RESULT_SCHEMA, processor.processSchema(schema).toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ protected static void prepareInitialConfigs(HoodieStorage storage, String dfsBas
dfsBasePath + "/sql-transformer.properties");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source.avsc", storage, dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved.avsc", storage, dfsBasePath + "/source_evolved.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved_post_processed.avsc", storage, dfsBasePath + "/source_evolved_post_processed.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source-flattened.avsc", storage, dfsBasePath + "/source-flattened.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target.avsc", storage, dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target-flattened.avsc", storage, dfsBasePath + "/target-flattened.avsc");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
Expand Down Expand Up @@ -278,23 +277,15 @@ private static HoodieStreamer.Config getBaseConfig() {
*/
private static Stream<Arguments> schemaEvolArgs() {
return Stream.of(
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.AVRO),

Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecordType.SPARK));
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.AVRO),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.AVRO),

Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, HoodieRecordType.SPARK),
Arguments.of(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, HoodieRecordType.SPARK));
}

private static Stream<Arguments> provideValidCliArgs() {
Expand Down Expand Up @@ -569,8 +560,8 @@ private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer ex
// TODO add tests w/ disabled reconciliation
@ParameterizedTest
@MethodSource("schemaEvolArgs")
public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor, HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema;
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
// Insert data produced with Schema A, pass Schema A
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()),
Expand All @@ -579,9 +570,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}

new HoodieDeltaStreamer(cfg, jsc).sync();
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
Expand All @@ -593,9 +582,6 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
Expand All @@ -619,9 +605,6 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
if (useUserProvidedSchema) {
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
}
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
}
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
new HoodieDeltaStreamer(cfg, jsc).sync();
// again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
Expand All @@ -636,11 +619,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
assertNotNull(tableSchema);

Schema expectedSchema;
if (!useSchemaPostProcessor) {
expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved.avsc")));
} else {
expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved_post_processed.avsc")));
}
expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved.avsc")));
assertEquals(expectedSchema, tableSchema);

// clean up and reinit
Expand Down
Loading
Loading