Skip to content

Commit

Permalink
[#9595] Improve async state propagation in NonSampling state
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jan 20, 2023
1 parent 4045b7c commit ca82b1a
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.SpanRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
Expand Down Expand Up @@ -86,79 +87,68 @@ public Trace continueTraceObject(final TraceId traceId) {
final TraceSampler.State state = traceSampler.isContinueSampled();
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.continueTraceRoot(traceId, state.nextId());
final Span span = spanFactory.newSpan(traceRoot);
final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);
final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final CloseListener closeListener = new DefaultCloseListener(handle, uriStatStorage);
return new DefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, closeListener);
return newDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
}

private ActiveTraceHandle registerActiveTrace(TraceRoot traceRoot) {
return activeTraceRepository.register(traceRoot);
}

private ActiveTraceHandle registerActiveTrace(LocalTraceRoot localTraceRoot) {
return activeTraceRepository.register(localTraceRoot);
}

@Override
public Trace newTraceObject() {
// TODO need to modify how to inject a datasender
final TraceSampler.State state = traceSampler.isNewSampled();
return newTraceObject(state);
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.newTraceRoot(state.nextId());
return newDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
}

@Override
public Trace newTraceObject(String urlPath) {
final TraceSampler.State state = traceSampler.isNewSampled(urlPath);
return newTraceObject(state);
}

Trace newTraceObject(TraceSampler.State state) {
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.newTraceRoot(state.nextId());
final Span span = spanFactory.newSpan(traceRoot);
final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final CloseListener closeListener = new DefaultCloseListener(handle, uriStatStorage);
return new DefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, closeListener);
return newDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
}

private DefaultTrace newDefaultTrace(TraceRoot traceRoot) {
final Span span = spanFactory.newSpan(traceRoot);
final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final CloseListener closeListener = new DefaultCloseListener(handle, uriStatStorage);
return new DefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, closeListener);
}

// internal async trace.
@Override
public Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId) {
final SpanChunkFactory spanChunkFactory = new AsyncSpanChunkFactory(traceRoot, localAsyncId);
final Storage storage = storageFactory.createStorage(spanChunkFactory);

final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final SpanRecorder spanRecorder = recorderFactory.newTraceRootSpanRecorder(traceRoot);

final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);

return new AsyncChildTrace(traceRoot, callStack, storage, spanRecorder, wrappedSpanEventRecorder, localAsyncId);
}

@Override
public Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot) {
return new DisableAsyncChildTrace(traceRoot);
final AsyncState asyncState = new DisableAsyncState();
SpanRecorder spanRecorder = recorderFactory.newDisableSpanRecorder(traceRoot);
SpanEventRecorder spanEventRecorder = recorderFactory.newDisableSpanEventRecorder(traceRoot, asyncState);
return new DisableAsyncChildTrace(traceRoot, spanRecorder, spanEventRecorder);
}

// entry point async trace.
Expand All @@ -167,21 +157,8 @@ public Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot) {
public Trace continueAsyncTraceObject(final TraceId traceId) {
final TraceSampler.State state = traceSampler.isContinueSampled();
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.continueTraceRoot(traceId, state.nextId());
final Span span = spanFactory.newSpan(traceRoot);

final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final AsyncState asyncState = new ListenableAsyncState(traceRoot, asyncStateListener, handle, uriStatStorage);

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot, asyncState);

return new AsyncDefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, asyncState);
TraceRoot traceRoot = traceRootFactory.continueTraceRoot(traceId, state.nextId());
return newAsyncDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
Expand All @@ -192,36 +169,42 @@ public Trace continueAsyncTraceObject(final TraceId traceId) {
@Override
public Trace newAsyncTraceObject() {
final TraceSampler.State state = traceSampler.isNewSampled();
return newAsyncTraceObject(state);
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.newTraceRoot(state.nextId());
return newAsyncDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
}

@Override
public Trace newAsyncTraceObject(String urlPath) {
final TraceSampler.State state = traceSampler.isNewSampled(urlPath);
return newAsyncTraceObject(state);
}

Trace newAsyncTraceObject(TraceSampler.State state) {
if (state.isSampled()) {
final TraceRoot traceRoot = traceRootFactory.newTraceRoot(state.nextId());
final Span span = spanFactory.newSpan(traceRoot);
final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final AsyncState asyncState = new ListenableAsyncState(traceRoot, asyncStateListener, handle, uriStatStorage);

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot, asyncState);

return new AsyncDefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, asyncState);
return newAsyncDefaultTrace(traceRoot);
} else {
return newLocalTrace(state.nextId());
}
}

