Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;

/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,4 +575,9 @@ public void close() throws IOException {
public boolean isBroken() {
return state == State.BROKEN;
}

@Override
public long getSyncedLength() {
return this.ackedBlockLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {

private final ExecutorService executor;

private volatile long syncedLength = 0;

public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -91,7 +93,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
out.hflush();
}
}
future.complete(out.getPos());
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read-followedby-update also needs to be atomic, yes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is just for test so not a big problem but aligning with other producation implementations is better. Can have an addendum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, reviewed the code again, actuall, the flush0 method can only be executed in a single thread so no need to use AtomicUtils.updateMax. The AtomicLong is in the ProtobufLogWriter, not the output stream. But the 'if(pos > this.syncedLength) {' is a bit confusing to developers, I prefer we just remove this check...

}
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
Expand Down Expand Up @@ -124,4 +130,9 @@ public void close() throws IOException {
public boolean isBroken() {
return false;
}

@Override
public long getSyncedLength() {
return this.syncedLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public long getLength() {
return writers.get(0).getLength();
}

@Override
public long getSyncedLength() {
return writers.get(0).getSyncedLength();
}

@Override
public void close() throws IOException {
Exception error = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
Expand All @@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter

protected FSDataOutputStream output;

private final AtomicLong syncedLength = new AtomicLong(0);

@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
Expand Down Expand Up @@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException {
} else {
fsdos.hflush();
}
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
}

@Override
public long getSyncedLength() {
return this.syncedLength.get();
}

public FSDataOutputStream getStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -74,6 +75,22 @@ public interface WALProvider {

interface WriterBase extends Closeable {
long getLength();
/**
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
* according to the visibility guarantee of HDFS, the data will be available immediately
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
* This means replication may read uncommitted data and replicate it to the remote cluster
* and cause data inconsistency.
* The method {@link WriterBase#getLength} may return length which just in hdfs client
* buffer and not successfully synced to HDFS, so we use this method to return the length
* successfully synced to HDFS and replication thread could only read writing WAL file
* limited by this length.
* see also HBASE-14004 and this document for more details:
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
* @return byteSize successfully synced to underlying filesystem.
*/
long getSyncedLength();
}

// Writers are used internally. Users outside of the WAL should be relying on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down Expand Up @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter1.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter1.getSyncedLength();
}
};
log.setWriter(newWriter1);

Expand Down Expand Up @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public long getLength() {
return asyncWriter.getLength();
}

@Override
public long getSyncedLength() {
return asyncWriter.getSyncedLength();
}

@Override
public void append(Entry entry) throws IOException {
asyncWriter.append(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public long getLength() {
return localWriter.getLength();
}

@Override
public long getSyncedLength() {
return this.localWriter.getSyncedLength();
}

@Override
public void close() throws IOException {
Closeables.close(localWriter, true);
Expand Down