Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
import scala.Tuple2;
import scala.Tuple3;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
* CLI command to display log file options.
*/
Expand Down Expand Up @@ -185,7 +188,7 @@ public String showLogFileRecords(
.collect(Collectors.toList());

// logFilePaths size must > 1
assert logFilePaths.size() > 0 : "There is no log file";
checkArgument(logFilePaths.size() > 0, "There is no log file");

// TODO : readerSchema can change across blocks/log files, fix this inside Scanner
AvroSchemaConverter converter = new AvroSchemaConverter();
Expand Down Expand Up @@ -218,6 +221,7 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withPartition(getRelativePartitionPath(new Path(client.getBasePath()), new Path(logFilePaths.get(0)).getParent()))
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -203,6 +204,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
// get expected result of 10 records.
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
assertTrue(logFilePaths.size() > 0);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(tablePath)
Expand All @@ -221,6 +223,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withPartition(getRelativePartitionPath(new Path(tablePath), new Path(logFilePaths.get(0)).getParent()))
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static String getRelativePartitionPath(Path basePath, Path fullPartitionP

/**
* Obtain all the partition paths, that are present in this table, denoted by presence of
* {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}.
* {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE_PREFIX}.
*
* If the basePathStr is a subdirectory of .hoodie folder then we assume that the partitions of an internal
* table (a hoodie table within the .hoodie directory) are to be obtained.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -574,12 +575,13 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
writer.close();
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
// scan all log blocks (across multiple log files)
List<String> logFilePaths = logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
assertTrue(logFilePaths.size() > 0);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(
logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withLogFilePaths(logFilePaths)
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
Expand All @@ -589,6 +591,7 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logFilePaths.get(0)).getParent()))
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we inline the fix into the HoodieMergedLogRecordScanner so that there is no need to change these classes ?


List<IndexedRecord> scannedRecords = new ArrayList<>();
Expand Down Expand Up @@ -803,6 +806,7 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
Expand Down Expand Up @@ -881,6 +885,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches");
Set<String> readKeys = new HashSet<>(200);
Expand Down Expand Up @@ -968,6 +973,7 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
Set<String> readKeys = new HashSet<>(200);
Expand Down Expand Up @@ -1046,6 +1052,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
Expand Down Expand Up @@ -1092,6 +1099,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
Expand Down Expand Up @@ -1187,6 +1195,7 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
Expand Down Expand Up @@ -1290,6 +1299,7 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback");

Expand Down Expand Up @@ -1358,6 +1368,7 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
Expand Down Expand Up @@ -1409,6 +1420,7 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records");
final List<String> readKeys = new ArrayList<>(100);
Expand Down Expand Up @@ -1479,6 +1491,7 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
Expand Down Expand Up @@ -1585,6 +1598,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
Expand Down Expand Up @@ -1659,6 +1673,7 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.stream.IntStream;

import static junit.framework.TestCase.assertEquals;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -346,7 +348,7 @@ private static HoodieMergedLogRecordScanner getScanner(
List<String> logPaths,
Schema readSchema,
String instant) {
return HoodieMergedLogRecordScanner.newBuilder()
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logPaths)
Expand All @@ -358,8 +360,12 @@ private static HoodieMergedLogRecordScanner getScanner(
.withMaxMemorySizeInBytes(1024 * 1024L)
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.format;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
Expand All @@ -43,6 +42,7 @@
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -52,6 +52,10 @@
import java.util.Map;
import java.util.function.Function;

import static org.apache.hudi.common.fs.FSUtils.getFs;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;

/**
* Utilities for format.
*/
Expand Down Expand Up @@ -124,11 +128,13 @@ public static HoodieMergedLogRecordScanner logScanner(
Schema logSchema,
Configuration config,
boolean withOperationField) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieMergedLogRecordScanner.newBuilder()
String basePath = split.getTablePath();
List<String> logPaths = split.getLogPaths().get();
FileSystem fs = getFs(basePath, config);
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
.withLogFilePaths(split.getLogPaths().get())
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
Expand All @@ -144,16 +150,20 @@ public static HoodieMergedLogRecordScanner logScanner(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withInstantRange(split.getInstantRange())
.withOperationField(withOperationField)
.build();
.withOperationField(withOperationField);
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
}

private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config,
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
FileSystem fs = getFs(split.getTablePath(), config);
return HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
Expand Down Expand Up @@ -234,8 +244,8 @@ public static HoodieMergedLogRecordScanner logScanner(
HoodieWriteConfig writeConfig,
Configuration hadoopConf) {
String basePath = writeConfig.getBasePath();
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(basePath, hadoopConf))
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(getFs(basePath, hadoopConf))
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withReaderSchema(logSchema)
Expand All @@ -246,8 +256,12 @@ public static HoodieMergedLogRecordScanner logScanner(
.withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
}

private static Boolean string2Boolean(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.stream.IntStream;

import static junit.framework.TestCase.assertEquals;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -661,7 +663,7 @@ private static HoodieMergedLogRecordScanner getScanner(
List<String> logPaths,
Schema readSchema,
String instant) {
return HoodieMergedLogRecordScanner.newBuilder()
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logPaths)
Expand All @@ -673,8 +675,12 @@ private static HoodieMergedLogRecordScanner getScanner(
.withMaxMemorySizeInBytes(1024 * 1024L)
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
}

/**
Expand Down
Loading