From 58319027a8475b52f374a4934ca68e8026f6a838 Mon Sep 17 00:00:00 2001 From: Surya Prasanna Kumar Yalla Date: Fri, 15 Apr 2022 14:07:56 -0700 Subject: [PATCH 1/2] [HUDI-3919] [UBER] Support out of order rollback blocks in AbstractHoodieLogRecordReader --- .../action/rollback/BaseRollbackHelper.java | 2 +- .../table/action/rollback/RollbackUtils.java | 2 +- .../log/AbstractHoodieLogRecordReader.java | 158 ++++++++---------- .../table/log/block/HoodieCommandBlock.java | 2 +- .../functional/TestHoodieLogFormat.java | 58 +++++-- .../TestHoodieLogFormatAppendFailure.java | 2 +- .../hadoop/testutils/InputFormatTestUtil.java | 4 +- 7 files changed, 119 insertions(+), 109 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 8475afe16eea0..8b96ca96f7431 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -210,7 +210,7 @@ protected Map generateHeader(String c header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); return header; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index ce7a18515137b..c37244f7fc2e3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -65,7 +65,7 @@ static Map generateHeader(String inst header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackInstantTime); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, instantToRollback); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); return header; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 9e56083b262e0..3a6c88ace4c5d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashSet; @@ -64,7 +65,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; +import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; @@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option keySpecOpt) { logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); + /** + * Traversal of log blocks from log files can be done in two directions. + * 1. Forward traversal + * 2. Reverse traversal + * For example: BaseFile, LogFile1(LogBlock11,LogBlock12,LogBlock13), LofFile2(LogBlock21,LogBlock22,LogBlock23) + * Forward traversal look like, + * LogBlock11, LogBlock12, LogBlock13, LogBlock21, LogBlock22, LogBlock23 + * If we are considering reverse traversal including log blocks, + * LogBlock23, LogBlock22, LogBlock21, LogBlock13, LogBlock12, LogBlock11 + * Here, reverse traversal also traverses blocks in reverse order of creation. + * + * 1. Forward traversal + * Forward traversal is easy to do in single writer mode. Where the rollback block is right after the effected data blocks. + * With multiwriter mode the blocks can be out of sync. An example scenario. + * B1, B2, B3, B4, R1(B3), B5 + * In this case, rollback block R1 is invalidating the B3 which is not the previous block. + * This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction. + * TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580 + * + * To solve this do traversal twice. + * In first traversal, collect all the valid data and delete blocks that are not corrupted along with the rollback block's target instant times. + * For second traversal, traverse on the collected data blocks by considering the rollback instants. + * 2. Reverse traversal + * Reverse traversal is more intuitive in multiwriter mode. Reverse traversal would mean not just traversing + * log files in reverse order, but also the log blocks within them. + * This is harder to achieve when there are corrupt blocks, since the blocks size information + * might not be stored at the end of the corrupt block. So, hopping to the starting of the block is not possible. + * So, defaulting to use forward traversal and lazy read as true. + */ + + // Collect targetRollbackInstants, using which we can determine which blocks are invalid. + Set targetRollbackInstants = new HashSet<>(); + // This will only contain data and delete blocks, corrupt blocks will be ignored and + // target instants from rollback block are collected in targetRolbackInstants set. + List dataAndDeleteBlocks = new ArrayList<>(); + Set scannedLogFiles = new HashSet<>(); + + // Do a forward traversal for all files and blocks. while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); LOG.info("Scanning log file " + logFile); @@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option keySpecOpt) { continue; } } + if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) { + LOG.info("Found a corrupt block in " + logFile.getPath()); + totalCorruptBlocks.incrementAndGet(); + continue; + } + + // Rollback blocks contain information of instants that are failed, collect them in a set.. switch (logBlock.getBlockType()) { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: case PARQUET_DATA_BLOCK: - LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " - + logBlock.getLogBlockHeader().get(INSTANT_TIME)); - if (isNewInstantBlock(logBlock) && !readBlocksLazily) { - // If this is an avro data block belonging to a different commit/instant, - // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } - // store the current block - currentInstantLogBlocks.push(logBlock); - break; case DELETE_BLOCK: - LOG.info("Reading a delete block from file " + logFile.getPath()); - if (isNewInstantBlock(logBlock) && !readBlocksLazily) { - // If this is a delete data block belonging to a different commit/instant, - // then merge the last blocks and records into the main result - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } - // store deletes so can be rolled back - currentInstantLogBlocks.push(logBlock); + dataAndDeleteBlocks.add(logBlock); break; case COMMAND_BLOCK: - // Consider the following scenario - // (Time 0, C1, Task T1) -> Running - // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct - // DataBlock (B1) with commitTime C1 - // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2) - // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2) - // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same. - // Say, commit C1 eventually failed and a rollback is triggered. - // Rollback will write only 1 rollback block (R1) since it assumes one block is - // written per ingestion batch for a file but in reality we need to rollback (B1 & B2) - // The following code ensures the same rollback block (R1) is used to rollback - // both B1 & B2 LOG.info("Reading a command block from file " + logFile.getPath()); // This is a command block - take appropriate action based on the command HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; - String targetInstantForCommandBlock = - logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); - switch (commandBlock.getType()) { // there can be different types of command blocks - case ROLLBACK_PREVIOUS_BLOCK: - // Rollback the last read log block - // Get commit time from last record block, compare with targetCommitTime, - // rollback only if equal, this is required in scenarios of invalid/extra - // rollback blocks written due to failures during the rollback operation itself - // and ensures the same rollback block (R1) is used to rollback both B1 & B2 with - // same instant_time - int numBlocksRolledBack = 0; - totalRollbacks.incrementAndGet(); - while (!currentInstantLogBlocks.isEmpty()) { - HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); - // handle corrupt blocks separately since they may not have metadata - if (lastBlock.getBlockType() == CORRUPT_BLOCK) { - LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); - currentInstantLogBlocks.pop(); - numBlocksRolledBack++; - } else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) { - // rollback last data block or delete block - LOG.info("Rolling back the last log block read in " + logFile.getPath()); - currentInstantLogBlocks.pop(); - numBlocksRolledBack++; - } else if (!targetInstantForCommandBlock - .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) { - // invalid or extra rollback block - LOG.warn("TargetInstantTime " + targetInstantForCommandBlock - + " invalid or extra rollback command block in " + logFile.getPath()); - break; - } else { - // this should not happen ideally - LOG.warn("Unable to apply rollback command block in " + logFile.getPath()); - } - } - LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); - break; - default: - throw new UnsupportedOperationException("Command type not yet supported."); + if (commandBlock.getType().equals(ROLLBACK_BLOCK)) { + totalRollbacks.incrementAndGet(); + String targetInstantForCommandBlock = + logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME); + targetRollbackInstants.add(targetInstantForCommandBlock); + } else { + throw new UnsupportedOperationException("Command type not yet supported."); } break; - case CORRUPT_BLOCK: - LOG.info("Found a corrupt block in " + logFile.getPath()); - totalCorruptBlocks.incrementAndGet(); - // If there is a corrupt block - we will assume that this was the next data block - currentInstantLogBlocks.push(logBlock); - break; default: - throw new UnsupportedOperationException("Block type not supported yet"); + throw new UnsupportedOperationException("Block type not yet supported."); } } + + int numBlocksRolledBack = 0; + // This is a reverse traversal on the collected data blocks. + for (HoodieLogBlock logBlock : dataAndDeleteBlocks) { + String blockInstantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); + + // Exclude the blocks for rollback blocks exist. + // Here, rollback can include instants affiliated to deltacommits or log compaction commits. + if (targetRollbackInstants.contains(blockInstantTime)) { + numBlocksRolledBack++; + continue; + } + currentInstantLogBlocks.push(logBlock); + } + LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); + // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { LOG.info("Merging the final data blocks"); @@ -361,15 +358,6 @@ protected synchronized void scanInternal(Option keySpecOpt) { } } - /** - * Checks if the current logblock belongs to a later instant. - */ - private boolean isNewInstantBlock(HoodieLogBlock logBlock) { - return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK - && !logBlock.getLogBlockHeader().get(INSTANT_TIME) - .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME)); - } - /** * Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to * handle it. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index 0ff3a77b5007b..c44f1950144b5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock { * Hoodie command block type enum. */ public enum HoodieCommandBlockTypeEnum { - ROLLBACK_PREVIOUS_BLOCK + ROLLBACK_BLOCK } public HoodieCommandBlock(Map header) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 4fa53bb41f9f8..2dea1c9431316 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -839,6 +839,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di writer.appendBlock(dataBlock); // Write 2 + header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); @@ -846,13 +847,16 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di writer.appendBlock(dataBlock); // Rollback the last write + header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); // Write 3 + header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List copyOfRecords3 = records3.stream() @@ -1010,17 +1014,17 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header); writer.appendBlock(dataBlock); - copyOfRecords1.addAll(copyOfRecords2); - List originalKeys = - copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) - .collect(Collectors.toList()); - - // Delete 50 keys + // Delete 50 keys from 1st block List deletedRecords = copyOfRecords1.stream() .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); + copyOfRecords2.addAll(copyOfRecords1); + List originalKeys = + copyOfRecords2.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect(Collectors.toList()); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header); writer.appendBlock(deleteBlock); @@ -1068,15 +1072,15 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di Collections.sort(readKeys); assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 150 records from 2 versions"); - // Rollback the last block + // Rollback the 1st block i.e. a data block. header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103"); - header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102"); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); - FileCreateUtils.deleteDeltaCommit(basePath, "102", fs); + FileCreateUtils.deleteDeltaCommit(basePath, "101", fs); readKeys.clear(); scanner = HoodieMergedLogRecordScanner.newBuilder() @@ -1084,7 +1088,7 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withBasePath(basePath) .withLogFilePaths(allLogFiles) .withReaderSchema(schema) - .withLatestInstantTime("101") + .withLatestInstantTime("103") .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) @@ -1094,7 +1098,25 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); - assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); + assertEquals(100, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); + final List newEmptyPayloads = new ArrayList<>(); + scanner.forEach(s -> { + try { + if (!s.getData().getInsertValue(schema).isPresent()) { + newEmptyPayloads.add(true); + } + } catch (IOException io) { + throw new UncheckedIOException(io); + } + }); + assertEquals(100, readKeys.size(), "Stream collect should return 100 records, since 2nd block is rolled back"); + assertEquals(50, newEmptyPayloads.size(), "Stream collect should return all 50 records with empty payloads"); + List firstBlockRecords = + copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect(Collectors.toList()); + Collections.sort(firstBlockRecords); + Collections.sort(readKeys); + assertEquals(firstBlockRecords, readKeys, "CompositeAvroLogReader should return 150 records from 2 versions"); } @ParameterizedTest @@ -1260,7 +1282,7 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk // Attempt 1 : Write rollback block for a failed write header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); try { writer.appendBlock(commandBlock); @@ -1336,7 +1358,7 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable // Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); writer.appendBlock(commandBlock); @@ -1388,7 +1410,7 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk // Write invalid rollback for a failed write (possible for in-flight commits) header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); @@ -1458,7 +1480,7 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); @@ -1563,7 +1585,7 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); writer.appendBlock(commandBlock); writer.close(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java index 6c4d69a05b296..d825c294cc6b2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java @@ -143,7 +143,7 @@ public void testFailedToGetAppendStreamFromHDFSNameNode() .overBaseCommit("").withFs(fs).build(); header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); writer.appendBlock(new HoodieCommandBlock(header)); // The log version should be different for this new writer diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index ccd85d382930a..ebfd267cd5a6c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -344,7 +344,7 @@ public static HoodieLogFormat.Writer writeRollback(File partitionDir, FileSystem header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); // if update belongs to an existing log file writer.appendBlock(new HoodieCommandBlock(header)); return writer; @@ -398,7 +398,7 @@ public static HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionD header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header); writer.appendBlock(rollbackBlock); return writer; From 4e7a31686083748bb0f2a36f60dbfb963fab9708 Mon Sep 17 00:00:00 2001 From: Surya Prasanna Kumar Yalla Date: Wed, 11 May 2022 13:09:59 -0700 Subject: [PATCH 2/2] Remove reverseReader and readBlocksLazily configs --- .../log/AbstractHoodieLogRecordReader.java | 60 ++--- .../common/table/log/HoodieLogFileReader.java | 86 ++----- .../table/log/HoodieLogFormatReader.java | 22 +- .../log/HoodieMergedLogRecordScanner.java | 10 +- .../log/HoodieUnMergedLogRecordScanner.java | 8 +- .../table/log/block/HoodieAvroDataBlock.java | 6 +- .../table/log/block/HoodieCommandBlock.java | 6 +- .../table/log/block/HoodieCorruptBlock.java | 6 +- .../table/log/block/HoodieDataBlock.java | 10 +- .../table/log/block/HoodieDeleteBlock.java | 10 +- .../table/log/block/HoodieHFileDataBlock.java | 3 +- .../table/log/block/HoodieLogBlock.java | 22 +- .../log/block/HoodieParquetDataBlock.java | 3 +- .../HoodieMetadataMergedLogRecordReader.java | 2 +- .../functional/TestHoodieLogFormat.java | 209 +----------------- 15 files changed, 79 insertions(+), 384 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 3a6c88ace4c5d..84a9f1c56f38d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -101,11 +101,6 @@ public abstract class AbstractHoodieLogRecordReader { private Option> simpleKeyGenFields = Option.empty(); // Log File Paths protected final List logFilePaths; - // Read Lazily flag - private final boolean readBlocksLazily; - // Reverse reader - Not implemented yet (NA -> Why do we need ?) - // but present here for plumbing for future implementation - private final boolean reverseReader; // Buffer Size for log file reader private final int bufferSize; // optional instant range for incremental block filtering @@ -141,19 +136,18 @@ public abstract class AbstractHoodieLogRecordReader { private boolean populateMetaFields = true; protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, - Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, + Schema readerSchema, String latestInstantTime, int bufferSize, Option instantRange, boolean withOperationField) { - this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, + this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, bufferSize, instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema()); } protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, - Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, - boolean reverseReader, int bufferSize, Option instantRange, - boolean withOperationField, boolean forceFullScan, - Option partitionName, InternalSchema internalSchema) { + Schema readerSchema, String latestInstantTime, int bufferSize, + Option instantRange, boolean withOperationField, + boolean forceFullScan, Option partitionName, + InternalSchema internalSchema) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); @@ -163,8 +157,6 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keySpecOpt) { boolean enableRecordLookups = !forceFullScan; logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); + readerSchema, bufferSize, enableRecordLookups, keyField, internalSchema); /** - * Traversal of log blocks from log files can be done in two directions. - * 1. Forward traversal - * 2. Reverse traversal - * For example: BaseFile, LogFile1(LogBlock11,LogBlock12,LogBlock13), LofFile2(LogBlock21,LogBlock22,LogBlock23) - * Forward traversal look like, - * LogBlock11, LogBlock12, LogBlock13, LogBlock21, LogBlock22, LogBlock23 - * If we are considering reverse traversal including log blocks, - * LogBlock23, LogBlock22, LogBlock21, LogBlock13, LogBlock12, LogBlock11 - * Here, reverse traversal also traverses blocks in reverse order of creation. + * Scanning log blocks require two traversals on the log blocks. + * First traversal to identify the rollback blocks and * - * 1. Forward traversal - * Forward traversal is easy to do in single writer mode. Where the rollback block is right after the effected data blocks. + * Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks. * With multiwriter mode the blocks can be out of sync. An example scenario. * B1, B2, B3, B4, R1(B3), B5 * In this case, rollback block R1 is invalidating the B3 which is not the previous block. * This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction. * TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580 * - * To solve this do traversal twice. + * To solve this need to do traversal twice. * In first traversal, collect all the valid data and delete blocks that are not corrupted along with the rollback block's target instant times. * For second traversal, traverse on the collected data blocks by considering the rollback instants. - * 2. Reverse traversal - * Reverse traversal is more intuitive in multiwriter mode. Reverse traversal would mean not just traversing - * log files in reverse order, but also the log blocks within them. - * This is harder to achieve when there are corrupt blocks, since the blocks size information - * might not be stored at the end of the corrupt block. So, hopping to the starting of the block is not possible. - * So, defaulting to use forward traversal and lazy read as true. */ // Collect targetRollbackInstants, using which we can determine which blocks are invalid. @@ -273,7 +251,7 @@ protected synchronized void scanInternal(Option keySpecOpt) { && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime )) { // hit a block with instant time greater than should be processed, stop processing further - break; + continue; } if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) @@ -286,13 +264,8 @@ protected synchronized void scanInternal(Option keySpecOpt) { continue; } } - if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) { - LOG.info("Found a corrupt block in " + logFile.getPath()); - totalCorruptBlocks.incrementAndGet(); - continue; - } - // Rollback blocks contain information of instants that are failed, collect them in a set.. + // First traversal to collect data and delete blocks and rollback block's target instant times. switch (logBlock.getBlockType()) { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: @@ -308,18 +281,23 @@ protected synchronized void scanInternal(Option keySpecOpt) { totalRollbacks.incrementAndGet(); String targetInstantForCommandBlock = logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME); + // Rollback blocks contain information of instants that are failed, collect them in a set.. targetRollbackInstants.add(targetInstantForCommandBlock); } else { throw new UnsupportedOperationException("Command type not yet supported."); } break; + case CORRUPT_BLOCK: + LOG.info("Found a corrupt block in " + logFile.getPath()); + totalCorruptBlocks.incrementAndGet(); + break; default: throw new UnsupportedOperationException("Block type not yet supported."); } } int numBlocksRolledBack = 0; - // This is a reverse traversal on the collected data blocks. + // Second traversal to filter out the blocks whose block instant times are part of targetRollbackInstants set. for (HoodieLogBlock logBlock : dataAndDeleteBlocks) { String blockInstantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index af9bcd27f933c..87aa87b347f32 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -78,10 +78,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private final Schema readerSchema; private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema(); private final String keyField; - private boolean readBlockLazily; - private long reverseLogFilePosition; - private long lastReverseLogFilePosition; - private boolean reverseReader; private boolean enableRecordLookups; private boolean closed = false; private transient Thread shutdownThread = null; @@ -93,19 +89,16 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, - HoodieRecord.RECORD_KEY_METADATA_FIELD); + this(fs, logFile, readerSchema, bufferSize, false, HoodieRecord.RECORD_KEY_METADATA_FIELD); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema()); + boolean enableRecordLookups, String keyField) throws IOException { + this(fs, logFile, readerSchema, bufferSize, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema()); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField, InternalSchema internalSchema) throws IOException { + boolean enableRecordLookups, String keyField, InternalSchema internalSchema) throws IOException { this.hadoopConf = fs.getConf(); // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path // is prefixed with an appropriate scheme given that we're not propagating the FS @@ -113,15 +106,9 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize()); this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize); this.readerSchema = readerSchema; - this.readBlockLazily = readBlockLazily; - this.reverseReader = reverseReader; this.enableRecordLookups = enableRecordLookups; this.keyField = keyField; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; - if (this.reverseReader) { - this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize(); - } - addShutDownHook(); } @@ -185,8 +172,8 @@ private HoodieLogBlock readBlock() throws IOException { // 6. Read the content or skip content based on IO vs Memory trade-off by client long contentPosition = inputStream.getPos(); - boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION; - Option content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily); + boolean shouldReadLazily = nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION; + Option content = HoodieLogBlock.tryReadContent(inputStream, contentLength); // 7. Read footer if any Map footer = @@ -209,7 +196,7 @@ private HoodieLogBlock readBlock() throws IOException { if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema); } else { - return new HoodieAvroDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, + return new HoodieAvroDataBlock(inputStream, content, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, keyField, internalSchema); } @@ -217,21 +204,21 @@ private HoodieLogBlock readBlock() throws IOException { checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); - return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, + return new HoodieHFileDataBlock(inputStream, content, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath()); case PARQUET_DATA_BLOCK: checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); - return new HoodieParquetDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, + return new HoodieParquetDataBlock(inputStream, content, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, keyField); case DELETE_BLOCK: - return new HoodieDeleteBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieDeleteBlock(content, inputStream, Option.of(logBlockContentLoc), header, footer); case COMMAND_BLOCK: - return new HoodieCommandBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieCommandBlock(content, inputStream, Option.of(logBlockContentLoc), header, footer); default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); @@ -258,10 +245,10 @@ private HoodieLogBlock createCorruptBlock() throws IOException { LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset); int corruptedBlockSize = (int) (nextBlockOffset - currentPos); long contentPosition = inputStream.getPos(); - Option corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily); + Option corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset); - return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); + return new HoodieCorruptBlock(corruptedBytes, inputStream, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); } private boolean isBlockCorrupted(int blocksize) throws IOException { @@ -390,24 +377,13 @@ public HoodieLogBlock next() { } } + // TODO: remove method to iterate in reverse order. /** * hasPrev is not idempotent. */ @Override public boolean hasPrev() { - try { - if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); - } - reverseLogFilePosition = lastReverseLogFilePosition; - reverseLogFilePosition -= Long.BYTES; - lastReverseLogFilePosition = reverseLogFilePosition; - inputStream.seek(reverseLogFilePosition); - } catch (Exception e) { - // Either reached EOF while reading backwards or an exception - return false; - } - return true; + throw new HoodieNotSupportedException("Reverse log reader is not supported."); } /** @@ -417,25 +393,7 @@ public boolean hasPrev() { */ @Override public HoodieLogBlock prev() throws IOException { - - if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); - } - long blockSize = inputStream.readLong(); - long blockEndPos = inputStream.getPos(); - // blocksize should read everything about a block including the length as well - try { - inputStream.seek(reverseLogFilePosition - blockSize); - } catch (Exception e) { - // this could be a corrupt block - inputStream.seek(blockEndPos); - throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, " - + "fallback to forward reading of logfile"); - } - boolean hasNext = hasNext(); - reverseLogFilePosition -= blockSize; - lastReverseLogFilePosition = reverseLogFilePosition; - return next(); + throw new HoodieNotSupportedException("Reverse log reader is not supported."); } /** @@ -444,17 +402,7 @@ public HoodieLogBlock prev() throws IOException { * position returned from the method to expect correct results */ public long moveToPrev() throws IOException { - - if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); - } - inputStream.seek(lastReverseLogFilePosition); - long blockSize = inputStream.readLong(); - // blocksize should be everything about a block including the length as well - inputStream.seek(reverseLogFilePosition - blockSize); - reverseLogFilePosition -= blockSize; - lastReverseLogFilePosition = reverseLogFilePosition; - return reverseLogFilePosition; + throw new HoodieNotSupportedException("Reverse log reader is not supported."); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index c48107e392515..10174b297ab7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -44,22 +44,17 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final FileSystem fs; private final Schema readerSchema; private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema(); - private final boolean readBlocksLazily; - private final boolean reverseLogReader; private final String recordKeyField; private final boolean enableInlineReading; private int bufferSize; private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); - HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, - boolean reverseLogReader, int bufferSize, boolean enableRecordLookups, - String recordKeyField, InternalSchema internalSchema) throws IOException { + HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, int bufferSize, + boolean enableRecordLookups, String recordKeyField, InternalSchema internalSchema) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; - this.readBlocksLazily = readBlocksLazily; - this.reverseLogReader = reverseLogReader; this.bufferSize = bufferSize; this.prevReadersInOpenState = new ArrayList<>(); this.recordKeyField = recordKeyField; @@ -67,17 +62,17 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, enableRecordLookups, recordKeyField, internalSchema); } } - @Override /** * Note : In lazy mode, clients must ensure close() should be called only after processing all log-blocks as the * underlying inputstream will be closed. TODO: We can introduce invalidate() API at HoodieLogBlock and this object * can call invalidate on all returned log-blocks so that we check this scenario specifically in HoodieLogBlock */ + @Override public void close() throws IOException { for (HoodieLogFileReader reader : prevReadersInOpenState) { @@ -101,13 +96,8 @@ public boolean hasNext() { } else if (logFiles.size() > 0) { try { HoodieLogFile nextLogFile = logFiles.remove(0); - // First close previous reader only if readBlockLazily is true - if (!readBlocksLazily) { - this.currentReader.close(); - } else { - this.prevReadersInOpenState.add(currentReader); - } - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + this.prevReadersInOpenState.add(currentReader); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, enableInlineReading, recordKeyField, internalSchema); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e3d8554d00fd8..5019a396bafc9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -80,14 +80,13 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, - boolean reverseReader, int bufferSize, String spillableMapBasePath, - Option instantRange, + String latestInstantTime, Long maxMemorySizeInBytes,int bufferSize, + String spillableMapBasePath, Option instantRange, ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, boolean withOperationField, boolean forceFullScan, Option partitionName, InternalSchema internalSchema) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, bufferSize, instantRange, withOperationField, forceFullScan, partitionName, internalSchema); try { @@ -318,8 +317,7 @@ public HoodieMergedLogRecordScanner build() { this.partitionName = getRelativePartitionPath(new Path(basePath), new Path(this.logFilePaths.get(0)).getParent()); } return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, - bufferSize, spillableMapBasePath, instantRange, + latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true, Option.ofNullable(partitionName), internalSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8ea34d6f2fa0d..a93010ccbae52 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -36,9 +36,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private final LogRecordScannerCallback callback; private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback, Option instantRange) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false); + String latestInstantTime, int bufferSize, LogRecordScannerCallback callback, + Option instantRange) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, bufferSize, instantRange, false); this.callback = callback; } @@ -138,7 +138,7 @@ public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) { @Override public HoodieUnMergedLogRecordScanner build() { return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange); + latestInstantTime, bufferSize, callback, instantRange); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 491c6700c9067..33c8755e222fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -65,24 +65,22 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { public HoodieAvroDataBlock(FSDataInputStream inputStream, Option content, - boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, Option readerSchema, Map header, Map footer, String keyField, InternalSchema internalSchema) { - super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false, internalSchema); + super(content, inputStream, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false, internalSchema); } public HoodieAvroDataBlock(FSDataInputStream inputStream, Option content, - boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, Option readerSchema, Map header, Map footer, String keyField) { - super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); + super(content, inputStream, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); } public HoodieAvroDataBlock(@Nonnull List records, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index c44f1950144b5..eb63eefcf7111 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -40,13 +40,13 @@ public enum HoodieCommandBlockTypeEnum { } public HoodieCommandBlock(Map header) { - this(Option.empty(), null, false, Option.empty(), header, new HashMap<>()); + this(Option.empty(), null, Option.empty(), header, new HashMap<>()); } - public HoodieCommandBlock(Option content, FSDataInputStream inputStream, boolean readBlockLazily, + public HoodieCommandBlock(Option content, FSDataInputStream inputStream, Option blockContentLocation, Map header, Map footer) { - super(header, footer, blockContentLocation, content, inputStream, readBlockLazily); + super(header, footer, blockContentLocation, content, inputStream); this.type = HoodieCommandBlockTypeEnum.values()[Integer.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))]; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java index 3e4f571588684..7c274f7e089d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java @@ -31,15 +31,15 @@ */ public class HoodieCorruptBlock extends HoodieLogBlock { - public HoodieCorruptBlock(Option corruptedBytes, FSDataInputStream inputStream, boolean readBlockLazily, + public HoodieCorruptBlock(Option corruptedBytes, FSDataInputStream inputStream, Option blockContentLocation, Map header, Map footer) { - super(header, footer, blockContentLocation, corruptedBytes, inputStream, readBlockLazily); + super(header, footer, blockContentLocation, corruptedBytes, inputStream); } @Override public byte[] getContentBytes() throws IOException { - if (!getContent().isPresent() && readBlockLazily) { + if (!getContent().isPresent()) { // read content from disk inflate(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index c83b3bc82d56c..a41ba0e4b40a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -70,7 +70,7 @@ public HoodieDataBlock(List records, Map header, Map footer, String keyFieldName) { - super(header, footer, Option.empty(), Option.empty(), null, false); + super(header, footer, Option.empty(), Option.empty(), null); this.records = Option.of(records); this.keyFieldName = keyFieldName; // If no reader-schema has been provided assume writer-schema as one @@ -83,14 +83,13 @@ public HoodieDataBlock(List records, */ protected HoodieDataBlock(Option content, FSDataInputStream inputStream, - boolean readBlockLazily, Option blockContentLocation, Option readerSchema, Map headers, Map footer, String keyFieldName, boolean enablePointLookups) { - super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily); + super(headers, footer, blockContentLocation, content, inputStream); this.records = Option.empty(); this.keyFieldName = keyFieldName; // If no reader-schema has been provided assume writer-schema as one @@ -100,7 +99,6 @@ protected HoodieDataBlock(Option content, protected HoodieDataBlock(Option content, FSDataInputStream inputStream, - boolean readBlockLazily, Option blockContentLocation, Option readerSchema, Map headers, @@ -108,7 +106,7 @@ protected HoodieDataBlock(Option content, String keyFieldName, boolean enablePointLookups, InternalSchema internalSchema) { - super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily); + super(headers, footer, blockContentLocation, content, inputStream); this.records = Option.empty(); this.keyFieldName = keyFieldName; // If no reader-schema has been provided assume writer-schema as one @@ -180,7 +178,7 @@ public final ClosableIterator getRecordIterator(List keys } protected ClosableIterator readRecordsFromBlockPayload() throws IOException { - if (readBlockLazily && !getContent().isPresent()) { + if (!getContent().isPresent()) { // read log block contents from disk inflate(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index a5168072d014d..6a23d41615176 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -44,14 +44,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock { private DeleteRecord[] recordsToDelete; public HoodieDeleteBlock(DeleteRecord[] recordsToDelete, Map header) { - this(Option.empty(), null, false, Option.empty(), header, new HashMap<>()); + this(Option.empty(), null, Option.empty(), header, new HashMap<>()); this.recordsToDelete = recordsToDelete; } - public HoodieDeleteBlock(Option content, FSDataInputStream inputStream, boolean readBlockLazily, + public HoodieDeleteBlock(Option content, FSDataInputStream inputStream, Option blockContentLocation, Map header, Map footer) { - super(header, footer, blockContentLocation, content, inputStream, readBlockLazily); + super(header, footer, blockContentLocation, content, inputStream); } @Override @@ -61,7 +61,7 @@ public byte[] getContentBytes() throws IOException { // In case this method is called before realizing keys from content if (content.isPresent()) { return content.get(); - } else if (readBlockLazily && recordsToDelete == null) { + } else if (recordsToDelete == null) { // read block lazily getRecordsToDelete(); } @@ -78,7 +78,7 @@ public byte[] getContentBytes() throws IOException { public DeleteRecord[] getRecordsToDelete() { try { if (recordsToDelete == null) { - if (!getContent().isPresent() && readBlockLazily) { + if (!getContent().isPresent()) { // read content from disk inflate(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 72cb3a0ef3b47..e1d0635355f69 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -74,14 +74,13 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { public HoodieHFileDataBlock(FSDataInputStream inputStream, Option content, - boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, Option readerSchema, Map header, Map footer, boolean enablePointLookups, Path pathForReader) { - super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieHFileReader.KEY_FIELD_NAME, enablePointLookups); + super(content, inputStream, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieHFileReader.KEY_FIELD_NAME, enablePointLookups); this.compressionAlgorithm = Option.empty(); this.pathForReader = pathForReader; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 71336be883781..fefa6f13de40c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -64,22 +64,18 @@ public abstract class HoodieLogBlock { // create handlers to return specific type of inputstream based on FS // input stream corresponding to the log file where this logBlock belongs private final FSDataInputStream inputStream; - // Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory intensive) - protected boolean readBlockLazily; public HoodieLogBlock( @Nonnull Map logBlockHeader, @Nonnull Map logBlockFooter, @Nonnull Option blockContentLocation, @Nonnull Option content, - @Nullable FSDataInputStream inputStream, - boolean readBlockLazily) { + @Nullable FSDataInputStream inputStream) { this.logBlockHeader = logBlockHeader; this.logBlockFooter = logBlockFooter; this.blockContentLocation = blockContentLocation; this.content = content; this.inputStream = inputStream; - this.readBlockLazily = readBlockLazily; } // Return the bytes representation of the data belonging to a LogBlock @@ -245,19 +241,11 @@ public static Map getLogMetadata(DataInputStream dis * Read or Skip block content of a log block in the log file. Depends on lazy reading enabled in * {@link HoodieMergedLogRecordScanner} */ - public static Option tryReadContent(FSDataInputStream inputStream, Integer contentLength, boolean readLazily) + public static Option tryReadContent(FSDataInputStream inputStream, Integer contentLength) throws IOException { - if (readLazily) { - // Seek to the end of the content block - inputStream.seek(inputStream.getPos() + contentLength); - return Option.empty(); - } - - // TODO re-use buffer if stream is backed by buffer - // Read the contents in memory - byte[] content = new byte[contentLength]; - inputStream.readFully(content, 0, contentLength); - return Option.of(content); + // Seek to the end of the content block + inputStream.seek(inputStream.getPos() + contentLength); + return Option.empty(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 5e7bef90a08ba..e34da6affe0c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -59,13 +59,12 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { public HoodieParquetDataBlock(FSDataInputStream inputStream, Option content, - boolean readBlockLazily, HoodieLogBlockContentLocation logBlockContentLocation, Option readerSchema, Map header, Map footer, String keyField) { - super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); + super(content, inputStream, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); this.compressionCodecName = Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index cbd7e6c17511c..827cca07e2ace 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -60,7 +60,7 @@ private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, Stri ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, Option instantRange, boolean allowFullScan) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, true, false, bufferSize, + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 2dea1c9431316..7009cca0ced59 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.AppendResult; -import org.apache.hudi.common.table.log.HoodieLogFileReader; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -50,7 +49,6 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; -import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; @@ -73,7 +71,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.io.UncheckedIOException; @@ -414,7 +411,7 @@ public void testHugeLogFileWrite() throws IOException, URISyntaxException, Inter header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(new Configuration(), null, 0, dataBlockContentBytes.length, 0); - HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false, + HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), logBlockContentLoc, Option.ofNullable(getSimpleSchema()), header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); long writtenSize = 0; int logBlockWrittenNum = 0; @@ -839,7 +836,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di writer.appendBlock(dataBlock); // Write 2 - header = new HashMap<>(); + header.clear(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); @@ -847,7 +844,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di writer.appendBlock(dataBlock); // Rollback the last write - header = new HashMap<>(); + header.clear(); header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101"); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, @@ -856,7 +853,7 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di writer.appendBlock(commandBlock); // Write 3 - header = new HashMap<>(); + header.clear(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List copyOfRecords3 = records3.stream() @@ -1730,204 +1727,6 @@ public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpi diskMapType, isCompressionEnabled, readBlocksLazily); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) - throws IOException, URISyntaxException, InterruptedException { - Writer writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - Schema schema = getSimpleSchema(); - List records1 = SchemaTestUtil.generateTestRecords(0, 100); - List copyOfRecords1 = records1.stream() - .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header); - writer.appendBlock(dataBlock); - writer.close(); - - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List records2 = SchemaTestUtil.generateTestRecords(0, 100); - List copyOfRecords2 = records2.stream() - .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header); - writer.appendBlock(dataBlock); - writer.close(); - - // Close and Open again and append 100 more records - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List records3 = SchemaTestUtil.generateTestRecords(0, 100); - List copyOfRecords3 = records3.stream() - .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header); - writer.appendBlock(dataBlock); - writer.close(); - - FileCreateUtils.createDeltaCommit(basePath, "100", fs); - - HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) { - - assertTrue(reader.hasPrev(), "Last block should be available"); - HoodieLogBlock prevBlock = reader.prev(); - HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; - - List recordsRead1 = getRecords(dataBlockRead); - assertEquals(copyOfRecords3.size(), recordsRead1.size(), - "Third records size should be equal to the written records size"); - assertEquals(copyOfRecords3, recordsRead1, - "Both records lists should be the same. (ordering guaranteed)"); - - assertTrue(reader.hasPrev(), "Second block should be available"); - prevBlock = reader.prev(); - dataBlockRead = (HoodieDataBlock) prevBlock; - List recordsRead2 = getRecords(dataBlockRead); - assertEquals(copyOfRecords2.size(), recordsRead2.size(), - "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords2, recordsRead2, - "Both records lists should be the same. (ordering guaranteed)"); - - assertTrue(reader.hasPrev(), "First block should be available"); - prevBlock = reader.prev(); - dataBlockRead = (HoodieDataBlock) prevBlock; - List recordsRead3 = getRecords(dataBlockRead); - assertEquals(copyOfRecords1.size(), recordsRead3.size(), - "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords1, recordsRead3, - "Both records lists should be the same. (ordering guaranteed)"); - - assertFalse(reader.hasPrev()); - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) - throws IOException, URISyntaxException, InterruptedException { - Writer writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - Schema schema = getSimpleSchema(); - List records = SchemaTestUtil.generateTestRecords(0, 100); - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); - - FileCreateUtils.createDeltaCommit(basePath, "100", fs); - - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) - fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); - FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); - // create a block with - outputStream.write(HoodieLogFormat.MAGIC); - outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); - // Write out a length that does not confirm with the content - outputStream.writeInt(1000); - // Write out footer length - outputStream.writeInt(1); - // Write out some metadata - // TODO : test for failure to write metadata - NA ? - outputStream.write(HoodieLogBlock.getLogMetadataBytes(header)); - outputStream.write("something-random".getBytes()); - outputStream.flush(); - outputStream.close(); - - // Should be able to append a new block - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - records = SchemaTestUtil.generateTestRecords(0, 100); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); - - // First round of reads - we should be able to read the first block and then EOF - HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - - try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) { - - assertTrue(reader.hasPrev(), "Last block should be available"); - HoodieLogBlock block = reader.prev(); - assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock"); - - assertTrue(reader.hasPrev(), "Last block should be available"); - assertThrows(CorruptedLogFileException.class, () -> { - reader.prev(); - }); - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) - throws IOException, URISyntaxException, InterruptedException { - Writer writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - Schema schema = getSimpleSchema(); - List records1 = SchemaTestUtil.generateTestRecords(0, 100); - List copyOfRecords1 = records1.stream() - .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header); - writer.appendBlock(dataBlock); - writer.close(); - - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List records2 = SchemaTestUtil.generateTestRecords(0, 100); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header); - writer.appendBlock(dataBlock); - writer.close(); - - // Close and Open again and append 100 more records - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List records3 = SchemaTestUtil.generateTestRecords(0, 100); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header); - writer.appendBlock(dataBlock); - writer.close(); - - FileCreateUtils.createDeltaCommit(basePath, "100", fs); - - HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) { - - assertTrue(reader.hasPrev(), "Third block should be available"); - reader.moveToPrev(); - - assertTrue(reader.hasPrev(), "Second block should be available"); - reader.moveToPrev(); - - // After moving twice, this last reader.prev() should read the First block written - assertTrue(reader.hasPrev(), "First block should be available"); - HoodieLogBlock prevBlock = reader.prev(); - HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; - List recordsRead = getRecords(dataBlockRead); - assertEquals(copyOfRecords1.size(), recordsRead.size(), - "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords1, recordsRead, - "Both records lists should be the same. (ordering guaranteed)"); - - assertFalse(reader.hasPrev()); - } - } - @Test public void testV0Format() throws IOException, URISyntaxException { // HoodieLogFormatVersion.DEFAULT_VERSION has been deprecated so we cannot