Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
Expand Down Expand Up @@ -148,7 +149,13 @@ public static <R> HoodieRecord<R> tagAsNewRecordIfNeeded(HoodieRecord<R> record,
// separate filenames that the record is found in. This will result in setting
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
HoodieRecord<R> newRecord = record.newInstance();
HoodieRecord<R> newRecord = null;
if (record instanceof HoodieEmptyRecord) {
newRecord = record.newInstance(record.getKey(), record.getOperation());
} else {
newRecord = record.newInstance();
}

newRecord.unseal();
newRecord.setCurrentLocation(location.get());
newRecord.seal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand Down Expand Up @@ -139,23 +140,34 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
return;
}

MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
HoodieRecord populatedRecord =
record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps());

if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields);
// Inject custom logic for the record.
Option<Pair<HoodieRecord, Schema>> processedRecord = config.getRecordMerger().insert(record, schema, config.getProps());
if (!processedRecord.isPresent()
|| HoodieOperation.isDelete(processedRecord.get().getLeft().getOperation())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert all the changes to create handle, the delete messages already got handled in line 137.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the custom logic?

|| processedRecord.get().getLeft().isDelete(schema, config.getProps())) { // Decide to delete the record.
recordsDeleted++;
} else { // Decide to write the record.
record = processedRecord.get().getLeft();
schema = processedRecord.get().getRight();

MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
HoodieRecord populatedRecord =
record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps());

if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields);
}

// Update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(newRecordLocation);
record.seal();

recordsWritten++;
insertRecordsWritten++;
}

// Update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(newRecordLocation);
record.seal();

recordsWritten++;
insertRecordsWritten++;
} else {
recordsDeleted++;
}
Expand Down Expand Up @@ -183,10 +195,12 @@ public void write() {
} else {
keyIterator = recordMap.keySet().stream().iterator();
}

Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
while (keyIterator.hasNext()) {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
write(record, useWriterSchema ? writeSchemaWithMetaFields : writeSchema, config.getProps());
write(record, schema, config.getProps());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
Expand Down Expand Up @@ -267,26 +269,82 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {

protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema) throws IOException {
boolean isDelete = false;
// Precondition: combined record exists.
if (combineRecordOpt.isPresent()) {
if (oldRecord.getData() != combineRecordOpt.get().getData()) {
// the incoming record is chosen
isDelete = HoodieOperation.isDelete(newRecord.getOperation());
} else {
// the incoming record is dropped
// Save old data.
if (oldRecord.getData() == combineRecordOpt.get().getData()) {
return false;
}
updatedRecordsWritten++;

// If the new record is a delete record, we do delete no matter what
// the combined record is.
if (isDeleteRecord(newRecord, writerSchema, config.getProps())) {
doDelete(newRecord);
return true;
}

// Until now we know this is not a delete case, we inject custom logic to decide
// if the combined record should be written. Three cases here:
// 1. If the output is empty, the old data is written for safety since empty means unknown.
// 2. If the output is a delete record, the delete count is increased.
// 3. Otherwise, the resulting record is passed to the downstream.
Option<Pair<HoodieRecord, Schema>> processedRecord = recordMerger.insert(
combineRecordOpt.get(), writerSchema, config.getProps());
if (!processedRecord.isPresent()) {
// Save old data.
return false;
} else if (isDeleteRecord(processedRecord.get().getLeft(), processedRecord.get().getRight(), config.getProps())) {
doDelete(newRecord);
return true;
} else {
updatedRecordsWritten++;
combineRecordOpt = Option.of(processedRecord.get().getLeft());
writerSchema = processedRecord.get().getRight();
}
}

// TODO: clean logic in `writeRecord` function.
// Write the record finally.
return writeRecord(newRecord, combineRecordOpt, writerSchema, config.getPayloadConfig().getProps(), isDelete);
}

protected void doDelete(HoodieRecord record) {
recordsDeleted++;
updatedRecordsWritten++;
record.unseal();
record.clearNewLocation();
record.seal();
record.deflate();
writeStatus.markSuccess(record, record.getMetadata());
}

protected boolean isDeleteRecord(HoodieRecord record, Schema schema, TypedProperties props) throws IOException {
return record.isDelete(schema, props)
|| record instanceof HoodieEmptyRecord
|| (record.getData() != null && record.getData() instanceof EmptyHoodieRecordPayload)
|| HoodieOperation.isDelete(record.getOperation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the gains we bind all these check together ? Empty record and delete record are meant to be dropped but are they handled in the same logic originally?

Copy link
Collaborator Author

@linliu-code linliu-code Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, Hudi uses both empty record and delete record interchangeably to mean delete. So I combine here. I will modify this PR based on our discussion so this logic should be gone. But we should find a way to standardize these logics; otherwise, the scattered logic is really confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert all the changes to HoodieMergeHandle, then add a new interface to the Merger API:

/**
 * Checks the merged record valility before flushing into dist, if returns false, the given record would be ignored.
 * In some scenarios, the bussiness logic needs to check the validity of the merged record, so this interface give 
 * a chance for the user to do a sanity check.
 *
 * <p> This interface is experimental and might got evolved in the future.
**/
@Experimental
default boolean isValid(HoodieRecord record, Schema schema) {
  return true;
}

This interface would be invoked before each merged record flushing. Only merged record needs this check currently.

}

protected boolean isValidRecord(HoodieRecord record, Schema schema, TypedProperties props) throws IOException {
return !isDeleteRecord(record, schema, props) && !record.shouldIgnore(schema, props);
}

protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
// just skip the ignored record
if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
}
writeInsertRecord(newRecord, schema, config.getProps());

// Insert possible insert logic.
Option<Pair<HoodieRecord, Schema>> processedRecord = recordMerger.insert(
newRecord, schema, config.getProps());
if (!processedRecord.isPresent()
|| !isValidRecord(processedRecord.get().getLeft(), schema, config.getProps())) {
return;
}

writeInsertRecord(processedRecord.get().getLeft(), schema, config.getProps());
}

protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, Properties prop)
Expand All @@ -309,6 +367,10 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
return false;
}
try {
// Cases for DELETE:
// (1) The merged record is empty.
// (2) The merged record is a delete record.
// (3) The new record is a delete record.
if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
Expand All @@ -321,8 +383,7 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
}
writeStatus.markSuccess(newRecord, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
// part of marking record successful.
newRecord.deflate();
return true;
} catch (Exception e) {
Expand All @@ -346,9 +407,12 @@ public void write(HoodieRecord<T> oldRecord) {
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
try {
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
// Inject default/custom merging logic for update.
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(
oldRecord, oldSchema, newRecord, newSchema, props);
Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);

if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
Expand Down Expand Up @@ -398,12 +462,17 @@ protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema,

protected void writeIncomingRecords() throws IOException {
// write out any pending records (this can happen when inserts are turned into updates)
Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
TypedProperties props = config.getPayloadConfig().getProps();

Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();

while (newRecordsItr.hasNext()) {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
writeInsertRecord(hoodieRecord);
HoodieRecord<T> record = newRecordsItr.next();
if (!writtenRecordKeys.contains(record.getRecordKey())) {
writeInsertRecord(record);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
HoodieRecord<T> reducedRecord;
try {
reducedRecord = merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
reducedRecord = merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
} catch (IOException e) {
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -318,10 +319,47 @@ protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<Ho
assertFalse(commit.isPresent());
}

protected void deleteRecordsFromMORTable(HoodieTableMetaClient metaClient, List<HoodieKey> keys, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime,
boolean doExplicitCommit) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
JavaRDD<WriteStatus> statusesRdd = client.delete(jsc().parallelize(keys, 1), commitTime);
List<WriteStatus> statuses = statusesRdd.collect();
assertNoWriteErrors(statuses);
if (doExplicitCommit) {
client.commit(commitTime, statusesRdd);
}
assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath())));

Option<HoodieInstant> deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals(commitTime, deltaCommit.get().getTimestamp(),
"Latest Delta commit should match specified time");

Option<HoodieInstant> commit = reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
}

protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException {
return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension());
}

