Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val beginInstantTime = HoodieDataSourceHelpers.listCommitsSince(fs, "/user/hive/warehouse/stock_ticks_cow", "00000").get(0)
val hoodieIncQueryDF = spark.read.format("org.apache.hudi").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginInstantTime).
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key(), beginInstantTime).
load("/user/hive/warehouse/stock_ticks_cow");
hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr where symbol = 'GOOG'").show(100, false);
Expand All @@ -37,30 +37,30 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism","2").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
option(HoodieWriteConfig.TABLE_NAME, "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key(), "key").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key(), "ts").
option(HoodieWriteConfig.TABLE_NAME.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY.key(), "stock_ticks_derived_mor").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY.key(), "default").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER_OPT_KEY.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");

spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)

val hoodieIncQueryBsDF = spark.read.format("org.apache.hudi").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "00000000000001").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key(), "00000000000001").
load("/user/hive/warehouse/stock_ticks_cow_bs");
hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);
Expand All @@ -69,21 +69,21 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism","2").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
option(HoodieWriteConfig.TABLE_NAME, "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key(), "key").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key(), "ts").
option(HoodieWriteConfig.TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY.key(), "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY.key(), "default").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY.key(), "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER_OPT_KEY.key(), "hive").
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY.key(), "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY.key(), "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key(), "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ public String showLogFileRecords(
.getCommitTimeline().lastInstant().get().getTimestamp())
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP.defaultValue()))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue()))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public String overwriteHoodieProperties(
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Properties newProps = new Properties();
newProps.load(new FileInputStream(new File(overwriteFilePath)));
Map<String, String> oldProps = client.getTableConfig().getProps();
Map<String, String> oldProps = client.getTableConfig().propsMap();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.createHoodieProperties(client.getFs(), metaPathDir, newProps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();

properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, sourcePath);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, keyGeneratorClass);
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER, fullBootstrapInputProvider);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM, parallelism);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, selectorClass);
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), recordKeyCols);
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), partitionFields);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key(), sourcePath);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key(), keyGeneratorClass);
properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER.key(), fullBootstrapInputProvider);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key(), parallelism);
properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR.key(), selectorClass);
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY().key(), recordKeyCols);
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY().key(), partitionFields);

HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetTableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public String descTable() {
rows.add(new Comparable[] {"basePath", client.getBasePath()});
rows.add(new Comparable[] {"metaPath", client.getMetaPath()});
rows.add(new Comparable[] {"fileSystem", client.getFs().getScheme()});
client.getTableConfig().getProps().entrySet().forEach(e -> {
client.getTableConfig().propsMap().entrySet().forEach(e -> {
rows.add(new Comparable[] {e.getKey(), e.getValue()});
});
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ object SparkHelpers {
def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble,
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP.defaultValue.toDouble,
HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, filter)
val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE_BYTES.defaultValue.toInt, HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue.toDouble)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP.defaultValue()))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue()))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void init() throws IOException {
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
HoodieTableConfig.DEFAULT_ARCHIVELOG_FOLDER, TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue(), TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
}

/**
Expand Down Expand Up @@ -156,10 +156,10 @@ public void testOverwriteHoodieProperties() throws IOException {
CommandResult cr = getShell().executeCommand("repair overwrite-hoodie-props --new-props-file " + newProps.getPath());
assertTrue(cr.isSuccess());

Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().getProps();
Map<String, String> oldProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();

// after overwrite, the stored value in .hoodie is equals to which read from properties.
Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
Map<String, String> result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().propsMap();
Properties expectProps = new Properties();
expectProps.load(new FileInputStream(new File(newProps.getPath())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand All @@ -37,7 +38,6 @@

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Properties;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
Expand Down Expand Up @@ -81,7 +81,7 @@ public void testDowngradeCommand() throws Exception {
// update hoodie.table.version to 1
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
try (FSDataOutputStream os = metaClient.getFs().create(new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) {
metaClient.getTableConfig().getProperties().store(os, "");
metaClient.getTableConfig().getProps().store(os, "");
}
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());

Expand Down Expand Up @@ -109,9 +109,9 @@ private void assertTableVersionFromPropertyFile() throws IOException {
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
// Load the properties and verify
FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile);
Properties prop = new Properties();
prop.load(fsDataInputStream);
HoodieConfig hoodieConfig = HoodieConfig.create(fsDataInputStream);
fsDataInputStream.close();
assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME));
assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), hoodieConfig
.getString(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;

/**
* Write commit callback http client.
Expand All @@ -47,10 +46,10 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable {
private final String apiKey;
private final String url;
private final CloseableHttpClient client;
private Properties props;
private HoodieWriteConfig writeConfig;

public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) {
this.props = config.getProps();
this.writeConfig = config;
this.apiKey = getApiKey();
this.url = getUrl();
this.client = getClient();
Expand Down Expand Up @@ -80,11 +79,11 @@ public void send(String callbackMsg) {
}

private String getApiKey() {
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
return writeConfig.getString(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
}

private String getUrl() {
return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
return writeConfig.getString(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
}

private CloseableHttpClient getClient() {
Expand All @@ -98,7 +97,7 @@ private CloseableHttpClient getClient() {
}

private Integer getHttpTimeoutSeconds() {
return Integer.parseInt(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS));
return writeConfig.getInt(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS);
}

@Override
Expand Down
Loading