Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String c
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
return header;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String inst
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, instantToRollback);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
return header;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
Expand All @@ -64,7 +65,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;

Expand Down Expand Up @@ -98,11 +101,6 @@ public abstract class AbstractHoodieLogRecordReader {
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
protected final List<String> logFilePaths;
// Read Lazily flag
private final boolean readBlocksLazily;
// Reverse reader - Not implemented yet (NA -> Why do we need ?)
// but present here for plumbing for future implementation
private final boolean reverseReader;
// Buffer Size for log file reader
private final int bufferSize;
// optional instant range for incremental block filtering
Expand Down Expand Up @@ -138,19 +136,18 @@ public abstract class AbstractHoodieLogRecordReader {
private boolean populateMetaFields = true;

protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
Schema readerSchema, String latestInstantTime,
int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, bufferSize,
instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema());
}

protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
Schema readerSchema, String latestInstantTime, int bufferSize,
Option<InstantRange> instantRange, boolean withOperationField,
boolean forceFullScan, Option<String> partitionName,
InternalSchema internalSchema) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
Expand All @@ -160,8 +157,6 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
this.preCombineField = tableConfig.getPreCombineField();
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
this.readBlocksLazily = readBlocksLazily;
this.fs = fs;
this.bufferSize = bufferSize;
this.instantRange = instantRange;
Expand Down Expand Up @@ -216,9 +211,33 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
readerSchema, bufferSize, enableRecordLookups, keyField, internalSchema);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also remove the reverseReader as it is no longer supported.

/**
* Scanning log blocks require two traversals on the log blocks.
* First traversal to identify the rollback blocks and
*
* Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks.
* With multiwriter mode the blocks can be out of sync. An example scenario.
* B1, B2, B3, B4, R1(B3), B5
* In this case, rollback block R1 is invalidating the B3 which is not the previous block.
* This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction.
* TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580
*
* To solve this need to do traversal twice.
* In first traversal, collect all the valid data and delete blocks that are not corrupted along with the rollback block's target instant times.
* For second traversal, traverse on the collected data blocks by considering the rollback instants.
*/

// Collect targetRollbackInstants, using which we can determine which blocks are invalid.
Set<String> targetRollbackInstants = new HashSet<>();
// This will only contain data and delete blocks, corrupt blocks will be ignored and
// target instants from rollback block are collected in targetRolbackInstants set.
List<HoodieLogBlock> dataAndDeleteBlocks = new ArrayList<>();

Set<HoodieLogFile> scannedLogFiles = new HashSet<>();

// Do a forward traversal for all files and blocks.
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
LOG.info("Scanning log file " + logFile);
Expand All @@ -232,7 +251,7 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why continue. blocks greater than latest known instant time can be skipped altogether right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. It is a mistake I am removing this.

}
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
Expand All @@ -245,97 +264,53 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
continue;
}
}

// First traversal to collect data and delete blocks and rollback block's target instant times.
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store the current block
currentInstantLogBlocks.push(logBlock);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
dataAndDeleteBlocks.add(logBlock);
break;
case COMMAND_BLOCK:
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct
// DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback
// both B1 & B2
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
// Get commit time from last record block, compare with targetCommitTime,
// rollback only if equal, this is required in scenarios of invalid/extra
// rollback blocks written due to failures during the rollback operation itself
// and ensures the same rollback block (R1) is used to rollback both B1 & B2 with
// same instant_time
int numBlocksRolledBack = 0;
totalRollbacks.incrementAndGet();
while (!currentInstantLogBlocks.isEmpty()) {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback last data block or delete block
LOG.info("Rolling back the last log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
// invalid or extra rollback block
LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
+ " invalid or extra rollback command block in " + logFile.getPath());
break;
} else {
// this should not happen ideally
LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
default:
throw new UnsupportedOperationException("Command type not yet supported.");
if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
totalRollbacks.incrementAndGet();
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
// Rollback blocks contain information of instants that are failed, collect them in a set..
targetRollbackInstants.add(targetInstantForCommandBlock);
} else {
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
case CORRUPT_BLOCK:
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(logBlock);
break;
default:
throw new UnsupportedOperationException("Block type not supported yet");
throw new UnsupportedOperationException("Block type not yet supported.");
}
}

int numBlocksRolledBack = 0;
// Second traversal to filter out the blocks whose block instant times are part of targetRollbackInstants set.
for (HoodieLogBlock logBlock : dataAndDeleteBlocks) {
String blockInstantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);

// Exclude the blocks for rollback blocks exist.
// Here, rollback can include instants affiliated to deltacommits or log compaction commits.
if (targetRollbackInstants.contains(blockInstantTime)) {
numBlocksRolledBack++;
continue;
}
currentInstantLogBlocks.push(logBlock);
}
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);

// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
Expand All @@ -361,15 +336,6 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
}
}

/**
* Checks if the current logblock belongs to a later instant.
*/
private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
&& !logBlock.getLogBlockHeader().get(INSTANT_TIME)
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
}

/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
Expand Down
Loading