Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public String showLogFileRecords(
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the indention.

for (HoodieRecord hoodieRecord : scanner) {
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();

Iterator<HoodieRecord> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
Expand Down Expand Up @@ -188,11 +189,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Log compaction can be scheduled if the no. of log blocks crosses this threshold value. "
+ "This is effective only when log compaction is enabled via " + INLINE_LOG_COMPACT.key());

public static final ConfigProperty<String> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
.key("hoodie.log.record.reader.use.scanV2")
public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty
.key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
.defaultValue("false")
.sinceVersion("0.13.0")
.withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
.withDocumentation("New optimized scan for log blocks that handles all multi-writer use-cases while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");

/** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
Expand Down Expand Up @@ -432,8 +433,8 @@ public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThresh
return this;
}

public Builder withLogRecordReaderScanV2(String useLogRecordReaderScanV2) {
compactionConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, useLogRecordReaderScanV2);
public Builder withEnableOptimizedLogBlocksScan(String enableOptimizedLogBlocksScan) {
compactionConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, enableOptimizedLogBlocksScan);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,8 +1306,8 @@ public int getLogCompactionBlocksThreshold() {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}

public boolean useScanV2ForLogRecordReader() {
return getBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2);
public boolean enableOptimizedLogBlocksScan() {
return getBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}

public HoodieCleaningPolicy getCleanerPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
// by default, the HFile does not keep the metadata fields, set up as false
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
.withLogRecordReaderScanV2(String.valueOf(writeConfig.useScanV2ForLogRecordReader()))
.withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ protected Iterator<List<WriteStatus>> writeFileAndGetWriteStats(HoodieCompaction
return result;
}

protected boolean useScanV2(HoodieWriteConfig writeConfig) {
return writeConfig.useScanV2ForLogRecordReader();
protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) {
return writeConfig.enableOptimizedLogBlocksScan();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.withUseScanV2(executionHelper.useScanV2(config))
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
.withRecordMerger(config.getRecordMerger())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected Iterator<List<WriteStatus>> writeFileAndGetWriteStats(HoodieCompaction
}

@Override
protected boolean useScanV2(HoodieWriteConfig writeConfig) {
protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String
.collect(Collectors.toList()))
.withLatestInstantTime(maxInstantTime)
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withUseScanV2(true)
.withOptimizedLogBlocksScan(true)
.withRecordMerger(writeConfig.getRecordMerger())
.build();
scanner.scan(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withUseScanV2(config.useScanV2ForLogRecordReader())
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void testLogCompactionOnMORTable() throws Exception {
public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder()
.withLogCompactionBlocksThreshold("1")
.withLogRecordReaderScanV2("true")
.withEnableOptimizedLogBlocksScan("true")
.build();
HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build();
Expand Down Expand Up @@ -447,7 +447,7 @@ private void validateBlockInstantsBeforeAndAfterRollback(HoodieWriteConfig confi
.collect(Collectors.toList()))
.withLatestInstantTime(instant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withUseScanV2(true)
.withOptimizedLogBlocksScan(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner.scan(true);
Expand All @@ -461,7 +461,7 @@ private void validateBlockInstantsBeforeAndAfterRollback(HoodieWriteConfig confi
.collect(Collectors.toList()))
.withLatestInstantTime(currentInstant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withUseScanV2(true)
.withOptimizedLogBlocksScan(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner2.scan(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public final class HoodieMetadataConfig extends HoodieConfig {

public static final String METADATA_PREFIX = "hoodie.metadata";
public static final String OPTIMIZED_LOG_BLOCKS_SCAN = ".optimized.log.blocks.scan.enable";

// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> ENABLE = ConfigProperty
Expand Down Expand Up @@ -237,12 +238,12 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "metadata table which are never added before. This config determines how to handle "
+ "such spurious deletes");

public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
.key(METADATA_PREFIX + ".log.record.reader.use.scanV2")
public static final ConfigProperty<Boolean> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty
.key(METADATA_PREFIX + OPTIMIZED_LOG_BLOCKS_SCAN)
.defaultValue(false)
.sinceVersion("0.13.0")
.withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");
.withDocumentation("Optimized log blocks scanner that addresses all the multiwriter use-cases while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written by log compaction.");

private HoodieMetadataConfig() {
super();
Expand Down Expand Up @@ -328,8 +329,8 @@ public boolean ignoreSpuriousDeletes() {
return getBoolean(IGNORE_SPURIOUS_DELETES);
}

public boolean getUseLogRecordReaderScanV2() {
return getBoolean(USE_LOG_RECORD_READER_SCAN_V2);
public boolean doEnableOptimizedLogBlocksScan() {
return getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}

/**
Expand Down Expand Up @@ -478,8 +479,8 @@ public Builder withProperties(Properties properties) {
return this;
}

public Builder withLogRecordReaderScanV2(boolean useLogRecordReaderScanV2) {
metadataConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, String.valueOf(useLogRecordReaderScanV2));
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
metadataConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, String.valueOf(enableOptimizedLogBlocksScan));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Collect all the block instants after scanning all the log files.
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean useScanV2;
private final boolean enableOptimizedLogBlocksScan;

protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
Expand All @@ -158,7 +158,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
Option<String> partitionNameOverride,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean useScanV2,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
Expand All @@ -184,7 +184,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
this.useScanV2 = useScanV2;
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;

if (keyFieldOverride.isPresent()) {
// NOTE: This branch specifically is leveraged handling Metadata Table
Expand Down Expand Up @@ -217,7 +217,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
*/
protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
synchronized (this) {
if (useScanV2) {
if (enableOptimizedLogBlocksScan) {
scanInternalV2(keySpecOpt, skipProcessingBlocks);
} else {
scanInternalV1(keySpecOpt);
Expand Down Expand Up @@ -894,7 +894,7 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
throw new UnsupportedOperationException();
}

public Builder withUseScanV2(boolean useScanV2) {
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean useScanV2, HoodieRecordMerger recordMerger) {
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, useScanV2, recordMerger);
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
Expand Down Expand Up @@ -333,7 +333,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
// By default, we're doing a full-scan
private boolean forceFullScan = true;
// Use scanV2 method.
private boolean useScanV2 = false;
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger;

@Override
Expand Down Expand Up @@ -430,8 +430,8 @@ public Builder withPartition(String partitionName) {
}

@Override
public Builder withUseScanV2(boolean useScanV2) {
this.useScanV2 = useScanV2;
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
return this;
}

Expand Down Expand Up @@ -462,7 +462,7 @@ public HoodieMergedLogRecordScanner build() {
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), useScanV2, recordMerger);
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean useScanV2, HoodieRecordMerger recordMerger) {
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(), useScanV2, recordMerger);
false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger);
this.callback = callback;
}

Expand Down Expand Up @@ -105,7 +105,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
private boolean useScanV2;
private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger;

public Builder withFileSystem(FileSystem fs) {
Expand Down Expand Up @@ -167,8 +167,8 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
}

@Override
public Builder withUseScanV2(boolean useScanV2) {
this.useScanV2 = useScanV2;
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
return this;
}

Expand All @@ -184,7 +184,7 @@ public HoodieUnMergedLogRecordScanner build() {

return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange,
internalSchema, useScanV2, recordMerger);
internalSchema, enableOptimizedLogBlocksScan, recordMerger);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<Hoodie
.withLogBlockTimestamps(validInstantTimestamps)
.enableFullScan(allowFullScan)
.withPartition(partitionName)
.withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2())
.withEnableOptimizedLogBlocksScan(metadataConfig.doEnableOptimizedLogBlocksScan())
.build();

Long logScannerOpenMs = timer.endTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ public Builder enableFullScan(boolean enableFullScan) {
return this;
}

public Builder withUseScanV2(boolean useScanV2) {
scannerBuilder.withUseScanV2(useScanV2);
public Builder withEnableOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
scannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
return this;
}

Expand Down
Loading