Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;

/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,9 @@ public void close() throws IOException {
public boolean isBroken() {
return state == State.BROKEN;
}

@Override
public long getSyncedLength() {
return this.ackedBlockLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {

private final ExecutorService executor;

private volatile long syncedLength = 0;

public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -91,7 +93,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
out.hflush();
}
}
future.complete(out.getPos());
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
Expand Down Expand Up @@ -124,4 +130,9 @@ public void close() throws IOException {
public boolean isBroken() {
return false;
}

@Override
public long getSyncedLength() {
return this.syncedLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,12 @@ private void waitForSafePoint() {
}
}

private long closeWriter() {
AsyncWriter oldWriter = this.writer;
if (oldWriter != null) {
long fileLength = oldWriter.getLength();
protected final long closeWriter(AsyncWriter writer) {
if (writer != null) {
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
oldWriter.close();
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
}
Expand All @@ -693,7 +692,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter();
long oldFileLen = closeWriter(this.writer);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
Expand All @@ -719,7 +718,8 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter();
closeWriter(this.writer);
this.writer = null;
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
Expand All @@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter

protected FSDataOutputStream output;

private final AtomicLong syncedLength = new AtomicLong(0);

@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
Expand Down Expand Up @@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException {
} else {
fsdos.hflush();
}
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
}

@Override
public long getSyncedLength() {
return this.syncedLength.get();
}

public FSDataOutputStream getStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -74,6 +75,22 @@ public interface WALProvider {

interface WriterBase extends Closeable {
long getLength();
/**
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
* according to the visibility guarantee of HDFS, the data will be available immediately
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
* This means replication may read uncommitted data and replicate it to the remote cluster
* and cause data inconsistency.
* The method {@link WriterBase#getLength} may return length which just in hdfs client
* buffer and not successfully synced to HDFS, so we use this method to return the length
* successfully synced to HDFS and replication thread could only read writing WAL file
* limited by this length.
* see also HBASE-14004 and this document for more details:
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
* @return byteSize successfully synced to underlying filesystem.
*/
long getSyncedLength();
}

// Writers are used internally. Users outside of the WAL should be relying on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,35 +130,40 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}

@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
@Override
public void sync(boolean forceSync) throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync(forceSync);
}

@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}

@Override
public long getLength() {
return w.getLength();
}
};
@Override
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}

// Make up mocked server and services.
RegionServerServices services = mock(RegionServerServices.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down Expand Up @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter1.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter1.getSyncedLength();
}
};
log.setWriter(newWriter1);

Expand Down Expand Up @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,7 @@ public void testReplicationSourceWALReaderRecovered() throws Exception {
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
// Actually this should be true but we haven't handled this yet since for a normal queue the
// last one is always open... Not a big deal for now.
assertFalse(batch.isEndOfFile());
assertTrue(batch.isEndOfFile());

assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
}
Expand Down