Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,17 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
}

private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
boolean isDelete = false;
if (indexedRecord.isPresent()) {
updatedRecordsWritten++;
GenericRecord record = (GenericRecord) indexedRecord.get();
if (oldRecord != record) {
// the incoming record is chosen
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
}
}
return writeRecord(hoodieRecord, indexedRecord);
return writeRecord(hoodieRecord, indexedRecord, isDelete);
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -264,24 +270,25 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
if (writeRecord(hoodieRecord, insertRecord)) {
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
insertRecordsWritten++;
}
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
return writeRecord(hoodieRecord, indexedRecord, false);
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
+ hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
return false;
}
if (HoodieOperation.isDelete(hoodieRecord.getOperation())) {
indexedRecord = Option.empty();
}
try {
if (indexedRecord.isPresent()) {
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
Expand Down Expand Up @@ -321,7 +328,7 @@ public void write(GenericRecord oldRecord) {
if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
} else if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the the combined new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -156,28 +157,52 @@ public Result inputSplits(
}

String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;

Set<String> writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());

Set<String> writePartitions;
final FileStatus[] fileStatuses;

if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
if (this.requiredPartitions != null) {
// apply partition push down
fileIndex.setPartitionPaths(this.requiredPartitions);
}
writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();
} else {
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;

writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,13 +683,18 @@ public boolean reachedEnd() throws IOException {
// deleted
continue;
} else {
final RowKind rowKind = FormatUtils.getRowKindSafely(mergedAvroRecord.get(), this.operationPos);
if (!emitDelete && rowKind == RowKind.DELETE) {
// deleted
continue;
}
GenericRecord avroRecord = buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos);
this.currentRecord.setRowKind(rowKind);
return false;
}
}
Expand Down Expand Up @@ -746,9 +751,6 @@ private Option<IndexedRecord> mergeRowWithLog(
RowData curRow,
String curKey) throws IOException {
final HoodieRecord<?> record = scanner.getRecords().get(curKey);
if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
return Option.empty();
}
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -221,8 +222,9 @@ void testReadBaseAndLogFilesWithDeletes() throws Exception {
assertThat(actual2, is(expected2));
}

@Test
void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
beforeEach(HoodieTableType.MERGE_ON_READ, options);
Expand All @@ -233,7 +235,7 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);

// write another commit using logs and read again.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact);
TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);

InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
Expand All @@ -242,9 +244,11 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
// when isEmitDelete is false.
List<RowData> result1 = readData(inputFormat);

final String rowKind = compact ? "I" : "U";
final String expected = "[+" + rowKind + "[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";

final String actual1 = TestData.rowDataToString(result1);
final String expected1 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
assertThat(actual1, is(expected1));
assertThat(actual1, is(expected));

// refresh the input format and set isEmitDelete to true.
this.tableSource.reset();
Expand All @@ -254,8 +258,7 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete() throws Exception {
List<RowData> result2 = readData(inputFormat);

final String actual2 = TestData.rowDataToString(result2);
final String expected2 = "[+U[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
assertThat(actual2, is(expected2));
assertThat(actual2, is(expected));
}

@Test
Expand Down