Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hadoop.utils;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -43,6 +44,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -218,6 +220,18 @@ public static void addRequiredProjectionFields(Configuration configuration) {
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
}

public static boolean requiredProjectionFieldsExistInConf(Configuration configuration) {
String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
}

public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf));
}

/**
* Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
* the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,73 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws
assertEquals(3000, counter);
}

@Test
public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception {
// test for hudi-1722
Configuration conf = new Configuration();
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);

String newCommitTime = "101";
// to trigger the bug of HUDI-1772, only update fileid2
// insert 1000 update records to log file 2
// now fileid0, fileid1 has no log files, fileid2 has log file
HoodieLogFormat.Writer writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();

TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
ArrayList<String> alias = new ArrayList<>();
alias.add(tempDir.toAbsolutePath().toString());
tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);

MapredWork mrwork = new MapredWork();
mrwork.getMapWork().setPathToPartitionInfo(pt);
mrwork.getMapWork().setPathToAliases(tableAlias);
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
jobConf = new JobConf(conf);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
jobConf.set(HAS_MAP_WORK, "true");
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
// set SPLIT_MAXSIZE larger to create one split for 3 files groups
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");

HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
assertEquals(1, splits.length);
RecordReader<NullWritable, ArrayWritable> recordReader =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hello , just see one recordreader?

Copy link
Contributor Author

@xiarixiaoyao xiarixiaoyao Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we only create one combine recorder, but this recorder hold three RealtimeCompactedRecordReaders。
the creating order of those RealtimeCompactedRecordReaders lead this npe problem.
for test example:
combine recorder holds three RealtimeCompactedRecordReaders, we call them creader1, creader2, creader3
creader1: only has base file
creader2: only has base file
creader3: has base file and log file.

if creader3 is create firstly, hoodie additional projection columns will be added to jobConf and in this case the query will be ok
however if creader1 or creader2 is create firstly, no hoodie additional projection columns will be added to jobConf, the query will failed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
NullWritable nullWritable = recordReader.createKey();
ArrayWritable arrayWritable = recordReader.createValue();
int counter = 0;
while (recordReader.next(nullWritable, arrayWritable)) {
// read over all the splits
counter++;
}
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
assertEquals(3000, counter);
recordReader.close();
}

@Test
@Disabled
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,33 @@ public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionD
return writer;
}

public static void setProjectFieldsForInputFormat(JobConf jobConf,
Schema schema, String hiveColumnTypes) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
Configuration conf = HoodieTestUtils.getDefaultHadoopConf();

String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr"))
.map(Schema.Field::name).collect(Collectors.joining(","));
hiveColumnNames = hiveColumnNames + ",datestr";
String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
// skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]);
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
// skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]);
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
jobConf.addResource(conf);
}

public static void setPropsForInputFormat(JobConf jobConf,
Schema schema, String hiveColumnTypes) {
List<Schema.Field> fields = schema.getFields();
Expand Down