diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 9b3dc8df0098a..61be856d3662c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -72,6 +73,26 @@ public static List getLatestBaseFilesForPartition( return Collections.emptyList(); } + /** + * Fetches Pair of partition path and {@link FileSlice}s for interested partitions. + * + * @param partition Partition of interest + * @param hoodieTable Instance of {@link HoodieTable} of interest + * @return the list of {@link FileSlice} + */ + public static List getLatestFileSlicesForPartition( + final String partition, + final HoodieTable hoodieTable) { + Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant(); + if (latestCommitTime.isPresent()) { + return hoodieTable.getHoodieView() + .getLatestFileSlicesBeforeOrOn(partition, latestCommitTime.get().getTimestamp(), true) + .collect(toList()); + } + return Collections.emptyList(); + } + /** * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java index 2ccebb472f277..aae50e1f956c0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java @@ -25,7 +25,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -52,10 +51,11 @@ private Map loadPartitionBucketIdFileIdMapping( Map bucketIdToFileIdMapping = new HashMap<>(); hoodieTable.getMetaClient().reloadActiveTimeline(); HoodieIndexUtils - .getLatestBaseFilesForPartition(partition, hoodieTable) - .forEach(file -> { - String fileId = file.getFileId(); - String commitTime = file.getCommitTime(); + .getLatestFileSlicesForPartition(partition, hoodieTable) + .forEach(fileSlice -> { + String fileId = fileSlice.getFileId(); + String commitTime = fileSlice.getBaseInstantTime(); + int bucketId = BucketIdentifier.bucketIdFromFileId(fileId); if (!bucketIdToFileIdMapping.containsKey(bucketId)) { bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId)); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 2f00b82772395..8e7df833cc5df 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -45,9 +44,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.io.storage.HoodieAvroParquetWriter; -import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieOrcWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -152,27 +151,21 @@ public Path withInserts(String partition, String fileId, List reco return baseFilePath; } - public Map> withLogAppends(List records) throws Exception { + public Map> withLogAppends(String partition, String fileId, List records) throws Exception { Map> partitionToLogfilesMap = new HashMap<>(); - for (List groupedRecords : records.stream() - .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { - final Pair appendedLogFile = appendRecordsToLogFile(groupedRecords); - partitionToLogfilesMap.computeIfAbsent( - appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue()); - } + final Pair appendedLogFile = appendRecordsToLogFile(partition, fileId, records); + partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue()); return partitionToLogfilesMap; } - private Pair appendRecordsToLogFile(List groupedRecords) throws Exception { - String partitionPath = groupedRecords.get(0).getPartitionPath(); - HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation(); + private Pair appendRecordsToLogFile(String partitionPath, String fileId, List records) throws Exception { try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) - .overBaseCommit(location.getInstantTime()).withFs(fs).build()) { + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) + .overBaseCommit(currentInstantTime).withFs(fs).build()) { Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, currentInstantTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> { + logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> { try { GenericRecord val = (GenericRecord) ((HoodieRecordPayload) r.getData()).getInsertValue(schema).get(); HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), ""); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java index a96ce04077088..ea6418696c694 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java @@ -42,6 +42,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.Properties; @@ -89,8 +91,9 @@ public void testBucketIndexValidityCheck() { .withBucketNum("8").build(); } - @Test - public void testTagLocation() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTagLocation(boolean isInsert) throws Exception { String rowKey1 = UUID.randomUUID().toString(); String rowKey2 = UUID.randomUUID().toString(); String rowKey3 = UUID.randomUUID().toString(); @@ -119,9 +122,17 @@ public void testTagLocation() throws Exception { assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown())); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(table, SCHEMA); - testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1); - testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2); - testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3); + + if (isInsert) { + testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1); + testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2); + testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3); + } else { + testTable.addCommit("001").withLogAppends("2016/01/31", getRecordFileId(record1), record1); + testTable.addCommit("002").withLogAppends("2016/01/31", getRecordFileId(record2), record2); + testTable.addCommit("003").withLogAppends("2016/01/31", getRecordFileId(record3), record3); + } + taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context, HoodieSparkTable.create(config, context, metaClient)); assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> r.isCurrentLocationKnown()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java index 8940223926089..3b50d1b29b04f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.metadata.HoodieTableMetadataWriter; @@ -36,6 +37,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.UUID; public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable { @@ -116,4 +118,13 @@ public HoodieSparkWriteableTestTable withInserts(String partition, String fileId public Path withInserts(String partition, String fileId, List records) throws Exception { return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier()); } + + public HoodieSparkWriteableTestTable withLogAppends(String partition, String fileId, HoodieRecord... records) throws Exception { + withLogAppends(partition, fileId, Arrays.asList(records)); + return this; + } + + public Map> withLogAppends(String partition, String fileId, List records) throws Exception { + return super.withLogAppends(partition, fileId, records); + } }