Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand Down Expand Up @@ -72,6 +73,26 @@ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
return Collections.emptyList();
}

/**
* Fetches Pair of partition path and {@link FileSlice}s for interested partitions.
*
* @param partition Partition of interest
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link FileSlice}
*/
public static List<FileSlice> getLatestFileSlicesForPartition(
final String partition,
final HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
return hoodieTable.getHoodieView()
.getLatestFileSlicesBeforeOrOn(partition, latestCommitTime.get().getTimestamp(), true)
.collect(toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable fix, can we add a test case for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will add a test for this pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danny0405 , I have added a test for log type, please review again.

}
return Collections.emptyList();
}

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -52,10 +51,11 @@ private Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<>();
hoodieTable.getMetaClient().reloadActiveTimeline();
HoodieIndexUtils
.getLatestBaseFilesForPartition(partition, hoodieTable)
.forEach(file -> {
String fileId = file.getFileId();
String commitTime = file.getCommitTime();
.getLatestFileSlicesForPartition(partition, hoodieTable)
.forEach(fileSlice -> {
String fileId = fileSlice.getFileId();
String commitTime = fileSlice.getBaseInstantTime();
Comment on lines +56 to +57
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danny0405 Thanks for your suggestion, I have corrected it, please review again : )


int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -45,9 +44,9 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -152,27 +151,21 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
return baseFilePath;
}

public Map<String, List<HoodieLogFile>> withLogAppends(List<HoodieRecord> records) throws Exception {
public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
Map<String, List<HoodieLogFile>> partitionToLogfilesMap = new HashMap<>();
for (List<HoodieRecord> groupedRecords : records.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any special reason we rename the method from withLogAppends to withAppends ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems withAppends more consistent with former withInserts, I will recover to withLogAppends follow your advice.

.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(groupedRecords);
partitionToLogfilesMap.computeIfAbsent(
appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
}
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(partition, fileId, records);
partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
return partitionToLogfilesMap;
}

private Pair<String, HoodieLogFile> appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
String partitionPath = groupedRecords.get(0).getPartitionPath();
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
private Pair<String, HoodieLogFile> appendRecordsToLogFile(String partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
.overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
.overBaseCommit(currentInstantTime).withFs(fs).build()) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, currentInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> {
try {
GenericRecord val = (GenericRecord) ((HoodieRecordPayload) r.getData()).getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Properties;
Expand Down Expand Up @@ -89,8 +91,9 @@ public void testBucketIndexValidityCheck() {
.withBucketNum("8").build();
}

@Test
public void testTagLocation() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTagLocation(boolean isInsert) throws Exception {
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
Expand Down Expand Up @@ -119,9 +122,17 @@ public void testTagLocation() throws Exception {
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown()));

HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(table, SCHEMA);
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);

if (isInsert) {
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);
} else {
testTable.addCommit("001").withLogAppends("2016/01/31", getRecordFileId(record1), record1);
testTable.addCommit("002").withLogAppends("2016/01/31", getRecordFileId(record2), record2);
testTable.addCommit("003").withLogAppends("2016/01/31", getRecordFileId(record3), record3);
}

taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context,
HoodieSparkTable.create(config, context, metaClient));
assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> r.isCurrentLocationKnown())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand All @@ -36,6 +37,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
Expand Down Expand Up @@ -116,4 +118,13 @@ public HoodieSparkWriteableTestTable withInserts(String partition, String fileId
public Path withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier());
}

public HoodieSparkWriteableTestTable withLogAppends(String partition, String fileId, HoodieRecord... records) throws Exception {
withLogAppends(partition, fileId, Arrays.asList(records));
return this;
}

public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
return super.withLogAppends(partition, fileId, records);
}
}