diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 7b79f61e49bcf..c168924e65fe0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -76,6 +76,9 @@ public RecordReader getRecordReader(final InputSpli return createBootstrappingRecordReader(split, job, reporter); } + // adapt schema evolution + new SchemaEvolutionContext(split, job).doEvolutionForParquetFormat(); + if (LOG.isDebugEnabled()) { LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java new file mode 100644 index 0000000000000..59184d14b350b --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TablePathUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.internal.schema.action.InternalSchemaMerger; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class is responsible for calculating names and types of fields that are actual at a certain point in time for hive. + * If field is renamed in queried schema, its old name will be returned, which is relevant at the provided time. + * If type of field is changed, its old type will be returned, and projection will be created that will convert the old type to the queried one. + */ +public class SchemaEvolutionContext { + + private static final Logger LOG = LogManager.getLogger(SchemaEvolutionContext.class); + + private static final String HIVE_TMP_READ_COLUMN_NAMES_CONF_STR = "hive.tmp.io.file.readcolumn.ids"; + private static final String HIVE_TMP_COLUMNS = "hive.tmp.columns"; + private static final String HIVE_EVOLUTION_ENABLE = "hudi.hive.schema.evolution"; + + private final InputSplit split; + private final JobConf job; + private HoodieTableMetaClient metaClient; + public Option internalSchemaOption; + + public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException { + this(split, job, Option.empty()); + } + + public SchemaEvolutionContext(InputSplit split, JobConf job, Option metaClientOption) throws IOException { + this.split = split; + this.job = job; + this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); + if (this.metaClient == null) { + internalSchemaOption = Option.empty(); + return; + } + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + this.internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + } catch (Exception e) { + internalSchemaOption = Option.empty(); + LOG.warn(String.format("failed to get internal Schema from hudi table:%s", metaClient.getBasePathV2()), e); + } + LOG.info(String.format("finish init schema evolution for split: %s", split)); + } + + private HoodieTableMetaClient setUpHoodieTableMetaClient() throws IOException { + try { + Path inputPath = ((FileSplit)split).getPath(); + FileSystem fs = inputPath.getFileSystem(job); + Option tablePath = TablePathUtils.getTablePath(fs, inputPath); + return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(job).build(); + } catch (Exception e) { + LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)split).getPath()), e); + return null; + } + } + + /** + * Do schema evolution for RealtimeInputFormat. + * + * @param realtimeRecordReader recordReader for RealtimeInputFormat. + * @return + */ + public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realtimeRecordReader) throws Exception { + if (!(split instanceof RealtimeSplit)) { + LOG.warn(String.format("expect realtime split for mor table, but find other type split %s", split)); + return; + } + if (internalSchemaOption.isPresent()) { + Schema tableAvroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + List requiredColumns = getRequireColumn(job); + InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), + requiredColumns); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + String partitionFields = job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) + : new ArrayList<>(); + Schema writerSchema = AvroInternalSchemaConverter.convert(internalSchemaOption.get(), tableAvroSchema.getName()); + writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); + Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); + // we should get HoodieParquetInputFormat#HIVE_TMP_COLUMNS,since serdeConstants#LIST_COLUMNS maybe change by HoodieParquetInputFormat#setColumnNameList + Schema hiveSchema = realtimeRecordReader.constructHiveOrderedSchema(writerSchema, schemaFieldsMap, job.get(HIVE_TMP_COLUMNS)); + Schema readerSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName()); + // setUp evolution schema + realtimeRecordReader.setWriterSchema(writerSchema); + realtimeRecordReader.setReaderSchema(readerSchema); + realtimeRecordReader.setHiveSchema(hiveSchema); + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", + realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns)); + } + } + + /** + * Do schema evolution for ParquetFormat. + */ + public void doEvolutionForParquetFormat() { + if (internalSchemaOption.isPresent()) { + // reading hoodie schema evolution table + job.setBoolean(HIVE_EVOLUTION_ENABLE, true); + Path finalPath = ((FileSplit)split).getPath(); + InternalSchema prunedSchema; + List requiredColumns = getRequireColumn(job); + // No need trigger schema evolution for count(*)/count(1) operation + boolean disableSchemaEvolution = + requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); + if (!disableSchemaEvolution) { + prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); + InternalSchema querySchema = prunedSchema; + Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); + InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); + InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, + true).mergeSchema(); + List fields = mergedInternalSchema.columns(); + setColumnNameList(job, fields); + setColumnTypeList(job, fields); + pushDownFilter(job, querySchema, fileSchema); + } + } + } + + public void setColumnTypeList(JobConf job, List fields) { + List fullTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString(job.get(serdeConstants.LIST_COLUMN_TYPES)); + List tmpColIdList = Arrays.stream(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + if (tmpColIdList.size() != fields.size()) { + throw new HoodieException(String.format("The size of hive.io.file.readcolumn.ids: %s is not equal to projection columns: %s", + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), fields.stream().map(Types.Field::name).collect(Collectors.joining(",")))); + } + List fieldTypes = new ArrayList<>(); + for (int i = 0; i < tmpColIdList.size(); i++) { + Types.Field field = fields.get(i); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfosFromTypeString(fullTypeInfos.get(tmpColIdList.get(i)).getQualifiedName()).get(0); + TypeInfo fieldType = constructHiveSchemaFromType(field.type(), typeInfo); + fieldTypes.add(fieldType); + } + for (int i = 0; i < tmpColIdList.size(); i++) { + TypeInfo typeInfo = fieldTypes.get(i); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + int index = tmpColIdList.get(i); + fullTypeInfos.remove(index); + fullTypeInfos.add(index, typeInfo); + } + } + List fullColTypeList = TypeInfoUtils.getTypeStringsFromTypeInfo(fullTypeInfos); + String fullColTypeListString = String.join(",", fullColTypeList); + job.set(serdeConstants.LIST_COLUMN_TYPES, fullColTypeListString); + } + + private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) { + switch (type.typeId()) { + case RECORD: + Types.RecordType record = (Types.RecordType)type; + List fields = record.fields(); + ArrayList fieldTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + for (int index = 0; index < fields.size(); index++) { + StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type()); + fieldTypes.add(subTypeInfo); + String name = fields.get(index).name(); + fieldNames.add(name); + } + StructTypeInfo structTypeInfo = new StructTypeInfo(); + structTypeInfo.setAllStructFieldNames(fieldNames); + structTypeInfo.setAllStructFieldTypeInfos(fieldTypes); + return structTypeInfo; + case ARRAY: + ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo; + Types.ArrayType array = (Types.ArrayType) type; + TypeInfo subTypeInfo = getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType()); + listTypeInfo.setListElementTypeInfo(subTypeInfo); + return listTypeInfo; + case MAP: + Types.MapType map = (Types.MapType)type; + MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo; + TypeInfo keyType = getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType()); + TypeInfo valueType = getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType()); + MapTypeInfo mapType = new MapTypeInfo(); + mapType.setMapKeyTypeInfo(keyType); + mapType.setMapValueTypeInfo(valueType); + return mapType; + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIMESTAMP: + case STRING: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + return typeInfo; + case TIME: + throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type })); + default: + LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + } + } + + private TypeInfo getSchemaSubTypeInfo(TypeInfo hoodieTypeInfo, Type hiveType) { + TypeInfo subTypeInfo = TypeInfoUtils.getTypeInfosFromTypeString(hoodieTypeInfo.getQualifiedName()).get(0); + TypeInfo typeInfo; + if (subTypeInfo instanceof PrimitiveTypeInfo) { + typeInfo = subTypeInfo; + } else { + typeInfo = constructHiveSchemaFromType(hiveType, subTypeInfo); + } + return typeInfo; + } + + private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSchema fileSchema) { + String filterExprSerialized = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized != null) { + ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); + LinkedList exprNodes = new LinkedList(); + exprNodes.add(filterExpr); + while (!exprNodes.isEmpty()) { + int size = exprNodes.size(); + for (int i = 0; i < size; i++) { + ExprNodeDesc expr = exprNodes.poll(); + if (expr instanceof ExprNodeColumnDesc) { + String oldColumn = ((ExprNodeColumnDesc)expr).getColumn(); + String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema); + ((ExprNodeColumnDesc)expr).setColumn(newColumn); + } + List children = expr.getChildren(); + if (children != null) { + exprNodes.addAll(children); + } + } + } + String filterText = filterExpr.getExprString(); + String serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr); + if (LOG.isDebugEnabled()) { + LOG.debug("Pushdown initiated with filterText = " + filterText + ", filterExpr = " + + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr); + } + job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText); + job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr); + } + } + + private void setColumnNameList(JobConf job, List fields) { + if (fields == null) { + return; + } + List tmpColIdList = Arrays.asList(job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR).split(",")); + if (fields.size() != tmpColIdList.size()) { + return; + } + StringBuilder readColumnNames = new StringBuilder(); + List tmpColNameList = Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(",")); + List fullColNamelist = new ArrayList(tmpColNameList); + for (int index = 0; index < fields.size(); index++) { + String colName = fields.get(index).name(); + if (readColumnNames.length() > 0) { + readColumnNames.append(','); + } + readColumnNames.append(colName); + int id = Integer.parseInt(tmpColIdList.get(index)); + if (!colName.equals(fullColNamelist.get(id))) { + fullColNamelist.remove(id); + fullColNamelist.add(id, colName); + } + } + String readColumnNamesString = readColumnNames.toString(); + job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColumnNamesString); + String fullColNamelistString = String.join(",", fullColNamelist); + job.set(serdeConstants.LIST_COLUMNS, fullColNamelistString); + } + + public static List getRequireColumn(JobConf jobConf) { + String originColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + if (StringUtils.isNullOrEmpty(originColumnString)) { + jobConf.set(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR, jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + } + String hoodieFullColumnString = jobConf.get(HIVE_TMP_COLUMNS); + if (StringUtils.isNullOrEmpty(hoodieFullColumnString)) { + jobConf.set(HIVE_TMP_COLUMNS, jobConf.get(serdeConstants.LIST_COLUMNS)); + } + String tableColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR); + List tableColumns = Arrays.asList(tableColumnString.split(",")); + return new ArrayList<>(tableColumns); + } +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index dfdda9dfc8259..47da956578f7c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.Schema; @@ -55,6 +57,7 @@ public abstract class AbstractRealtimeRecordReader { private Schema writerSchema; private Schema hiveSchema; private HoodieTableMetaClient metaClient; + protected SchemaEvolutionContext schemaEvolutionContext; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -69,7 +72,12 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { } this.usesCustomPayload = usesCustomPayload(metaClient); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); - init(); + schemaEvolutionContext = new SchemaEvolutionContext(split, job, Option.of(metaClient)); + if (schemaEvolutionContext.internalSchemaOption.isPresent()) { + schemaEvolutionContext.doEvolutionForRealtimeInputFormat(this); + } else { + init(); + } } catch (Exception e) { throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); } @@ -99,7 +107,7 @@ private void init() throws Exception { jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); - hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); + hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS)); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before @@ -108,10 +116,7 @@ private void init() throws Exception { split.getDeltaLogPaths(), split.getPath(), projectionFields)); } - private Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap) { - // Get all column names of hive table - String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS); - LOG.info("Hive Columns : " + hiveColumnString); + public Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap, String hiveColumnString) { String[] hiveColumns = hiveColumnString.split(","); LOG.info("Hive Columns : " + hiveColumnString); List hiveSchemaFields = new ArrayList<>(); @@ -154,4 +159,16 @@ public RealtimeSplit getSplit() { public JobConf getJobConf() { return jobConf; } + + public void setReaderSchema(Schema readerSchema) { + this.readerSchema = readerSchema; + } + + public void setWriterSchema(Schema writerSchema) { + this.writerSchema = writerSchema; + } + + public void setHiveSchema(Schema hiveSchema) { + this.hiveSchema = hiveSchema; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index b1bd3df50f3ba..8e1f8530b4748 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -35,6 +35,7 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -44,7 +45,7 @@ import java.util.Map; import java.util.Set; -class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader +public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); @@ -92,6 +93,7 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) .withUseScanV2(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false)) + .withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())) .build(); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java new file mode 100644 index 0000000000000..071b954a17f75 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.SchemaEvolutionContext; +import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; + +import java.io.File; +import java.util.Date; + +@Tag("functional") +public class TestHiveTableSchemaEvolution { + + private SparkSession sparkSession = null; + + @BeforeEach + public void setUp() { + initSparkContexts("HiveSchemaEvolution"); + } + + private void initSparkContexts(String appName) { + SparkConf sparkConf = new SparkConf(); + if (HoodieSparkUtils.gteqSpark3_2()) { + sparkConf.set("spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); + } + sparkSession = SparkSession.builder().appName(appName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .withExtensions(new HoodieSparkSessionExtension()) + .config("hoodie.insert.shuffle.parallelism", "4") + .config("hoodie.upsert.shuffle.parallelism", "4") + .config("hoodie.delete.shuffle.parallelism", "4") + .config("hoodie.support.write.lock", "false") + .config("spark.sql.session.timeZone", "CTT") + .config("spark.sql.hive.convertMetastoreParquet", "false") + .config(sparkConf) + .master("local[1]").getOrCreate(); + sparkSession.sparkContext().setLogLevel("ERROR"); + } + + @Test + public void testCopyOnWriteTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + + HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("cow", splits[0], jobConf); + } + } + + @Test + public void testMergeOnReadTableForHive() throws Exception { + String tableName = "huditest" + new Date().getTime(); + File file = new File(System.getProperty("java.io.tmpdir") + tableName); + if (HoodieSparkUtils.gteqSpark3_1()) { + sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String path = new Path(file.getCanonicalPath()).toUri().toString(); + sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); + sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); + sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); + sparkSession.sql("alter table " + tableName + " alter column col1 type double"); + sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); + + HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); + JobConf jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, path); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEvolutionResult("mor", splits[0], jobConf); + } + } + + private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); + jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa"); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); + + SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf); + if ("cow".equals(tableType)) { + schemaEvolutionContext.doEvolutionForParquetFormat(); + } else { + // mot table + RealtimeSplit realtimeSplit = (RealtimeSplit) split; + RecordReader recordReader; + // for log only split, set the parquet reader as empty. + if (FSUtils.isLogFile(realtimeSplit.getPath())) { + recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); + } else { + // create a RecordReader to be used by HoodieRealtimeRecordReader + recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null); + } + RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader); + // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat + schemaEvolutionContext.doEvolutionForParquetFormat(); + schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader); + } + + assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); + assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); + } +} \ No newline at end of file diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml index 340152d19099a..92859002d7a2c 100644 --- a/packaging/hudi-hadoop-mr-bundle/pom.xml +++ b/packaging/hudi-hadoop-mr-bundle/pom.xml @@ -88,6 +88,7 @@ com.yammer.metrics:metrics-core commons-io:commons-io org.openjdk.jol:jol-core + com.github.ben-manes.caffeine:caffeine diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml index e29f6b709ef95..709029336a8c8 100644 --- a/packaging/hudi-presto-bundle/pom.xml +++ b/packaging/hudi-presto-bundle/pom.xml @@ -71,6 +71,7 @@ org.apache.parquet:parquet-avro org.apache.avro:avro + com.github.ben-manes.caffeine:caffeine org.codehaus.jackson:* org.apache.commons:commons-lang3 org.apache.hbase:hbase-common