protected List<FileStatus> listAllFilesInPartition(HoodieTable table, String partitionPath) throws IOException {
return Arrays.asList(HoodieTestTable.of(table.getMetaClient()).listAllFilesInPartition(partitionPath));
}

protected List<FileStatus> listAllBaseFilesInPartition(HoodieTable table, String partitionPath) throws IOException {
String baseFileExtension = table.getBaseFileExtension();
return listAllFilesInPartition(table, partitionPath).stream()
.filter(status -> status.getPath().getName().endsWith(baseFileExtension))
.collect(Collectors.toList());
}

protected List<FileStatus> listAllLogFilesInPartition(HoodieTable table, String partitionPath) throws IOException {
String logFileExtension = table.getLogFileFormat().getFileExtension();
return listAllFilesInPartition(table, partitionPath).stream()
.filter(status -> status.getPath().getName().endsWith(logFileExtension)).collect(Collectors.toList());
}

protected Properties getPropertiesForKeyGen() {
return getPropertiesForKeyGen(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.testutils;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand Down Expand Up @@ -117,6 +118,21 @@ public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count, Strin
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}

/**
* Generate random Rows based on a set of given keys.
*
* @param
* @param keys A set of {@Link HoodieKey}
* @return the Dataset<Row>s thus generated.
*/
public static Dataset<Row> getRandomRowsWithKeys(SQLContext sqlContext, List<HoodieKey> keys, boolean isError) {
List<Row> records = new ArrayList<>();
for (HoodieKey key : keys) {
records.add(getRandomValue(key.getRecordKey(), key.getPartitionPath(), isError));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}

/**
* Generate random Row.
*
Expand Down Expand Up @@ -146,6 +162,36 @@ public static Row getRandomValue(String partitionPath, boolean isError) {
return new GenericRow(values);
}

/**
* Generate random Row based on given record key and partition path.
*
* @param key the id of the Row.
* @param partitionPath partition path to be set in the Row.
* @return the Row thus generated.
*/
public static Row getRandomValue(String key, String partitionPath, boolean isError) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[9];
values[0] = ""; //commit time
if (!isError) {
values[1] = ""; // commit seq no
} else {
values[1] = RANDOM.nextLong();
}
values[2] = key;
values[3] = partitionPath;
values[4] = ""; // filename
values[5] = UUID.randomUUID().toString();
values[6] = partitionPath;
values[7] = RANDOM.nextInt();
if (!isError) {
values[8] = RANDOM.nextLong();
} else {
values[8] = UUID.randomUUID().toString();
}
return new GenericRow(values);
}

/**
* Convert Dataset<Row>s to List of {@link InternalRow}s.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, Str

@Override
public boolean isDelete(Schema recordSchema, Properties props) {
return false;
return data == null || data.equals(SENTINEL);
}

@Override
Expand Down
Loading