Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
}
}
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
/**
* This flush0 method could only be called by single thread, so here we could
* safely overwrite without any synchronization.
*/
this.syncedLength = pos;
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter

private final Class<? extends Channel> channelClass;

private AsyncFSOutput output;
private volatile AsyncFSOutput output;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to make it volatile? IIRC, it will be initialized in the init call and we will always call it when constructing a new ProtobufLogWriter. Make it volatile may cause developer confusing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output would be set to null in AsyncProtobufLogWriter.close and would be used in getSyncedLength method, and seems there is no explicit constraint on which thread could invoke AsyncProtobufLogWriter.close , so here use volatile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will also use output in other methods where we do not test whether it is null, only test it in this method seems a bit confusing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, I ignored other write methods also not test whether output is null. The problem here is it is meaningless to call AsyncProtobufLogWriter.getSyncedLength after AsyncProtobufLogWriter.close , which indeed occured when #1970 is applied to branch-2 previously caused by problematic state synchronization.

If we do not test whether output is null, we can just leave some comments here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, then this is another problem? I think it is allowed to call getSyncedLength after closing? We should not throw IllegalStateException...

Copy link
Contributor Author

@comnetwork comnetwork Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ,the problem description is :
https://issues.apache.org/jira/browse/HBASE-24625?focusedCommentId=17152610&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17152610

In my opinion, AsyncProtobufLogWriter.getSyncedLength is used for WALFileLengthProvider.getLogFileSizeIfBeingWritten and is just meaningful when the WAL file is current writing, once AsyncProtobufLogWriter is closed(BTW, AsyncProtobufLogWriter.output is null), upperlayer code seems no need to call this method, and if upperlayer code still call this method after closed, it may indicates some synchronization error or other.

If we allow to call getSyncedLength after closing, we should save the syncedLength when AsyncProtobufLogWriter.close , or else getSyncedLength will throw NullPointException or just return 0, both are unexpected and may cause upperlayer code error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practise, as we will call getSyncedLength in another thread, it is possible that we call getSyncedLength on a closed writer. Maybe we could introduce another field to save the final length of the file, and in getSyncedLength, we check whether we have this field set, if so we just return the value of this field, otherwise we will go to get output. WDYT?

Copy link
Contributor Author

@comnetwork comnetwork Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 , Yes, I agree with you, and would modify PR following your suggestion.
BTW, for now, AsyncProtobufLogWriter.getSyncedLength is only used when the WAL file is writing. AsyncProtobufLogWriter.getSyncedLength for closed wrter may be used for future use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 , I modify PR to introduce finalSyncedLength to save the syncedLength when AsyncProtobufLogWriter.close.

But in getSyncedLength method, I check whether output is null instead of checking whether finalSyncedLength is set, because I think the statement "this.output = null;" in AsyncProtobufLogWriter.close is a good sync point, if output is null, then finalSyncedLength must set, so we can return finalSyncedLength, else we return output.getSyncedLength.

If we use finalSyncedLength as a sync point, it is hard to decide whether the output is null or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 , please have a review, thanks.

/**
* Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
*/
private volatile long finalSyncedLength = -1;

private static final class OutputStreamWrapper extends OutputStream
implements ByteBufferWriter {
Expand Down Expand Up @@ -156,6 +160,13 @@ public synchronized void close() throws IOException {
LOG.warn("normal close failed, try recover", e);
output.recoverAndClose(null);
}
/**
* We have to call {@link AsyncFSOutput#getSyncedLength()}
* after {@link AsyncFSOutput#close()} to get the final length
* synced to underlying filesystem because {@link AsyncFSOutput#close()}
* may also flush some data to underlying filesystem.
*/
this.finalSyncedLength = this.output.getSyncedLength();
this.output = null;
}

Expand Down Expand Up @@ -234,6 +245,17 @@ protected OutputStream getOutputStreamForCellEncoder() {

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
/**
* The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
* is a sync point, if output is null, then finalSyncedLength must set,
* so we can return finalSyncedLength, else we return output.getSyncedLength
*/
AsyncFSOutput outputToUse = this.output;
if(outputToUse == null) {
long finalSyncedLengthToUse = this.finalSyncedLength;
assert finalSyncedLengthToUse >= 0;
return finalSyncedLengthToUse;
}
return outputToUse.getSyncedLength();
}
}