diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b426cb1509621..309cdfa761b47 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -106,17 +106,17 @@ private FlinkOptions() { .defaultValue(4) .withDescription("Parallelism of tasks that do actual read, default is 4"); - public static final ConfigOption READ_AVRO_SCHEMA_PATH = ConfigOptions - .key("read.avro-schema.path") + public static final ConfigOption SOURCE_AVRO_SCHEMA_PATH = ConfigOptions + .key("source.avro-schema.path") .stringType() .noDefaultValue() - .withDescription("Avro schema file path, the parsed schema is used for deserialization"); + .withDescription("Source avro schema file path, the parsed schema is used for deserialization"); - public static final ConfigOption READ_AVRO_SCHEMA = ConfigOptions - .key("read.avro-schema") + public static final ConfigOption SOURCE_AVRO_SCHEMA = ConfigOptions + .key("source.avro-schema") .stringType() .noDefaultValue() - .withDescription("Avro schema string, the parsed schema is used for deserialization"); + .withDescription("Source avro schema string, the parsed schema is used for deserialization"); public static final String QUERY_TYPE_SNAPSHOT = "snapshot"; public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized"; diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java index edcd928b22c08..1443a68cf0fc2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java +++ b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java @@ -64,10 +64,10 @@ public FilebasedSchemaProvider(TypedProperties props) { } public FilebasedSchemaProvider(Configuration conf) { - final String readSchemaPath = conf.getString(FlinkOptions.READ_AVRO_SCHEMA_PATH); - final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf()); + final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH); + final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf()); try { - this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath))); + this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath))); } catch (IOException ioe) { throw new HoodieIOException("Error reading schema", ioe); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 73c3448c20241..0b4533f7c606e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -63,11 +63,6 @@ public class FlinkStreamerConfig extends Configuration { required = true) public String targetBasePath; - @Parameter(names = {"--read-schema-path"}, - description = "Avro schema file path, the parsed schema is used for deserializing.", - required = true) - public String readSchemaFilePath; - @Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true) public String targetTableName; @@ -150,11 +145,11 @@ public class FlinkStreamerConfig extends Configuration { description = "Whether to load partitions in state if partition path matching, default *") public String indexPartitionRegex = ".*"; - @Parameter(names = {"--avro-schema-path"}, description = "Avro schema file path, the parsed schema is used for deserialization") - public String avroSchemaPath = ""; + @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization") + public String sourceAvroSchemaPath = ""; - @Parameter(names = {"--avro-schema"}, description = "Avro schema string, the parsed schema is used for deserialization") - public String avroSchema = ""; + @Parameter(names = {"--source-avro-schema"}, description = "Source avro schema string, the parsed schema is used for deserialization") + public String sourceAvroSchema = ""; @Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch" + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" @@ -292,7 +287,6 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt org.apache.flink.configuration.Configuration conf = fromMap(propsMap); conf.setString(FlinkOptions.PATH, config.targetBasePath); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath); conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); @@ -316,8 +310,8 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setDouble(FlinkOptions.INDEX_STATE_TTL, config.indexStateTtl); conf.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, config.indexGlobalEnabled); conf.setString(FlinkOptions.INDEX_PARTITION_REGEX, config.indexPartitionRegex); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, config.sourceAvroSchemaPath); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, config.sourceAvroSchema); conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone); conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode); conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 808e9c7e7255c..2eeb8f58b82a2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -203,17 +203,17 @@ private static void setupCleaningOptions(Configuration conf) { /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) - * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and - * {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified. + * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and + * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified. * * @param conf The configuration * @param rowType The specified table row type */ private static void inferAvroSchema(Configuration conf, LogicalType rowType) { - if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent() - && !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) { + if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent() + && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) { String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString(); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA, inferredSchema); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e8927dc7f34f3..8c882abe5dc21 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -70,7 +70,7 @@ public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString()); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString()); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7530f08c204a7..66dd934867fd6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -86,15 +86,15 @@ public static Schema getSourceSchema(FlinkStreamerConfig cfg) { } public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) { - if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()) { + if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) { return new FilebasedSchemaProvider(conf).getSourceSchema(); - } else if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) { - final String schemaStr = conf.get(FlinkOptions.READ_AVRO_SCHEMA); + } else if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) { + final String schemaStr = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA); return new Schema.Parser().parse(schemaStr); } else { final String errorMsg = String.format("Either option '%s' or '%s' " + "should be specified for avro schema deserialization", - FlinkOptions.READ_AVRO_SCHEMA_PATH.key(), FlinkOptions.READ_AVRO_SCHEMA.key()); + FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key()); throw new HoodieException(errorMsg); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index d7ec69321a918..e40741ce1c947 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -120,14 +120,14 @@ void testInferAvroSchemaForSource() { final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)); final Configuration conf1 = tableSource1.getConf(); - assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA)); + assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA)); // set up the explicit schema using the file path - this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); + this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf)); Configuration conf2 = tableSource2.getConf(); - assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); + assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null"); } @Test @@ -207,14 +207,14 @@ void testInferAvroSchemaForSink() { final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); final Configuration conf1 = tableSink1.getConf(); - assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA)); + assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA)); // set up the explicit schema using the file path - this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); + this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH); HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); Configuration conf2 = tableSink2.getConf(); - assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null"); + assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null"); } @Test diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index b21f8bb1cb1a6..d0ff469c24c1a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -144,7 +144,7 @@ public static String getCollectSinkDDL(String tableName, TableSchema tableSchema public static Configuration getDefaultConf(String tablePath) { Configuration conf = new Configuration(); conf.setString(FlinkOptions.PATH, tablePath); - conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource("test_read_schema.avsc")).toString()); conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable"); @@ -155,7 +155,7 @@ public static Configuration getDefaultConf(String tablePath) { public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) { FlinkStreamerConfig streamerConf = new FlinkStreamerConfig(); streamerConf.targetBasePath = tablePath; - streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread() + streamerConf.sourceAvroSchemaPath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource("test_read_schema.avsc")).toString(); streamerConf.targetTableName = "TestHoodieTable"; streamerConf.partitionPathField = "partition";