Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException {
long contentPosition = inputStream.getPos();
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
return HoodieCorruptBlock.getBlock(logFile, inputStream, Option.ofNullable(corruptedBytes), readBlockLazily,
contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<>(), new HashMap<>());
contentPosition, corruptedBlockSize, nextBlockOffset, new HashMap<>(), new HashMap<>());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be nextBlockOffset directly ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, I also think so.


private boolean isBlockCorrupt(int blocksize) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public byte[] getContentBytes() {
}

public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Map<HeaderMetadataType, String> header,
boolean readBlockLazily, long position, long blockSize, long blockEndPos, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {

return new HoodieCommandBlock(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), header, footer);
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,63 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
reader.close();
}

@Test
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(records, header);
writer.appendBlock(dataBlock);
writer.close();

// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
outputStream.writeLong(474);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(400);
// Write out incomplete content
outputStream.write("something-random".getBytes());
// get corrupt block end position
long corruptBlockEndPos = outputStream.getPos();
outputStream.flush();
outputStream.close();

// Append a proper block again
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 10);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(records, header);
writer.appendBlock(dataBlock);
writer.close();

// Read data and corrupt block
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
HoodieLogBlock block = reader.next();
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
// validate the corrupt block end position correctly.
assertEquals(corruptBlockEndPos, block.getBlockContentLocation().get().getBlockEndPos());
assertTrue(reader.hasNext(), "Third block should be available");
reader.next();
assertFalse(reader.hasNext(), "There should be no more block left");

reader.close();
}

@ParameterizedTest
@MethodSource("testArguments")
public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,
Expand Down