Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final String logWriteToken;
private final String rolloverLogWriteToken;
private FSDataOutputStream output;
private boolean closed = false;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";

/**
Expand All @@ -64,7 +65,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
* @param sizeThreshold
*/
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold,
String logWriteToken, String rolloverLogWriteToken) throws IOException, InterruptedException {
String logWriteToken, String rolloverLogWriteToken) {
this.fs = fs;
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
Expand All @@ -73,40 +74,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
this.logWriteToken = logWriteToken;
this.rolloverLogWriteToken = rolloverLogWriteToken;
addShutDownHook();
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
LOG.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
LOG.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
/*
* Before throwing an exception, close the outputstream,
* to ensure that the lease on the log file is released.
*/
close();
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
LOG.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
LOG.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
}
}

public FileSystem getFs() {
Expand All @@ -122,16 +89,64 @@ public long getSizeThreshold() {
return sizeThreshold;
}

/**
* Lazily opens the output stream if needed for writing.
* @return OutputStream for writing to current log file.
* @throws IOException
* @throws InterruptedException
*/
private FSDataOutputStream getOutputStream() throws IOException, InterruptedException {
if (this.output == null) {
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
LOG.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
LOG.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
/*
* Before throwing an exception, close the outputstream,
* to ensure that the lease on the log file is released.
*/
close();
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
LOG.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
LOG.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
}
}
return output;
}

@Override
public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {

// Find current version
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
long currentSize = this.output.size();

FSDataOutputStream outputStream = getOutputStream();
long currentSize = outputStream.size();

// 1. Write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC);
outputStream.write(HoodieLogFormat.MAGIC);

// bytes for header
byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
Expand All @@ -141,27 +156,27 @@ public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedE
byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());

// 2. Write the total size of the block (excluding Magic)
this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));

// 3. Write the version of this log block
this.output.writeInt(currentLogFormatVersion.getVersion());
outputStream.writeInt(currentLogFormatVersion.getVersion());
// 4. Write the block type
this.output.writeInt(block.getBlockType().ordinal());
outputStream.writeInt(block.getBlockType().ordinal());

// 5. Write the headers for the log block
this.output.write(headerBytes);
outputStream.write(headerBytes);
// 6. Write the size of the content block
this.output.writeLong(content.length);
outputStream.writeLong(content.length);
// 7. Write the contents of the data block
this.output.write(content);
outputStream.write(content);
// 8. Write the footers for the log block
this.output.write(footerBytes);
outputStream.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is everything written
// until now (for reverse pointer)
// Update: this information is now used in determining if a block is corrupt by comparing to the
// block size in header. This change assumes that the block size will be the last data written
// to a block. Read will break if any data is written past this point for a block.
this.output.writeLong(this.output.size() - currentSize);
outputStream.writeLong(outputStream.size() - currentSize);
// Flush every block to disk
flush();

Expand Down Expand Up @@ -207,9 +222,12 @@ private void createNewFile() throws IOException {

@Override
public void close() throws IOException {
flush();
output.close();
output = null;
if (output != null) {
flush();
output.close();
output = null;
closed = true;
}
}

private void flush() throws IOException {
Expand All @@ -224,9 +242,13 @@ private void flush() throws IOException {

@Override
public long getCurrentSize() throws IOException {
if (output == null) {
if (closed) {
throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
}

if (output == null) {
return 0;
}
return output.getPos();
}

Expand Down Expand Up @@ -302,5 +324,4 @@ private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void testBasicAppend(HoodieLogBlockType dataBlockType) throws IOException
assertEquals(size, fs.getFileStatus(writer.getLogFile().getPath()).getLen(),
"Write should be auto-flushed. The size reported by FileStatus and the writer should match");
writer.close();

}

@ParameterizedTest
Expand Down Expand Up @@ -174,6 +175,8 @@ public void testRollover() throws IOException, InterruptedException, URISyntaxEx
writer = writer.appendBlock(dataBlock);
assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0");
assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2");
Path logFilePath = writer.getLogFile().getPath();
assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not exist");
writer.close();
}

Expand Down Expand Up @@ -216,16 +219,16 @@ private void testConcurrentAppend(boolean logFileExists, boolean newLogFileForma
builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
}
Writer writer = builder1.build();
Writer writer2 = builder2.build();
HoodieLogFile logFile1 = writer.getLogFile();
HoodieLogFile logFile2 = writer2.getLogFile();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
Writer writer2 = builder2.build();
writer2 = writer2.appendBlock(dataBlock);
HoodieLogFile logFile1 = writer.getLogFile();
HoodieLogFile logFile2 = writer2.getLogFile();
writer.close();
writer2.close();
assertNotNull(logFile1.getLogWriteToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
Expand Down Expand Up @@ -139,6 +140,11 @@ public void testFailedToGetAppendStreamFromHDFSNameNode()
writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(fs).build();
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));

writer.appendBlock(new HoodieCommandBlock(header));
// The log version should be different for this new writer
assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion);
}
Expand Down