diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java index 3e8fee8db1f1..37dc504f7e1c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java @@ -51,4 +51,8 @@ public static ImmutableByteArray wrap(byte[] b) { public String toStringUtf8() { return Bytes.toString(b); } + + public String toStringBinary() { + return Bytes.toStringBinary(b); + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 449c4b7985a7..4614c2593317 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -135,10 +135,10 @@ public void testPartialRead() throws Exception { long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, getWalKeyImpl(ts, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts, scopes), edit); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, getWalKeyImpl(ts+1, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts+1, scopes), edit); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -149,10 +149,10 @@ public void testPartialRead() throws Exception { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true); + log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -193,7 +193,7 @@ public void testWALRecordReader() throws Exception { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -201,9 +201,8 @@ public void testWALRecordReader() throws Exception { log.rollWriter(); edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); + txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); log.shutdown(); walfactory.shutdown(); @@ -253,17 +252,15 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception { WAL log = walfactory.getWAL(info); byte [] value = Bytes.toBytes("value"); WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); + long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); Thread.sleep(10); // make sure 2nd edit gets a later timestamp edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); + txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit); log.sync(txid); log.shutdown(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 59b8502d5b22..5d6cbf0ab291 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7995,7 +7995,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List ringBuffer) - throws IOException { + WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer ringBuffer) + throws IOException { if (this.closed) { throw new IOException( - "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); + "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); } MutableLong txidHolder = new MutableLong(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { @@ -1066,7 +1071,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe ServerCall rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { @@ -1102,7 +1107,24 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) { } } + @Override + public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { + return append(info, key, edits, true, false); + } + + @Override + public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException { + return append(info, key, edits, false, closeRegion); + } + /** + * Append a set of edits to the WAL. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. + *

