Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class TraceUtil {
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
AttributeKey.booleanKey("db.hbase.rowlock.readlock");

public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");

private TraceUtil() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.lmax.disruptor.RingBuffer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -571,6 +570,35 @@ public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IO
return rollWriter(false);
}

@Override
public final void sync() throws IOException {
sync(useHsync);
}

@Override
public final void sync(long txid) throws IOException {
sync(txid, useHsync);
}

@Override
public final void sync(boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}

@Override
public final void sync(long txid, boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(txid, forceSync);
return null;
}, () -> createSpan("WAL.sync"));
}

protected abstract void doSync(boolean forceSync) throws IOException;

protected abstract void doSync(long txid, boolean forceSync) throws IOException;
/**
* This is a convenience method that computes a new filename with a given file-number.
* @param filenum to use
Expand Down Expand Up @@ -672,7 +700,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
List<String> listForPrint = new ArrayList();
List<String> listForPrint = new ArrayList<>();
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
StringBuilder families = new StringBuilder();
for (int i = 0; i < r.getValue().size(); i++) {
Expand Down Expand Up @@ -815,6 +843,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
}
}

private Span createSpan(String name) {
return TraceUtil.createSpan(name).setAttribute(TraceUtil.WAL_IMPL, implClassName);
}

/**
* Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
* <p/>
Expand All @@ -832,13 +864,10 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHFile.replaceWriter").startSpan();
try (Scope scope = span.makeCurrent()) {
return TraceUtil.trace(() -> {
doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
} finally {
span.end();
}
}, () -> createSpan("WAL.replaceWriter"));
}

protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
Expand Down Expand Up @@ -876,8 +905,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
return ioe;
}

@Override
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
rollWriterLock.lock();
try {
if (this.closed) {
Expand All @@ -888,8 +916,7 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
return null;
}
Map<byte[], List<byte[]>> regionsToFlush = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("FSHLog.rollWriter").startSpan();
try (Scope scope = span.makeCurrent()) {
try {
Path oldPath = getOldPath();
Path newPath = getNewPath();
// Any exception from here on is catastrophic, non-recoverable so we currently abort.
Expand All @@ -914,17 +941,20 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
// If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort.
throw new IOException(
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
} finally {
span.end();
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
exception);
}
return regionsToFlush;
} finally {
rollWriterLock.unlock();
}
}

@Override
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
}

// public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
public long getLogFileSize() {
Expand Down Expand Up @@ -1099,7 +1129,6 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
.append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
.append(" ms, current pipeline: ")
.append(Arrays.toString(getPipeline())).toString();
Span.current().addEvent(msg);
LOG.info(msg);
if (timeInNanos > this.rollOnSyncNs) {
// A single sync took too long.
Expand All @@ -1122,8 +1151,7 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
Expand All @@ -1135,14 +1163,12 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
Span span = TraceUtil.getGlobalTracer().spanBuilder(implClassName + ".append").startSpan();
try (Scope scope = span.makeCurrent()) {
try {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
span.end();
}
return txid;
}
Expand Down Expand Up @@ -1176,13 +1202,14 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true);
return TraceUtil.trace(() -> append(info, key, edits, true),
() -> createSpan("WAL.appendData"));
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
throws IOException {
return append(info, key, edits, false);
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return TraceUtil.trace(() -> append(info, key, edits, false),
() -> createSpan("WAL.appendMarker"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -53,7 +51,6 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -345,7 +342,7 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
break;
}
}
postSync(System.nanoTime() - startTimeNs, finishSync(true));
postSync(System.nanoTime() - startTimeNs, finishSync());
if (trySetReadyForRolling()) {
// we have just finished a roll, then do not need to check for log rolling, the writer will be
// closed soon.
Expand Down Expand Up @@ -394,23 +391,14 @@ private void sync(AsyncWriter writer) {
}, consumeExecutor);
}

private void addTimeAnnotation(SyncFuture future, String annotation) {
Span.current().addEvent(annotation);
// TODO handle htrace API change, see HBASE-18895
// future.setSpan(scope.getSpan());
}

private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
private int finishSyncLowerThanTxid(long txid) {
int finished = 0;
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
sync.done(txid, null);
iter.remove();
finished++;
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
}
} else {
break;
}
Expand All @@ -419,7 +407,7 @@ private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
}

// try advancing the highestSyncedTxid as much as possible
private int finishSync(boolean addSyncTrace) {
private int finishSync() {
if (unackedAppends.isEmpty()) {
// All outstanding appends have been acked.
if (toWriteAppends.isEmpty()) {
Expand All @@ -428,9 +416,6 @@ private int finishSync(boolean addSyncTrace) {
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
sync.done(maxSyncTxid, null);
if (addSyncTrace) {
addTimeAnnotation(sync, "writer synced");
}
}
highestSyncedTxid.set(maxSyncTxid);
int finished = syncFutures.size();
Expand All @@ -444,23 +429,23 @@ private int finishSync(boolean addSyncTrace) {
assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
long doneTxid = lowestUnprocessedAppendTxid - 1;
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
return finishSyncLowerThanTxid(doneTxid);
}
} else {
// There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
// first unacked append minus 1.
long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
highestSyncedTxid.set(doneTxid);
return finishSyncLowerThanTxid(doneTxid, addSyncTrace);
return finishSyncLowerThanTxid(doneTxid);
}
}

private void appendAndSync() {
final AsyncWriter writer = this.writer;
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
// finish some.
finishSync(false);
finishSync();
long newHighestProcessedAppendTxid = -1L;
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
Expand Down Expand Up @@ -501,7 +486,7 @@ private void appendAndSync() {
// stamped some region sequence id.
if (unackedAppends.isEmpty()) {
highestSyncedTxid.set(highestProcessedAppendTxid);
finishSync(false);
finishSync();
trySetReadyForRolling();
}
return;
Expand Down Expand Up @@ -648,74 +633,54 @@ protected boolean markerEditOnly() {

@Override
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
return txid;
}

@Override
public void sync() throws IOException {
sync(useHsync);
}

@Override
public void sync(long txid) throws IOException {
sync(txid, useHsync);
}

@Override
public void sync(boolean forceSync) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
try (Scope scope = span.makeCurrent()) {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
waitingConsumePayloads.publish(txid);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
protected void doSync(boolean forceSync) throws IOException {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
span.end();
waitingConsumePayloads.publish(txid);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}

@Override
public void sync(long txid, boolean forceSync) throws IOException {
protected void doSync(long txid, boolean forceSync) throws IOException {
if (highestSyncedTxid.get() >= txid) {
return;
}
Span span = TraceUtil.getGlobalTracer().spanBuilder("AsyncFSWAL.sync").startSpan();
try (Scope scope = span.makeCurrent()) {
// here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {
waitingConsumePayloads.publish(sequence);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
// here we do not use ring buffer sequence as txid
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {
span.end();
waitingConsumePayloads.publish(sequence);
}
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}
blockOnSync(future);
}

protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
Expand Down
Loading