Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
.defaultValue("5")
.withDocumentation("Log compaction can be scheduled if the no. of log blocks crosses this threshold value.");

public static final ConfigProperty<String> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
.key("hoodie.log.record.reader.use.scanV2")
.defaultValue("false")
.withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");

/** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
@Deprecated
public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
Expand Down Expand Up @@ -456,6 +462,11 @@ public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThresh
return this;
}

public Builder withLogRecordReaderScanV2(String useLogRecordReaderScanV2) {
compactionConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, useLogRecordReaderScanV2);
return this;
}

public HoodieCompactionConfig build() {
compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
return compactionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,10 @@ public int getLogCompactionBlocksThreshold() {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}

public boolean useScanV2ForLogRecordReader() {
return getBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2);
}

public HoodieCleaningPolicy getCleanerPolicy() {
return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
// by default, the HFile does not keep the metadata fields, set up as false
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
.withLogRecordReaderScanV2(String.valueOf(writeConfig.useScanV2ForLogRecordReader()))
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.withUseScanV2(config.useScanV2ForLogRecordReader())
.build();

Option<HoodieBaseFile> oldDataFileOpt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withUseScanV2(config.useScanV2ForLogRecordReader())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "metadata table which are never added before. This config determines how to handle "
+ "such spurious deletes");

public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
.key(METADATA_PREFIX + ".log.record.reader.use.scanV2")
.defaultValue(false)
.sinceVersion("0.10.10")
.withDocumentation("There are cases when extra files are requested to be deleted from metadata table which was never added before. This config"
+ "determines how to handle such spurious deletes");

private HoodieMetadataConfig() {
super();
}
Expand Down Expand Up @@ -318,6 +325,10 @@ public boolean ignoreSpuriousDeletes() {
return getBoolean(IGNORE_SPURIOUS_DELETES);
}

public boolean getUseLogRecordReaderScanV2() {
return getBoolean(USE_LOG_RECORD_READER_SCAN_V2);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down Expand Up @@ -461,6 +472,11 @@ public Builder withProperties(Properties properties) {
return this;
}

public Builder withLogRecordReaderScanV2(boolean useLogRecordReaderScanV2) {
metadataConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, String.valueOf(useLogRecordReaderScanV2));
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<
.withLogBlockTimestamps(validInstantTimestamps)
.allowFullScan(allowFullScan)
.withPartition(partitionName)
.withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2())
.build();

Long logScannerOpenMs = timer.endTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, Stri
String spillableMapBasePath,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
Option<InstantRange> instantRange, boolean allowFullScan) {
Option<InstantRange> instantRange, boolean allowFullScan, boolean useScanV2) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, true, false, bufferSize,
spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan,
Option.of(partitionName), InternalSchema.getEmptyInternalSchema(), false, false);
Option.of(partitionName), InternalSchema.getEmptyInternalSchema(), false, useScanV2);
}

@Override
Expand Down Expand Up @@ -138,8 +138,12 @@ protected String getKeyField() {
* Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {

private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();

// Use scanV2 method.
private boolean useScanV2 = false;

@Override
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
Expand Down Expand Up @@ -226,11 +230,17 @@ public Builder allowFullScan(boolean enableFullScan) {
return this;
}

@Override
public Builder withUseScanV2(boolean useScanV2) {
this.useScanV2 = useScanV2;
return this;
}

@Override
public HoodieMetadataMergedLogRecordReader build() {
return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan);
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan, useScanV2);
}
}

Expand Down
Loading