Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ private FlinkOptions() {
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");

public static final ConfigOption<String> READ_AVRO_SCHEMA_PATH = ConfigOptions
.key("read.avro-schema.path")
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
.stringType()
.noDefaultValue()
.withDescription("Avro schema file path, the parsed schema is used for deserialization");
.withDescription("Source avro schema file path, the parsed schema is used for deserialization");

public static final ConfigOption<String> READ_AVRO_SCHEMA = ConfigOptions
.key("read.avro-schema")
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA = ConfigOptions
.key("source.avro-schema")
.stringType()
.noDefaultValue()
.withDescription("Avro schema string, the parsed schema is used for deserialization");
.withDescription("Source avro schema file path, the parsed schema is used for deserialization");

public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public FilebasedSchemaProvider(TypedProperties props) {
}

public FilebasedSchemaProvider(Configuration conf) {
final String readSchemaPath = conf.getString(FlinkOptions.READ_AVRO_SCHEMA_PATH);
final String readSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ public class FlinkStreamerConfig extends Configuration {
required = true)
public String targetBasePath;

@Parameter(names = {"--read-schema-path"},
description = "Avro schema file path, the parsed schema is used for deserializing.",
required = true)
public String readSchemaFilePath;

@Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
public String targetTableName;

Expand Down Expand Up @@ -150,11 +145,11 @@ public class FlinkStreamerConfig extends Configuration {
description = "Whether to load partitions in state if partition path matching, default *")
public String indexPartitionRegex = ".*";

@Parameter(names = {"--avro-schema-path"}, description = "Avro schema file path, the parsed schema is used for deserialization")
public String avroSchemaPath = "";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
public String sourceAvroSchemaPath = "";

@Parameter(names = {"--avro-schema"}, description = "Avro schema string, the parsed schema is used for deserialization")
public String avroSchema = "";
@Parameter(names = {"--source-avro-schema"}, description = "Source avro schema string, the parsed schema is used for deserialization")
public String sourceAvroSchema = "";

@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
Expand Down Expand Up @@ -292,7 +287,6 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);

conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
Expand All @@ -316,8 +310,8 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setDouble(FlinkOptions.INDEX_STATE_TTL, config.indexStateTtl);
conf.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, config.indexGlobalEnabled);
conf.setString(FlinkOptions.INDEX_PARTITION_REGEX, config.indexPartitionRegex);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, config.sourceAvroSchemaPath);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, config.sourceAvroSchema);
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,17 @@ private static void setupCleaningOptions(Configuration conf) {

/**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
*
* @param conf The configuration
* @param rowType The specified table row type
*/
private static void inferAvroSchema(Configuration conf, LogicalType rowType) {
if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()
&& !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
&& !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString();
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, inferredSchema);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static String getCompactionInstantTime(HoodieTableMetaClient metaClient)
public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
}

public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceSchema();
} else if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
final String schemaStr = conf.get(FlinkOptions.READ_AVRO_SCHEMA);
} else if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
final String schemaStr = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA);
return new Schema.Parser().parse(schemaStr);
} else {
final String errorMsg = String.format("Either option '%s' or '%s' "
+ "should be specified for avro schema deserialization",
FlinkOptions.READ_AVRO_SCHEMA_PATH.key(), FlinkOptions.READ_AVRO_SCHEMA.key());
FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key());
throw new HoodieException(errorMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ void testInferAvroSchemaForSource() {
final HoodieTableSource tableSource1 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA));
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));

// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSource tableSource2 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
Configuration conf2 = tableSource2.getConf();
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}

@Test
Expand Down Expand Up @@ -207,14 +207,14 @@ void testInferAvroSchemaForSink() {
final HoodieTableSink tableSink1 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA));
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));

// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSink tableSink2 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
Configuration conf2 = tableSink2.getConf();
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static String getCollectSinkDDL(String tableName, TableSchema tableSchema
public static Configuration getDefaultConf(String tablePath) {
Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, tablePath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH,
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
Expand All @@ -155,7 +155,7 @@ public static Configuration getDefaultConf(String tablePath) {
public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
streamerConf.targetBasePath = tablePath;
streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread()
streamerConf.sourceAvroSchemaPath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString();
streamerConf.targetTableName = "TestHoodieTable";
streamerConf.partitionPathField = "partition";
Expand Down