diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 2955147b4053f..0c3434433aeee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -264,7 +264,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) .collect(Collectors.toList()); List recordsRead = - HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false); + HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, populateMetaFields); // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 assertEquals(0, recordsRead.size(), "Must contain 0 records"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index a84a9482a6707..f2db4d692866a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -143,7 +143,7 @@ public Schema getTableAvroSchemaFromDataFile() { * @throws Exception */ public Schema getTableAvroSchema() throws Exception { - return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields()); + return getTableAvroSchema(true); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index ab77caa1bcb83..70d5a1bb3e9d6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -194,6 +194,10 @@ public static Schema getEvolvedSchema() throws IOException { return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avsc")); } + public static Schema getEvolvedCompatibleSchema() throws IOException { + return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved-compatible.avsc")); + } + public static List generateEvolvedTestRecords(int from, int limit) throws IOException, URISyntaxException { return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit); diff --git a/hudi-common/src/test/resources/simple-test-evolved-compatible.avsc b/hudi-common/src/test/resources/simple-test-evolved-compatible.avsc new file mode 100644 index 0000000000000..09463fa310937 --- /dev/null +++ b/hudi-common/src/test/resources/simple-test-evolved-compatible.avsc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ +"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": "int"}, + {"name": "favorite_color", "type": "string"}, + {"name": "field1", "type": ["null", "string"], "default": null}, + {"name": "field2", "type": ["null", "string"], "default": null} + ] +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 030e20f2278b4..dfdda9dfc8259 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -21,21 +21,18 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.LogReaderUtils; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.InputSplitUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -57,6 +54,7 @@ public abstract class AbstractRealtimeRecordReader { private Schema readerSchema; private Schema writerSchema; private Schema hiveSchema; + private HoodieTableMetaClient metaClient; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -65,15 +63,15 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); try { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); + metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); if (metaClient.getTableConfig().getPreCombineField() != null) { this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, metaClient.getTableConfig().getPreCombineField()); } this.usesCustomPayload = usesCustomPayload(metaClient); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); init(); - } catch (IOException e) { - throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); + } catch (Exception e) { + throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); } } @@ -83,19 +81,14 @@ private boolean usesCustomPayload(HoodieTableMetaClient metaClient) { } /** - * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls + * Gets schema from HoodieTableMetaClient. If not, falls * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the * job conf. */ - private void init() throws IOException { - Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFiles(), jobConf); - if (schemaFromLogFile == null) { - writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf); - LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); - } else { - writerSchema = schemaFromLogFile; - LOG.info("Writer Schema From Log => " + writerSchema.toString(true)); - } + private void init() throws Exception { + LOG.info("Getting writer schema from table avro schema "); + writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + // Add partitioning fields to writer schema for resulting row to contain null values for these fields String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); List partitioningFields = diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java index ec6ea0a8b3ec3..0a14af2212ac3 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java @@ -21,12 +21,18 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit; @@ -58,6 +64,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.stream.Collectors; @@ -105,7 +112,9 @@ public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Excep // Create 3 partitions, each partition holds one parquet file and 1000 records List partitionDirs = InputFormatTestUtil .prepareMultiPartitionedParquetTable(tempDir, schema, 3, numRecords, commitTime, HoodieTableType.MERGE_ON_READ); - InputFormatTestUtil.commit(tempDir, commitTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata)); TableDesc tblDesc = Utilities.defaultTd; // Set the input format @@ -185,7 +194,9 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws final int numRecords = 1000; // Create 3 parquet files with 1000 records each File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); - InputFormatTestUtil.commit(tempDir, commitTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata)); TableDesc tblDesc = Utilities.defaultTd; // Set the input format @@ -255,7 +266,9 @@ public void testMultiReaderRealtimeCombineHoodieInputFormat() throws Exception { final int numRecords = 1000; // Create 3 parquet files with 1000 records each File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); - InputFormatTestUtil.commit(tempDir, commitTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata)); String newCommitTime = "101"; // to trigger the bug of HUDI-1772, only update fileid2 @@ -323,7 +336,9 @@ public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception { final int numRecords = 1000; // Create 3 parquet files with 1000 records each File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); - InputFormatTestUtil.commit(tempDir, commitTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata)); // insert 1000 update records to log file 0 String newCommitTime = "101"; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index fc4eb7ce2c042..07a4a0250e5de 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -51,7 +51,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -60,6 +60,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; @@ -85,7 +86,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -136,16 +136,6 @@ private void setHiveColumnNameProps(List fields, JobConf jobConf, jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); } - protected Properties getPropertiesForKeyGen() { - Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); - properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); - properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); - properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); - properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path"); - return properties; - } - @ParameterizedTest @MethodSource("testArguments") public void testReader(ExternalSpillableMap.DiskMapType diskMapType, @@ -183,7 +173,10 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, HoodieTableType.MERGE_ON_READ) : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant, HoodieTableType.MERGE_ON_READ); - FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant); + + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); + FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant, commitMetadata); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -218,7 +211,7 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, long size = writer.getCurrentSize(); writer.close(); assertTrue(size > 0, "block - size should be > 0"); - FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); // create a split with baseFile (parquet file written earlier) and new log file(s) fileSlice.addLogFile(writer.getLogFile()); @@ -291,7 +284,9 @@ public void testUnMergedReader() throws Exception { final int secondBatchLastRecordKey = 2 * numRecords - 1; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, HoodieTableType.MERGE_ON_READ); - FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); + FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -303,7 +298,7 @@ public void testUnMergedReader() throws Exception { long size = writer.getCurrentSize(); writer.close(); assertTrue(size > 0, "block - size should be > 0"); - FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime); + FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata); // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( @@ -371,7 +366,10 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp int numberOfLogRecords = numberOfRecords / 2; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numberOfRecords, instantTime, HoodieTableType.MERGE_ON_READ); - InputFormatTestUtil.commit(basePath, instantTime); + + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -382,7 +380,9 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp long size = writer.getCurrentSize(); writer.close(); assertTrue(size > 0, "block - size should be > 0"); - InputFormatTestUtil.deltaCommit(basePath, newCommitTime); + commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); + FileCreateUtils.createDeltaCommit(basePath.toString(), newCommitTime, commitMetadata); // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( @@ -507,7 +507,9 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa File partitionDir = InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords, instantTime, HoodieTableType.MERGE_ON_READ); - InputFormatTestUtil.commit(basePath, instantTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); List firstSchemaFields = schema.getFields(); @@ -529,7 +531,10 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa newCommitTime, "101", 1); logFiles.add(writer.getLogFile()); writer.close(); - InputFormatTestUtil.deltaCommit(basePath, newCommitTime); + + commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION); + FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( @@ -572,6 +577,63 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa reader.close(); } + @Test + public void testSchemaEvolution() throws Exception { + ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK; + boolean isCompressionEnabled = true; + // initial commit + List logFiles = new ArrayList<>(); + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + String instantTime = "100"; + int numberOfRecords = 100; + int numberOfLogRecords = numberOfRecords / 2; + File partitionDir = + InputFormatTestUtil.prepareSimpleParquetTable(basePath, schema, 1, numberOfRecords, + instantTime, HoodieTableType.MERGE_ON_READ); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(basePath.toString(), instantTime, Option.of(commitMetadata)); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + List firstSchemaFields = schema.getFields(); + + // 2nd commit w/ evolved schema + Schema evolvedSchema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedCompatibleSchema()); + List secondSchemaFields = evolvedSchema.getFields(); + String newCommitTime = "101"; + File partitionDir1 = + InputFormatTestUtil.prepareSimpleParquetTable(basePath, evolvedSchema, 1, numberOfRecords, + instantTime, HoodieTableType.MERGE_ON_READ,"2017","05","01"); + HoodieCommitMetadata commitMetadata1 = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + evolvedSchema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(basePath.toString(), newCommitTime, Option.of(commitMetadata1)); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir1.getPath()); + + // create a split with baseFile from 1st commit. + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( + new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), + basePath.toUri().toString(), logFiles, newCommitTime, false, Option.empty()); + + // create a RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = new MapredParquetInputFormat().getRecordReader( + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), baseJobConf, null); + JobConf jobConf = new JobConf(baseJobConf); + + // Try to read all the fields passed by the new schema + setHiveColumnNameProps(secondSchemaFields, jobConf, true); + // This time read only the fields which are part of parquet + HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + // use reader to read base Parquet File and log file + NullWritable key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(key, value)) { + // keep reading + } + reader.close(); + } + private static Stream testArguments() { // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: partitioned return Stream.of( @@ -595,8 +657,7 @@ public void testIncrementalWithOnlylog() throws Exception { final int numRecords = 1000; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, HoodieTableType.MERGE_ON_READ); - //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); - createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -607,7 +668,7 @@ public void testIncrementalWithOnlylog() throws Exception { InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, numRecords, numRecords, 0); writer.close(); - createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0"); + createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0", schema.toString()); InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1); @@ -644,8 +705,7 @@ public void testIncrementalWithReplace() throws Exception { String baseInstant = "100"; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, HoodieTableType.MERGE_ON_READ); - //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); - createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); @@ -727,13 +787,17 @@ private void createDeltaCommitFile( String commitNumber, String partitionPath, String filePath, - String fileId) throws IOException { + String fileId, + String schemaStr) throws IOException { List writeStats = new ArrayList<>(); HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId); writeStats.add(writeStat); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); + if (schemaStr != null) { + commitMetadata.getExtraMetadata().put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); + } File file = basePath.resolve(".hoodie").resolve(commitNumber + ".deltacommit").toFile(); file.createNewFile(); FileOutputStream fileOutputStream = new FileOutputStream(file); @@ -765,7 +829,9 @@ public void testLogOnlyReader() throws Exception { long size = writer.getCurrentSize(); writer.close(); assertTrue(size > 0, "block - size should be > 0"); - FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, commitMetadata); // create a split with new log file(s) fileSlice.addLogFile(new HoodieLogFile(writer.getLogFile().getPath(), size)); RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus( @@ -807,7 +873,7 @@ public void testIncrementalWithCompaction() throws Exception { String baseInstant = "100"; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, HoodieTableType.MERGE_ON_READ); - createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0", schema.toString()); // Add the paths FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 836ad57121bd5..1185be65c196e 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -223,9 +223,14 @@ public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception { + return prepareSimpleParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, tableType, "2016","05","01"); + } + + public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, + int numberOfRecords, String commitNumber, HoodieTableType tableType, String year, String month, String date) throws Exception { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); - java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get(year, month, date)); setupPartition(basePath, partitionPath); createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 96fe47e0219d4..4a4ea2cdd1be9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -19,12 +19,13 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieDataSourceHelper._ import org.apache.hudi.HoodieMergeOnReadRDD.resolveAvroSchemaNullability import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath +import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils @@ -309,10 +310,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } } - private def mergeRowWithLog(curRow: InternalRow, curKey: String) = { + private def mergeRowWithLog(curRow: InternalRow, curKey: String) : org.apache.hudi.common.util.Option[IndexedRecord] = { val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord] - logRecords.get(curKey).getData + val mergedRec = logRecords.get(curKey).getData .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, payloadProps) + if (mergedRec.isPresent && mergedRec.get().getSchema != tableAvroSchema) { + org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord], tableAvroSchema).asInstanceOf[IndexedRecord]) + } else { + mergedRec + } } } }