Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";

// Allow duplicates with inserts while merging with existing records
private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts";
private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false";

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
Expand Down Expand Up @@ -330,6 +334,10 @@ public boolean isMergeDataValidationCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}

public boolean allowDuplicateInserts() {
return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS));
}

public EngineType getEngineType() {
return engineType;
}
Expand Down Expand Up @@ -1180,6 +1188,11 @@ public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
return this;
}

public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) {
props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles));
return this;
}

public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
Expand Down Expand Up @@ -1234,6 +1247,8 @@ protected void setDefaults() {
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS),
MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS);

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,45 @@
import java.util.Set;

@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
* <p>
* Simplified Logic:
* For every existing record
* Check if there is a new record coming in. If yes, merge two records and write to file
* else write the record as is
* For all pending records from incoming batch, write to file.
*
* Illustration with simple data.
* Incoming data:
* rec1_2, rec4_2, rec5_1, rec6_1
* Existing data:
* rec1_1, rec2_1, rec3_1, rec4_1
*
* For every existing record, merge w/ incoming if requried and write to storage.
* => rec1_1 and rec1_2 is merged to write rec1_2 to storage
* => rec2_1 is written as is
* => rec3_1 is written as is
* => rec4_2 and rec4_1 is merged to write rec4_2 to storage
* Write all pending records from incoming set to storage
* => rec5_1 and rec6_1
*
* Final snapshot in storage
* rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
*
* </p>
*/
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);

protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
private HoodieFileWriter<IndexedRecord> fileWriter;
protected HoodieFileWriter<IndexedRecord> fileWriter;

private Path newFilePath;
protected Path newFilePath;
private Path oldFilePath;
private long recordsWritten = 0;
protected long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

/**
* Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}}
* is set, this handle will be used instead of {@link HoodieMergeHandle}.
*
* Simplified Logic:
* For every existing record
* Write the record as is
* For all incoming records, write to file as is.
*
* Illustration with simple data.
* Incoming data:
* rec1_2, rec4_2, rec5_1, rec6_1
* Existing data:
* rec1_1, rec2_1, rec3_1, rec4_1
*
* For every existing record, write to storage as is.
* => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
* Write all records from incoming set to storage
* => rec1_2, rec4_2, rec5_1 and rec6_1
*
* Final snapshot in storage
* rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
*
* Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
* happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
*/
public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);

public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
}

public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
}

/**
* Write old record as is w/o merging with incoming record.
*/
@Override
public void write(GenericRecord oldRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : Have added this new handle and tested that it works as expected. But would like to call out that any new records would just get appended. For instance, if records to be deleted (with "_hoodie_is_deleted" set to true) are sent via "Insert" operation, this handle will just append and may not recognize the deleted records as we don't do any combineAndUpdate.

String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
try {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;

import java.io.Serializable;
Expand All @@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable {
*/
protected final WorkloadStat globalStat;

/**
* Write operation type.
*/
private WriteOperationType operationType;

public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
}

public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
this.operationType = operationType;
}

public WorkloadStat getGlobalStat() {
return globalStat;
}
Expand All @@ -62,11 +73,16 @@ public WorkloadStat getWorkloadStat(String partitionPath) {
return partitionPathStatMap.get(partitionPath);
}

public WriteOperationType getOperationType() {
return operationType;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadProfile {");
sb.append("globalStat=").append(globalStat).append(", ");
sb.append("partitionStat=").append(partitionPathStatMap);
sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
sb.append("operationType=").append(operationType);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.storage.HoodieConcatHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
Expand Down Expand Up @@ -119,7 +120,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
profile = new WorkloadProfile(buildProfile(inputRecordsRDD));
profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType);
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
Expand Down Expand Up @@ -320,6 +321,8 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
if (table.requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier);
} else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
} else {
return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
for (SmallFile smallFile : smallFiles) {
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
totalUnassignedInserts);
if (recordsToAppend > 0) {
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
Expand Down
Loading