diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java index 4d3f71859440c..2998373ab8b02 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java @@ -116,7 +116,7 @@ private HoodieWriteConfig.Builder getConfigBuilder(HoodieHBaseIndexConfig hoodie private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Option resourceAllocatorClass) { HoodieHBaseIndexConfig.Builder builder = new HoodieHBaseIndexConfig.Builder() - .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName).hbaseIndexGetBatchSize(100); if (resourceAllocatorClass.isPresent()) { builder.withQPSResourceAllocatorType(resourceAllocatorClass.get()); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index f73c5c2508be3..a608d43239a0e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -418,7 +418,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() { .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE) .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder() - .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) .hbaseIndexGetBatchSize(100).build()) .build()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index e7f3f08312383..35d969e8de2b6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -188,15 +188,11 @@ private static Configuration addProjectionField(Configuration conf, String field return conf; } - private static Configuration addRequiredProjectionFields(Configuration configuration) { + private static void addRequiredProjectionFields(Configuration configuration) { // Need this to do merge records in HoodieRealtimeRecordReader - configuration = - addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); - configuration = - addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS); - configuration = - addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS); - return configuration; + addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); + addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS); + addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS); } /** @@ -205,7 +201,7 @@ private static Configuration addRequiredProjectionFields(Configuration configura * e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438) */ - private static Configuration cleanProjectionColumnIds(Configuration conf) { + private static void cleanProjectionColumnIds(Configuration conf) { String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') { conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1)); @@ -213,23 +209,22 @@ private static Configuration cleanProjectionColumnIds(Configuration conf) { LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"); } } - return conf; } @Override - public RecordReader getRecordReader(final InputSplit split, final JobConf job, + public RecordReader getRecordReader(final InputSplit split, final JobConf jobConf, final Reporter reporter) throws IOException { // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible // latency incurred here due to the synchronization since get record reader is called once per spilt before the // actual heavy lifting of reading the parquet files happen. - if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { - synchronized (job) { + if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) { + synchronized (jobConf) { LOG.info( - "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { + "Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) { // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases // hoodie additional projection columns are reset after calling setConf and only natural projections @@ -237,21 +232,23 @@ public RecordReader getRecordReader(final InputSpli // For e:g _hoodie_record_key would be missing and merge step would throw exceptions. // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. - this.conf = cleanProjectionColumnIds(job); - this.conf = addRequiredProjectionFields(job); + cleanProjectionColumnIds(jobConf); + addRequiredProjectionFields(jobConf); + + this.conf = jobConf; this.conf.set(HOODIE_READ_COLUMNS_PROP, "true"); } } } - LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // sanity check Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit, "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split); - return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, - super.getRecordReader(split, job, reporter)); + return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf, + super.getRecordReader(split, jobConf, reporter)); } @Override