diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index b456f25676d..ca23202813c 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -184,6 +184,10 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) } KJ_UNREACHABLE; } + KJ_CASE_ONEOF(resume, tracing::Resume) { + // Resume events are not used with legacy trace + KJ_UNREACHABLE; + } KJ_CASE_ONEOF(custom, tracing::CustomEventInfo) { return kj::Maybe(jsg::alloc(trace, custom)); } diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index be11c142a16..09c3439926b 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -202,7 +202,7 @@ IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own context kj::Own ioChannelFactoryParam, kj::Own metricsParam, kj::Maybe> workerTracer, - kj::Rc invocationSpanContext) + tracing::InvocationSpanContext invocationSpanContext) : context(kj::mv(contextParam)), metrics(kj::mv(metricsParam)), workerTracer(kj::mv(workerTracer)), diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index d78d712419b..2bf23326817 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -111,7 +111,7 @@ class IoContext_IncomingRequest final { kj::Own ioChannelFactory, kj::Own metrics, kj::Maybe> workerTracer, - kj::Rc invocationSpanContext); + tracing::InvocationSpanContext invocationSpanContext); KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest); ~IoContext_IncomingRequest() noexcept(false); @@ -163,7 +163,7 @@ class IoContext_IncomingRequest final { // The invocation span context is a unique identifier for a specific // worker invocation. - kj::Rc& getInvocationSpanContext() { + tracing::InvocationSpanContext& getInvocationSpanContext() { return invocationSpanContext; } @@ -176,7 +176,7 @@ class IoContext_IncomingRequest final { // The invocation span context identifies the trace id, invocation id, and root // span for the current request. Every invocation of a worker function always // has a root span, even if it is not explicitly traced. - kj::Rc invocationSpanContext; + tracing::InvocationSpanContext invocationSpanContext; bool wasDelivered = false; diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 5fa0ec2b8da..12eb2be3a14 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -12,6 +12,25 @@ namespace workerd::tracing { namespace { +class FakeEntropySource final: public kj::EntropySource { + public: + void generate(kj::ArrayPtr buffer) override { + // Write the uint64_t value to the buffer + buffer[0] = counter & 0xff; + buffer[1] = (counter >> 8) & 0xff; + buffer[2] = (counter >> 16) & 0xff; + buffer[3] = (counter >> 24) & 0xff; + buffer[4] = (counter >> 32) & 0xff; + buffer[5] = (counter >> 40) & 0xff; + buffer[6] = (counter >> 48) & 0xff; + buffer[7] = (counter >> 56) & 0xff; + counter++; + } + + private: + uint64_t counter = 0; +}; + KJ_TEST("can read trace ID string format") { KJ_EXPECT(TraceId::fromGoString("z"_kj) == kj::none); @@ -66,72 +85,528 @@ KJ_TEST("can write trace ID protobuf format") { } KJ_TEST("InvocationSpanContext") { - - class FakeEntropySource final: public kj::EntropySource { - public: - void generate(kj::ArrayPtr buffer) override { - // Write the uint64_t value to the buffer - buffer[0] = counter & 0xff; - buffer[1] = (counter >> 8) & 0xff; - buffer[2] = (counter >> 16) & 0xff; - buffer[3] = (counter >> 24) & 0xff; - buffer[4] = (counter >> 32) & 0xff; - buffer[5] = (counter >> 40) & 0xff; - buffer[6] = (counter >> 48) & 0xff; - buffer[7] = (counter >> 56) & 0xff; - counter++; - } - - private: - uint64_t counter = 0; - }; - setPredictableModeForTest(); FakeEntropySource fakeEntropySource; auto sc = InvocationSpanContext::newForInvocation(kj::none, fakeEntropySource); // We can create an InvocationSpanContext... static constexpr auto kCheck = TraceId(0x2a2a2a2a2a2a2a2a, 0x2a2a2a2a2a2a2a2a); - KJ_EXPECT(sc->getTraceId() == kCheck); - KJ_EXPECT(sc->getInvocationId() == kCheck); - KJ_EXPECT(sc->getSpanId() == 0); + KJ_EXPECT(sc.getTraceId() == kCheck); + KJ_EXPECT(sc.getInvocationId() == kCheck); + KJ_EXPECT(sc.getSpanId() == SpanId(1)); // And serialize that to a capnp struct... capnp::MallocMessageBuilder builder; auto root = builder.initRoot(); - sc->toCapnp(root); + sc.toCapnp(root); // Then back again... auto sc2 = KJ_ASSERT_NONNULL(InvocationSpanContext::fromCapnp(root.asReader())); - KJ_EXPECT(sc2->getTraceId() == kCheck); - KJ_EXPECT(sc2->getInvocationId() == kCheck); - KJ_EXPECT(sc2->getSpanId() == 0); - KJ_EXPECT(sc2->isTrigger()); + KJ_EXPECT(sc2.getTraceId() == kCheck); + KJ_EXPECT(sc2.getInvocationId() == kCheck); + KJ_EXPECT(sc2.getSpanId() == SpanId(1)); + KJ_EXPECT(sc2.isTrigger()); // The one that has been deserialized from capnp cannot create children... try { - sc2->newChild(); + sc2.newChild(); KJ_FAIL_ASSERT("should not be able to create child span with SpanContext from capnp"); } catch (kj::Exception& ex) { - // KJ_EXPECT(ex.getDescription() == - // "expected !isTrigger(); unable to create child spans on this context"_kj); + KJ_EXPECT(ex.getDescription() == + "expected !isTrigger(); unable to create child spans on this context"_kj); } - auto sc3 = sc->newChild(); - KJ_EXPECT(sc3->getTraceId() == kCheck); - KJ_EXPECT(sc3->getInvocationId() == kCheck); - KJ_EXPECT(sc3->getSpanId() == 1); - - auto sc4 = InvocationSpanContext::newForInvocation(sc2); - KJ_EXPECT(sc4->getTraceId() == kCheck); - KJ_EXPECT(sc4->getInvocationId() == kCheck); - KJ_EXPECT(sc4->getSpanId() == 0); - - auto& sc5 = KJ_ASSERT_NONNULL(sc4->getParent()); - KJ_EXPECT(sc5->getTraceId() == kCheck); - KJ_EXPECT(sc5->getInvocationId() == kCheck); - KJ_EXPECT(sc5->getSpanId() == 0); - KJ_EXPECT(sc5->isTrigger()); + auto sc3 = sc.newChild(); + KJ_EXPECT(sc3.getTraceId() == kCheck); + KJ_EXPECT(sc3.getInvocationId() == kCheck); + KJ_EXPECT(sc3.getSpanId() == SpanId(2)); + + auto sc4 = InvocationSpanContext::newForInvocation(sc2, fakeEntropySource); + KJ_EXPECT(sc4.getTraceId() == kCheck); + KJ_EXPECT(sc4.getInvocationId() == kCheck); + KJ_EXPECT(sc4.getSpanId() == SpanId(3)); + + auto& sc5 = KJ_ASSERT_NONNULL(sc4.getParent()); + KJ_EXPECT(sc5.getTraceId() == kCheck); + KJ_EXPECT(sc5.getInvocationId() == kCheck); + KJ_EXPECT(sc5.getSpanId() == SpanId(1)); + KJ_EXPECT(sc5.isTrigger()); +} + +KJ_TEST("Read/Write FetchEventInfo works") { + capnp::MallocMessageBuilder builder; + auto fetchInfoBuilder = builder.initRoot(); + + kj::Vector headers; + headers.add(FetchEventInfo::Header(kj::str("foo"), kj::str("bar"))); + + tracing::FetchEventInfo info( + kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), headers.releaseAsArray()); + + info.copyTo(fetchInfoBuilder); + + auto reader = fetchInfoBuilder.asReader(); + + tracing::FetchEventInfo info2(reader); + KJ_ASSERT(info2.method == kj::HttpMethod::GET); + KJ_ASSERT(info2.url == "https://example.com"_kj); + KJ_ASSERT(info2.cfJson == "{}"_kj); + KJ_ASSERT(info2.headers.size() == 1); + KJ_ASSERT(info2.headers[0].name == "foo"_kj); + KJ_ASSERT(info2.headers[0].value == "bar"_kj); + + tracing::FetchEventInfo info3 = info.clone(); + KJ_ASSERT(info3.method == kj::HttpMethod::GET); + KJ_ASSERT(info3.url == "https://example.com"_kj); + KJ_ASSERT(info3.cfJson == "{}"_kj); + KJ_ASSERT(info3.headers.size() == 1); + KJ_ASSERT(info3.headers[0].name == "foo"_kj); + KJ_ASSERT(info3.headers[0].value == "bar"_kj); +} + +KJ_TEST("Read/Write JsRpcEventInfo works") { + capnp::MallocMessageBuilder builder; + auto jsRpcInfoBuilder = builder.initRoot(); + + tracing::JsRpcEventInfo info(kj::str("foo")); + + info.copyTo(jsRpcInfoBuilder); + + auto reader = jsRpcInfoBuilder.asReader(); + + tracing::JsRpcEventInfo info2(reader); + KJ_ASSERT(info2.methodName == "foo"_kj); + + tracing::JsRpcEventInfo info3 = info.clone(); + KJ_ASSERT(info3.methodName == "foo"_kj); +} + +KJ_TEST("Read/Write ScheduledEventInfo workers") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::ScheduledEventInfo info(1.2, kj::str("foo")); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::ScheduledEventInfo info2(reader); + KJ_ASSERT(info2.scheduledTime == 1.2); + KJ_ASSERT(info2.cron == "foo"_kj); + + tracing::ScheduledEventInfo info3 = info.clone(); + KJ_ASSERT(info3.scheduledTime == 1.2); + KJ_ASSERT(info3.cron == "foo"_kj); +} + +KJ_TEST("Read/Write AlarmEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::AlarmEventInfo info(kj::UNIX_EPOCH); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::AlarmEventInfo info2(reader); + KJ_ASSERT(info.scheduledTime == info2.scheduledTime); + + tracing::AlarmEventInfo info3 = info.clone(); + KJ_ASSERT(info.scheduledTime == info3.scheduledTime); +} + +KJ_TEST("Read/Write QueueEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::QueueEventInfo info(kj::str("foo"), 1); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::QueueEventInfo info2(reader); + KJ_ASSERT(info2.queueName == "foo"_kj); + KJ_ASSERT(info2.batchSize == 1); + + tracing::QueueEventInfo info3 = info.clone(); + KJ_ASSERT(info2.queueName == "foo"_kj); + KJ_ASSERT(info2.batchSize == 1); +} + +KJ_TEST("Read/Write EmailEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::EmailEventInfo info(kj::str("foo"), kj::str("bar"), 1); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::EmailEventInfo info2(reader); + KJ_ASSERT(info2.mailFrom == "foo"_kj); + KJ_ASSERT(info2.rcptTo == "bar"_kj); + KJ_ASSERT(info2.rawSize == 1); + + tracing::EmailEventInfo info3 = info.clone(); + KJ_ASSERT(info3.mailFrom == "foo"_kj); + KJ_ASSERT(info3.rcptTo == "bar"_kj); + KJ_ASSERT(info3.rawSize == 1); +} + +KJ_TEST("Read/Write TraceEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + kj::Vector> items(1); + items.add(kj::heap(kj::none, kj::str("foo"), kj::none, kj::none, kj::none, + kj::Array(), kj::none, ExecutionModel::STATELESS)); + + tracing::TraceEventInfo info(items.asPtr()); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::TraceEventInfo info2(reader); + KJ_ASSERT(info2.traces.size() == 1); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); + + tracing::TraceEventInfo info3 = info.clone(); + KJ_ASSERT(info2.traces.size() == 1); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); +} + +KJ_TEST("Read/Write HibernatableWebSocketEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::HibernatableWebSocketEventInfo info(tracing::HibernatableWebSocketEventInfo::Message{}); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::HibernatableWebSocketEventInfo info2(reader); + KJ_ASSERT(info2.type.is()); + + tracing::HibernatableWebSocketEventInfo info3 = info.clone(); + KJ_ASSERT(info3.type.is()); +} + +KJ_TEST("Read/Write FetchResponseInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchResponseInfo info(123); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::FetchResponseInfo info2(reader); + KJ_ASSERT(info2.statusCode == 123); + + tracing::FetchResponseInfo info3 = info.clone(); + KJ_ASSERT(info3.statusCode == 123); +} + +KJ_TEST("Read/Write DiagnosticChannelEvent works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::DiagnosticChannelEvent info(kj::UNIX_EPOCH, kj::str("foo"), kj::Array()); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::DiagnosticChannelEvent info2(reader); + KJ_ASSERT(info2.timestamp == info.timestamp); + KJ_ASSERT(info2.channel == "foo"_kj); + KJ_ASSERT(info2.message.size() == 0); + + tracing::DiagnosticChannelEvent info3 = info.clone(); + KJ_ASSERT(info3.timestamp == info.timestamp); + KJ_ASSERT(info3.channel == "foo"_kj); + KJ_ASSERT(info3.message.size() == 0); +} + +KJ_TEST("Read/Write Log works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Log info(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Log info2(reader); + KJ_ASSERT(info.timestamp == info2.timestamp); + KJ_ASSERT(info2.logLevel == LogLevel::INFO); + KJ_ASSERT(info2.message == "foo"_kj); + + tracing::Log info3 = info.clone(); + KJ_ASSERT(info.timestamp == info3.timestamp); + KJ_ASSERT(info3.logLevel == LogLevel::INFO); + KJ_ASSERT(info3.message == "foo"_kj); +} + +KJ_TEST("Read/Write Exception works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Exception info(kj::UNIX_EPOCH, kj::str("foo"), kj::str("bar"), kj::none); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Exception info2(reader); + KJ_ASSERT(info.timestamp == info2.timestamp); + KJ_ASSERT(info2.name == "foo"_kj); + KJ_ASSERT(info2.message == "bar"_kj); + KJ_ASSERT(info2.stack == kj::none); + + tracing::Exception info3 = info.clone(); + KJ_ASSERT(info.timestamp == info3.timestamp); + KJ_ASSERT(info3.name == "foo"_kj); + KJ_ASSERT(info3.message == "bar"_kj); + KJ_ASSERT(info3.stack == kj::none); +} + +KJ_TEST("Read/Write Resume works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Resume info(kj::arr(1, 2, 3)); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Resume info2(reader); + auto& attachment = KJ_ASSERT_NONNULL(info2.attachment); + KJ_ASSERT(attachment.size() == 3); + KJ_ASSERT(attachment[0] == 1); + KJ_ASSERT(attachment[1] == 2); + KJ_ASSERT(attachment[2] == 3); + + tracing::Resume info3 = info.clone(); + auto& attachment2 = KJ_ASSERT_NONNULL(info3.attachment); + KJ_ASSERT(attachment2.size() == 3); + KJ_ASSERT(attachment2[0] == 1); + KJ_ASSERT(attachment2[1] == 2); + KJ_ASSERT(attachment2[2] == 3); +} + +KJ_TEST("Read/Write Hibernate works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Hibernate info; + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Hibernate info2(reader); + + info.clone(); +} + +KJ_TEST("Read/Write Attribute works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Attribute attr(kj::str("foo"), {123.0, 321.2}); + attr.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Attribute info2(reader); + KJ_ASSERT(info2.name == "foo"_kj); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[0].tryGet()) == 123.0); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[1].tryGet()) == 321.2); +} + +KJ_TEST("Read/Write Return works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchResponseInfo fetchInfo(123); + tracing::Return info(tracing::Return::Info(kj::mv(fetchInfo))); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Return info2(reader); + auto& fetchInfo2 = + KJ_ASSERT_NONNULL(KJ_ASSERT_NONNULL(info2.info).tryGet()); + KJ_ASSERT(fetchInfo2.statusCode == 123); + + tracing::Return info3 = info.clone(); + auto& fetchInfo3 = + KJ_ASSERT_NONNULL(KJ_ASSERT_NONNULL(info3.info).tryGet()); + KJ_ASSERT(fetchInfo3.statusCode == 123); +} + +KJ_TEST("Read/Write SpanOpen works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::SpanOpen info(kj::str("foo"), kj::none); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::SpanOpen info2(reader); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.operationName) == "foo"_kj); + KJ_ASSERT(info2.info == kj::none); + + tracing::SpanOpen info3 = info.clone(); + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.operationName) == "foo"_kj); + KJ_ASSERT(info3.info == kj::none); +} + +KJ_TEST("Read/Write SpanClose works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::SpanClose info(EventOutcome::EXCEPTION); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::SpanClose info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); + + tracing::SpanClose info3 = info.clone(); + KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); +} + +KJ_TEST("Read/Write Onset works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchEventInfo fetchInfo( + kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr); + + FakeEntropySource entropy; + auto trigger = InvocationSpanContext::newForInvocation(kj::none, entropy); + + tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)), + { + .scriptName = kj::str("foo"), + }, + tracing::Onset::TriggerContext(trigger)); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Onset info2(reader); + tracing::FetchEventInfo& fetchInfo2 = + KJ_ASSERT_NONNULL(info2.info.tryGet()); + KJ_ASSERT(fetchInfo2.method == kj::HttpMethod::GET); + KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj); + KJ_ASSERT(info2.workerInfo.executionModel == ExecutionModel::STATELESS); + + auto& triggerCtx = KJ_ASSERT_NONNULL(info2.trigger); + KJ_ASSERT(triggerCtx.traceId == trigger.getTraceId()); + KJ_ASSERT(triggerCtx.invocationId == trigger.getInvocationId()); + KJ_ASSERT(triggerCtx.spanId == trigger.getSpanId()); + + tracing::Onset info3 = info.clone(); + tracing::FetchEventInfo& fetchInfo3 = + KJ_ASSERT_NONNULL(info3.info.tryGet()); + KJ_ASSERT(fetchInfo3.method == kj::HttpMethod::GET); + KJ_ASSERT(fetchInfo3.url == "https://example.com"_kj); + KJ_ASSERT(info3.workerInfo.executionModel == ExecutionModel::STATELESS); +} + +KJ_TEST("Read/Write Outcome works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Outcome info(EventOutcome::EXCEPTION, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Outcome info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); + KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS); + + tracing::Outcome info3 = info.clone(); + KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); + KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); +} + +KJ_TEST("Read/Write Link works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + + tracing::Link link(context, kj::str("foo")); + link.copyTo(infoBuilder); + + tracing::Link link2(infoBuilder.asReader()); + KJ_ASSERT(KJ_ASSERT_NONNULL(link2.label) == "foo"_kj); + KJ_ASSERT(link2.traceId == context.getTraceId()); + KJ_ASSERT(link2.invocationId == context.getInvocationId()); + KJ_ASSERT(link2.spanId == context.getSpanId()); +} + +KJ_TEST("Read/Write TailEvent works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + tracing::Log log(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + tracing::TailEvent info(context, kj::UNIX_EPOCH, 0, tracing::Mark(kj::mv(log))); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::TailEvent info2(reader); + KJ_ASSERT(info2.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(info2.sequence == 0); + KJ_ASSERT(info2.invocationId == context.getInvocationId()); + KJ_ASSERT(info2.traceId == context.getTraceId()); + KJ_ASSERT(info2.spanId == context.getSpanId()); + + auto& event = KJ_ASSERT_NONNULL(info2.event.tryGet()); + auto& log2 = KJ_ASSERT_NONNULL(event.tryGet()); + KJ_ASSERT(log2.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(log2.logLevel == LogLevel::INFO); + KJ_ASSERT(log2.message == "foo"_kj); + + tracing::TailEvent info3 = info.clone(); + KJ_ASSERT(info3.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(info3.sequence == 0); + KJ_ASSERT(info3.invocationId == context.getInvocationId()); + KJ_ASSERT(info3.traceId == context.getTraceId()); + KJ_ASSERT(info3.spanId == context.getSpanId()); + + auto& event2 = KJ_ASSERT_NONNULL(info3.event.tryGet()); + auto& log3 = KJ_ASSERT_NONNULL(event2.tryGet()); + KJ_ASSERT(log3.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(log3.logLevel == LogLevel::INFO); + KJ_ASSERT(log3.message == "foo"_kj); +} + +KJ_TEST("Read/Write TailEvent with Multiple Attributes") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + + // An attribute event can have one or more Attributes specified. + kj::Vector attrs(2); + attrs.add(tracing::Attribute(kj::str("foo"), true)); + attrs.add(tracing::Attribute(kj::str("bar"), 123)); + + tracing::TailEvent info(context, kj::UNIX_EPOCH, 0, tracing::Mark(attrs.releaseAsArray())); + info.copyTo(infoBuilder); + + tracing::TailEvent info2(infoBuilder.asReader()); + auto& mark = KJ_ASSERT_NONNULL(info2.event.tryGet()); + auto& attrs2 = KJ_ASSERT_NONNULL(mark.tryGet>()); + KJ_ASSERT(attrs2.size() == 2); + + KJ_ASSERT(attrs2[0].name == "foo"_kj); + KJ_ASSERT(attrs2[1].name == "bar"_kj); } } // namespace diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 8a284633c08..9c8450e3823 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -127,7 +127,7 @@ kj::String TraceId::toW3C() const { } namespace { -uint64_t getRandom64Bit(kj::Maybe& entropySource) { +uint64_t getRandom64Bit(const kj::Maybe& entropySource) { uint64_t ret = 0; uint8_t tries = 0; @@ -154,6 +154,21 @@ TraceId TraceId::fromEntropy(kj::Maybe entropySource) { return TraceId(getRandom64Bit(entropySource), getRandom64Bit(entropySource)); } +kj::String SpanId::toGoString() const { + kj::Vector s(16); + addHex(s, id); + s.add('\0'); + return kj::String(s.releaseAsArray()); +} + +SpanId SpanId::fromEntropy(kj::Maybe entropySource) { + return SpanId(getRandom64Bit(entropySource)); +} + +kj::String KJ_STRINGIFY(const SpanId& id) { + return id; +} + kj::String KJ_STRINGIFY(const TraceId& id) { return id; } @@ -162,31 +177,35 @@ InvocationSpanContext::InvocationSpanContext(kj::Badge, kj::Maybe entropySource, TraceId traceId, TraceId invocationId, - uint64_t spanId, - kj::Maybe> parentSpanContext) + SpanId spanId, + kj::Maybe parentSpanContext) : entropySource(entropySource), traceId(kj::mv(traceId)), invocationId(kj::mv(invocationId)), - spanId(spanId), - parentSpanContext(kj::mv(parentSpanContext)) {} + spanId(kj::mv(spanId)), + parentSpanContext(parentSpanContext.map([](const InvocationSpanContext& ctx) { + return kj::heap(ctx.clone()); + })) {} -kj::Rc InvocationSpanContext::newChild() { +InvocationSpanContext InvocationSpanContext::newChild() const { KJ_ASSERT(!isTrigger(), "unable to create child spans on this context"); - return kj::rc(kj::Badge(), entropySource, traceId, - invocationId, getRandom64Bit(entropySource), addRefToThis()); + kj::Maybe otherEntropySource = entropySource.map( + [](auto& es) -> kj::EntropySource& { return const_cast(es); }); + return InvocationSpanContext(kj::Badge(), otherEntropySource, traceId, + invocationId, SpanId::fromEntropy(otherEntropySource), *this); } -kj::Rc InvocationSpanContext::newForInvocation( - kj::Maybe&> triggerContext, +InvocationSpanContext InvocationSpanContext::newForInvocation( + kj::Maybe triggerContext, kj::Maybe entropySource) { - kj::Maybe> parent; + kj::Maybe parent; auto traceId = triggerContext - .map([&](kj::Rc& ctx) { - parent = ctx.addRef(); - return ctx->traceId; + .map([&](auto& ctx) mutable -> TraceId { + parent = ctx; + return ctx.traceId; }).orDefault([&] { return TraceId::fromEntropy(entropySource); }); - return kj::rc(kj::Badge(), entropySource, - kj::mv(traceId), TraceId::fromEntropy(entropySource), 0, kj::mv(parent)); + return InvocationSpanContext(kj::Badge(), entropySource, kj::mv(traceId), + TraceId::fromEntropy(entropySource), SpanId::fromEntropy(entropySource), kj::mv(parent)); } TraceId TraceId::fromCapnp(rpc::InvocationSpanContext::TraceId::Reader reader) { @@ -198,7 +217,7 @@ void TraceId::toCapnp(rpc::InvocationSpanContext::TraceId::Builder writer) const writer.setHigh(high); } -kj::Maybe> InvocationSpanContext::fromCapnp( +kj::Maybe InvocationSpanContext::fromCapnp( rpc::InvocationSpanContext::Reader reader) { if (!reader.hasTraceId() || !reader.hasInvocationId()) { // If the reader does not have a traceId or invocationId field then it is @@ -206,11 +225,11 @@ kj::Maybe> InvocationSpanContext::fromCapnp( return kj::none; } - auto sc = kj::rc(kj::Badge(), kj::none, + auto sc = InvocationSpanContext(kj::Badge(), kj::none, TraceId::fromCapnp(reader.getTraceId()), TraceId::fromCapnp(reader.getInvocationId()), reader.getSpanId()); // If the traceId or invocationId are invalid, then we'll ignore them. - if (!sc->getTraceId() || !sc->getInvocationId()) return kj::none; + if (!sc.getTraceId() || !sc.getInvocationId()) return kj::none; return kj::mv(sc); } @@ -218,11 +237,18 @@ void InvocationSpanContext::toCapnp(rpc::InvocationSpanContext::Builder writer) traceId.toCapnp(writer.initTraceId()); invocationId.toCapnp(writer.initInvocationId()); writer.setSpanId(spanId); - kj::mv(getParent()); // Just invalidating the parent. Not moving it anywhere. } -kj::String KJ_STRINGIFY(const kj::Rc& context) { - return kj::str(context->getTraceId(), "-", context->getInvocationId(), "-", context->getSpanId()); +InvocationSpanContext InvocationSpanContext::clone() const { + kj::Maybe otherEntropySource = entropySource.map( + [](auto& es) -> kj::EntropySource& { return const_cast(es); }); + return InvocationSpanContext(kj::Badge(), otherEntropySource, traceId, + invocationId, spanId, + parentSpanContext.map([](auto& ctx) -> const InvocationSpanContext& { return *ctx.get(); })); +} + +kj::String KJ_STRINGIFY(const InvocationSpanContext& context) { + return kj::str(context.getTraceId(), "-", context.getInvocationId(), "-", context.getSpanId()); } } // namespace tracing @@ -272,6 +298,11 @@ void tracing::FetchEventInfo::copyTo(rpc::Trace::FetchEventInfo::Builder builder } } +tracing::FetchEventInfo tracing::FetchEventInfo::clone() { + return FetchEventInfo( + method, kj::str(url), kj::str(cfJson), KJ_MAP(h, headers) { return h.clone(); }); +} + tracing::FetchEventInfo::Header::Header(kj::String name, kj::String value) : name(kj::mv(name)), value(kj::mv(value)) {} @@ -285,6 +316,10 @@ void tracing::FetchEventInfo::Header::copyTo(rpc::Trace::FetchEventInfo::Header: builder.setValue(value); } +tracing::FetchEventInfo::Header tracing::FetchEventInfo::Header::clone() { + return Header(kj::str(name), kj::str(value)); +} + tracing::JsRpcEventInfo::JsRpcEventInfo(kj::String methodName): methodName(kj::mv(methodName)) {} tracing::JsRpcEventInfo::JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader) @@ -294,6 +329,10 @@ void tracing::JsRpcEventInfo::copyTo(rpc::Trace::JsRpcEventInfo::Builder builder builder.setMethodName(methodName); } +tracing::JsRpcEventInfo tracing::JsRpcEventInfo::clone() { + return JsRpcEventInfo(kj::str(methodName)); +} + tracing::ScheduledEventInfo::ScheduledEventInfo(double scheduledTime, kj::String cron) : scheduledTime(scheduledTime), cron(kj::mv(cron)) {} @@ -307,6 +346,10 @@ void tracing::ScheduledEventInfo::copyTo(rpc::Trace::ScheduledEventInfo::Builder builder.setCron(cron); } +tracing::ScheduledEventInfo tracing::ScheduledEventInfo::clone() { + return ScheduledEventInfo(scheduledTime, kj::str(cron)); +} + tracing::AlarmEventInfo::AlarmEventInfo(kj::Date scheduledTime): scheduledTime(scheduledTime) {} tracing::AlarmEventInfo::AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader) @@ -316,6 +359,10 @@ void tracing::AlarmEventInfo::copyTo(rpc::Trace::AlarmEventInfo::Builder builder builder.setScheduledTimeMs((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS); } +tracing::AlarmEventInfo tracing::AlarmEventInfo::clone() { + return AlarmEventInfo(scheduledTime); +} + tracing::QueueEventInfo::QueueEventInfo(kj::String queueName, uint32_t batchSize) : queueName(kj::mv(queueName)), batchSize(batchSize) {} @@ -329,6 +376,10 @@ void tracing::QueueEventInfo::copyTo(rpc::Trace::QueueEventInfo::Builder builder builder.setBatchSize(batchSize); } +tracing::QueueEventInfo tracing::QueueEventInfo::clone() { + return QueueEventInfo(kj::str(queueName), batchSize); +} + tracing::EmailEventInfo::EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize) : mailFrom(kj::mv(mailFrom)), rcptTo(kj::mv(rcptTo)), @@ -345,6 +396,10 @@ void tracing::EmailEventInfo::copyTo(rpc::Trace::EmailEventInfo::Builder builder builder.setRawSize(rawSize); } +tracing::EmailEventInfo tracing::EmailEventInfo::clone() { + return EmailEventInfo(kj::str(mailFrom), kj::str(rcptTo), rawSize); +} + kj::Vector getTraceItemsFromTraces( kj::ArrayPtr> traces) { return KJ_MAP(t, traces) -> tracing::TraceEventInfo::TraceItem { @@ -373,6 +428,10 @@ void tracing::TraceEventInfo::copyTo(rpc::Trace::TraceEventInfo::Builder builder } } +tracing::TraceEventInfo tracing::TraceEventInfo::clone() { + return TraceEventInfo(KJ_MAP(item, traces) { return item.clone(); }); +} + tracing::TraceEventInfo::TraceItem::TraceItem(kj::Maybe scriptName) : scriptName(kj::mv(scriptName)) {} @@ -386,6 +445,10 @@ void tracing::TraceEventInfo::TraceItem::copyTo( } } +tracing::TraceEventInfo::TraceItem tracing::TraceEventInfo::TraceItem::clone() { + return TraceItem(scriptName.map([](auto& name) { return kj::str(name); })); +} + tracing::DiagnosticChannelEvent::DiagnosticChannelEvent( kj::Date timestamp, kj::String channel, kj::Array message) : timestamp(timestamp), @@ -404,6 +467,10 @@ void tracing::DiagnosticChannelEvent::copyTo(rpc::Trace::DiagnosticChannelEvent: builder.setMessage(message); } +tracing::DiagnosticChannelEvent tracing::DiagnosticChannelEvent::clone() { + return DiagnosticChannelEvent(timestamp, kj::str(channel), kj::heapArray(message)); +} + tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(Type type): type(type) {} tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo( @@ -428,6 +495,24 @@ void tracing::HibernatableWebSocketEventInfo::copyTo( } } +tracing::HibernatableWebSocketEventInfo tracing::HibernatableWebSocketEventInfo::clone() { + KJ_SWITCH_ONEOF(type) { + KJ_CASE_ONEOF(_, Message) { + return HibernatableWebSocketEventInfo(Message{}); + } + KJ_CASE_ONEOF(_, Error) { + return HibernatableWebSocketEventInfo(Error{}); + } + KJ_CASE_ONEOF(close, Close) { + return HibernatableWebSocketEventInfo(Close{ + .code = close.code, + .wasClean = close.wasClean, + }); + } + } + KJ_UNREACHABLE; +} + tracing::HibernatableWebSocketEventInfo::Type tracing::HibernatableWebSocketEventInfo::readFrom( rpc::Trace::HibernatableWebSocketEventInfo::Reader reader) { auto type = reader.getType(); @@ -457,6 +542,10 @@ void tracing::FetchResponseInfo::copyTo(rpc::Trace::FetchResponseInfo::Builder b builder.setStatusCode(statusCode); } +tracing::FetchResponseInfo tracing::FetchResponseInfo::clone() { + return FetchResponseInfo(statusCode); +} + tracing::Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) : timestamp(timestamp), logLevel(logLevel), @@ -576,6 +665,10 @@ void Trace::copyTo(rpc::Trace::Builder builder) { auto hibWsBuilder = eventInfoBuilder.initHibernatableWebSocket(); hibWs.copyTo(hibWsBuilder); } + KJ_CASE_ONEOF(resume, tracing::Resume) { + // Resume is not used in legacy trace. + KJ_UNREACHABLE; + } KJ_CASE_ONEOF(custom, tracing::CustomEventInfo) { eventInfoBuilder.initCustom(); } @@ -603,6 +696,10 @@ void tracing::Log::copyTo(rpc::Trace::Log::Builder builder) { builder.setMessage(message); } +tracing::Log tracing::Log::clone() { + return Log(timestamp, logLevel, kj::str(message)); +} + void tracing::Exception::copyTo(rpc::Trace::Exception::Builder builder) { builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setName(name); @@ -612,6 +709,11 @@ void tracing::Exception::copyTo(rpc::Trace::Exception::Builder builder) { } } +tracing::Exception tracing::Exception::clone() { + return Exception(timestamp, kj::str(name), kj::str(message), + stack.map([](auto& stack) { return kj::str(stack); })); +} + void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLevel) { // Sandboxed workers currently record their traces as if the pipeline log level were set to // "full", so we may need to filter out the extra data after receiving the traces back. @@ -714,6 +816,763 @@ tracing::Exception::Exception(rpc::Trace::Exception::Reader reader) } } +namespace { +kj::Maybe> readResumeAttachment(const auto& reader) { + if (reader.hasAttachment()) { + return kj::heapArray(reader.getAttachment()); + } + return kj::none; +} +} // namespace + +tracing::Resume::Resume(kj::Maybe> attachment) + : attachment(kj::mv(attachment)) {} + +tracing::Resume::Resume(rpc::Trace::Resume::Reader reader) + : attachment(readResumeAttachment(reader)) {} + +void tracing::Resume::copyTo(rpc::Trace::Resume::Builder builder) { + KJ_IF_SOME(attach, attachment) { + builder.setAttachment(attach); + } +} + +tracing::Resume tracing::Resume::clone() { + return Resume(attachment.map([](auto& attach) { return kj::heapArray(attach); })); +} + +tracing::Hibernate::Hibernate() {} + +tracing::Hibernate::Hibernate(rpc::Trace::Hibernate::Reader reader) {} + +void tracing::Hibernate::copyTo(rpc::Trace::Hibernate::Builder builder) {} + +tracing::Hibernate tracing::Hibernate::clone() { + return Hibernate(); +} + +tracing::Attribute::Attribute(kj::String name, Value&& value) + : name(kj::mv(name)), + value(kj::arr(kj::mv(value))) {} + +tracing::Attribute::Attribute(kj::String name, Values&& value) + : name(kj::mv(name)), + value(kj::mv(value)) {} + +namespace { +kj::Array readValues(const rpc::Trace::Attribute::Reader& reader) { + static auto readValue = + [](rpc::Trace::Attribute::Value::Reader reader) -> tracing::Attribute::Value { + auto inner = reader.getInner(); + switch (inner.which()) { + case rpc::Trace::Attribute::Value::Inner::TEXT: { + return kj::str(inner.getText()); + } + case rpc::Trace::Attribute::Value::Inner::BOOL: { + return inner.getBool(); + } + case rpc::Trace::Attribute::Value::Inner::FLOAT: { + return inner.getFloat(); + } + case rpc::Trace::Attribute::Value::Inner::INT: { + return static_cast(inner.getInt()); + } + } + KJ_UNREACHABLE; + }; + + // There should always be a value and it always have at least one entry in the list. + KJ_ASSERT(reader.hasValue()); + auto value = reader.getValue(); + kj::Vector values(value.size()); + for (auto v: value) { + values.add(readValue(v)); + } + return values.releaseAsArray(); +} +} // namespace + +tracing::Attribute::Attribute(rpc::Trace::Attribute::Reader reader) + : name(kj::str(reader.getName())), + value(readValues(reader)) {} + +void tracing::Attribute::copyTo(rpc::Trace::Attribute::Builder builder) { + static auto writeValue = [](auto builder, const auto& value) mutable { + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(str, kj::String) { + builder.initInner().setText(str.asPtr()); + } + KJ_CASE_ONEOF(b, bool) { + builder.initInner().setBool(b); + } + KJ_CASE_ONEOF(f, double) { + builder.initInner().setFloat(f); + } + KJ_CASE_ONEOF(i, int32_t) { + builder.initInner().setInt(i); + } + } + }; + builder.setName(name.asPtr()); + auto vec = builder.initValue(value.size()); + for (size_t n = 0; n < value.size(); n++) { + writeValue(vec[n], value[n]); + } +} + +tracing::Attribute tracing::Attribute::clone() { + constexpr auto cloneValue = [](const Value& value) -> Value { + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(str, kj::String) { + return kj::str(str); + } + KJ_CASE_ONEOF(b, bool) { + return b; + } + KJ_CASE_ONEOF(f, double) { + return f; + } + KJ_CASE_ONEOF(i, int32_t) { + return i; + } + } + KJ_UNREACHABLE; + }; + + return Attribute(kj::str(name), KJ_MAP(v, value) { return cloneValue(v); }); +} + +tracing::Return::Return(kj::Maybe info): info(kj::mv(info)) {} + +namespace { +kj::Maybe readReturnInfo(const rpc::Trace::Return::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::Return::Info::EMPTY: + return kj::none; + case rpc::Trace::Return::Info::CUSTOM: { + auto list = info.getCustom(); + kj::Vector attrs(list.size()); + for (size_t n = 0; n < list.size(); n++) { + attrs.add(tracing::Attribute(list[n])); + } + return kj::Maybe(attrs.releaseAsArray()); + } + case rpc::Trace::Return::Info::FETCH: { + return kj::Maybe(tracing::FetchResponseInfo(info.getFetch())); + } + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::Return::Return(rpc::Trace::Return::Reader reader): info(readReturnInfo(reader)) {} + +void tracing::Return::copyTo(rpc::Trace::Return::Builder builder) { + KJ_IF_SOME(i, info) { + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchResponseInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + auto attributes = infoBuilder.initCustom(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + custom[n].copyTo(attributes[n]); + } + } + } + } +} + +tracing::Return tracing::Return::clone() { + KJ_IF_SOME(i, info) { + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchResponseInfo) { + return Return(kj::Maybe(fetch.clone())); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + return Return(kj::Maybe(KJ_MAP(i, custom) { return i.clone(); })); + } + } + KJ_UNREACHABLE; + } + return Return(); +} + +tracing::SpanOpen::SpanOpen(kj::Maybe operationName, kj::Maybe info) + : operationName(kj::mv(operationName)), + info(kj::mv(info)) {} + +namespace { +kj::Maybe readSpanOpenOperationName(const rpc::Trace::SpanOpen::Reader& reader) { + if (!reader.hasOperationName()) return kj::none; + return kj::str(reader.getOperationName()); +} + +kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::SpanOpen::Info::EMPTY: + return kj::none; + case rpc::Trace::SpanOpen::Info::FETCH: { + return kj::Maybe(tracing::FetchEventInfo(info.getFetch())); + } + case rpc::Trace::SpanOpen::Info::JSRPC: { + return kj::Maybe(tracing::JsRpcEventInfo(info.getJsrpc())); + } + case rpc::Trace::SpanOpen::Info::CUSTOM: { + auto custom = info.getCustom(); + kj::Vector attrs(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + attrs.add(tracing::Attribute(custom[n])); + } + return kj::Maybe(attrs.releaseAsArray()); + } + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader) + : operationName(readSpanOpenOperationName(reader)), + info(readSpanOpenInfo(reader)) {} + +void tracing::SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) { + KJ_IF_SOME(name, operationName) { + builder.setOperationName(name.asPtr()); + } + KJ_IF_SOME(i, info) { + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + jsrpc.copyTo(infoBuilder.initJsrpc()); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + auto customBuilder = infoBuilder.initCustom(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + custom[n].copyTo(customBuilder[n]); + } + } + } + } +} + +tracing::SpanOpen tracing::SpanOpen::clone() { + constexpr auto cloneInfo = [](kj::Maybe& info) -> kj::Maybe { + return info.map([](Info& info) -> tracing::SpanOpen::Info { + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + return fetch.clone(); + } + KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + return jsrpc.clone(); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + kj::Vector attrs(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + attrs.add(custom[n].clone()); + } + return attrs.releaseAsArray(); + } + } + KJ_UNREACHABLE; + }); + }; + return SpanOpen(operationName.map([](auto& str) { return kj::str(str); }), cloneInfo(info)); +} + +tracing::SpanClose::SpanClose(EventOutcome outcome): outcome(outcome) {} + +tracing::SpanClose::SpanClose(rpc::Trace::SpanClose::Reader reader): outcome(reader.getOutcome()) {} + +void tracing::SpanClose::copyTo(rpc::Trace::SpanClose::Builder builder) { + builder.setOutcome(outcome); +} + +tracing::SpanClose tracing::SpanClose::clone() { + return SpanClose(outcome); +} + +namespace { +kj::Maybe readLabelFromReader(const rpc::Trace::Link::Reader& reader) { + if (!reader.hasLabel()) return kj::none; + return kj::str(reader.getLabel()); +} +tracing::TraceId readTraceIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::TraceId::fromCapnp(context.getTraceId()); +} +tracing::TraceId readInvocationIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::TraceId::fromCapnp(context.getInvocationId()); +} +tracing::SpanId readSpanIdFromReader(const rpc::Trace::Link::Reader& reader) { + KJ_ASSERT(reader.hasContext()); + auto context = reader.getContext(); + return tracing::SpanId(context.getSpanId()); +} +} // namespace + +tracing::Link::Link(const InvocationSpanContext& other, kj::Maybe label) + : Link(kj::mv(label), other.getTraceId(), other.getInvocationId(), other.getSpanId()) {} + +tracing::Link::Link( + kj::Maybe label, TraceId traceId, TraceId invocationId, SpanId spanId) + : label(kj::mv(label)), + traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)) {} + +tracing::Link::Link(rpc::Trace::Link::Reader reader) + : label(readLabelFromReader(reader)), + traceId(readTraceIdFromReader(reader)), + invocationId(readInvocationIdFromReader(reader)), + spanId(readSpanIdFromReader(reader)) {} + +void tracing::Link::copyTo(rpc::Trace::Link::Builder builder) { + KJ_IF_SOME(l, label) { + builder.setLabel(l); + } + auto ctx = builder.initContext(); + traceId.toCapnp(ctx.initTraceId()); + invocationId.toCapnp(ctx.initInvocationId()); + ctx.setSpanId(spanId.getId()); +} + +tracing::Link tracing::Link::clone() { + return Link( + label.map([](kj::String& str) { return kj::str(str); }), traceId, invocationId, spanId); +} + +namespace { +tracing::Onset::Info getInfoFromReader(const rpc::Trace::Onset::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::Onset::Info::FETCH: { + return tracing::FetchEventInfo(info.getFetch()); + } + case rpc::Trace::Onset::Info::JSRPC: { + return tracing::JsRpcEventInfo(info.getJsrpc()); + } + case rpc::Trace::Onset::Info::SCHEDULED: { + return tracing::ScheduledEventInfo(info.getScheduled()); + } + case rpc::Trace::Onset::Info::ALARM: { + return tracing::AlarmEventInfo(info.getAlarm()); + } + case rpc::Trace::Onset::Info::QUEUE: { + return tracing::QueueEventInfo(info.getQueue()); + } + case rpc::Trace::Onset::Info::EMAIL: { + return tracing::EmailEventInfo(info.getEmail()); + } + case rpc::Trace::Onset::Info::TRACE: { + return tracing::TraceEventInfo(info.getTrace()); + } + case rpc::Trace::Onset::Info::HIBERNATABLE_WEB_SOCKET: { + return tracing::HibernatableWebSocketEventInfo(info.getHibernatableWebSocket()); + } + case rpc::Trace::Onset::Info::RESUME: { + return tracing::Resume(info.getResume()); + } + case rpc::Trace::Onset::Info::CUSTOM: { + return tracing::CustomEventInfo(); + } + } + KJ_UNREACHABLE; +} + +kj::Maybe getScriptNameFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptName()) { + return kj::str(reader.getScriptName()); + } + return kj::none; +} + +kj::Maybe> getScriptVersionFromReader( + const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptVersion()) { + return capnp::clone(reader.getScriptVersion()); + } + return kj::none; +} + +kj::Maybe getDispatchNamespaceFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasDispatchNamespace()) { + return kj::str(reader.getDispatchNamespace()); + } + return kj::none; +} + +kj::Maybe> getScriptTagsFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptTags()) { + auto tags = reader.getScriptTags(); + kj::Vector scriptTags(tags.size()); + for (size_t i = 0; i < tags.size(); i++) { + scriptTags.add(kj::str(tags[i])); + } + return kj::Maybe(scriptTags.releaseAsArray()); + } + return kj::none; +} + +kj::Maybe getEntrypointFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasEntryPoint()) { + return kj::str(reader.getEntryPoint()); + } + return kj::none; +} +kj::Maybe getTriggerContextFromReader( + const rpc::Trace::Onset::Reader& reader) { + if (!reader.hasTrigger()) return kj::none; + auto trigger = reader.getTrigger(); + return tracing::Onset::TriggerContext(tracing::TraceId::fromCapnp(trigger.getTraceId()), + tracing::TraceId::fromCapnp(trigger.getInvocationId()), tracing::SpanId(trigger.getSpanId())); +} +tracing::Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reader) { + return tracing::Onset::WorkerInfo{ + .executionModel = reader.getExecutionModel(), + .scriptName = getScriptNameFromReader(reader), + .scriptVersion = getScriptVersionFromReader(reader), + .dispatchNamespace = getDispatchNamespaceFromReader(reader), + .scriptTags = getScriptTagsFromReader(reader), + .entrypoint = getEntrypointFromReader(reader), + }; +} +} // namespace + +tracing::Onset::Onset(tracing::Onset::Info&& info, + tracing::Onset::WorkerInfo&& workerInfo, + kj::Maybe maybeTrigger) + : info(kj::mv(info)), + workerInfo(kj::mv(workerInfo)), + trigger(kj::mv(maybeTrigger)) {} + +tracing::Onset::Onset(rpc::Trace::Onset::Reader reader) + : info(getInfoFromReader(reader)), + workerInfo(getWorkerInfoFromReader(reader)), + trigger(getTriggerContextFromReader(reader)) {} + +void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) { + builder.setExecutionModel(workerInfo.executionModel); + KJ_IF_SOME(name, workerInfo.scriptName) { + builder.setScriptName(name); + } + KJ_IF_SOME(version, workerInfo.scriptVersion) { + builder.setScriptVersion(*version); + } + KJ_IF_SOME(name, workerInfo.dispatchNamespace) { + builder.setDispatchNamespace(name); + } + KJ_IF_SOME(tags, workerInfo.scriptTags) { + auto list = builder.initScriptTags(tags.size()); + for (size_t i = 0; i < tags.size(); i++) { + list.set(i, tags[i]); + } + } + KJ_IF_SOME(e, workerInfo.entrypoint) { + builder.setEntryPoint(e); + } + KJ_IF_SOME(t, trigger) { + auto ctx = builder.initTrigger(); + t.traceId.toCapnp(ctx.initTraceId()); + t.invocationId.toCapnp(ctx.getInvocationId()); + ctx.setSpanId(t.spanId.getId()); + } + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { + jsrpc.copyTo(infoBuilder.initJsrpc()); + } + KJ_CASE_ONEOF(scheduled, ScheduledEventInfo) { + scheduled.copyTo(infoBuilder.initScheduled()); + } + KJ_CASE_ONEOF(alarm, AlarmEventInfo) { + alarm.copyTo(infoBuilder.initAlarm()); + } + KJ_CASE_ONEOF(queue, QueueEventInfo) { + queue.copyTo(infoBuilder.initQueue()); + } + KJ_CASE_ONEOF(email, EmailEventInfo) { + email.copyTo(infoBuilder.initEmail()); + } + KJ_CASE_ONEOF(trace, TraceEventInfo) { + trace.copyTo(infoBuilder.initTrace()); + } + KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { + hws.copyTo(infoBuilder.initHibernatableWebSocket()); + } + KJ_CASE_ONEOF(resume, Resume) { + resume.copyTo(infoBuilder.initResume()); + } + KJ_CASE_ONEOF(custom, CustomEventInfo) { + infoBuilder.initCustom(); + } + } +} + +tracing::Onset::WorkerInfo tracing::Onset::WorkerInfo::clone() const { + return WorkerInfo{ + .executionModel = executionModel, + .scriptName = scriptName.map([](auto& str) { return kj::str(str); }), + .scriptVersion = scriptVersion.map([](auto& version) { return capnp::clone(*version); }), + .dispatchNamespace = dispatchNamespace.map([](auto& str) { return kj::str(str); }), + .scriptTags = + scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }), + .entrypoint = entrypoint.map([](auto& str) { return kj::str(str); }), + }; +} + +tracing::Onset tracing::Onset::clone() { + constexpr auto cloneInfo = [](Info& info) -> tracing::Onset::Info { + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { + return fetch.clone(); + } + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { + return jsrpc.clone(); + } + KJ_CASE_ONEOF(scheduled, ScheduledEventInfo) { + return scheduled.clone(); + } + KJ_CASE_ONEOF(alarm, AlarmEventInfo) { + return alarm.clone(); + } + KJ_CASE_ONEOF(queue, QueueEventInfo) { + return queue.clone(); + } + KJ_CASE_ONEOF(email, EmailEventInfo) { + return email.clone(); + } + KJ_CASE_ONEOF(trace, TraceEventInfo) { + return trace.clone(); + } + KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { + return hws.clone(); + } + KJ_CASE_ONEOF(resume, Resume) { + return resume.clone(); + } + KJ_CASE_ONEOF(custom, CustomEventInfo) { + return CustomEventInfo(); + } + } + KJ_UNREACHABLE; + }; + return Onset(cloneInfo(info), workerInfo.clone(), trigger.map([](TriggerContext& ctx) { + return TriggerContext(ctx.traceId, ctx.invocationId, ctx.spanId); + })); +} + +tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) + : outcome(outcome), + cpuTime(cpuTime), + wallTime(wallTime) {} + +tracing::Outcome::Outcome(rpc::Trace::Outcome::Reader reader) + : outcome(reader.getOutcome()), + cpuTime(reader.getCpuTime() * kj::MILLISECONDS), + wallTime(reader.getWallTime() * kj::MILLISECONDS) {} + +void tracing::Outcome::copyTo(rpc::Trace::Outcome::Builder builder) { + builder.setOutcome(outcome); + builder.setCpuTime(cpuTime / kj::MILLISECONDS); + builder.setWallTime(wallTime / kj::MILLISECONDS); +} + +tracing::Outcome tracing::Outcome::clone() { + return Outcome(outcome, cpuTime, wallTime); +} + +tracing::TailEvent::TailEvent(const tracing::InvocationSpanContext& context, + kj::Date timestamp, + kj::uint sequence, + Event&& event) + : traceId(context.getTraceId()), + invocationId(context.getInvocationId()), + spanId(context.getSpanId()), + timestamp(timestamp), + sequence(sequence), + event(kj::mv(event)) {} + +tracing::TailEvent::TailEvent(TraceId traceId, + TraceId invocationId, + SpanId spanId, + kj::Date timestamp, + kj::uint sequence, + Event&& event) + : traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)), + timestamp(timestamp), + sequence(sequence), + event(kj::mv(event)) {} + +namespace { +tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Reader& reader) { + const auto event = reader.getEvent(); + switch (event.which()) { + case rpc::Trace::TailEvent::Event::ONSET: { + return tracing::Onset(event.getOnset()); + } + case rpc::Trace::TailEvent::Event::OUTCOME: { + return tracing::Outcome(event.getOutcome()); + } + case rpc::Trace::TailEvent::Event::HIBERNATE: { + return tracing::Hibernate(event.getHibernate()); + } + case rpc::Trace::TailEvent::Event::SPAN_OPEN: { + return tracing::SpanOpen(event.getSpanOpen()); + } + case rpc::Trace::TailEvent::Event::SPAN_CLOSE: { + return tracing::SpanClose(event.getSpanClose()); + } + case rpc::Trace::TailEvent::Event::ATTRIBUTE: { + auto listReader = event.getAttribute(); + kj::Vector attrs(listReader.size()); + for (size_t n = 0; n < listReader.size(); n++) { + attrs.add(tracing::Attribute(listReader[n])); + } + return tracing::Mark(attrs.releaseAsArray()); + } + case rpc::Trace::TailEvent::Event::RETURN: { + return tracing::Mark(tracing::Return(event.getReturn())); + } + case rpc::Trace::TailEvent::Event::DIAGNOSTIC_CHANNEL_EVENT: { + return tracing::Mark(tracing::DiagnosticChannelEvent(event.getDiagnosticChannelEvent())); + } + case rpc::Trace::TailEvent::Event::EXCEPTION: { + return tracing::Mark(tracing::Exception(event.getException())); + } + case rpc::Trace::TailEvent::Event::LOG: { + return tracing::Mark(tracing::Log(event.getLog())); + } + case rpc::Trace::TailEvent::Event::LINK: { + return tracing::Mark(tracing::Link(event.getLink())); + } + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::TailEvent::TailEvent(rpc::Trace::TailEvent::Reader reader) + : traceId(TraceId::fromCapnp(reader.getContext().getTraceId())), + invocationId(TraceId::fromCapnp(reader.getContext().getInvocationId())), + spanId(SpanId(reader.getContext().getSpanId())), + timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), + sequence(reader.getSequence()), + event(readEventFromTailEvent(reader)) {} + +void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) { + auto context = builder.initContext(); + traceId.toCapnp(context.initTraceId()); + invocationId.toCapnp(context.initInvocationId()); + context.setSpanId(spanId.getId()); + builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSequence(sequence); + auto eventBuilder = builder.initEvent(); + KJ_SWITCH_ONEOF(event) { + KJ_CASE_ONEOF(onset, Onset) { + onset.copyTo(eventBuilder.initOnset()); + } + KJ_CASE_ONEOF(outcome, Outcome) { + outcome.copyTo(eventBuilder.initOutcome()); + } + KJ_CASE_ONEOF(hibernate, Hibernate) { + hibernate.copyTo(eventBuilder.initHibernate()); + } + KJ_CASE_ONEOF(open, SpanOpen) { + open.copyTo(eventBuilder.initSpanOpen()); + } + KJ_CASE_ONEOF(close, SpanClose) { + close.copyTo(eventBuilder.initSpanClose()); + } + KJ_CASE_ONEOF(mark, Mark) { + KJ_SWITCH_ONEOF(mark) { + KJ_CASE_ONEOF(diag, DiagnosticChannelEvent) { + diag.copyTo(eventBuilder.initDiagnosticChannelEvent()); + } + KJ_CASE_ONEOF(ex, Exception) { + ex.copyTo(eventBuilder.initException()); + } + KJ_CASE_ONEOF(log, Log) { + log.copyTo(eventBuilder.initLog()); + } + KJ_CASE_ONEOF(ret, Return) { + ret.copyTo(eventBuilder.initReturn()); + } + KJ_CASE_ONEOF(link, Link) { + link.copyTo(eventBuilder.initLink()); + } + KJ_CASE_ONEOF(attrs, kj::Array) { + // Mark is a collection of attributes. + auto attrBuilder = eventBuilder.initAttribute(attrs.size()); + for (size_t n = 0; n < attrs.size(); n++) { + attrs[n].copyTo(attrBuilder[n]); + } + } + } + } + } +} + +tracing::TailEvent tracing::TailEvent::clone() { + constexpr auto cloneEvent = [](Event& event) -> Event { + KJ_SWITCH_ONEOF(event) { + KJ_CASE_ONEOF(onset, Onset) { + return onset.clone(); + } + KJ_CASE_ONEOF(outcome, Outcome) { + return outcome.clone(); + } + KJ_CASE_ONEOF(hibernate, Hibernate) { + return hibernate.clone(); + } + KJ_CASE_ONEOF(open, SpanOpen) { + return open.clone(); + } + KJ_CASE_ONEOF(close, SpanClose) { + return close.clone(); + } + KJ_CASE_ONEOF(mark, Mark) { + KJ_SWITCH_ONEOF(mark) { + KJ_CASE_ONEOF(diag, DiagnosticChannelEvent) { + return Mark(diag.clone()); + } + KJ_CASE_ONEOF(ex, Exception) { + return Mark(ex.clone()); + } + KJ_CASE_ONEOF(log, Log) { + return Mark(log.clone()); + } + KJ_CASE_ONEOF(ret, Return) { + return Mark(ret.clone()); + } + KJ_CASE_ONEOF(link, Link) { + return Mark(link.clone()); + } + KJ_CASE_ONEOF(attrs, tracing::CustomInfo) { + return Mark(KJ_MAP(attr, attrs) { return attr.clone(); }); + } + } + } + } + KJ_UNREACHABLE; + }; + return TailEvent(traceId, invocationId, spanId, timestamp, sequence, cloneEvent(event)); +} + +// ====================================================================================== + SpanBuilder& SpanBuilder::operator=(SpanBuilder&& other) { end(); observer = kj::mv(other.observer); diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index a76b2746dd5..d03aff90aa0 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -18,6 +18,9 @@ #include #include +#include +#include + namespace kj { enum class HttpMethod; class EntropySource; @@ -120,14 +123,63 @@ class TraceId final { }; constexpr TraceId TraceId::nullId = nullptr; +// A 64-bit span identifier. +class SpanId final { + public: + // A null span ID. This is only acceptable for use in tests. + constexpr SpanId(decltype(nullptr)): id(0) {} + + constexpr SpanId(uint64_t id): id(id) {} + constexpr SpanId(const SpanId& other) = default; + constexpr SpanId& operator=(const SpanId& other) = default; + constexpr SpanId(SpanId&& other): id(other.id) { + other.id = 0; + } + constexpr SpanId& operator=(SpanId&& other) { + id = other.id; + other.id = 0; + return *this; + } + constexpr operator bool() const { + return id != 0; + } + constexpr bool operator==(const SpanId& other) const { + return id == other.id; + } + constexpr bool operator==(decltype(nullptr)) const { + return id == 0; + } + + inline operator kj::String() const { + return toGoString(); + } + + inline operator uint64_t() const { + return id; + } + + kj::String toGoString() const; + + static const SpanId nullId; + + constexpr uint64_t getId() const { + return id; + } + + static SpanId fromEntropy(kj::Maybe entropy = kj::none); + + private: + uint64_t id; +}; +constexpr SpanId SpanId::nullId = nullptr; + // The InvocationSpanContext is a tuple of a trace id, invocation id, and span id. // The trace id represents a top-level request and should be shared across all // invocation spans and events within those spans. The invocation id identifies // a specific worker invocation. The span id identifies a specific span within an // invocation. Every invocation of every worker should have an InvocationSpanContext. // That may or may not have a trigger InvocationSpanContext. -class InvocationSpanContext final: public kj::Refcounted, - public kj::EnableAddRefToThis { +class InvocationSpanContext final { public: // The constructor is public only so kj::rc can see it and create a new instance. // User code should use the static factory methods or the newChild method. @@ -135,9 +187,17 @@ class InvocationSpanContext final: public kj::Refcounted, kj::Maybe entropySource, TraceId traceId, TraceId invocationId, - uint64_t spanId = 0ULL, - kj::Maybe> parentSpanContext = kj::none); - KJ_DISALLOW_COPY_AND_MOVE(InvocationSpanContext); + SpanId spanId, + kj::Maybe parentSpanContext = kj::none); + + KJ_DISALLOW_COPY(InvocationSpanContext); + + InvocationSpanContext(InvocationSpanContext&& other) = default; + InvocationSpanContext& operator=(InvocationSpanContext&& other) = default; + + inline bool operator==(const InvocationSpanContext& other) const { + return traceId == other.traceId && invocationId == other.invocationId && spanId == other.spanId; + } inline const TraceId& getTraceId() const { return traceId; @@ -147,18 +207,21 @@ class InvocationSpanContext final: public kj::Refcounted, return invocationId; } - inline const uint64_t getSpanId() const { + inline const SpanId& getSpanId() const { return spanId; } - inline const kj::Maybe>& getParent() const { - return parentSpanContext; + inline kj::Maybe getParent() const { + KJ_IF_SOME(p, parentSpanContext) { + return *p; + } + return kj::none; } // Creates a new child span. If the current context does not have an entropy // source this will assert. If isTrigger() is true then it will not have an // entropy source. - kj::Rc newChild(); + InvocationSpanContext newChild() const; // An InvocationSpanContext is a trigger context if it has no entropy source. // This generally means the SpanContext was create from a capnp message and @@ -172,52 +235,62 @@ class InvocationSpanContext final: public kj::Refcounted, // traceId is used as the traceId for the newly created context. Otherwise a new // traceId is generated. The invocationId is always generated new and the spanId // will be 0 with no parent span. - static kj::Rc newForInvocation( - kj::Maybe&> triggerContext = kj::none, + static InvocationSpanContext newForInvocation( + kj::Maybe triggerContext = kj::none, kj::Maybe entropySource = kj::none); // Creates a new InvocationSpanContext from a capnp message. The returned // InvocationSpanContext will not be capable of creating child spans and // is considered only a "trigger" span. - static kj::Maybe> fromCapnp( - rpc::InvocationSpanContext::Reader reader); + static kj::Maybe fromCapnp(rpc::InvocationSpanContext::Reader reader); void toCapnp(rpc::InvocationSpanContext::Builder writer) const; + InvocationSpanContext clone() const; private: // If there is no entropy source, then child spans cannot be created from // this InvocationSpanContext. kj::Maybe entropySource; - const TraceId traceId; - const TraceId invocationId; - const uint64_t spanId; + TraceId traceId; + TraceId invocationId; + SpanId spanId; // The parentSpanContext can be either a direct parent or a trigger // context. If it is a trigger context, then it should have the same // traceId but a different invocationId (unless predictable mode for // testing is enabled). The isTrigger() should also return true. - const kj::Maybe> parentSpanContext; + kj::Maybe> parentSpanContext; }; +kj::String KJ_STRINGIFY(const SpanId& id); kj::String KJ_STRINGIFY(const TraceId& id); -kj::String KJ_STRINGIFY(const kj::Rc& context); +kj::String KJ_STRINGIFY(const InvocationSpanContext& context); -class FetchEventInfo final { - public: - class Header; +// The various structs defined below are used in both legacy tail workers +// and streaming tail workers to report tail events. + +// Describes a fetch request +struct FetchEventInfo final { + struct Header; explicit FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers); FetchEventInfo(rpc::Trace::FetchEventInfo::Reader reader); + FetchEventInfo(FetchEventInfo&&) = default; + FetchEventInfo& operator=(FetchEventInfo&&) = default; + KJ_DISALLOW_COPY(FetchEventInfo); - class Header final { - public: + struct Header final { explicit Header(kj::String name, kj::String value); Header(rpc::Trace::FetchEventInfo::Header::Reader reader); + Header(Header&&) = default; + Header& operator=(Header&&) = default; + KJ_DISALLOW_COPY(Header); kj::String name; kj::String value; void copyTo(rpc::Trace::FetchEventInfo::Header::Builder builder); + Header clone(); JSG_MEMORY_INFO(Header) { tracker.trackField("name", name); @@ -232,122 +305,159 @@ class FetchEventInfo final { kj::Array
headers; void copyTo(rpc::Trace::FetchEventInfo::Builder builder); + FetchEventInfo clone(); }; -class JsRpcEventInfo final { - public: +// Describes a jsrpc request +struct JsRpcEventInfo final { explicit JsRpcEventInfo(kj::String methodName); JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader); + JsRpcEventInfo(JsRpcEventInfo&&) = default; + JsRpcEventInfo& operator=(JsRpcEventInfo&&) = default; + KJ_DISALLOW_COPY(JsRpcEventInfo); kj::String methodName; void copyTo(rpc::Trace::JsRpcEventInfo::Builder builder); + JsRpcEventInfo clone(); }; -class ScheduledEventInfo final { - public: +// Describes a scheduled request +struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); ScheduledEventInfo(rpc::Trace::ScheduledEventInfo::Reader reader); + ScheduledEventInfo(ScheduledEventInfo&&) = default; + ScheduledEventInfo& operator=(ScheduledEventInfo&&) = default; + KJ_DISALLOW_COPY(ScheduledEventInfo); double scheduledTime; kj::String cron; void copyTo(rpc::Trace::ScheduledEventInfo::Builder builder); + ScheduledEventInfo clone(); }; -class AlarmEventInfo final { - public: +// Describes a Durable Object alarm request +struct AlarmEventInfo final { explicit AlarmEventInfo(kj::Date scheduledTime); AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader); + AlarmEventInfo(AlarmEventInfo&&) = default; + AlarmEventInfo& operator=(AlarmEventInfo&&) = default; + KJ_DISALLOW_COPY(AlarmEventInfo); kj::Date scheduledTime; void copyTo(rpc::Trace::AlarmEventInfo::Builder builder); + AlarmEventInfo clone(); }; -class QueueEventInfo final { - public: +// Describes a queue worker request +struct QueueEventInfo final { explicit QueueEventInfo(kj::String queueName, uint32_t batchSize); QueueEventInfo(rpc::Trace::QueueEventInfo::Reader reader); + QueueEventInfo(QueueEventInfo&&) = default; + QueueEventInfo& operator=(QueueEventInfo&&) = default; + KJ_DISALLOW_COPY(QueueEventInfo); kj::String queueName; uint32_t batchSize; void copyTo(rpc::Trace::QueueEventInfo::Builder builder); + QueueEventInfo clone(); }; -class EmailEventInfo final { - public: +// Describes an email request +struct EmailEventInfo final { explicit EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize); EmailEventInfo(rpc::Trace::EmailEventInfo::Reader reader); + EmailEventInfo(EmailEventInfo&&) = default; + EmailEventInfo& operator=(EmailEventInfo&&) = default; + KJ_DISALLOW_COPY(EmailEventInfo); kj::String mailFrom; kj::String rcptTo; uint32_t rawSize; void copyTo(rpc::Trace::EmailEventInfo::Builder builder); + EmailEventInfo clone(); }; -class TraceEventInfo final { - public: - class TraceItem; +// Describes a legacy tail worker request +struct TraceEventInfo final { + struct TraceItem; explicit TraceEventInfo(kj::ArrayPtr> traces); + TraceEventInfo(kj::Array traces): traces(kj::mv(traces)) {} TraceEventInfo(rpc::Trace::TraceEventInfo::Reader reader); + TraceEventInfo(TraceEventInfo&&) = default; + TraceEventInfo& operator=(TraceEventInfo&&) = default; + KJ_DISALLOW_COPY(TraceEventInfo); - class TraceItem final { - public: + struct TraceItem final { explicit TraceItem(kj::Maybe scriptName); TraceItem(rpc::Trace::TraceEventInfo::TraceItem::Reader reader); + TraceItem(TraceItem&&) = default; + TraceItem& operator=(TraceItem&&) = default; + KJ_DISALLOW_COPY(TraceItem); kj::Maybe scriptName; void copyTo(rpc::Trace::TraceEventInfo::TraceItem::Builder builder); + TraceItem clone(); }; kj::Vector traces; void copyTo(rpc::Trace::TraceEventInfo::Builder builder); + TraceEventInfo clone(); }; -class HibernatableWebSocketEventInfo final { - public: - struct Message {}; - struct Close { +// Describes a hibernatable web socket event +struct HibernatableWebSocketEventInfo final { + struct Message final {}; + struct Close final { uint16_t code; bool wasClean; }; - struct Error {}; + struct Error final {}; using Type = kj::OneOf; explicit HibernatableWebSocketEventInfo(Type type); HibernatableWebSocketEventInfo(rpc::Trace::HibernatableWebSocketEventInfo::Reader reader); + HibernatableWebSocketEventInfo(HibernatableWebSocketEventInfo&&) = default; + HibernatableWebSocketEventInfo& operator=(HibernatableWebSocketEventInfo&&) = default; + KJ_DISALLOW_COPY(HibernatableWebSocketEventInfo); Type type; void copyTo(rpc::Trace::HibernatableWebSocketEventInfo::Builder builder); + HibernatableWebSocketEventInfo clone(); static Type readFrom(rpc::Trace::HibernatableWebSocketEventInfo::Reader reader); }; -class CustomEventInfo final { - public: +// Describes a custom event +struct CustomEventInfo final { explicit CustomEventInfo() {}; CustomEventInfo(rpc::Trace::CustomEventInfo::Reader reader) {}; }; -class FetchResponseInfo final { - public: +// Describes a fetch response +struct FetchResponseInfo final { explicit FetchResponseInfo(uint16_t statusCode); FetchResponseInfo(rpc::Trace::FetchResponseInfo::Reader reader); + FetchResponseInfo(FetchResponseInfo&&) = default; + FetchResponseInfo& operator=(FetchResponseInfo&&) = default; + KJ_DISALLOW_COPY(FetchResponseInfo); uint16_t statusCode; void copyTo(rpc::Trace::FetchResponseInfo::Builder builder); + FetchResponseInfo clone(); }; -class DiagnosticChannelEvent final { - public: +// Describes an event published using the node:diagnostics_channel API +struct DiagnosticChannelEvent final { explicit DiagnosticChannelEvent( kj::Date timestamp, kj::String channel, kj::Array message); DiagnosticChannelEvent(rpc::Trace::DiagnosticChannelEvent::Reader reader); @@ -359,10 +469,11 @@ class DiagnosticChannelEvent final { kj::Array message; void copyTo(rpc::Trace::DiagnosticChannelEvent::Builder builder); + DiagnosticChannelEvent clone(); }; -class Log final { - public: +// Describes a log event +struct Log final { explicit Log(kj::Date timestamp, LogLevel logLevel, kj::String message); Log(rpc::Trace::Log::Reader reader); Log(Log&&) = default; @@ -377,10 +488,11 @@ class Log final { kj::String message; void copyTo(rpc::Trace::Log::Builder builder); + Log clone(); }; -class Exception final { - public: +// Describes an exception event +struct Exception final { explicit Exception( kj::Date timestamp, kj::String name, kj::String message, kj::Maybe stack); Exception(rpc::Trace::Exception::Reader reader); @@ -397,6 +509,33 @@ class Exception final { kj::Maybe stack; void copyTo(rpc::Trace::Exception::Builder builder); + Exception clone(); +}; + +// Used to indicate that a previously hibernated tail stream is being resumed. +struct Resume final { + explicit Resume(kj::Maybe> attachment); + Resume(rpc::Trace::Resume::Reader reader); + Resume(Resume&&) = default; + KJ_DISALLOW_COPY(Resume); + ~Resume() noexcept(false) = default; + + kj::Maybe> attachment; + + void copyTo(rpc::Trace::Resume::Builder builder); + Resume clone(); +}; + +// Used to indicate that a tail stream is being hibernated. +struct Hibernate final { + explicit Hibernate(); + Hibernate(rpc::Trace::Hibernate::Reader reader); + Hibernate(Hibernate&&) = default; + KJ_DISALLOW_COPY(Hibernate); + ~Hibernate() noexcept(false) = default; + + void copyTo(rpc::Trace::Hibernate::Builder builder); + Hibernate clone(); }; // EventInfo types are used to describe the onset of an invocation. The FetchEventInfo @@ -409,7 +548,225 @@ using EventInfo = kj::OneOf; + +template +concept AttributeValue = kj::isSameType() || kj::isSameType() || + kj::isSameType() || kj::isSameType(); + +// An Attribute mark is used to add detail to a span over its lifetime. +// The Attribute struct can also be used to provide arbitrary additional +// properties for some other structs. +// Modeled after https://opentelemetry.io/docs/concepts/signals/traces/#attributes +struct Attribute final { + using Value = kj::OneOf; + using Values = kj::Array; + + explicit Attribute(kj::String name, Value&& value); + explicit Attribute(kj::String name, Values&& values); + + template + explicit Attribute(kj::String name, V v): Attribute(kj::mv(name), Value(kj::mv(v))) {} + + template + explicit Attribute(kj::String name, kj::Array vals) + : Attribute(kj::mv(name), KJ_MAP(v, vals) { return Value(kj::mv(v)); }) {} + + template + explicit Attribute(kj::String name, std::initializer_list list) + : Attribute(kj::mv(name), kj::heapArray(list)) {} + + Attribute(rpc::Trace::Attribute::Reader reader); + Attribute(Attribute&&) = default; + Attribute& operator=(Attribute&&) = default; + KJ_DISALLOW_COPY(Attribute); + + kj::String name; + Values value; + + void copyTo(rpc::Trace::Attribute::Builder builder); + Attribute clone(); +}; +using CustomInfo = kj::Array; + +// A Return mark is used to mark the point at which a span operation returned +// a value. For instance, when a fetch subrequest response is received, or when +// the fetch handler returns a Response. Importantly, it does not signal that the +// span has closed, which may not happen for some period of time after the return +// mark is recorded (e.g. due to things like waitUntils or waiting to fully ready +// the response body payload, etc). +struct Return final { + using Info = kj::OneOf; + + explicit Return(kj::Maybe info = kj::none); + Return(rpc::Trace::Return::Reader reader); + Return(Return&&) = default; + Return& operator=(Return&&) = default; + KJ_DISALLOW_COPY(Return); + + kj::Maybe info = kj::none; + + void copyTo(rpc::Trace::Return::Builder builder); + Return clone(); +}; + +// A Link mark is used to establish a link from one span to another. +// The optional label can be used to identify the link. +struct Link final { + explicit Link(const InvocationSpanContext& other, kj::Maybe label = kj::none); + explicit Link(kj::Maybe label, TraceId traceId, TraceId invocationId, SpanId spanId); + Link(rpc::Trace::Link::Reader reader); + Link(Link&&) = default; + Link& operator=(Link&&) = default; + KJ_DISALLOW_COPY(Link); + + kj::Maybe label; + TraceId traceId; + TraceId invocationId; + SpanId spanId; + + void copyTo(rpc::Trace::Link::Builder builder); + Link clone(); +}; + +using Mark = kj::OneOf>; + +// Marks the opening of a child span within the streaming tail session. +struct SpanOpen final { + // If the span represents a subrequest, then the info describes the + // details of that subrequest. + using Info = kj::OneOf; + + explicit SpanOpen( + kj::Maybe operationName = kj::none, kj::Maybe info = kj::none); + SpanOpen(rpc::Trace::SpanOpen::Reader reader); + SpanOpen(SpanOpen&&) = default; + SpanOpen& operator=(SpanOpen&&) = default; + KJ_DISALLOW_COPY(SpanOpen); + + kj::Maybe operationName = kj::none; + kj::Maybe info = kj::none; + + void copyTo(rpc::Trace::SpanOpen::Builder builder); + SpanOpen clone(); +}; + +// Marks the closing of a child span within the streaming tail session. +// Once emitted, no further mark events should occur within the closed +// span. +struct SpanClose final { + explicit SpanClose(EventOutcome outcome = EventOutcome::OK); + SpanClose(rpc::Trace::SpanClose::Reader reader); + SpanClose(SpanClose&&) = default; + SpanClose& operator=(SpanClose&&) = default; + KJ_DISALLOW_COPY(SpanClose); + + EventOutcome outcome = EventOutcome::OK; + + void copyTo(rpc::Trace::SpanClose::Builder builder); + SpanClose clone(); +}; + +// The Onset and Outcome event types are special forms of SpanOpen and +// SpanClose that explicitly mark the start and end of the root span. +// A streaming tail session will always begin with an Onset event, and +// always end with an Outcome event. +struct Onset final { + using Info = EventInfo; + + // Information about the worker that is being tailed. + struct WorkerInfo final { + ExecutionModel executionModel = ExecutionModel::STATELESS; + kj::Maybe scriptName; + kj::Maybe> scriptVersion; + kj::Maybe dispatchNamespace; + kj::Maybe> scriptTags; + kj::Maybe entrypoint; + + WorkerInfo clone() const; + }; + + struct TriggerContext final { + TraceId traceId; + TraceId invocationId; + SpanId spanId; + + TriggerContext(TraceId traceId, TraceId invocationId, SpanId spanId) + : traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)) {} + + TriggerContext(const InvocationSpanContext& ctx) + : TriggerContext(ctx.getTraceId(), ctx.getInvocationId(), ctx.getSpanId()) {} + }; + + explicit Onset( + Info&& info, WorkerInfo&& workerInfo, kj::Maybe maybeTrigger = kj::none); + + Onset(rpc::Trace::Onset::Reader reader); + Onset(Onset&&) = default; + Onset& operator=(Onset&&) = default; + KJ_DISALLOW_COPY(Onset); + + Info info; + WorkerInfo workerInfo; + kj::Maybe trigger; + + void copyTo(rpc::Trace::Onset::Builder builder); + Onset clone(); +}; + +struct Outcome final { + explicit Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime); + Outcome(rpc::Trace::Outcome::Reader reader); + Outcome(Outcome&&) = default; + Outcome& operator=(Outcome&&) = default; + KJ_DISALLOW_COPY(Outcome); + + EventOutcome outcome = EventOutcome::OK; + kj::Duration cpuTime; + kj::Duration wallTime; + + void copyTo(rpc::Trace::Outcome::Builder builder); + Outcome clone(); +}; + +// A streaming tail worker receives a series of Tail Events. Tail events always +// occur within an InvocationSpanContext. The first TailEvent delivered to a +// streaming tail session is always an Onset. The final TailEvent delivered is +// always an Outcome. Between those can be any number of SpanOpen, SpanClose, +// and Mark events. Every SpanOpen *must* be associated with a SpanClose unless +// the stream was abruptly terminated. +struct TailEvent final { + using Event = kj::OneOf; + + explicit TailEvent( + const InvocationSpanContext& context, kj::Date timestamp, kj::uint sequence, Event&& event); + TailEvent(TraceId traceId, + TraceId invocationId, + SpanId spanId, + kj::Date timestamp, + kj::uint sequence, + Event&& event); + TailEvent(rpc::Trace::TailEvent::Reader reader); + TailEvent(TailEvent&&) = default; + TailEvent& operator=(TailEvent&&) = default; + KJ_DISALLOW_COPY(TailEvent); + + // The invocation span context this event is associated with. + TraceId traceId; + TraceId invocationId; + SpanId spanId; + + kj::Date timestamp; // Unix epoch, spectre-mitigated resolution + kj::uint sequence; + + Event event; + + void copyTo(rpc::Trace::TailEvent::Builder builder); + TailEvent clone(); +}; } // namespace tracing enum class PipelineLogLevel { diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index b12de07e64e..ad8fa6089e5 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -54,7 +54,7 @@ class WorkerEntrypoint final: public WorkerInterface { bool tunnelExceptions, kj::Maybe> workerTracer, kj::Maybe cfBlobJson, - kj::Maybe> maybeTriggerInvocationSpan); + kj::Maybe maybeTriggerInvocationSpan); kj::Promise request(kj::HttpMethod method, kj::StringPtr url, @@ -98,7 +98,7 @@ class WorkerEntrypoint final: public WorkerInterface { kj::Own ioChannelFactory, kj::Own metrics, kj::Maybe> workerTracer, - kj::Rc invocationSpanContext); + tracing::InvocationSpanContext invocationSpanContext); template kj::Promise maybeAddGcPassForTest(IoContext& context, kj::Promise promise); @@ -161,13 +161,13 @@ kj::Own WorkerEntrypoint::construct(ThreadContext& threadContex bool tunnelExceptions, kj::Maybe> workerTracer, kj::Maybe cfBlobJson, - kj::Maybe> maybeTriggerInvocationSpan) { + kj::Maybe maybeTriggerInvocationSpan) { TRACE_EVENT("workerd", "WorkerEntrypoint::construct()"); // Create a new InvocationSpanContext for this worker invocation. auto invocationSpanContext = tracing::InvocationSpanContext::newForInvocation( maybeTriggerInvocationSpan.map( - [](auto& trigger) -> kj::Rc& { return trigger; }), + [](auto& trigger) -> tracing::InvocationSpanContext& { return trigger; }), threadContext.getEntropySource()); auto obj = kj::heap(kj::Badge(), threadContext, @@ -198,7 +198,7 @@ void WorkerEntrypoint::init(kj::Own worker, kj::Own ioChannelFactory, kj::Own metrics, kj::Maybe> workerTracer, - kj::Rc invocationSpanContext) { + tracing::InvocationSpanContext invocationSpanContext) { TRACE_EVENT("workerd", "WorkerEntrypoint::init()"); // We need to construct the IoContext -- unless this is an actor and it already has a // IoContext, in which case we reuse it. @@ -729,7 +729,7 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, bool tunnelExceptions, kj::Maybe> workerTracer, kj::Maybe cfBlobJson, - kj::Maybe> maybeTriggerInvocationSpan) { + kj::Maybe maybeTriggerInvocationSpan) { return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson), diff --git a/src/workerd/io/worker-entrypoint.h b/src/workerd/io/worker-entrypoint.h index d8eec5ad0ae..a5f4628e736 100644 --- a/src/workerd/io/worker-entrypoint.h +++ b/src/workerd/io/worker-entrypoint.h @@ -43,6 +43,6 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, // the implication is that this worker entrypoint is being created as a subrequest or // subtask of another request. If it is kj::none, then this invocation is a top-level // invocation. - kj::Maybe> maybeTriggerInvocationSpan = kj::none); + kj::Maybe maybeTriggerInvocationSpan = kj::none); } // namespace workerd diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 8a3263bb725..8a1588cb66a 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -157,6 +157,142 @@ struct Trace @0x8e8d911203762d34 { executionModel @25 :ExecutionModel; # the execution model of the worker being traced. Can be stateless for a regular worker, # durableObject for a DO worker or workflow for the upcoming Workflows feature. + + # ===================================================================================== + # Additional types for streaming tail workers + + struct Attribute { + # An Attribute mark is used to add detail to a span over its lifetime. + # The Attribute struct can also be used to provide arbitrary additional + # properties for some other structs. + # Modeled after https://opentelemetry.io/docs/concepts/signals/traces/#attributes + struct Value { + inner :union { + text @0 :Text; + bool @1 :Bool; + int @2 :Int32; + float @3 :Float64; + } + } + name @0 :Text; + value @1 :List(Value); + } + + struct Return { + # A Return mark is used to mark the point at which a span operation returned + # a value. For instance, when a fetch subrequest response is received, or when + # the fetch handler returns a Response. Importantly, it does not signal that the + # span has closed, which may not happen for some period of time after the return + # mark is recorded (e.g. due to things like waitUntils or waiting to fully ready + # the response body payload, etc). Not all spans will have a Return mark. + info :union { + empty @0 :Void; + custom @1 :List(Attribute); + fetch @2 :FetchResponseInfo; + } + } + + struct SpanOpen { + # Marks the opening of a child span within the streaming tail session. + operationName @0 :Text; + info :union { + empty @1 :Void; + custom @2 :List(Attribute); + fetch @3 :FetchEventInfo; + jsrpc @4 :JsRpcEventInfo; + } + } + + struct SpanClose { + # Marks the closing of a child span within the streaming tail session. + # Once emitted, no further mark events should occur within the closed + # span. + outcome @0 :EventOutcome; + } + + struct Resume { + # A resume event indicates that we are resuming a previously hibernated + # tail session. + + attachment @0 :Data; + # When a tail session is hibernated, the tail worker is given the opportunity + # to provide some additional data that will be serialized and stored with the + # hibernated state. When the stream is resumed, if the tail worker has provided + # such data, it will be passed back to the worker in the resume event. + } + + struct Onset { + # The Onset and Outcome event types are special forms of SpanOpen and + # SpanClose that explicitly mark the start and end of the root span. + # A streaming tail session will always begin with an Onset event, and + # always end with an Outcome event. + executionModel @0 :ExecutionModel; + scriptName @1 :Text; + scriptVersion @2 :ScriptVersion; + dispatchNamespace @3 :Text; + scriptTags @4 :List(Text); + entryPoint @5 :Text; + + trigger @6 :InvocationSpanContext; + # If this invocation was triggered by a different invocation that + # is being traced, the trigger will identify the triggering span. + # Propagation of the trigger context is not required, and in some + # cases is not desirable. + + info :union { + fetch @7 :FetchEventInfo; + jsrpc @8 :JsRpcEventInfo; + scheduled @9 :ScheduledEventInfo; + alarm @10 :AlarmEventInfo; + queue @11 :QueueEventInfo; + email @12 :EmailEventInfo; + trace @13 :TraceEventInfo; + hibernatableWebSocket @14 :HibernatableWebSocketEventInfo; + resume @15 :Resume; + custom @16 :CustomEventInfo; + } + } + + struct Outcome { + outcome @0 :EventOutcome; + cpuTime @1 :UInt64; + wallTime @2 :UInt64; + } + + struct Hibernate { + # A hibernate event indicates that the tail session is being hibernated. + } + + struct Link { + # A link to another invocation span context. + label @0 :Text; + context @1 :InvocationSpanContext; + } + + struct TailEvent { + # A streaming tail worker receives a series of Tail Events. Tail events always + # occur within an InvocationSpanContext. The first TailEvent delivered to a + # streaming tail session is always an Onset. The final TailEvent delivered is + # always an Outcome or Hibernate. Between those can be any number of SpanOpen, + # SpanClose, and Mark events. Every SpanOpen *must* be associated with a SpanClose + # unless the stream was abruptly terminated. + context @0 :InvocationSpanContext; + timestampNs @1 :Int64; + sequence @2 :UInt32; + event :union { + onset @3 :Onset; + outcome @4 :Outcome; + hibernate @5 :Hibernate; + spanOpen @6 :SpanOpen; + spanClose @7 :SpanClose; + attribute @8 :List(Attribute); + return @9 :Return; + diagnosticChannelEvent @10 :DiagnosticChannelEvent; + exception @11 :Exception; + log @12 :Log; + link @13 :Link; + } + } } struct SendTracesRun @0xde913ebe8e1b82a5 {