diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index d8f0a01a911ed..6b827d3eec309 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -85,12 +85,12 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible // latency incurred here due to the synchronization since get record reader is called once per spilt before the // actual heavy lifting of reading the parquet files happen. - if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) { + if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) { synchronized (jobConf) { LOG.info( "Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) { + if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) { // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases // hoodie additional projection columns are reset after calling setConf and only natural projections diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index ce770bad15b78..db8de64fec7a9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hadoop.mapred.JobConf; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -218,6 +220,18 @@ public static void addRequiredProjectionFields(Configuration configuration) { addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS); } + public static boolean requiredProjectionFieldsExistInConf(Configuration configuration) { + String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""); + return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD) + && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + } + + public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { + return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null + || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf)); + } + /** * Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*), * the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java index a5b193126f8d0..5ec32d7150690 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java @@ -156,6 +156,73 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws assertEquals(3000, counter); } + @Test + public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception { + // test for hudi-1722 + Configuration conf = new Configuration(); + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 1000; + // Create 3 parquet files with 1000 records each + File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); + InputFormatTestUtil.commit(tempDir, commitTime); + + String newCommitTime = "101"; + // to trigger the bug of HUDI-1772, only update fileid2 + // insert 1000 update records to log file 2 + // now fileid0, fileid1 has no log files, fileid2 has log file + HoodieLogFormat.Writer writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap<>(); + LinkedHashMap> tableAlias = new LinkedHashMap<>(); + ArrayList alias = new ArrayList<>(); + alias.add(tempDir.toAbsolutePath().toString()); + tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias); + pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(tableAlias); + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + jobConf = new JobConf(conf); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // set SPLIT_MAXSIZE larger to create one split for 3 files groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); + // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups + assertEquals(1, splits.length); + RecordReader recordReader = + combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); + NullWritable nullWritable = recordReader.createKey(); + ArrayWritable arrayWritable = recordReader.createValue(); + int counter = 0; + while (recordReader.next(nullWritable, arrayWritable)) { + // read over all the splits + counter++; + } + // should read out 3 splits, each for file0, file1, file2 containing 1000 records each + assertEquals(3000, counter); + recordReader.close(); + } + @Test @Disabled public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 19568938d7ed7..aecbae46a3f55 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -325,6 +325,33 @@ public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionD return writer; } + public static void setProjectFieldsForInputFormat(JobConf jobConf, + Schema schema, String hiveColumnTypes) { + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + Configuration conf = HoodieTestUtils.getDefaultHadoopConf(); + + String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) + .map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",datestr"; + String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + // skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722 + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]); + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + // skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722 + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]); + conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); + jobConf.addResource(conf); + } + public static void setPropsForInputFormat(JobConf jobConf, Schema schema, String hiveColumnTypes) { List fields = schema.getFields();