private AsyncDefaultTrace newAsyncDefaultTrace(TraceRoot traceRoot) {
final Span span = spanFactory.newSpan(traceRoot);

final SpanChunkFactory spanChunkFactory = new DefaultSpanChunkFactory(traceRoot);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final ListenableAsyncState.AsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final AsyncState asyncState = new ListenableAsyncState(traceRoot, asyncStateListener, handle, uriStatStorage);

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot, asyncState);

return new AsyncDefaultTrace(span, callStack, storage, spanRecorder, wrappedSpanEventRecorder, asyncState);
}

@Override
public Trace disableSampling() {
final TraceSampler.State state = traceSampler.getContinueDisableState();
Expand All @@ -232,7 +215,22 @@ public Trace disableSampling() {
private Trace newLocalTrace(long nextDisabledId) {
final LocalTraceRoot traceRoot = traceRootFactory.newDisableTraceRoot(nextDisabledId);
final SpanRecorder spanRecorder = recorderFactory.newDisableSpanRecorder(traceRoot);

final ActiveTraceHandle activeTraceHandle = registerActiveTrace(traceRoot);
return new DisableTrace(traceRoot, spanRecorder, activeTraceHandle, uriStatStorage);

AsyncState asyncState = new ListenableAsyncState(traceRoot,
ListenableAsyncState.AsyncStateListener.EMPTY,
activeTraceHandle, uriStatStorage);

final SpanEventRecorder spanEventRecorder = recorderFactory.newDisableSpanEventRecorder(traceRoot, asyncState);
return new DisableTrace(traceRoot, spanRecorder, spanEventRecorder, activeTraceHandle, uriStatStorage);
}

private ActiveTraceHandle registerActiveTrace(TraceRoot traceRoot) {
return activeTraceRepository.register(traceRoot);
}

private ActiveTraceHandle registerActiveTrace(LocalTraceRoot localTraceRoot) {
return activeTraceRepository.register(localTraceRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,59 @@ public class DisableAsyncChildTrace implements Trace {
private DefaultTraceScopePool scopePool;

private final LocalTraceRoot traceRoot;
private int depth;

public DisableAsyncChildTrace(final LocalTraceRoot traceRoot) {
private final SpanRecorder spanRecorder;
private final SpanEventRecorder spanEventRecorder;

public DisableAsyncChildTrace(final LocalTraceRoot traceRoot, SpanRecorder spanRecorder, SpanEventRecorder spanEventRecorder) {
this.traceRoot = Objects.requireNonNull(traceRoot, "traceRoot");
this.spanRecorder = Objects.requireNonNull(spanRecorder, "spanRecorder");
this.spanEventRecorder = Objects.requireNonNull(spanEventRecorder, "spanEventRecorder");
}

@Override
public SpanEventRecorder traceBlockBegin() {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION);
return traceBlockBegin(DEFAULT_STACKID);
}

@Override
public SpanEventRecorder traceBlockBegin(int stackId) {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION);
push();
return getSpanEventRecorder();
}

@Override
public void traceBlockEnd() {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION);
traceBlockBegin(DEFAULT_STACKID);
}


@Override
public void traceBlockEnd(int stackId) {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION);
pop();
}

private SpanEventRecorder getSpanEventRecorder() {
return spanEventRecorder;
}

private int push() {
return this.depth++;
}

private void pop() {
this.depth--;
}

@Override
public boolean isRootStack() {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION);
return depth == 0;
}

@Override
public int getCallStackFrameId() {
return 0;
return DEFAULT_STACKID;
}

private LocalTraceRoot getTraceRoot() {
Expand All @@ -80,7 +100,7 @@ public long getId() {

@Override
public long getStartTime() {
return getTraceRoot().getTraceStartTime();
return traceRoot.getTraceStartTime();
}

@Override
Expand All @@ -95,7 +115,7 @@ public boolean canSampled() {

@Override
public boolean isRoot() {
return this.getTraceId().isRoot();
return false;
}

@Override
Expand All @@ -105,12 +125,12 @@ public boolean isAsync() {

@Override
public SpanRecorder getSpanRecorder() {
return null;
return spanRecorder;
}

@Override
public SpanEventRecorder currentSpanEventRecorder() {
return null;
return spanEventRecorder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* @author Woonduk Kang(emeroad)
*/
public class DisableAsyncState implements AsyncState {
private final Logger logger = LogManager.getLogger(this.getClass());

@Override
public void setup() {
logger.debug("setup");
}

@Override
public boolean await() {
logger.debug("await");
return false;
}

@Override
public void finish() {
logger.debug("finish");
}
}
Loading

0 comments on commit ca82b1a

Please sign in to comment.