Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,22 @@ private void addShutDownHook() {
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blockSize;
long blockStartPos = inputStream.getPos();
try {
// 1 Read the total size of the block
blockSize = (int) inputStream.readLong();
} catch (EOFException | CorruptedLogFileException e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next MAGIC marker or EOF
return createCorruptBlock();
return createCorruptBlock(blockStartPos);
}

// We may have had a crash which could have written this block partially
// Skip blockSize in the stream and we should either find a sync marker (start of the next
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupted(blockSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Frankly, isBlockCorrupted seems to be more idiomatic than corrupt

if (isCorrupted) {
return createCorruptBlock();
return createCorruptBlock(blockStartPos);
}

// 2. Read the version for this log format
Expand Down Expand Up @@ -253,14 +254,14 @@ private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blo
return HoodieLogBlockType.values()[type];
}

private HoodieLogBlock createCorruptBlock() throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
inputStream.seek(blockStartPos);
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(currentPos);
inputStream.seek(blockStartPos);
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,20 +700,11 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType

@Test
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);

// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
FSDataOutputStream outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
Expand All @@ -728,17 +719,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
outputStream.close();

// Append a proper block that is of the missing length of the corrupted block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 10);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
logFile = addValidBlock("test-fileId1", "100", 10);

// First round of reads - we should be able to read the first block and then EOF
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
Expand All @@ -751,7 +735,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
reader.close();

// Simulate another failure back to back
outputStream = fs.append(writer.getLogFile().getPath());
outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
Expand All @@ -766,17 +750,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
outputStream.close();

// Should be able to append a new block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
logFile = addValidBlock("test-fileId1", "100", 100);

// Second round of reads - we should be able to read the first and last block
reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
Expand All @@ -792,6 +769,48 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
reader.close();
}

@Test
public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);

// Append just magic bytes and move onto next block
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(logFile.getPath());
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.flush();
outputStream.close();

// Append a proper block
logFile = addValidBlock("test-fileId1", "100", 10);

// First round of reads - we should be able to read the first block and then EOF
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
HoodieLogBlock block = reader.next();
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
assertTrue(reader.hasNext(), "Third block should be available");
reader.next();
assertFalse(reader.hasNext(), "There should be no more block left");

reader.close();
}

private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
return writer.getLogFile();
}

@Test
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
Expand Down