diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 26afe6aec740a..e6ead54a48d77 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -284,7 +284,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException { long contentPosition = inputStream.getPos(); byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily); return HoodieCorruptBlock.getBlock(logFile, inputStream, Option.ofNullable(corruptedBytes), readBlockLazily, - contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<>(), new HashMap<>()); + contentPosition, corruptedBlockSize, nextBlockOffset, new HashMap<>(), new HashMap<>()); } private boolean isBlockCorrupt(int blocksize) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index 558053bc3f39f..08909233a576b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -67,10 +67,10 @@ public byte[] getContentBytes() { } public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, - boolean readBlockLazily, long position, long blockSize, long blockEndpos, Map header, + boolean readBlockLazily, long position, long blockSize, long blockEndPos, Map header, Map footer) { return new HoodieCommandBlock(content, inputStream, readBlockLazily, - Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), header, footer); + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 17bf9135381e5..e95d2d00e3c77 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -603,6 +603,63 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep reader.close(); } + @Test + public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + HoodieDataBlock dataBlock = getDataBlock(records, header); + writer.appendBlock(dataBlock); + writer.close(); + + // Append some arbit byte[] to the end of the log (mimics a partially written commit) + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + // Write out a length that does not confirm with the content + outputStream.writeLong(474); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION); + // Write out a length that does not confirm with the content + outputStream.writeLong(400); + // Write out incomplete content + outputStream.write("something-random".getBytes()); + // get corrupt block end position + long corruptBlockEndPos = outputStream.getPos(); + outputStream.flush(); + outputStream.close(); + + // Append a proper block again + writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 10); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + dataBlock = getDataBlock(records, header); + writer.appendBlock(dataBlock); + writer.close(); + + // Read data and corrupt block + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + assertTrue(reader.hasNext(), "First block should be available"); + reader.next(); + assertTrue(reader.hasNext(), "We should have corrupted block next"); + HoodieLogBlock block = reader.next(); + assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block"); + // validate the corrupt block end position correctly. + assertEquals(corruptBlockEndPos, block.getBlockContentLocation().get().getBlockEndPos()); + assertTrue(reader.hasNext(), "Third block should be available"); + reader.next(); + assertFalse(reader.hasNext(), "There should be no more block left"); + + reader.close(); + } + @ParameterizedTest @MethodSource("testArguments") public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,