diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 3c520b80e3a1..059ca00b02cc 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable { */ @Override void close() throws IOException; + + /** + * @return byteSize success synced to underlying filesystem. + */ + long getSyncedLength(); } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index ed5bbf0f7dac..457b7c165024 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -574,4 +574,9 @@ public void close() throws IOException { public boolean isBroken() { return state == State.BROKEN; } + + @Override + public long getSyncedLength() { + return this.ackedBlockLength; + } } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java index bbb4e5468936..39f1f71e2473 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput { private final ExecutorService executor; + private volatile long syncedLength = 0; + public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { this.out = out; this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) @@ -91,7 +93,11 @@ private void flush0(CompletableFuture future, ByteArrayOutputStream buffer out.hflush(); } } - future.complete(out.getPos()); + long pos = out.getPos(); + if(pos > this.syncedLength) { + this.syncedLength = pos; + } + future.complete(pos); } catch (IOException e) { future.completeExceptionally(e); return; @@ -124,4 +130,9 @@ public void close() throws IOException { public boolean isBroken() { return false; } + + @Override + public long getSyncedLength() { + return this.syncedLength; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index bf53352c5c0e..a978dbedeae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -1061,7 +1061,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) { Path currentPath = getOldPath(); if (path.equals(currentPath)) { W writer = this.writer; - return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); + return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); } else { return OptionalLong.empty(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index ab709d453034..3c799bf887e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -671,13 +671,12 @@ private void waitForSafePoint() { } } - private long closeWriter() { - AsyncWriter oldWriter = this.writer; - if (oldWriter != null) { - long fileLength = oldWriter.getLength(); + protected final long closeWriter(AsyncWriter writer) { + if (writer != null) { + long fileLength = writer.getLength(); closeExecutor.execute(() -> { try { - oldWriter.close(); + writer.close(); } catch (IOException e) { LOG.warn("close old writer failed", e); } @@ -693,7 +692,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(); + long oldFileLen = closeWriter(this.writer); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -719,7 +718,8 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite @Override protected void doShutdown() throws IOException { waitForSafePoint(); - closeWriter(); + closeWriter(this.writer); + this.writer = null; closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e731611b5c7b..8c944b1bdf57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws protected OutputStream getOutputStreamForCellEncoder() { return asyncOutputWrapper; } + + @Override + public long getSyncedLength() { + return this.output.getSyncedLength(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ff08da8f4434..4bbc13d3ab88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -19,11 +19,14 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.FSHLogProvider; @@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter protected FSDataOutputStream output; + private final AtomicLong syncedLength = new AtomicLong(0); + @Override public void append(Entry entry) throws IOException { entry.getKey().getBuilder(compressor). @@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException { } else { fsdos.hflush(); } + AtomicUtils.updateMax(this.syncedLength, fsdos.getPos()); + } + + @Override + public long getSyncedLength() { + return this.syncedLength.get(); } public FSDataOutputStream getStream() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 6f0b983444ce..c3bd14995077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -74,6 +75,22 @@ public interface WALProvider { interface WriterBase extends Closeable { long getLength(); + /** + * NOTE: We add this method for {@link WALFileLengthProvider} used for replication, + * considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently, + * according to the visibility guarantee of HDFS, the data will be available immediately + * when arriving at DN since all the DNs will be considered as the last one in pipeline. + * This means replication may read uncommitted data and replicate it to the remote cluster + * and cause data inconsistency. + * The method {@link WriterBase#getLength} may return length which just in hdfs client + * buffer and not successfully synced to HDFS, so we use this method to return the length + * successfully synced to HDFS and replication thread could only read writing WAL file + * limited by this length. + * see also HBASE-14004 and this document for more details: + * https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit# + * @return byteSize successfully synced to underlying filesystem. + */ + long getSyncedLength(); } // Writers are used internally. Users outside of the WAL should be relying on the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 25ea11216c14..198e64bb7a88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -130,35 +130,40 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce @Override protected Writer createWriterInstance(Path path) throws IOException { final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } - @Override - public void sync(boolean forceSync) throws IOException { - if (throwSyncException) { - throw new IOException("FAKE! Failed to replace a bad datanode..."); - } - w.sync(forceSync); + @Override + public void sync(boolean forceSync) throws IOException { + if (throwSyncException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); } + w.sync(forceSync); + } - @Override - public void append(Entry entry) throws IOException { - if (throwAppendException) { - throw new IOException("FAKE! Failed to replace a bad datanode..."); - } - w.append(entry); + @Override + public void append(Entry entry) throws IOException { + if (throwAppendException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); } + w.append(entry); + } - @Override - public long getLength() { - return w.getLength(); - } - }; + @Override + public long getLength() { + return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } + }; } + } // Make up mocked server and services. RegionServerServices services = mock(RegionServerServices.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 3448eb77f1cc..41ca8a855535 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1251,6 +1251,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index a50ef7800452..21f1774245c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return w.getLength(); } + + @Override + public long getSyncedLength() { + return w.getSyncedLength(); + } }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 704cdfa726b4..f31a908000b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -155,6 +155,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public CompletableFuture sync(boolean forceSync) { CompletableFuture result = writer.sync(forceSync); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java index 353f54935437..a482d934e948 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java @@ -109,6 +109,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public CompletableFuture sync(boolean forceSync) { writerSyncFlag = forceSync; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java index 9c460588fdbc..3c250446bec9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java @@ -84,6 +84,11 @@ public long getLength() { return writer.getLength(); } + @Override + public long getSyncedLength() { + return writer.getSyncedLength(); + } + @Override public void sync(boolean forceSync) throws IOException { writerSyncFlag = forceSync; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 691250a56092..0712b594c34d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return oldWriter1.getLength(); } + + @Override + public long getSyncedLength() { + return oldWriter1.getSyncedLength(); + } }; log.setWriter(newWriter1); @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException { public long getLength() { return oldWriter2.getLength(); } + + @Override + public long getSyncedLength() { + return oldWriter2.getSyncedLength(); + } }; log.setWriter(newWriter2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 6b76a95392fa..2a21660dd47b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -448,9 +448,7 @@ public void testReplicationSourceWALReaderRecovered() throws Exception { batch = reader.take(); assertEquals(walPath, batch.getLastWalPath()); assertEquals(5, batch.getNbEntries()); - // Actually this should be true but we haven't handled this yet since for a normal queue the - // last one is always open... Not a big deal for now. - assertFalse(batch.isEndOfFile()); + assertTrue(batch.isEndOfFile()); assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); }