* NOTE: This append, at a time that is usually after this call returns, starts an mvcc * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must @@ -1113,10 +1135,21 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) { * passed in WALKey walKey parameter. Be warned that the WriteEntry is not * immediately available on return from this method. It WILL be available subsequent to a sync of * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. + * @param info the regioninfo associated with append + * @param key Modified by this call; we add to it this edits region edit/sequence id. + * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit + * sequence id that is after all currently appended edits. + * @param inMemstore Always true except for case where we are writing a region event marker, for + * example, a compaction completion record into the WAL; in this case the entry is just + * so we can finish an unfinished compaction -- it is not an edit for memstore. + * @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this + * region on this region server. The WAL implementation should remove all the related + * stuff, for example, the sequence id accounting. + * @return Returns a 'transaction id' and key will have the region edit/sequence id + * in it. */ - @Override - public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException; + protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index e93642dfda9b..14edaaee964d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -434,7 +434,7 @@ private void appendAndSync() { FSWALEntry entry = iter.next(); boolean appended; try { - appended = append(writer, entry); + appended = appendEntry(writer, entry); } catch (IOException e) { throw new AssertionError("should not happen", e); } @@ -615,13 +615,13 @@ protected boolean markerEditOnly() { } @Override - public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException { + protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { if (markerEditOnly() && !edits.isMetaEdit()) { throw new IOException("WAL is closing, only marker edit is allowed"); } - long txid = - stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); + long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion, + waitingConsumePayloads); if (shouldScheduleConsumer()) { consumeExecutor.execute(consumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b61a0768670f..d5187233feb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -433,12 +433,10 @@ protected void doShutdown() throws IOException { } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", - justification = "Will never be null") @Override - public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, - final boolean inMemstore) throws IOException { - return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, + protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, + final boolean inMemstore, boolean closeRegion) throws IOException { + return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion, disruptor.getRingBuffer()); } @@ -1100,7 +1098,7 @@ private void attainSafePoint(final long currentSequence) { */ void append(final FSWALEntry entry) throws Exception { try { - FSHLog.this.append(writer, entry); + FSHLog.this.appendEntry(writer, entry); } catch (Exception e) { String msg = "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 5bdec137d545..acb49b7cca37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -51,14 +51,16 @@ class FSWALEntry extends Entry { // they are only in memory and held here while passing over the ring buffer. private final transient long txid; private final transient boolean inMemstore; + private final transient boolean closeRegion; private final transient RegionInfo regionInfo; private final transient Set familyNames; private final transient ServerCall rpcCall; FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo, - final boolean inMemstore, ServerCall rpcCall) { + final boolean inMemstore, boolean closeRegion, ServerCall rpcCall) { super(key, edit); this.inMemstore = inMemstore; + this.closeRegion = closeRegion; this.regionInfo = regionInfo; this.txid = txid; if (inMemstore) { @@ -98,6 +100,10 @@ boolean isInMemStore() { return this.inMemstore; } + boolean isCloseRegion() { + return closeRegion; + } + RegionInfo getRegionInfo() { return this.regionInfo; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 9736d8bd1f8e..3fc41a316bc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ImmutableByteArray; @@ -184,6 +185,30 @@ void update(byte[] encodedRegionName, Set families, long sequenceid, } } + /** + * Clear all the records of the given region as it is going to be closed. + *

+ * We will call this once we get the region close marker. We need this because that, if we use + * Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries + * that has not been processed yet, this will lead to orphan records in the + * lowestUnflushedSequenceIds and then cause too many WAL files. + *

+ * See HBASE-23157 for more details. + */ + void onRegionClose(byte[] encodedRegionName) { + synchronized (tieLock) { + this.lowestUnflushedSequenceIds.remove(encodedRegionName); + Map flushing = this.flushingSequenceIds.remove(encodedRegionName); + if (flushing != null) { + LOG.warn("Still have flushing records when closing {}, {}", + Bytes.toString(encodedRegionName), + flushing.entrySet().stream().map(e -> e.getKey().toStringBinary() + "->" + e.getValue()) + .collect(Collectors.joining(",", "{", "}"))); + } + } + this.highestSequenceIds.remove(encodedRegionName); + } + /** * Update the store sequence id, e.g., upon executing in-memory compaction */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 1808cd67b1a1..027e412d38c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -59,20 +59,19 @@ private WALUtil() { } /** - * Write the marker that a compaction has succeeded and is about to be committed. - * This provides info to the HMaster to allow it to recover the compaction if this regionserver - * dies in the middle. It also prevents the compaction from finishing if this regionserver has - * already lost its lease on the log. - * - *

This write is for internal use only. Not for external client consumption. + * Write the marker that a compaction has succeeded and is about to be committed. This provides + * info to the HMaster to allow it to recover the compaction if this regionserver dies in the + * middle. It also prevents the compaction from finishing if this regionserver has already lost + * its lease on the log. + *

+ * This write is for internal use only. Not for external client consumption. * @param mvcc Used by WAL to get sequence Id for the waledit. */ public static WALKeyImpl writeCompactionMarker(WAL wal, - NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, - MultiVersionConcurrencyControl mvcc) - throws IOException { + NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, + MultiVersionConcurrencyControl mvcc) throws IOException { WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null); + writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -81,14 +80,14 @@ public static WALKeyImpl writeCompactionMarker(WAL wal, /** * Write a flush marker indicating a start / abort or a complete of a region flush - * - *

This write is for internal use only. Not for external client consumption. + *

+ * This write is for internal use only. Not for external client consumption. */ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap replicationScope, - RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri, - WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync); + RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, + WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } @@ -96,15 +95,15 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap } /** - * Write a region open marker indicating that the region is opened. - * This write is for internal use only. Not for external client consumption. + * Write a region open marker indicating that the region is opened. This write is for internal use + * only. Not for external client consumption. */ public static WALKeyImpl writeRegionEventMarker(WAL wal, - NavigableMap replicationScope, RegionInfo hri, - final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, - WALEdit.createRegionEventWALEdit(hri, r), mvcc, null); + NavigableMap replicationScope, RegionInfo hri, final RegionEventDescriptor r, + final MultiVersionConcurrencyControl mvcc) throws IOException { + WALKeyImpl walKey = + writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), + r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -122,11 +121,11 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal, * @throws IOException We will throw an IOException if we can not append to the HLog. */ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, - final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) - throws IOException { - WALKeyImpl walKey = - writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null); + final NavigableMap replicationScope, final RegionInfo hri, + final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) + throws IOException { + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, + WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } @@ -134,36 +133,32 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, } private static WALKeyImpl writeMarker(final WAL wal, - final NavigableMap replicationScope, - final RegionInfo hri, - final WALEdit edit, - final MultiVersionConcurrencyControl mvcc, - final Map extendedAttributes) - throws IOException { + final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, + boolean closeRegion, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes) throws IOException { // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT - return doFullAppendTransaction(wal, replicationScope, hri, - edit, mvcc, extendedAttributes, true); + return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc, + extendedAttributes, true); } /** - * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, - * an optional sync, and then a call to complete the mvcc transaction. This method does it all. - * Good for case of adding a single edit or marker to the WAL. - * - *

This write is for internal use only. Not for external client consumption. + * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an + * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good + * for case of adding a single edit or marker to the WAL. + *

+ * This write is for internal use only. Not for external client consumption. * @return WALKeyImpl that was added to the WAL. */ - public static WALKeyImpl doFullAppendTransaction(final WAL wal, - final NavigableMap replicationScope, final RegionInfo hri, - final WALEdit edit, final MultiVersionConcurrencyControl mvcc, - final Map extendedAttributes, final boolean sync) - throws IOException { + private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, + final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, + boolean closeRegion, final MultiVersionConcurrencyControl mvcc, + final Map extendedAttributes, final boolean sync) throws IOException { // TODO: Pass in current time to use? WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); + System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { - trx = wal.append(hri, walKey, edit, false); + trx = wal.appendMarker(hri, walKey, edit, closeRegion); if (sync) { wal.sync(trx); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5f787fef5990..7e894aedc896 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -161,8 +161,18 @@ public void close() { } @Override - public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException { + public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { + return append(info, key, edits, true, false); + } + + @Override + public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException { + return append(info, key, edits, false, closeRegion); + } + + private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { WriteEntry writeEntry = key.getMvcc().begin(); if (!edits.isReplay()) { for (Cell cell : edits.getCells()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cde159d..74dab64646da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -56,7 +56,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { /** * Roll the log writer. That is, start writing log messages to a new file. * - *

+ *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * @@ -69,7 +69,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { /** * Roll the log writer. That is, start writing log messages to a new file. * - *

+ *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * @@ -97,44 +97,59 @@ public interface WAL extends Closeable, WALFileLengthProvider { void close() throws IOException; /** - * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction - * completes BUT on return this edit must have its region edit/sequence id assigned - * else it messes up our unification of mvcc and sequenceid. On return key will - * have the region edit/sequence id filled in. + * Append a set of data edits to the WAL. 'Data' here means that the content in the edits will + * also be added to memstore. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. * @param info the regioninfo associated with append * @param key Modified by this call; we add to it this edits region edit/sequence id. * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit - * sequence id that is after all currently appended edits. - * @param inMemstore Always true except for case where we are writing a compaction completion - * record into the WAL; in this case the entry is just so we can finish an unfinished compaction - * -- it is not an edit for memstore. + * sequence id that is after all currently appended edits. * @return Returns a 'transaction id' and key will have the region edit/sequence id - * in it. + * in it. + * @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit, boolean) */ - long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; + long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException; + + /** + * Append a marker edit to the WAL. A marker could be a FlushDescriptor, a compaction marker, or + * region event marker. The difference here is that, a marker will not be added to memstore. + *

+ * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must + * have its region edit/sequence id assigned else it messes up our unification of mvcc and + * sequenceid. On return key will have the region edit/sequence id filled in. + * @param info the regioninfo associated with append + * @param key Modified by this call; we add to it this edits region edit/sequence id. + * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit + * sequence id that is after all currently appended edits. + * @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this + * region on this region server. The WAL implementation should remove all the related + * stuff, for example, the sequence id accounting. + * @return Returns a 'transaction id' and key will have the region edit/sequence id + * in it. + * @see #appendData(RegionInfo, WALKeyImpl, WALEdit) + */ + long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion) + throws IOException; /** * updates the seuence number of a specific store. * depending on the flag: replaces current seq number if the given seq id is bigger, * or even if it is lower than existing one - * @param encodedRegionName - * @param familyName - * @param sequenceid - * @param onlyIfGreater */ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, boolean onlyIfGreater); /** * Sync what we have in the WAL. - * @throws IOException */ void sync() throws IOException; /** * Sync the WAL if the txId was not already sync'd. * @param txid Transaction id to sync to. - * @throws IOException */ void sync(long txid) throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dde020d4326b..9381ef36eb84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -239,9 +239,8 @@ private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. - long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, - new MultiVersionConcurrencyControl(), scopes), - edit, true); + long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, + new MultiVersionConcurrencyControl(), scopes), edit); log.sync(txid); // the edit shall have been change now by the coprocessor. @@ -291,9 +290,9 @@ public void testEmptyWALEditAreNotSeen() throws Exception { assertFalse(cp.isPostWALWriteCalled()); final long now = EnvironmentEdgeManager.currentTime(); - long txid = log.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), - new WALEdit(), true); + long txid = log.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), + new WALEdit()); log.sync(txid); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); @@ -340,8 +339,8 @@ public void testWALCoprocessorReplay() throws Exception { addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); } - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // sync to fs. wal.sync(); @@ -456,8 +455,8 @@ private void addWALEdits(final TableName tableName, final RegionInfo hri, final edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors - txid = wal.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); + txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit); } if (-1 != txid) { wal.sync(txid); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index db15ca61ea0c..a576adc8d192 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -604,9 +604,8 @@ public void makeWAL(HRegionServer hrs, List regions, int numEdits, i // HBaseTestingUtility.createMultiRegions use 5 bytes key byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc), - e, true); + log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), + tableName, System.currentTimeMillis(), mvcc), e); if (0 == i % syncEvery) { log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 4893982f9373..f2db9464a47e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -115,7 +115,7 @@ public void verifyBulkLoadEvent() throws IOException { storeFileName = (new Path(storeFileName)).getName(); List storeFileNames = new ArrayList<>(); storeFileNames.add(storeFileName); - when(log.append(any(), any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)), anyBoolean())).thenAnswer(new Answer() { @@ -142,7 +142,7 @@ public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOEx @Test public void shouldBulkLoadSingleFamilyHLog() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override @@ -162,7 +162,7 @@ public Object answer(InvocationOnMock invocation) { @Test public void shouldBulkLoadManyFamilyHLog() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override @@ -183,7 +183,7 @@ public Object answer(InvocationOnMock invocation) { @Test public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { - when(log.append(any(), + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4e7c5ad66c41..9db353f42795 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -4514,10 +4513,9 @@ private void durabilityTest(String method, Durability tableDurability, put.setDurability(mutationDurability); region.put(put); - //verify append called or not - verify(wal, expectAppend ? times(1) : never()) - .append((HRegionInfo)any(), (WALKeyImpl)any(), - (WALEdit)any(), Mockito.anyBoolean()); + // verify append called or not + verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(), + (WALKeyImpl) any(), (WALEdit) any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -5653,8 +5651,8 @@ public void testOpenRegionWrittenToWAL() throws Exception { region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null); - verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any() - , editCaptor.capture(), anyBoolean()); + verify(wal, times(1)).appendData((HRegionInfo)any(), (WALKeyImpl)any() + , editCaptor.capture()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5726,9 +5724,8 @@ public void testFlushedFileWithNoTags() throws Exception { */ private WAL mockWAL() throws IOException { WAL wal = mock(WAL.class); - Mockito.when(wal.append((HRegionInfo)Mockito.any(), - (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). - thenAnswer(new Answer() { + when(wal.appendData((HRegionInfo) any(), (WALKeyImpl) any(), (WALEdit) any())) + .thenAnswer(new Answer() { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKeyImpl key = invocation.getArgument(1); @@ -5737,7 +5734,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable { return 1L; } - }); + }); return wal; } @@ -5773,8 +5770,8 @@ public void testCloseRegionWrittenToWAL() throws Exception { region.close(false); // 2 times, one for region open, the other close region - verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(), - editCaptor.capture(), anyBoolean()); + verify(wal, times(2)).appendData((HRegionInfo)any(), (WALKeyImpl)any(), + editCaptor.capture()); WALEdit edit = editCaptor.getAllValues().get(1); assertNotNull(edit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index b08877e4fa62..770ee8d1ca30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -1164,8 +1163,8 @@ public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOExceptio // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); // test for replay prepare flush putDataByReplay(secondaryRegion, 0, 10, cq, families); @@ -1180,12 +1179,12 @@ public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOExceptio primaryRegion.getRegionInfo().getRegionName())) .build()); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); secondaryRegion.close(); - verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), - any(WALEdit.class), anyBoolean()); + verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), + any(WALEdit.class)); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 37270abad462..5bd342288ecd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -249,7 +249,7 @@ public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException { LOG.info("SET throwing of exception on append"); dodgyWAL.throwException = true; // This append provokes a WAL roll request - dodgyWAL.append(region.getRegionInfo(), key, edit, true); + dodgyWAL.appendData(region.getRegionInfo(), key, edit); boolean exception = false; try { dodgyWAL.sync(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 68eebc17a45d..ec3c28d3eda3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -25,17 +25,22 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -46,8 +51,10 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -57,13 +64,17 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -168,7 +179,7 @@ protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); - log.append(hri, key, cols, true); + log.appendData(hri, key, cols); } log.sync(); } @@ -417,7 +428,7 @@ public void run() { final RegionInfo info = region.getRegionInfo(); final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); - wal.append(info, logkey, edits, true); + wal.append(info, logkey, edits, true, false); region.getMVCC().completeAndWait(logkey.getWriteEntry()); } region.flush(true); @@ -466,7 +477,7 @@ public void testWriteEntryCanBeNull() throws IOException { new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); try { - wal.append(ri, key, cols, true); + wal.append(ri, key, cols, true, false); fail("Should fail since the wal has already been closed"); } catch (IOException e) { // expected @@ -484,4 +495,94 @@ public void testRollWriterForClosedWAL() throws IOException { wal.close(); wal.rollWriter(); } + + @Test + public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { + final String testName = currentTest.getMethodName(); + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch closeFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (AbstractFSWAL wal = newWAL(FS, FSUtils.getRootDir(CONF), testName, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { + wal.init(); + wal.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + } + } + }); + + // open a new region which uses this WAL + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); + RegionServerServices rsServices = mock(RegionServerServices.class); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); + when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); + final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, + TEST_UTIL.getConfiguration(), rsServices, null); + + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + region.put(new Put(b).addColumn(b, b, b)); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + Map closeResult = region.close(); + LOG.info("Close result:" + closeResult); + closeFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + closeFinished.await(); + + // now check the region's unflushed seqIds. + long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); + assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, + seqId); + + wal.close(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index a20748ae662d..a71011347ccf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -801,15 +801,15 @@ public void testReplayEditsWrittenIntoWAL() throws Exception { long now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, - true); + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), + edit); // Sync. wal.sync(); @@ -1154,10 +1154,10 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ } private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, - byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, - int index, NavigableMap scopes) throws IOException { + byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, + int index, NavigableMap scopes) throws IOException { FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), - createWALEdit(rowName, family, ee, index), hri, true, null); + createWALEdit(rowName, family, ee, index), hri, true, false, null); entry.stampRegionSequenceId(mvcc.begin()); return entry; } @@ -1167,8 +1167,8 @@ private void addWALEdits(final TableName tableName, final HRegionInfo hri, final final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) throws IOException { for (int j = 0; j < count; j++) { - wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), - createWALEdit(rowName, family, ee, j), true); + wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), + createWALEdit(rowName, family, ee, j)); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java index 420585f448c7..cf4862b2c335 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java @@ -83,7 +83,7 @@ public static void doWrite(WAL wal, RegionInfo hri, TableName tableName, int col throws IOException { for (int i = 0; i < recordCount; i++) { WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc); - wal.append(hri, entry.getKey(), entry.getEdit(), true); + wal.appendData(hri, entry.getKey(), entry.getEdit()); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 4cfe49e9e9e0..d156c5ecd433 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -196,7 +196,7 @@ public void run() { SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); try { - wal.append(ri, key, cols, true); + wal.append(ri, key, cols, true, false); } catch (IOException e) { // should not happen throw new UncheckedIOException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed1372a1ef..a7d4a55bccd2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -207,9 +207,8 @@ public void testLogRollAfterSplitStart() throws IOException { kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(Bytes.toBytes("column"), 0); - log.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), - kvs, true); + log.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 819df673c94e..63c3de18ef13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -166,8 +166,8 @@ public void run() { for(byte[] fam : htd.getColumnFamilyNames()) { scopes.put(fam, 0); } - final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); + final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), + TableName.META_TABLE_NAME, now, mvcc, scopes), edit); Threads.sleep(ThreadLocalRandom.current().nextInt(5)); wal.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 0967a756f45f..dd83c7c1ce93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -111,9 +111,8 @@ public void testActionListener() throws Exception { edit.add(kv); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(b, 0); - long txid = wal.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit, - true); + long txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 40a3f2a80f04..8178a236c5af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -423,7 +423,7 @@ public void testReplicationInReplay() throws Exception { long now = EnvironmentEdgeManager.currentTime(); edit.add(new KeyValue(rowName, famName, qualifier, now, value)); WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); - wal.append(hri, walKey, edit, true); + wal.appendData(hri, walKey, edit); wal.sync(); Get get = new Get(rowName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 3a1320cf66ef..0cdf1cf66b85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -300,11 +300,9 @@ public void testLogRoll() throws Exception { wal.rollWriter(); } LOG.info(Long.toString(i)); - final long txid = wal.append( - hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, - true); + final long txid = wal.appendData(hri, + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + edit); wal.sync(txid); } @@ -316,9 +314,9 @@ public void testLogRoll() throws Exception { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - wal.append(hri, + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, true); + edit); } wal.sync(); @@ -338,9 +336,9 @@ public void testLogRoll() throws Exception { manager.logPositionAndCleanOldLogs(source, new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); - wal.append(hri, + wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, true); + edit); wal.sync(); assertEquals(1, manager.getWALs().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index a873320ef919..2a21660dd47b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -556,9 +556,9 @@ private String getRow(WAL.Entry entry) { } private void appendToLog(String key) throws IOException { - final long txid = log.append(info, + final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), - mvcc, scopes), getWALEdit(key), true); + mvcc, scopes), getWALEdit(key)); log.sync(txid); } @@ -580,8 +580,8 @@ private void appendToLogAndSync(int count) throws IOException { } private long appendToLog(int count) throws IOException { - return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); + return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count)); } private WALEdit getWALEdits(int count) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index c7f1c4114480..01de1f47731d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -63,12 +63,12 @@ public void sync(long txid, boolean forceSync) throws IOException { } @Override - public long append(RegionInfo info, WALKeyImpl key, - WALEdit edits, boolean inMemstore) throws IOException { + protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore, + boolean closeRegion) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); } - return super.append(info, key, edits, inMemstore); + return super.append(info, key, edits, inMemstore, closeRegion); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 3205d7328e33..be84eabbf06f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -156,8 +156,8 @@ private void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), - cols, true); + log.appendData(hri, + getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes), cols); } log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 81938065bbcb..eebc11c0b95f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -129,8 +129,8 @@ public void testSecureWAL() throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09dd30ba..97a755121a38 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -208,7 +208,7 @@ public void testSplit() throws IOException { LOG.info("Region " + i + ": " + edit); WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes); - log.append(infos[i], walKey, edit, true); + log.appendData(infos[i], walKey, edit); walKey.getWriteEntry(); } log.sync(); @@ -270,8 +270,8 @@ public void Broken_testSync() throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -289,8 +289,8 @@ public void Broken_testSync() throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); reader = wals.createReader(fs, walPath); @@ -311,8 +311,8 @@ public void Broken_testSync() throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); - wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -388,9 +388,8 @@ public void testAppendClose() throws Exception { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), - kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -522,10 +521,8 @@ public void testEditAdd() throws IOException { .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build(); final WAL log = wals.getWAL(info); - final long txid = log.append(info, - new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc, scopes), - cols, true); + final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.completeCacheFlush(info.getEncodedNameAsBytes()); @@ -580,10 +577,8 @@ public void testAppend() throws IOException { } RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); final WAL log = wals.getWAL(hri); - final long txid = log.append(hri, - new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), - mvcc, scopes), - cols, true); + final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); @@ -634,8 +629,8 @@ public void testVisitors() throws Exception { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), cols, true); + log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols); } log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -644,8 +639,8 @@ public void testVisitors() throws Exception { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), cols, true); + log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), cols); log.sync(); assertEquals(COL_COUNT, visitor.increments); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index bc21a65b1633..83ad5fa677fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -118,8 +118,8 @@ private Path writeWAL(final WALFactory wals, final String tblName, boolean offhe } else { kvs.add(kv); } - wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), kvs, true); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); } wal.sync(); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java index 40fad6ad5208..6ea1daf47a6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java @@ -98,8 +98,8 @@ public void testWALRootDir() throws Exception { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(regionInfo, - getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true); + long txid = + log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit); log.sync(txid); assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size()); @@ -109,8 +109,7 @@ public void testWALRootDir() throws Exception { HConstants.HREGION_LOGDIR_NAME)).size()); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), - edit, true); + txid = log.appendData(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit); log.sync(txid); log.rollWriter(); log.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 861b289f1443..7e6ed8fc4267 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -184,7 +184,7 @@ public void run() { RegionInfo hri = region.getRegionInfo(); final WALKeyImpl logkey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); - wal.append(hri, logkey, walEdit, true); + wal.appendData(hri, logkey, walEdit); if (!this.noSync) { if (++lastSync >= this.syncInterval) { wal.sync();