diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index c784684cc029c..2e2af79823c5d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -150,13 +150,14 @@ private void addShutDownHook() { // for max of Integer size private HoodieLogBlock readBlock() throws IOException { int blockSize; + long blockStartPos = inputStream.getPos(); try { // 1 Read the total size of the block blockSize = (int) inputStream.readLong(); } catch (EOFException | CorruptedLogFileException e) { // An exception reading any of the above indicates a corrupt block // Create a corrupt block by finding the next MAGIC marker or EOF - return createCorruptBlock(); + return createCorruptBlock(blockStartPos); } // We may have had a crash which could have written this block partially @@ -164,7 +165,7 @@ private HoodieLogBlock readBlock() throws IOException { // block) or EOF. If we did not find either of it, then this block is a corrupted block. boolean isCorrupted = isBlockCorrupted(blockSize); if (isCorrupted) { - return createCorruptBlock(); + return createCorruptBlock(blockStartPos); } // 2. Read the version for this log format @@ -253,14 +254,14 @@ private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blo return HoodieLogBlockType.values()[type]; } - private HoodieLogBlock createCorruptBlock() throws IOException { - LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos()); - long currentPos = inputStream.getPos(); + private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException { + LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos); + inputStream.seek(blockStartPos); long nextBlockOffset = scanForNextAvailableBlockOffset(); // Rewind to the initial start and read corrupted bytes till the nextBlockOffset - inputStream.seek(currentPos); + inputStream.seek(blockStartPos); LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset); - int corruptedBlockSize = (int) (nextBlockOffset - currentPos); + int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos); long contentPosition = inputStream.getPos(); Option corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 6d084f8e17869..f87e5a41b8439 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -700,20 +700,11 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType @Test public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException { - Writer writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List records = SchemaTestUtil.generateTestRecords(0, 100); - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100); // Append some arbit byte[] to the end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); - FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + FSDataOutputStream outputStream = fs.append(logFile.getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); // Write out a length that does not confirm with the content @@ -728,17 +719,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep outputStream.close(); // Append a proper block that is of the missing length of the corrupted block - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - records = SchemaTestUtil.generateTestRecords(0, 10); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + logFile = addValidBlock("test-fileId1", "100", 10); // First round of reads - we should be able to read the first block and then EOF - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should have corrupted block next"); @@ -751,7 +735,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep reader.close(); // Simulate another failure back to back - outputStream = fs.append(writer.getLogFile().getPath()); + outputStream = fs.append(logFile.getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); // Write out a length that does not confirm with the content @@ -766,17 +750,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep outputStream.close(); // Should be able to append a new block - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - records = SchemaTestUtil.generateTestRecords(0, 100); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + logFile = addValidBlock("test-fileId1", "100", 100); // Second round of reads - we should be able to read the first and last block - reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should get the 1st corrupted block next"); @@ -792,6 +769,48 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep reader.close(); } + @Test + public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException { + HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100); + + // Append just magic bytes and move onto next block + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + FSDataOutputStream outputStream = fs.append(logFile.getPath()); + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.flush(); + outputStream.close(); + + // Append a proper block + logFile = addValidBlock("test-fileId1", "100", 10); + + // First round of reads - we should be able to read the first block and then EOF + Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); + assertTrue(reader.hasNext(), "First block should be available"); + reader.next(); + assertTrue(reader.hasNext(), "We should have corrupted block next"); + HoodieLogBlock block = reader.next(); + assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block"); + assertTrue(reader.hasNext(), "Third block should be available"); + reader.next(); + assertFalse(reader.hasNext(), "There should be no more block left"); + + reader.close(); + } + + private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, numRecords); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); + writer.appendBlock(dataBlock); + writer.close(); + return writer.getLogFile(); + } + @Test public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException { Writer writer =