diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java index aa1948811e2cc..3d56ffa445cd8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java @@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -82,12 +83,23 @@ private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inpu String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); Configuration conf = HoodieTestUtils.getDefaultHadoopConf(); + + String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) + .map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",datestr"; + + String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES); + hiveColumnTypes = hiveColumnTypes + ",string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - jobConf.set("partition_columns", "datestr"); + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - conf.set("partition_columns", "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes); inputFormat.setConf(conf); jobConf.addResource(conf); } diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 9514bcebe1031..68be7d33bca90 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -78,6 +78,7 @@ public class HoodieTestDataGenerator { + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java index 76ba1fddea201..4c0f983a59d41 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java @@ -130,6 +130,10 @@ public static Schema addMetadataFields(Schema schema) { return mergedSchema; } + public static String addMetadataColumnTypes(String hiveColumnTypes) { + return "string,string,string,string,string," + hiveColumnTypes; + } + private static Schema initRecordKeySchema() { Schema.Field recordKeyField = new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index ee62ecd794f1b..fda13d4fde3c2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.io.ArrayWritable; @@ -89,13 +90,14 @@ public abstract class AbstractRealtimeRecordReader { // Schema handles private Schema readerSchema; private Schema writerSchema; + private Schema hiveSchema; public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) { this.split = split; this.jobConf = job; LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - LOG.info("partitioningColumns ==> " + job.get("partition_columns", "")); + LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); try { this.usesCustomPayload = usesCustomPayload(); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); @@ -179,7 +181,8 @@ private static List orderFields(String fieldNameCsv, String fieldOrderCs /** * Generate a reader schema off the provided writeSchema, to just project out the provided columns */ - public static Schema generateProjectionSchema(Schema writeSchema, List fieldNames) { + public static Schema generateProjectionSchema(Schema writeSchema, Map schemaFieldsMap, + List fieldNames) { /** * Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas * Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using @@ -191,8 +194,6 @@ public static Schema generateProjectionSchema(Schema writeSchema, List f * */ List projectedFields = new ArrayList<>(); - Map schemaFieldsMap = writeSchema.getFields().stream() - .map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); for (String fn : fieldNames) { Schema.Field field = schemaFieldsMap.get(fn.toLowerCase()); if (field == null) { @@ -209,6 +210,11 @@ public static Schema generateProjectionSchema(Schema writeSchema, List f return projectedSchema; } + public static Map getNameToFieldMap(Schema schema) { + return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r)) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + } + /** * Convert the projected read from delta record into an array writable */ @@ -321,20 +327,48 @@ private void init() throws IOException { LOG.debug("Writer Schema From Log => " + writerSchema.getFields()); } // Add partitioning fields to writer schema for resulting row to contain null values for these fields - String partitionFields = jobConf.get("partition_columns", ""); + String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList()) : new ArrayList<>(); writerSchema = addPartitionFields(writerSchema, partitioningFields); List projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); + + Map schemaFieldsMap = getNameToFieldMap(writerSchema); + hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before - readerSchema = generateProjectionSchema(writerSchema, projectionFields); + + readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", split.getDeltaFilePaths(), split.getPath(), projectionFields)); } + private Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap) { + // Get all column names of hive table + String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); + String[] hiveColumns = hiveColumnString.split(","); + List hiveSchemaFields = new ArrayList<>(); + + for (String columnName : hiveColumns) { + Field field = schemaFieldsMap.get(columnName.toLowerCase()); + + if (field != null) { + hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); + } else { + // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema. + // They will get skipped as they won't be found in the original schema. + LOG.debug("Skipping Hive Column => " + columnName); + } + } + + Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(), + writerSchema.isError()); + hiveSchema.setFields(hiveSchemaFields); + return hiveSchema; + } + public Schema getReaderSchema() { return readerSchema; } @@ -343,6 +377,10 @@ public Schema getWriterSchema() { return writerSchema; } + public Schema getHiveSchema() { + return hiveSchema; + } + public long getMaxCompactionMemoryInBytes() { // jobConf.getMemoryForMapTask() returns in MB return (long) Math diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 1d6e6aa5ac1ad..766c702fa3d65 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -101,7 +101,7 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx } // we assume, a later safe record in the log, is newer than what we have in the map & // replace it. - ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getWriterSchema()); + ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable), diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 1b666288dc97d..eacbc23c1d951 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; @@ -76,6 +77,8 @@ public class HoodieRealtimeRecordReaderTest { + private static final String PARTITION_COLUMN = "datestr"; + private JobConf jobConf; private FileSystem fs; private Configuration hadoopConf; @@ -158,7 +161,22 @@ public void testNonPartitionedReader() throws Exception { testReader(false); } - public void testReader(boolean partitioned) throws Exception { + private void setHiveColumnNameProps(List fields, JobConf jobConf, boolean isPartitioned) { + String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + + String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN)) + .map(Field::name).collect(Collectors.joining(",")); + if (isPartitioned) { + hiveOrderedColumnNames += "," + PARTITION_COLUMN; + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, PARTITION_COLUMN); + } + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); + } + + private void testReader(boolean partitioned) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); @@ -213,13 +231,7 @@ public void testReader(boolean partitioned) throws Exception { new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); JobConf jobConf = new JobConf(); List fields = schema.getFields(); - String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); - String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - if (partitioned) { - jobConf.set("partition_columns", "datestr"); - } + setHiveColumnNameProps(fields, jobConf, partitioned); // validate record reader compaction HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); @@ -277,11 +289,7 @@ public void testUnMergedReader() throws Exception { new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); JobConf jobConf = new JobConf(); List fields = schema.getFields(); - String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); - String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - jobConf.set("partition_columns", "datestr"); + setHiveColumnNameProps(fields, jobConf, true); // Enable merge skipping. jobConf.set("hoodie.realtime.merge.skip", "true"); @@ -356,12 +364,7 @@ public void testReaderWithNestedAndComplexSchema() throws Exception { new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); JobConf jobConf = new JobConf(); List fields = schema.getFields(); - - String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(",")); - String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); - jobConf.set("partition_columns", "datestr"); + setHiveColumnNameProps(fields, jobConf, true); // validate record reader compaction HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); @@ -502,11 +505,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception assert (firstSchemaFields.containsAll(fields) == false); // Try to read all the fields passed by the new schema - String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(",")); - String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); - jobConf.set("partition_columns", "datestr"); + setHiveColumnNameProps(fields, jobConf, true); HoodieRealtimeRecordReader recordReader = null; try { @@ -518,11 +517,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception } // Try to read all the fields passed by the new schema - names = firstSchemaFields.stream().map(f -> f.name()).collect(Collectors.joining(",")); - positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); - jobConf.set("partition_columns", "datestr"); + setHiveColumnNameProps(firstSchemaFields, jobConf, true); // This time read only the fields which are part of parquet recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); // use reader to read base Parquet File and log file