Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private HoodieWriteConfig.Builder getConfigBuilder(HoodieHBaseIndexConfig hoodie

private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Option<String> resourceAllocatorClass) {
HoodieHBaseIndexConfig.Builder builder = new HoodieHBaseIndexConfig.Builder()
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName).hbaseIndexGetBatchSize(100);
if (resourceAllocatorClass.isPresent()) {
builder.withQPSResourceAllocatorType(resourceAllocatorClass.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() {
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
.hbaseIndexGetBatchSize(100).build())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,11 @@ private static Configuration addProjectionField(Configuration conf, String field
return conf;
}

private static Configuration addRequiredProjectionFields(Configuration configuration) {
private static void addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration =
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
configuration =
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
configuration =
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
return configuration;
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
}

/**
Expand All @@ -205,53 +201,54 @@ private static Configuration addRequiredProjectionFields(Configuration configura
* e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
* Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
*/
private static Configuration cleanProjectionColumnIds(Configuration conf) {
private static void cleanProjectionColumnIds(Configuration conf) {
String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
if (LOG.isDebugEnabled()) {
LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
}
}
return conf;
}

@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (job) {
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
// (one found in select queries) are set. things would break because of this.
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
this.conf = cleanProjectionColumnIds(job);
this.conf = addRequiredProjectionFields(job);
cleanProjectionColumnIds(jobConf);
addRequiredProjectionFields(jobConf);

this.conf = jobConf;
this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
}
}
}

LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);

return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
super.getRecordReader(split, job, reporter));
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
super.getRecordReader(split, jobConf, reporter));
}

@Override
Expand Down