diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 8d99c410d6b81..b1c5531a22fd0 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -67,6 +67,9 @@ import scala.Tuple2; import scala.Tuple3; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + /** * CLI command to display log file options. */ @@ -185,7 +188,7 @@ public String showLogFileRecords( .collect(Collectors.toList()); // logFilePaths size must > 1 - assert logFilePaths.size() > 0 : "There is no log file"; + checkArgument(logFilePaths.size() > 0, "There is no log file"); // TODO : readerSchema can change across blocks/log files, fix this inside Scanner AvroSchemaConverter converter = new AvroSchemaConverter(); @@ -218,6 +221,7 @@ public String showLogFileRecords( .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue()) .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) + .withPartition(getRelativePartitionPath(new Path(client.getBasePath()), new Path(logFilePaths.get(0)).getParent())) .build(); for (HoodieRecord hoodieRecord : scanner) { Option record = hoodieRecord.getData().getInsertValue(readerSchema); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index ee7fbda11b783..621061ae71122 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -203,6 +204,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc // get expected result of 10 records. List logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*"))) .map(status -> status.getPath().toString()).collect(Collectors.toList()); + assertTrue(logFilePaths.size() > 0); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(tablePath) @@ -221,6 +223,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue()) .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) + .withPartition(getRelativePartitionPath(new Path(tablePath), new Path(logFilePaths.get(0)).getParent())) .build(); Iterator> records = scanner.iterator(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 57fa4acc901a8..1bde88d3bb647 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -230,7 +230,7 @@ public static String getRelativePartitionPath(Path basePath, Path fullPartitionP /** * Obtain all the partition paths, that are present in this table, denoted by presence of - * {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}. + * {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE_PREFIX}. * * If the basePathStr is a subdirectory of .hoodie folder then we assume that the partitions of an internal * table (a hoodie table within the .hoodie directory) are to be obtained. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 4fa53bb41f9f8..536fec609542f 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -89,6 +89,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -574,12 +575,13 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType writer.close(); FileCreateUtils.createDeltaCommit(basePath, "100", fs); // scan all log blocks (across multiple log files) + List logFilePaths = logFiles.stream() + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); + assertTrue(logFilePaths.size() > 0); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) - .withLogFilePaths( - logFiles.stream() - .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) + .withLogFilePaths(logFilePaths) .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) @@ -589,6 +591,7 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logFilePaths.get(0)).getParent())) .build(); List scannedRecords = new ArrayList<>(); @@ -803,6 +806,7 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -881,6 +885,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); Set readKeys = new HashSet<>(200); @@ -968,6 +973,7 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); Set readKeys = new HashSet<>(200); @@ -1046,6 +1052,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); @@ -1092,6 +1099,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); @@ -1187,6 +1195,7 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); @@ -1290,6 +1299,7 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); @@ -1358,6 +1368,7 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); @@ -1409,6 +1420,7 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); final List readKeys = new ArrayList<>(100); @@ -1479,6 +1491,7 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @@ -1585,6 +1598,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); @@ -1659,6 +1673,7 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1 .withSpillableMapBasePath(BASE_OUTPUT_PATH) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent())) .build(); assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java index 67691a3ec7bd1..97a682c3a3903 100644 --- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java @@ -57,6 +57,8 @@ import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -346,7 +348,7 @@ private static HoodieMergedLogRecordScanner getScanner( List logPaths, Schema readSchema, String instant) { - return HoodieMergedLogRecordScanner.newBuilder() + HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) .withLogFilePaths(logPaths) @@ -358,8 +360,12 @@ private static HoodieMergedLogRecordScanner getScanner( .withMaxMemorySizeInBytes(1024 * 1024L) .withSpillableMapBasePath("/tmp/") .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) - .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) - .build(); + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); + if (!isNullOrEmpty(logPaths)) { + logRecordScannerBuilder + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); + } + return logRecordScannerBuilder.build(); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index fce9b75f764ea..f01993edc61c4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.format; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; @@ -43,6 +42,7 @@ import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.util.ArrayList; import java.util.Arrays; @@ -52,6 +52,10 @@ import java.util.Map; import java.util.function.Function; +import static org.apache.hudi.common.fs.FSUtils.getFs; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; + /** * Utilities for format. */ @@ -124,11 +128,13 @@ public static HoodieMergedLogRecordScanner logScanner( Schema logSchema, Configuration config, boolean withOperationField) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); - return HoodieMergedLogRecordScanner.newBuilder() + String basePath = split.getTablePath(); + List logPaths = split.getLogPaths().get(); + FileSystem fs = getFs(basePath, config); + HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) - .withBasePath(split.getTablePath()) - .withLogFilePaths(split.getLogPaths().get()) + .withBasePath(basePath) + .withLogFilePaths(logPaths) .withReaderSchema(logSchema) .withLatestInstantTime(split.getLatestCommit()) .withReadBlocksLazily( @@ -144,8 +150,12 @@ public static HoodieMergedLogRecordScanner logScanner( config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withInstantRange(split.getInstantRange()) - .withOperationField(withOperationField) - .build(); + .withOperationField(withOperationField); + if (!isNullOrEmpty(logPaths)) { + logRecordScannerBuilder + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); + } + return logRecordScannerBuilder.build(); } private static HoodieUnMergedLogRecordScanner unMergedLogScanner( @@ -153,7 +163,7 @@ private static HoodieUnMergedLogRecordScanner unMergedLogScanner( Schema logSchema, Configuration config, HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) { - FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + FileSystem fs = getFs(split.getTablePath(), config); return HoodieUnMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(split.getTablePath()) @@ -234,8 +244,8 @@ public static HoodieMergedLogRecordScanner logScanner( HoodieWriteConfig writeConfig, Configuration hadoopConf) { String basePath = writeConfig.getBasePath(); - return HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(FSUtils.getFs(basePath, hadoopConf)) + HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(getFs(basePath, hadoopConf)) .withBasePath(basePath) .withLogFilePaths(logPaths) .withReaderSchema(logSchema) @@ -246,8 +256,12 @@ public static HoodieMergedLogRecordScanner logScanner( .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .build(); + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()); + if (!isNullOrEmpty(logPaths)) { + logRecordScannerBuilder + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); + } + return logRecordScannerBuilder.build(); } private static Boolean string2Boolean(String s) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index c1e924056cfa2..f2439b4471d3c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -67,6 +67,8 @@ import java.util.stream.IntStream; import static junit.framework.TestCase.assertEquals; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -661,7 +663,7 @@ private static HoodieMergedLogRecordScanner getScanner( List logPaths, Schema readSchema, String instant) { - return HoodieMergedLogRecordScanner.newBuilder() + HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) .withBasePath(basePath) .withLogFilePaths(logPaths) @@ -673,8 +675,12 @@ private static HoodieMergedLogRecordScanner getScanner( .withMaxMemorySizeInBytes(1024 * 1024L) .withSpillableMapBasePath("/tmp/") .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) - .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) - .build(); + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()); + if (!isNullOrEmpty(logPaths)) { + logRecordScannerBuilder + .withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent())); + } + return logRecordScannerBuilder.build(); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index b917f004bcd06..9618f5f7caded 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -18,12 +18,6 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; @@ -35,15 +29,27 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; + class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -77,10 +83,11 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) - return HoodieMergedLogRecordScanner.newBuilder() + List logPaths = split.getDeltaLogPaths(); + HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf)) .withBasePath(split.getBasePath()) - .withLogFilePaths(split.getDeltaLogPaths()) + .withLogFilePaths(logPaths) .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema()) .withLatestInstantTime(split.getMaxCommitTime()) .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf)) @@ -90,8 +97,12 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())) .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), - HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) - .build(); + HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())); + if (!isNullOrEmpty(logPaths)) { + logRecordScannerBuilder + .withPartition(getRelativePartitionPath(new Path(split.getBasePath()), new Path(logPaths.get(0)).getParent())); + } + return logRecordScannerBuilder.build(); } private Option buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {