Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,15 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
}
try {
if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
// Last-minute check.
boolean decision = recordMerger.shouldFlush(combineRecord.get(), schema, config.getProps());

if (decision) { // CASE (1): Flush the merged record.
writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
} else { // CASE (2): A delete operation.
recordsDeleted++;
}
} else {
recordsDeleted++;
// Clear the new location as the record was deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.testutils;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand Down Expand Up @@ -112,7 +113,30 @@ private static ExpressionEncoder getEncoder(StructType schema) {
public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count, String partitionPath, boolean isError) {
List<Row> records = new ArrayList<>();
for (long recordNum = 0; recordNum < count; recordNum++) {
records.add(getRandomValue(partitionPath, isError));
records.add(getRandomValue(partitionPath, isError, ""));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}

public static Dataset<Row> getRandomRowsWithCommitTime(SQLContext sqlContext,
int count,
String partitionPath,
boolean isError,
String commitTime) {
List<Row> records = new ArrayList<>();
for (long recordNum = 0; recordNum < count; recordNum++) {
records.add(getRandomValue(partitionPath, isError, commitTime));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}

public static Dataset<Row> getRandomRowsWithKeys(SQLContext sqlContext,
List<HoodieKey> keys,
boolean isError,
String commitTime) {
List<Row> records = new ArrayList<>();
for (HoodieKey key : keys) {
records.add(getRandomValue(key, isError, commitTime));
}
return sqlContext.createDataFrame(records, isError ? ERROR_STRUCT_TYPE : STRUCT_TYPE);
}
Expand All @@ -123,10 +147,10 @@ public static Dataset<Row> getRandomRows(SQLContext sqlContext, int count, Strin
* @param partitionPath partition path to be set in the Row.
* @return the Row thus generated.
*/
public static Row getRandomValue(String partitionPath, boolean isError) {
public static Row getRandomValue(String partitionPath, boolean isError, String commitTime) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[9];
values[0] = ""; //commit time
values[0] = commitTime; //commit time
if (!isError) {
values[1] = ""; // commit seq no
} else {
Expand All @@ -146,6 +170,35 @@ public static Row getRandomValue(String partitionPath, boolean isError) {
return new GenericRow(values);
}

/**
* Generate random Row with a given key.
*
* @param key a {@link HoodieKey} key.
* @return the Row thus generated.
*/
public static Row getRandomValue(HoodieKey key, boolean isError, String commitTime) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[9];
values[0] = commitTime; //commit time
if (!isError) {
values[1] = ""; // commit seq no
} else {
values[1] = RANDOM.nextLong();
}
values[2] = key.getRecordKey();
values[3] = key.getPartitionPath();
values[4] = ""; // filename
values[5] = UUID.randomUUID().toString();
values[6] = key.getPartitionPath();
values[7] = RANDOM.nextInt();
if (!isError) {
values[8] = RANDOM.nextLong();
} else {
values[8] = UUID.randomUUID().toString();
}
return new GenericRow(values);
}

/**
* Convert Dataset<Row>s to List of {@link InternalRow}s.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ public interface HoodieRecordMerger extends Serializable {
*/
Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException;


/**
* In some cases a business logic does some checks before flushing a merged record to the disk.
* This method does the check, and when false is returned, it means the merged record should not
* be flushed.
*
* @param record the merged record.
* @param schema the schema of the merged record.
* @return a boolean variable to indicate if the merged record should be returned or not.
*
* <p> This interface is experimental and might be evolved in the future.
**/
default boolean shouldFlush(HoodieRecord record, Schema schema, TypedProperties props) throws IOException {
return true;
}

/**
* The record type handled by the current merger.
* SPARK, AVRO, FLINK
Expand Down
Loading