Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E
.map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
.collect(Collectors.toList());
List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, false);
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), inputPaths, basePath(), new JobConf(hadoopConf()), true, populateMetaFields);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Schema getTableAvroSchemaFromDataFile() {
* @throws Exception
*/
public Schema getTableAvroSchema() throws Exception {
return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditiwari01 @codope folks, can you please elaborate why this has been changed? Why are we assuming that Table's schema should have meta-fields?

return getTableAvroSchema(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public static Schema getEvolvedSchema() throws IOException {
return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avsc"));
}

public static Schema getEvolvedCompatibleSchema() throws IOException {
return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved-compatible.avsc"));
}

public static List<IndexedRecord> generateEvolvedTestRecords(int from, int limit)
throws IOException, URISyntaxException {
return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit);
Expand Down
29 changes: 29 additions & 0 deletions hudi-common/src/test/resources/simple-test-evolved-compatible.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"},
{"name": "field1", "type": ["null", "string"], "default": null},
{"name": "field2", "type": ["null", "string"], "default": null}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,18 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -57,6 +54,7 @@ public abstract class AbstractRealtimeRecordReader {
private Schema readerSchema;
private Schema writerSchema;
private Schema hiveSchema;
private HoodieTableMetaClient metaClient;

public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
this.split = split;
Expand All @@ -65,15 +63,15 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
if (metaClient.getTableConfig().getPreCombineField() != null) {
this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, metaClient.getTableConfig().getPreCombineField());
}
this.usesCustomPayload = usesCustomPayload(metaClient);
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
init();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
} catch (Exception e) {
throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}

Expand All @@ -83,19 +81,14 @@ private boolean usesCustomPayload(HoodieTableMetaClient metaClient) {
}

/**
* Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
* Gets schema from HoodieTableMetaClient. If not, falls
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogFiles(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
LOG.info("Writer Schema From Log => " + writerSchema.toString(true));
}
private void init() throws Exception {
LOG.info("Getting writer schema from table avro schema ");
writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();

// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit;
Expand Down Expand Up @@ -58,6 +64,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -105,7 +112,9 @@ public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Excep
// Create 3 partitions, each partition holds one parquet file and 1000 records
List<File> partitionDirs = InputFormatTestUtil
.prepareMultiPartitionedParquetTable(tempDir, schema, 3, numRecords, commitTime, HoodieTableType.MERGE_ON_READ);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));

TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
Expand Down Expand Up @@ -185,7 +194,9 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));

TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
Expand Down Expand Up @@ -255,7 +266,9 @@ public void testMultiReaderRealtimeCombineHoodieInputFormat() throws Exception {
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));

String newCommitTime = "101";
// to trigger the bug of HUDI-1772, only update fileid2
Expand Down Expand Up @@ -323,7 +336,9 @@ public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(tempDir.toString(), commitTime, Option.of(commitMetadata));

// insert 1000 update records to log file 0
String newCommitTime = "101";
Expand Down
Loading