Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
return new ImmutableByteArray(b);
}

public String toStringUtf8() {
return Bytes.toString(b);
public String toString() {
return Bytes.toStringBinary(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ public void testPartialRead() throws Exception {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
Expand All @@ -149,10 +149,10 @@ public void testPartialRead() throws Exception {

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts1+1, scopes), edit);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
log.appendData(info, getWalKeyImpl(ts1+2, scopes), edit);
log.sync();
log.shutdown();
walfactory.shutdown();
Expand Down Expand Up @@ -193,17 +193,16 @@ public void testWALRecordReader() throws Exception {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);

Thread.sleep(1); // make sure 2nd log gets a later timestamp
long secondTs = System.currentTimeMillis();
log.rollWriter();

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
Expand Down Expand Up @@ -253,17 +252,15 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
WAL log = walfactory.getWAL(info);
byte [] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value));
long txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);

Thread.sleep(10); // make sure 2nd edit gets a later timestamp

edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value));
txid = log.appendData(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit);
log.sync(txid);
log.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7995,7 +7995,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
}
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}

protected final boolean append(W writer, FSWALEntry entry) throws IOException {
protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
atHeadOfRingBufferEventHandlerAppend();
long start = EnvironmentEdgeManager.currentTime();
Expand All @@ -1001,8 +1001,13 @@ protected final boolean append(W writer, FSWALEntry entry) throws IOException {
doAppend(writer, entry);
assert highestUnsyncedTxid < entry.getTxid();
highestUnsyncedTxid = entry.getTxid();
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
entry.isInMemStore());
if (entry.isCloseRegion()) {
// let's clean all the records of this region
sequenceIdAccounting.onRegionClose(encodedRegionName);
} else {
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its weird this is 'update' and not 'append' (not you, for a follow-on)

entry.isInMemStore());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this flag.... isn't it false when not data? Could we use this instead of the appendData and appendMarker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make sense... You do not want the closeRegion flag but still want the inMemstore flag? At least with appendData and appendMarker, we could remove at least one of the parameters in the methods of the WAL interface...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isInMemStore flag is there currently? No? I'm not advocating adding anything not already present.

Looking at this patch, the new methods appendData and appendMarker do not seem to buy us much. They do not go deep enough down into AbstractFSWAL.

}
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
Expand Down Expand Up @@ -1052,11 +1057,11 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
Expand All @@ -1066,7 +1071,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down Expand Up @@ -1102,7 +1107,24 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
}
}

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true, false);
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
throws IOException {
return append(info, key, edits, false, closeRegion);
}

/**
* Append a set of edits to the WAL.
* <p/>
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
* sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
* <p/>
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
Expand All @@ -1113,10 +1135,21 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
* passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
* immediately available on return from this method. It WILL be available subsequent to a sync of
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
* @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a region event marker, for
* example, a compaction completion record into the WAL; in this case the entry is just
* so we can finish an unfinished compaction -- it is not an edit for memstore.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you answer why we can't write a special marker type instead? A close region marker and avoid the flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for telling us this is a special marker type... We have FlushDescriptor, RegionEventMarker, so we can not pass the marker directly, we just get a WALEdit here. Another is to deserialize the cells in the WALEdit and check if it is a close marker, but deserializing will introduce to an IOException, which makes the code a bit ugly if just ignore it. And also, this is not good for performance, as we just serialize it in the upper layer and then we deserialize it...

Copy link
Contributor

@saintstack saintstack Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing back some. Doesn't inMemstore flag do this? It is false when we are writing markers? Markers are rare so a check on the WALEdit if a marker shouldn't slow us down.

The IOE is a problem though.

Copy link
Contributor Author

@Apache9 Apache9 Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inMemstore flag just tells us it is a marker, but does not tell us what is the marker...

* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
@Override
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException;
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException;

protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ private void appendAndSync() {
FSWALEntry entry = iter.next();
boolean appended;
try {
appended = append(writer, entry);
appended = appendEntry(writer, entry);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
Expand Down Expand Up @@ -615,13 +615,13 @@ protected boolean markerEditOnly() {
}

@Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,10 @@ protected void doShutdown() throws IOException {
}
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
justification = "Will never be null")
@Override
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore, boolean closeRegion) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
disruptor.getRingBuffer());
}

Expand Down Expand Up @@ -1100,7 +1098,7 @@ private void attainSafePoint(final long currentSequence) {
*/
void append(final FSWALEntry entry) throws Exception {
try {
FSHLog.this.append(writer, entry);
FSHLog.this.appendEntry(writer, entry);
} catch (Exception e) {
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
+ ", requesting roll of WAL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer.
private final transient long txid;
private final transient boolean inMemstore;
private final transient boolean closeRegion;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient ServerCall<?> rpcCall;

FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.closeRegion = closeRegion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, wonder if we can avoid this flag and have a special close marker instead the same way we have a flush marker, etc?

this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
Expand Down Expand Up @@ -98,6 +100,10 @@ boolean isInMemStore() {
return this.inMemstore;
}

boolean isCloseRegion() {
return closeRegion;
}

RegionInfo getRegionInfo() {
return this.regionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ImmutableByteArray;
Expand Down Expand Up @@ -184,6 +185,30 @@ void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
}
}

/**
* Clear all the records of the given region as it is going to be closed.
* <p/>
* We will call this once we get the region close marker. We need this because that, if we use
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
* that has not been processed yet, this will lead to orphan records in the
* lowestUnflushedSequenceIds and then cause too many WAL files.
* <p/>
* See HBASE-23157 for more details.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment.

void onRegionClose(byte[] encodedRegionName) {
synchronized (tieLock) {
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
if (flushing != null) {
LOG.warn("Still have flushing records when closing {}, {}",
Bytes.toString(encodedRegionName),
flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())
.collect(Collectors.joining(",", "{", "}")));
}
}
this.highestSequenceIds.remove(encodedRegionName);
}

/**
* Update the store sequence id, e.g., upon executing in-memory compaction
*/
Expand Down Expand Up @@ -364,7 +389,7 @@ void abortCacheFlush(final byte[] encodedRegionName) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
+ e.getKey().toString() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
Expand Down
Loading