Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,25 @@ public String showLogFileRecords(
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS <===================");
HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema,
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(client.getBasePath())
.withLogFilePaths(logFilePaths)
.withReaderSchema(readerSchema)
.withLatestInstantTime(
client.getActiveTimeline()
.getCommitTimeline().lastInstant().get().getTimestamp())
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() < limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,23 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
// get expected result of 10 records.
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, INSTANT_TIME,
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(tablePath)
.withLogFilePaths(logFilePaths)
.withReaderSchema(schema)
.withLatestInstantTime(INSTANT_TIME)
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withReadBlocksLazily(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
.withReverseReader(
Boolean.parseBoolean(
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
int num = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,18 @@ private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteT
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles,
readerSchema, maxInstantTime, maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
config.getSpillableMapBasePath());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(metaClient.getBasePath())
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(maxInstantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public abstract class AbstractHoodieLogRecordScanner {
// Progress
private float progress = 0.0f;

// TODO (NA) - Change this to a builder, this constructor is too long
public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) {
this.readerSchema = readerSchema;
Expand Down Expand Up @@ -358,4 +357,28 @@ public long getTotalRollbacks() {
public long getTotalCorruptBlocks() {
return totalCorruptBlocks.get();
}

/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/
public abstract static class Builder {

public abstract Builder withFileSystem(FileSystem fs);

public abstract Builder withBasePath(String basePath);

public abstract Builder withLogFilePaths(List<String> logFilePaths);

public abstract Builder withReaderSchema(Schema schema);

public abstract Builder withLatestInstantTime(String latestInstantTime);

public abstract Builder withReadBlocksLazily(boolean readBlocksLazily);

public abstract Builder withReverseReader(boolean reverseReader);

public abstract Builder withBufferSize(int bufferSize);

public abstract AbstractHoodieLogRecordScanner build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public long getNumMergedRecordsInLog() {
return numMergedRecordsInLog;
}

/**
* Returns the builder for {@code HoodieMergedLogRecordScanner}.
*/
public static HoodieMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
}

@Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
String key = hoodieRecord.getRecordKey();
Expand All @@ -128,5 +135,79 @@ protected void processNextDeletedKey(HoodieKey hoodieKey) {
public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks;
}

/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
private Schema readerSchema;
private String latestInstantTime;
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
// specific configurations
private Long maxMemorySizeInBytes;
private String spillableMapBasePath;

public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}

public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}

public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}

public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}

public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}

public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
}

public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
}

public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
return this;
}

public Builder withSpillableMapBasePath(String spillableMapBasePath) {
this.spillableMapBasePath = spillableMapBasePath;
return this;
}

@Override
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<Strin
this.callback = callback;
}

/**
* Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
*/
public static HoodieUnMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
}

@Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws Exception {
// Just call callback without merging
Expand All @@ -60,4 +67,71 @@ public static interface LogRecordScannerCallback {

public void apply(HoodieRecord<? extends HoodieRecordPayload> record) throws Exception;
}

/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
private Schema readerSchema;
private String latestInstantTime;
private boolean readBlocksLazily;
private boolean reverseReader;
private int bufferSize;
// specific configurations
private LogRecordScannerCallback callback;

public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}

public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}

public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}

public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}

public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}

public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
}

public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
}

public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
this.callback = callback;
return this;
}

@Override
public HoodieUnMergedLogRecordScanner build() {
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback);
}
}
}
Loading