Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,27 @@ private static synchronized Configuration addRequiredProjectionFields(Configurat
return configuration;
}

/**
* Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
* the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
* e.g. ",2,0,3" and will cause an error. This method is used to avoid this situation.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with you internally as well, this appears to be a bug in Hive. It is manifesting because Hudi has the need to append its minimum set of projection columns i.e its metadata columns even incase of a count query.

But ideally this needs to be fixed in Hive so it does not happen in the first place. https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L119

Can we file a Jira with Hive, and add it to the comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, after the discussion and some investigations, Hive is the first place causes this bug and creates the projection column ids like ",2,0,3". What my code does actually is to handle this bug inside Hudi.
Hive has fixed this bug after 3.0.0, but before 3.0.0 we would still face this problem. The Jira for Hive is here: https://issues.apache.org/jira/browse/HIVE-22438.

private static synchronized Configuration cleanProjectionColumnIds(Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for using synchronized ? (Is this for non Hive on MR based jobs ?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Actually I am not sure about this. But I find that HoodieParquetRealtimeInputFormat::addRequiredProjectionFields method is synchronized. I guess this method should be similar with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like for Spark, multiple tasks run in the same executor. I think this could be a use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi That makes sense. Although, the hoodie projection column ids are added by the method addRequiredProjectionFields right below by the realtime format (which is invoked by hive). Can we perform this check before adding those projection columns themselves ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, the weird comma is added in the HiveInputFormat.java and then it directly calls getRecordReader from HoodieParquetRealtimeInputFormat.java. I didn't see a way to do this check even earlier unless we do it in the Hive code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : You can synchronize on the passed conf object instead of static synchronization which becomes a global lock at the JVM level.

You can do something like
synchronized(conf) {
....
}
inside your cleanProjectionColumnIds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, looks ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : You can synchronize on the passed conf object instead of static synchronization which becomes a global lock at the JVM level.

You can do something like
synchronized(conf) {
....
}
inside your cleanProjectionColumnIds.

That make sense. Code changes are done.

String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
if (LOG.isDebugEnabled()) {
LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
}
}
return conf;
}

@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {

this.conf = cleanProjectionColumnIds(job);
LOG.info("Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));

Expand Down