diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 2c9005b49b9a..9fe211f18418 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -478,8 +478,13 @@ KJ_TEST("Read/Write Onset works") { tracing::FetchEventInfo fetchInfo( kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr); + + FakeEntropySource entropy; + auto trigger = InvocationSpanContext::newForInvocation(kj::none, entropy); + tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)), ExecutionModel::STATELESS, - kj::str("foo"), kj::none, kj::none, kj::none, kj::none); + kj::str("foo"), kj::none, kj::none, kj::none, kj::none, + tracing::Onset::TriggerContext(trigger)); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); @@ -490,6 +495,11 @@ KJ_TEST("Read/Write Onset works") { KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj); KJ_ASSERT(info2.executionModel == ExecutionModel::STATELESS); + auto& triggerCtx = KJ_ASSERT_NONNULL(info2.trigger); + KJ_ASSERT(triggerCtx.traceId == trigger.getTraceId()); + KJ_ASSERT(triggerCtx.invocationId == trigger.getInvocationId()); + KJ_ASSERT(triggerCtx.spanId == trigger.getSpanId()); + tracing::Onset info3 = info.clone(); tracing::FetchEventInfo& fetchInfo3 = KJ_ASSERT_NONNULL(info3.info.tryGet()); @@ -517,6 +527,23 @@ KJ_TEST("Read/Write Outcome works") { KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); } +KJ_TEST("Read/Write Link works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + + tracing::Link link(context, kj::str("foo")); + link.copyTo(infoBuilder); + + tracing::Link link2(infoBuilder.asReader()); + KJ_ASSERT(KJ_ASSERT_NONNULL(link2.label) == "foo"_kj); + KJ_ASSERT(link2.traceId == context.getTraceId()); + KJ_ASSERT(link2.invocationId == context.getInvocationId()); + KJ_ASSERT(link2.spanId == context.getSpanId()); +} + KJ_TEST("Read/Write TailEvent works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 3b74c65a1929..cf16c4f0a485 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -1097,6 +1097,59 @@ tracing::SpanClose tracing::SpanClose::clone() { return SpanClose(outcome); } +namespace { +kj::Maybe readLabelFromReader(const rpc::Trace::Link::Reader& reader) { + if (!reader.hasLabel()) return kj::none; + return kj::str(reader.getLabel()); +} +tracing::TraceId readTraceIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::TraceId::fromCapnp(context.getTraceId()); +} +tracing::TraceId readInvocationIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::TraceId::fromCapnp(context.getInvocationId()); +} +tracing::SpanId readSpanIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::SpanId(context.getSpanId()); +} +} // namespace + +tracing::Link::Link(const InvocationSpanContext& other, kj::Maybe label) + : Link(kj::mv(label), other.getTraceId(), other.getInvocationId(), other.getSpanId()) {} + +tracing::Link::Link( + kj::Maybe label, TraceId traceId, TraceId invocationId, SpanId spanId) + : label(kj::mv(label)), + traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)) {} + +tracing::Link::Link(rpc::Trace::Link::Reader reader) + : label(readLabelFromReader(reader)), + traceId(readTraceIdFromReader(reader)), + invocationId(readInvocationIdFromReader(reader)), + spanId(readSpanIdFromReader(reader)) {} + +void tracing::Link::copyTo(rpc::Trace::Link::Builder builder) { + KJ_IF_SOME(l, label) { + builder.setLabel(l); + } + auto ctx = builder.initContext(); + traceId.toCapnp(ctx.initTraceId()); + invocationId.toCapnp(ctx.initInvocationId()); + ctx.setSpanId(spanId.getId()); +} + +tracing::Link tracing::Link::clone() { + return Link( + label.map([](kj::String& str) { return kj::str(str); }), traceId, invocationId, spanId); +} + namespace { tracing::Onset::Info getInfoFromReader(const rpc::Trace::Onset::Reader& reader) { auto info = reader.getInfo(); @@ -1175,6 +1228,13 @@ kj::Maybe getEntrypointFromReader(const rpc::Trace::Onset::Reader& r } return kj::none; } +kj::Maybe getTriggerContextFromReader( + const rpc::Trace::Onset::Reader& reader) { + if (!reader.hasTrigger()) return kj::none; + auto trigger = reader.getTrigger(); + return tracing::Onset::TriggerContext(tracing::TraceId::fromCapnp(trigger.getTraceId()), + tracing::TraceId::fromCapnp(trigger.getInvocationId()), tracing::SpanId(trigger.getSpanId())); +} } // namespace tracing::Onset::Onset(tracing::Onset::Info&& info, @@ -1183,14 +1243,16 @@ tracing::Onset::Onset(tracing::Onset::Info&& info, kj::Maybe> scriptVersion, kj::Maybe dispatchNamespace, kj::Maybe> scriptTags, - kj::Maybe entrypoint) + kj::Maybe entrypoint, + kj::Maybe maybeTrigger) : info(kj::mv(info)), executionModel(executionModel), scriptName(kj::mv(scriptName)), scriptVersion(kj::mv(scriptVersion)), dispatchNamespace(kj::mv(dispatchNamespace)), scriptTags(kj::mv(scriptTags)), - entrypoint(kj::mv(entrypoint)) {} + entrypoint(kj::mv(entrypoint)), + trigger(kj::mv(maybeTrigger)) {} tracing::Onset::Onset(rpc::Trace::Onset::Reader reader) : info(getInfoFromReader(reader)), @@ -1199,7 +1261,8 @@ tracing::Onset::Onset(rpc::Trace::Onset::Reader reader) scriptVersion(getScriptVersionFromReader(reader)), dispatchNamespace(getDispatchNamespaceFromReader(reader)), scriptTags(getScriptTagsFromReader(reader)), - entrypoint(getEntrypointFromReader(reader)) {} + entrypoint(getEntrypointFromReader(reader)), + trigger(getTriggerContextFromReader(reader)) {} void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) { builder.setExecutionModel(executionModel); @@ -1221,6 +1284,12 @@ void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) { KJ_IF_SOME(e, entrypoint) { builder.setEntryPoint(e); } + KJ_IF_SOME(t, trigger) { + auto ctx = builder.initTrigger(); + t.traceId.toCapnp(ctx.initTraceId()); + t.invocationId.toCapnp(ctx.getInvocationId()); + ctx.setSpanId(t.spanId.getId()); + } auto infoBuilder = builder.initInfo(); KJ_SWITCH_ONEOF(info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { @@ -1297,7 +1366,9 @@ tracing::Onset tracing::Onset::clone() { scriptVersion.map([](auto& version) { return capnp::clone(*version); }), dispatchNamespace.map([](auto& ns) { return kj::str(ns); }), scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }), - entrypoint.map([](auto& e) { return kj::str(e); })); + entrypoint.map([](auto& e) { return kj::str(e); }), trigger.map([](TriggerContext& ctx) { + return TriggerContext(ctx.traceId, ctx.invocationId, ctx.spanId); + })); } tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) @@ -1383,6 +1454,9 @@ tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Re case rpc::Trace::TailEvent::Event::LOG: { return tracing::Mark(tracing::Log(event.getLog())); } + case rpc::Trace::TailEvent::Event::LINK: { + return tracing::Mark(tracing::Link(event.getLink())); + } } KJ_UNREACHABLE; } @@ -1434,6 +1508,9 @@ void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) { KJ_CASE_ONEOF(ret, Return) { ret.copyTo(eventBuilder.initReturn()); } + KJ_CASE_ONEOF(link, Link) { + link.copyTo(eventBuilder.initLink()); + } KJ_CASE_ONEOF(attrs, kj::Array) { // Mark is a collection of attributes. auto attrBuilder = eventBuilder.initAttribute(attrs.size()); @@ -1478,6 +1555,9 @@ tracing::TailEvent tracing::TailEvent::clone() { KJ_CASE_ONEOF(ret, Return) { return Mark(ret.clone()); } + KJ_CASE_ONEOF(link, Link) { + return Mark(link.clone()); + } KJ_CASE_ONEOF(attrs, tracing::CustomInfo) { return Mark(KJ_MAP(attr, attrs) { return attr.clone(); }); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 5a2eb0daab3a..7a50ae39f379 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -611,7 +611,26 @@ struct Return final { Return clone(); }; -using Mark = kj::OneOf>; +// A Link mark is used to establish a link from one span to another. +// The optional label can be used to identify the link. +struct Link final { + explicit Link(const InvocationSpanContext& other, kj::Maybe label = kj::none); + explicit Link(kj::Maybe label, TraceId traceId, TraceId invocationId, SpanId spanId); + Link(rpc::Trace::Link::Reader reader); + Link(Link&&) = default; + Link& operator=(Link&&) = default; + KJ_DISALLOW_COPY(Link); + + kj::Maybe label; + TraceId traceId; + TraceId invocationId; + SpanId spanId; + + void copyTo(rpc::Trace::Link::Builder builder); + Link clone(); +}; + +using Mark = kj::OneOf>; // Marks the opening of a child span within the streaming tail session. struct SpanOpen final { @@ -656,13 +675,29 @@ struct SpanClose final { struct Onset final { using Info = EventInfo; + struct TriggerContext { + TraceId traceId; + TraceId invocationId; + SpanId spanId; + + TriggerContext(TraceId traceId, TraceId invocationId, SpanId spanId) + : traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)) {} + + TriggerContext(const InvocationSpanContext& ctx) + : TriggerContext(ctx.getTraceId(), ctx.getInvocationId(), ctx.getSpanId()) {} + }; + explicit Onset(Info&& info, - ExecutionModel executionModel, - kj::Maybe scriptName, - kj::Maybe> scriptVersion, - kj::Maybe dispatchNamespace, - kj::Maybe> scriptTags, - kj::Maybe entrypoint); + ExecutionModel executionModel = ExecutionModel::STATELESS, + kj::Maybe scriptName = kj::none, + kj::Maybe> scriptVersion = kj::none, + kj::Maybe dispatchNamespace = kj::none, + kj::Maybe> scriptTags = kj::none, + kj::Maybe entrypoint = kj::none, + kj::Maybe maybeTrigger = kj::none); + Onset(rpc::Trace::Onset::Reader reader); Onset(Onset&&) = default; Onset& operator=(Onset&&) = default; @@ -676,6 +711,8 @@ struct Onset final { kj::Maybe> scriptTags; kj::Maybe entrypoint; + kj::Maybe trigger; + void copyTo(rpc::Trace::Onset::Builder builder); Onset clone(); }; diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 80308a18c394..8a1588cb66a1 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -233,17 +233,23 @@ struct Trace @0x8e8d911203762d34 { scriptTags @4 :List(Text); entryPoint @5 :Text; + trigger @6 :InvocationSpanContext; + # If this invocation was triggered by a different invocation that + # is being traced, the trigger will identify the triggering span. + # Propagation of the trigger context is not required, and in some + # cases is not desirable. + info :union { - fetch @6 :FetchEventInfo; - jsrpc @7 :JsRpcEventInfo; - scheduled @8 :ScheduledEventInfo; - alarm @9 :AlarmEventInfo; - queue @10 :QueueEventInfo; - email @11 :EmailEventInfo; - trace @12 :TraceEventInfo; - hibernatableWebSocket @13 :HibernatableWebSocketEventInfo; - resume @14 :Resume; - custom @15 :CustomEventInfo; + fetch @7 :FetchEventInfo; + jsrpc @8 :JsRpcEventInfo; + scheduled @9 :ScheduledEventInfo; + alarm @10 :AlarmEventInfo; + queue @11 :QueueEventInfo; + email @12 :EmailEventInfo; + trace @13 :TraceEventInfo; + hibernatableWebSocket @14 :HibernatableWebSocketEventInfo; + resume @15 :Resume; + custom @16 :CustomEventInfo; } } @@ -257,6 +263,12 @@ struct Trace @0x8e8d911203762d34 { # A hibernate event indicates that the tail session is being hibernated. } + struct Link { + # A link to another invocation span context. + label @0 :Text; + context @1 :InvocationSpanContext; + } + struct TailEvent { # A streaming tail worker receives a series of Tail Events. Tail events always # occur within an InvocationSpanContext. The first TailEvent delivered to a @@ -278,6 +290,7 @@ struct Trace @0x8e8d911203762d34 { diagnosticChannelEvent @10 :DiagnosticChannelEvent; exception @11 :Exception; log @12 :Log; + link @13 :Link; } } }