Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -82,12 +83,23 @@ private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inpu
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
Configuration conf = HoodieTestUtils.getDefaultHadoopConf();

String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr"))
.map(Schema.Field::name).collect(Collectors.joining(","));
hiveColumnNames = hiveColumnNames + ",datestr";

String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
hiveColumnTypes = hiveColumnTypes + ",string";
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
jobConf.set("partition_columns", "datestr");
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
conf.set("partition_columns", "datestr");
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes);
inputFormat.setConf(conf);
jobConf.addResource(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public static Schema addMetadataFields(Schema schema) {
return mergedSchema;
}

public static String addMetadataColumnTypes(String hiveColumnTypes) {
return "string,string,string,string,string," + hiveColumnTypes;
}

private static Schema initRecordKeySchema() {
Schema.Field recordKeyField =
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
Expand Down Expand Up @@ -89,13 +90,14 @@ public abstract class AbstractRealtimeRecordReader {
// Schema handles
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;

public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
this.split = split;
this.jobConf = job;
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get("partition_columns", ""));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
Expand Down Expand Up @@ -179,7 +181,8 @@ private static List<String> orderFields(String fieldNameCsv, String fieldOrderCs
/**
* Generate a reader schema off the provided writeSchema, to just project out the provided columns
*/
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
List<String> fieldNames) {
/**
* Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
* Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
Expand All @@ -191,8 +194,6 @@ public static Schema generateProjectionSchema(Schema writeSchema, List<String> f
*
*/
List<Schema.Field> projectedFields = new ArrayList<>();
Map<String, Schema.Field> schemaFieldsMap = writeSchema.getFields().stream()
.map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
for (String fn : fieldNames) {
Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
if (field == null) {
Expand All @@ -209,6 +210,11 @@ public static Schema generateProjectionSchema(Schema writeSchema, List<String> f
return projectedSchema;
}

public static Map<String, Field> getNameToFieldMap(Schema schema) {
return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need toLowerCase(). avro is case sensitive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is not related to the change here. can you just add a todo to consider case sensitivity down the line or may be file a JIRA. May be we need to understand what happens when fields just differ in case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a Jira https://issues.apache.org/jira/browse/HUDI-303 where we can follow up on.

.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}

/**
* Convert the projected read from delta record into an array writable
*/
Expand Down Expand Up @@ -321,20 +327,48 @@ private void init() throws IOException {
LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get("partition_columns", "");
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);

Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
readerSchema = generateProjectionSchema(writerSchema, projectionFields);

readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
}

private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
// Get all column names of hive table
String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
String[] hiveColumns = hiveColumnString.split(",");
List<Field> hiveSchemaFields = new ArrayList<>();

for (String columnName : hiveColumns) {
Field field = schemaFieldsMap.get(columnName.toLowerCase());

if (field != null) {
hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
} else {
// Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) hive

// They will get skipped as they won't be found in the original schema.
LOG.debug("Skipping Hive Column => " + columnName);
}
}

Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
writerSchema.isError());
hiveSchema.setFields(hiveSchemaFields);
return hiveSchema;
}

public Schema getReaderSchema() {
return readerSchema;
}
Expand All @@ -343,6 +377,10 @@ public Schema getWriterSchema() {
return writerSchema;
}

public Schema getHiveSchema() {
return hiveSchema;
}

public long getMaxCompactionMemoryInBytes() {
// jobConf.getMemoryForMapTask() returns in MB
return (long) Math
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getWriterSchema());
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
Expand Down Expand Up @@ -76,6 +77,8 @@

public class HoodieRealtimeRecordReaderTest {

private static final String PARTITION_COLUMN = "datestr";

private JobConf jobConf;
private FileSystem fs;
private Configuration hadoopConf;
Expand Down Expand Up @@ -158,7 +161,22 @@ public void testNonPartitionedReader() throws Exception {
testReader(false);
}

public void testReader(boolean partitioned) throws Exception {
private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf jobConf, boolean isPartitioned) {
String names = fields.stream().map(Field::name).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);

String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN))
.map(Field::name).collect(Collectors.joining(","));
if (isPartitioned) {
hiveOrderedColumnNames += "," + PARTITION_COLUMN;
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, PARTITION_COLUMN);
}
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames);
}

private void testReader(boolean partitioned) throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
Expand Down Expand Up @@ -213,13 +231,7 @@ public void testReader(boolean partitioned) throws Exception {
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
if (partitioned) {
jobConf.set("partition_columns", "datestr");
}
setHiveColumnNameProps(fields, jobConf, partitioned);

// validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
Expand Down Expand Up @@ -277,11 +289,7 @@ public void testUnMergedReader() throws Exception {
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
jobConf.set("partition_columns", "datestr");
setHiveColumnNameProps(fields, jobConf, true);
// Enable merge skipping.
jobConf.set("hoodie.realtime.merge.skip", "true");

Expand Down Expand Up @@ -356,12 +364,7 @@ public void testReaderWithNestedAndComplexSchema() throws Exception {
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();

String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
setHiveColumnNameProps(fields, jobConf, true);

// validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
Expand Down Expand Up @@ -502,11 +505,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception
assert (firstSchemaFields.containsAll(fields) == false);

// Try to read all the fields passed by the new schema
String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
setHiveColumnNameProps(fields, jobConf, true);

HoodieRealtimeRecordReader recordReader = null;
try {
Expand All @@ -518,11 +517,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception
}

// Try to read all the fields passed by the new schema
names = firstSchemaFields.stream().map(f -> f.name()).collect(Collectors.joining(","));
positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
setHiveColumnNameProps(firstSchemaFields, jobConf, true);
// This time read only the fields which are part of parquet
recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
// use reader to read base Parquet File and log file
Expand Down