From ef6cc65799eb0da1cd75bb9cc7f47b82f47ab89e Mon Sep 17 00:00:00 2001 From: James M Snell Date: Thu, 21 Nov 2024 14:34:37 -0800 Subject: [PATCH] Implement the streaming tail types --- src/workerd/api/trace.c++ | 4 + src/workerd/io/trace-test.c++ | 472 ++++++++++++++- src/workerd/io/trace.c++ | 787 ++++++++++++++++++++++++++ src/workerd/io/trace.h | 139 ++++- src/workerd/io/worker-interface.capnp | 120 ++++ 5 files changed, 1494 insertions(+), 28 deletions(-) diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index b456f25676d..ca23202813c 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -184,6 +184,10 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) } KJ_UNREACHABLE; } + KJ_CASE_ONEOF(resume, tracing::Resume) { + // Resume events are not used with legacy trace + KJ_UNREACHABLE; + } KJ_CASE_ONEOF(custom, tracing::CustomEventInfo) { return kj::Maybe(jsg::alloc(trace, custom)); } diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 5fa0ec2b8da..712ed833e9d 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -12,6 +12,25 @@ namespace workerd::tracing { namespace { +class FakeEntropySource final: public kj::EntropySource { + public: + void generate(kj::ArrayPtr buffer) override { + // Write the uint64_t value to the buffer + buffer[0] = counter & 0xff; + buffer[1] = (counter >> 8) & 0xff; + buffer[2] = (counter >> 16) & 0xff; + buffer[3] = (counter >> 24) & 0xff; + buffer[4] = (counter >> 32) & 0xff; + buffer[5] = (counter >> 40) & 0xff; + buffer[6] = (counter >> 48) & 0xff; + buffer[7] = (counter >> 56) & 0xff; + counter++; + } + + private: + uint64_t counter = 0; +}; + KJ_TEST("can read trace ID string format") { KJ_EXPECT(TraceId::fromGoString("z"_kj) == kj::none); @@ -66,26 +85,6 @@ KJ_TEST("can write trace ID protobuf format") { } KJ_TEST("InvocationSpanContext") { - - class FakeEntropySource final: public kj::EntropySource { - public: - void generate(kj::ArrayPtr buffer) override { - // Write the uint64_t value to the buffer - buffer[0] = counter & 0xff; - buffer[1] = (counter >> 8) & 0xff; - buffer[2] = (counter >> 16) & 0xff; - buffer[3] = (counter >> 24) & 0xff; - buffer[4] = (counter >> 32) & 0xff; - buffer[5] = (counter >> 40) & 0xff; - buffer[6] = (counter >> 48) & 0xff; - buffer[7] = (counter >> 56) & 0xff; - counter++; - } - - private: - uint64_t counter = 0; - }; - setPredictableModeForTest(); FakeEntropySource fakeEntropySource; auto sc = InvocationSpanContext::newForInvocation(kj::none, fakeEntropySource); @@ -134,5 +133,438 @@ KJ_TEST("InvocationSpanContext") { KJ_EXPECT(sc5->isTrigger()); } +KJ_TEST("Read/Write FetchEventInfo works") { + capnp::MallocMessageBuilder builder; + auto fetchInfoBuilder = builder.initRoot(); + + kj::Vector headers; + headers.add(FetchEventInfo::Header(kj::str("foo"), kj::str("bar"))); + + tracing::FetchEventInfo info( + kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), headers.releaseAsArray()); + + info.copyTo(fetchInfoBuilder); + + auto reader = fetchInfoBuilder.asReader(); + + tracing::FetchEventInfo info2(reader); + KJ_ASSERT(info2.method == kj::HttpMethod::GET); + KJ_ASSERT(info2.url == "https://example.com"_kj); + KJ_ASSERT(info2.cfJson == "{}"_kj); + KJ_ASSERT(info2.headers.size() == 1); + KJ_ASSERT(info2.headers[0].name == "foo"_kj); + KJ_ASSERT(info2.headers[0].value == "bar"_kj); + + tracing::FetchEventInfo info3 = info.clone(); + KJ_ASSERT(info3.method == kj::HttpMethod::GET); + KJ_ASSERT(info3.url == "https://example.com"_kj); + KJ_ASSERT(info3.cfJson == "{}"_kj); + KJ_ASSERT(info3.headers.size() == 1); + KJ_ASSERT(info3.headers[0].name == "foo"_kj); + KJ_ASSERT(info3.headers[0].value == "bar"_kj); +} + +KJ_TEST("Read/Write JsRpcEventInfo works") { + capnp::MallocMessageBuilder builder; + auto jsRpcInfoBuilder = builder.initRoot(); + + tracing::JsRpcEventInfo info(kj::str("foo")); + + info.copyTo(jsRpcInfoBuilder); + + auto reader = jsRpcInfoBuilder.asReader(); + + tracing::JsRpcEventInfo info2(reader); + KJ_ASSERT(info2.methodName == "foo"_kj); + + tracing::JsRpcEventInfo info3 = info.clone(); + KJ_ASSERT(info3.methodName == "foo"_kj); +} + +KJ_TEST("Read/Write ScheduledEventInfo workers") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::ScheduledEventInfo info(1.2, kj::str("foo")); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::ScheduledEventInfo info2(reader); + KJ_ASSERT(info2.scheduledTime == 1.2); + KJ_ASSERT(info2.cron == "foo"_kj); + + tracing::ScheduledEventInfo info3 = info.clone(); + KJ_ASSERT(info3.scheduledTime == 1.2); + KJ_ASSERT(info3.cron == "foo"_kj); +} + +KJ_TEST("Read/Write AlarmEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::AlarmEventInfo info(kj::UNIX_EPOCH); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::AlarmEventInfo info2(reader); + KJ_ASSERT(info.scheduledTime == info2.scheduledTime); + + tracing::AlarmEventInfo info3 = info.clone(); + KJ_ASSERT(info.scheduledTime == info3.scheduledTime); +} + +KJ_TEST("Read/Write QueueEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::QueueEventInfo info(kj::str("foo"), 1); + + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::QueueEventInfo info2(reader); + KJ_ASSERT(info2.queueName == "foo"_kj); + KJ_ASSERT(info2.batchSize == 1); + + tracing::QueueEventInfo info3 = info.clone(); + KJ_ASSERT(info2.queueName == "foo"_kj); + KJ_ASSERT(info2.batchSize == 1); +} + +KJ_TEST("Read/Write EmailEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::EmailEventInfo info(kj::str("foo"), kj::str("bar"), 1); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::EmailEventInfo info2(reader); + KJ_ASSERT(info2.mailFrom == "foo"_kj); + KJ_ASSERT(info2.rcptTo == "bar"_kj); + KJ_ASSERT(info2.rawSize == 1); + + tracing::EmailEventInfo info3 = info.clone(); + KJ_ASSERT(info3.mailFrom == "foo"_kj); + KJ_ASSERT(info3.rcptTo == "bar"_kj); + KJ_ASSERT(info3.rawSize == 1); +} + +KJ_TEST("Read/Write TraceEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + kj::Vector> items(1); + items.add(kj::heap(kj::none, kj::str("foo"), kj::none, kj::none, kj::none, + kj::Array(), kj::none, ExecutionModel::STATELESS)); + + tracing::TraceEventInfo info(items.asPtr()); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::TraceEventInfo info2(reader); + KJ_ASSERT(info2.traces.size() == 1); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); + + tracing::TraceEventInfo info3 = info.clone(); + KJ_ASSERT(info2.traces.size() == 1); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); +} + +KJ_TEST("Read/Write HibernatableWebSocketEventInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::HibernatableWebSocketEventInfo info(tracing::HibernatableWebSocketEventInfo::Message{}); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::HibernatableWebSocketEventInfo info2(reader); + KJ_ASSERT(info2.type.is()); + + tracing::HibernatableWebSocketEventInfo info3 = info.clone(); + KJ_ASSERT(info3.type.is()); +} + +KJ_TEST("Read/Write FetchResponseInfo works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchResponseInfo info(123); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::FetchResponseInfo info2(reader); + KJ_ASSERT(info2.statusCode == 123); + + tracing::FetchResponseInfo info3 = info.clone(); + KJ_ASSERT(info3.statusCode == 123); +} + +KJ_TEST("Read/Write DiagnosticChannelEvent works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::DiagnosticChannelEvent info(kj::UNIX_EPOCH, kj::str("foo"), kj::Array()); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::DiagnosticChannelEvent info2(reader); + KJ_ASSERT(info2.timestamp == info.timestamp); + KJ_ASSERT(info2.channel == "foo"_kj); + KJ_ASSERT(info2.message.size() == 0); + + tracing::DiagnosticChannelEvent info3 = info.clone(); + KJ_ASSERT(info3.timestamp == info.timestamp); + KJ_ASSERT(info3.channel == "foo"_kj); + KJ_ASSERT(info3.message.size() == 0); +} + +KJ_TEST("Read/Write Log works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Log info(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Log info2(reader); + KJ_ASSERT(info.timestamp == info2.timestamp); + KJ_ASSERT(info2.logLevel == LogLevel::INFO); + KJ_ASSERT(info2.message == "foo"_kj); + + tracing::Log info3 = info.clone(); + KJ_ASSERT(info.timestamp == info3.timestamp); + KJ_ASSERT(info3.logLevel == LogLevel::INFO); + KJ_ASSERT(info3.message == "foo"_kj); +} + +KJ_TEST("Read/Write Exception works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Exception info(kj::UNIX_EPOCH, kj::str("foo"), kj::str("bar"), kj::none); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Exception info2(reader); + KJ_ASSERT(info.timestamp == info2.timestamp); + KJ_ASSERT(info2.name == "foo"_kj); + KJ_ASSERT(info2.message == "bar"_kj); + KJ_ASSERT(info2.stack == kj::none); + + tracing::Exception info3 = info.clone(); + KJ_ASSERT(info.timestamp == info3.timestamp); + KJ_ASSERT(info3.name == "foo"_kj); + KJ_ASSERT(info3.message == "bar"_kj); + KJ_ASSERT(info3.stack == kj::none); +} + +KJ_TEST("Read/Write Resume works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Resume info(kj::arr(1, 2, 3)); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Resume info2(reader); + auto& attachment = KJ_ASSERT_NONNULL(info2.attachment); + KJ_ASSERT(attachment.size() == 3); + KJ_ASSERT(attachment[0] == 1); + KJ_ASSERT(attachment[1] == 2); + KJ_ASSERT(attachment[2] == 3); + + tracing::Resume info3 = info.clone(); + auto& attachment2 = KJ_ASSERT_NONNULL(info3.attachment); + KJ_ASSERT(attachment2.size() == 3); + KJ_ASSERT(attachment2[0] == 1); + KJ_ASSERT(attachment2[1] == 2); + KJ_ASSERT(attachment2[2] == 3); +} + +KJ_TEST("Read/Write Hibernate works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Hibernate info(kj::arr(1, 2, 3)); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Hibernate info2(reader); + auto& attachment = KJ_ASSERT_NONNULL(info2.attachment); + KJ_ASSERT(attachment.size() == 3); + KJ_ASSERT(attachment[0] == 1); + KJ_ASSERT(attachment[1] == 2); + KJ_ASSERT(attachment[2] == 3); + + tracing::Hibernate info3 = info.clone(); + auto& attachment2 = KJ_ASSERT_NONNULL(info3.attachment); + KJ_ASSERT(attachment2.size() == 3); + KJ_ASSERT(attachment2[0] == 1); + KJ_ASSERT(attachment2[1] == 2); + KJ_ASSERT(attachment2[2] == 3); +} + +KJ_TEST("Read/Write Attribute works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Attribute info(kj::str("foo"), tracing::Attribute::Value((float)123.0)); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Attribute info2(reader); + KJ_ASSERT(info2.name == "foo"_kj); + auto& val = KJ_ASSERT_NONNULL(info2.value.tryGet()); + KJ_ASSERT(KJ_ASSERT_NONNULL(val.tryGet()) == 123.0); +} + +KJ_TEST("Read/Write Return works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchResponseInfo fetchInfo(123); + tracing::Return info(tracing::Return::Info(kj::mv(fetchInfo))); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Return info2(reader); + auto& fetchInfo2 = + KJ_ASSERT_NONNULL(KJ_ASSERT_NONNULL(info2.info).tryGet()); + KJ_ASSERT(fetchInfo2.statusCode == 123); + + tracing::Return info3 = info.clone(); + auto& fetchInfo3 = + KJ_ASSERT_NONNULL(KJ_ASSERT_NONNULL(info3.info).tryGet()); + KJ_ASSERT(fetchInfo3.statusCode == 123); +} + +KJ_TEST("Read/Write SpanOpen works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::SpanOpen info(kj::str("foo"), kj::none); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::SpanOpen info2(reader); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.operationName) == "foo"_kj); + KJ_ASSERT(info2.info == kj::none); + + tracing::SpanOpen info3 = info.clone(); + KJ_ASSERT(KJ_ASSERT_NONNULL(info3.operationName) == "foo"_kj); + KJ_ASSERT(info3.info == kj::none); +} + +KJ_TEST("Read/Write SpanClose works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::SpanClose info(EventOutcome::EXCEPTION); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::SpanClose info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); + + tracing::SpanClose info3 = info.clone(); + KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); +} + +KJ_TEST("Read/Write Onset works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::FetchEventInfo fetchInfo( + kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr); + tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)), ExecutionModel::STATELESS, + kj::str("foo"), kj::none, kj::none, kj::none, kj::none); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Onset info2(reader); + tracing::FetchEventInfo& fetchInfo2 = + KJ_ASSERT_NONNULL(info2.info.tryGet()); + KJ_ASSERT(fetchInfo2.method == kj::HttpMethod::GET); + KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj); + KJ_ASSERT(info2.executionModel == ExecutionModel::STATELESS); + + tracing::Onset info3 = info.clone(); + tracing::FetchEventInfo& fetchInfo3 = + KJ_ASSERT_NONNULL(info3.info.tryGet()); + KJ_ASSERT(fetchInfo3.method == kj::HttpMethod::GET); + KJ_ASSERT(fetchInfo3.url == "https://example.com"_kj); + KJ_ASSERT(info3.executionModel == ExecutionModel::STATELESS); +} + +KJ_TEST("Read/Write Outcome works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + tracing::Outcome info(EventOutcome::EXCEPTION, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + tracing::Outcome info2(reader); + KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); + KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS); + + tracing::Outcome info3 = info.clone(); + KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); + KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS); + KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); +} + +KJ_TEST("Read/Write TraceEvent works") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + tracing::Log log(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + tracing::TailEvent info(context, kj::UNIX_EPOCH, 0, tracing::Mark(kj::mv(log))); + info.copyTo(infoBuilder); + + auto reader = infoBuilder.asReader(); + + tracing::TailEvent info2(reader); + KJ_ASSERT(info2.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(info2.sequence == 0); + KJ_ASSERT(info2.context.invocationId == context->getInvocationId()); + KJ_ASSERT(info2.context.traceId == context->getTraceId()); + KJ_ASSERT(info2.context.spanId == context->getSpanId()); + + auto& event = KJ_ASSERT_NONNULL(info2.event.tryGet()); + auto& log2 = KJ_ASSERT_NONNULL(event.tryGet()); + KJ_ASSERT(log2.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(log2.logLevel == LogLevel::INFO); + KJ_ASSERT(log2.message == "foo"_kj); + + tracing::TailEvent info3 = info.clone(); + KJ_ASSERT(info3.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(info3.sequence == 0); + KJ_ASSERT(info3.context.invocationId == context->getInvocationId()); + KJ_ASSERT(info3.context.traceId == context->getTraceId()); + KJ_ASSERT(info3.context.spanId == context->getSpanId()); + + auto& event2 = KJ_ASSERT_NONNULL(info3.event.tryGet()); + auto& log3 = KJ_ASSERT_NONNULL(event2.tryGet()); + KJ_ASSERT(log3.timestamp == kj::UNIX_EPOCH); + KJ_ASSERT(log3.logLevel == LogLevel::INFO); + KJ_ASSERT(log3.message == "foo"_kj); +} + } // namespace } // namespace workerd::tracing diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 8a284633c08..dcc9288cc7d 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -272,6 +272,11 @@ void tracing::FetchEventInfo::copyTo(rpc::Trace::FetchEventInfo::Builder builder } } +tracing::FetchEventInfo tracing::FetchEventInfo::clone() { + return FetchEventInfo( + method, kj::str(url), kj::str(cfJson), KJ_MAP(h, headers) { return h.clone(); }); +} + tracing::FetchEventInfo::Header::Header(kj::String name, kj::String value) : name(kj::mv(name)), value(kj::mv(value)) {} @@ -285,6 +290,10 @@ void tracing::FetchEventInfo::Header::copyTo(rpc::Trace::FetchEventInfo::Header: builder.setValue(value); } +tracing::FetchEventInfo::Header tracing::FetchEventInfo::Header::clone() { + return Header(kj::str(name), kj::str(value)); +} + tracing::JsRpcEventInfo::JsRpcEventInfo(kj::String methodName): methodName(kj::mv(methodName)) {} tracing::JsRpcEventInfo::JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader) @@ -294,6 +303,10 @@ void tracing::JsRpcEventInfo::copyTo(rpc::Trace::JsRpcEventInfo::Builder builder builder.setMethodName(methodName); } +tracing::JsRpcEventInfo tracing::JsRpcEventInfo::clone() { + return JsRpcEventInfo(kj::str(methodName)); +} + tracing::ScheduledEventInfo::ScheduledEventInfo(double scheduledTime, kj::String cron) : scheduledTime(scheduledTime), cron(kj::mv(cron)) {} @@ -307,6 +320,10 @@ void tracing::ScheduledEventInfo::copyTo(rpc::Trace::ScheduledEventInfo::Builder builder.setCron(cron); } +tracing::ScheduledEventInfo tracing::ScheduledEventInfo::clone() { + return ScheduledEventInfo(scheduledTime, kj::str(cron)); +} + tracing::AlarmEventInfo::AlarmEventInfo(kj::Date scheduledTime): scheduledTime(scheduledTime) {} tracing::AlarmEventInfo::AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader) @@ -316,6 +333,10 @@ void tracing::AlarmEventInfo::copyTo(rpc::Trace::AlarmEventInfo::Builder builder builder.setScheduledTimeMs((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS); } +tracing::AlarmEventInfo tracing::AlarmEventInfo::clone() { + return AlarmEventInfo(scheduledTime); +} + tracing::QueueEventInfo::QueueEventInfo(kj::String queueName, uint32_t batchSize) : queueName(kj::mv(queueName)), batchSize(batchSize) {} @@ -329,6 +350,10 @@ void tracing::QueueEventInfo::copyTo(rpc::Trace::QueueEventInfo::Builder builder builder.setBatchSize(batchSize); } +tracing::QueueEventInfo tracing::QueueEventInfo::clone() { + return QueueEventInfo(kj::str(queueName), batchSize); +} + tracing::EmailEventInfo::EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize) : mailFrom(kj::mv(mailFrom)), rcptTo(kj::mv(rcptTo)), @@ -345,6 +370,10 @@ void tracing::EmailEventInfo::copyTo(rpc::Trace::EmailEventInfo::Builder builder builder.setRawSize(rawSize); } +tracing::EmailEventInfo tracing::EmailEventInfo::clone() { + return EmailEventInfo(kj::str(mailFrom), kj::str(rcptTo), rawSize); +} + kj::Vector getTraceItemsFromTraces( kj::ArrayPtr> traces) { return KJ_MAP(t, traces) -> tracing::TraceEventInfo::TraceItem { @@ -373,6 +402,10 @@ void tracing::TraceEventInfo::copyTo(rpc::Trace::TraceEventInfo::Builder builder } } +tracing::TraceEventInfo tracing::TraceEventInfo::clone() { + return TraceEventInfo(KJ_MAP(item, traces) { return item.clone(); }); +} + tracing::TraceEventInfo::TraceItem::TraceItem(kj::Maybe scriptName) : scriptName(kj::mv(scriptName)) {} @@ -386,6 +419,10 @@ void tracing::TraceEventInfo::TraceItem::copyTo( } } +tracing::TraceEventInfo::TraceItem tracing::TraceEventInfo::TraceItem::clone() { + return TraceItem(scriptName.map([](auto& name) { return kj::str(name); })); +} + tracing::DiagnosticChannelEvent::DiagnosticChannelEvent( kj::Date timestamp, kj::String channel, kj::Array message) : timestamp(timestamp), @@ -404,6 +441,10 @@ void tracing::DiagnosticChannelEvent::copyTo(rpc::Trace::DiagnosticChannelEvent: builder.setMessage(message); } +tracing::DiagnosticChannelEvent tracing::DiagnosticChannelEvent::clone() { + return DiagnosticChannelEvent(timestamp, kj::str(channel), kj::heapArray(message)); +} + tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(Type type): type(type) {} tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo( @@ -428,6 +469,24 @@ void tracing::HibernatableWebSocketEventInfo::copyTo( } } +tracing::HibernatableWebSocketEventInfo tracing::HibernatableWebSocketEventInfo::clone() { + KJ_SWITCH_ONEOF(type) { + KJ_CASE_ONEOF(_, Message) { + return HibernatableWebSocketEventInfo(Message{}); + } + KJ_CASE_ONEOF(_, Error) { + return HibernatableWebSocketEventInfo(Error{}); + } + KJ_CASE_ONEOF(close, Close) { + return HibernatableWebSocketEventInfo(Close{ + .code = close.code, + .wasClean = close.wasClean, + }); + } + } + KJ_UNREACHABLE; +} + tracing::HibernatableWebSocketEventInfo::Type tracing::HibernatableWebSocketEventInfo::readFrom( rpc::Trace::HibernatableWebSocketEventInfo::Reader reader) { auto type = reader.getType(); @@ -457,6 +516,10 @@ void tracing::FetchResponseInfo::copyTo(rpc::Trace::FetchResponseInfo::Builder b builder.setStatusCode(statusCode); } +tracing::FetchResponseInfo tracing::FetchResponseInfo::clone() { + return FetchResponseInfo(statusCode); +} + tracing::Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) : timestamp(timestamp), logLevel(logLevel), @@ -576,6 +639,10 @@ void Trace::copyTo(rpc::Trace::Builder builder) { auto hibWsBuilder = eventInfoBuilder.initHibernatableWebSocket(); hibWs.copyTo(hibWsBuilder); } + KJ_CASE_ONEOF(resume, tracing::Resume) { + // Resume is not used in legacy trace. + KJ_UNREACHABLE; + } KJ_CASE_ONEOF(custom, tracing::CustomEventInfo) { eventInfoBuilder.initCustom(); } @@ -603,6 +670,10 @@ void tracing::Log::copyTo(rpc::Trace::Log::Builder builder) { builder.setMessage(message); } +tracing::Log tracing::Log::clone() { + return Log(timestamp, logLevel, kj::str(message)); +} + void tracing::Exception::copyTo(rpc::Trace::Exception::Builder builder) { builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setName(name); @@ -612,6 +683,11 @@ void tracing::Exception::copyTo(rpc::Trace::Exception::Builder builder) { } } +tracing::Exception tracing::Exception::clone() { + return Exception(timestamp, kj::str(name), kj::str(message), + stack.map([](auto& stack) { return kj::str(stack); })); +} + void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLevel) { // Sandboxed workers currently record their traces as if the pipeline log level were set to // "full", so we may need to filter out the extra data after receiving the traces back. @@ -714,6 +790,717 @@ tracing::Exception::Exception(rpc::Trace::Exception::Reader reader) } } +namespace { +kj::Maybe> readResumeAttachment(const auto& reader) { + if (reader.hasAttachment()) { + return kj::heapArray(reader.getAttachment()); + } + return kj::none; +} +} // namespace + +tracing::Resume::Resume(kj::Maybe> attachment) + : attachment(kj::mv(attachment)) {} + +tracing::Resume::Resume(rpc::Trace::Resume::Reader reader) + : attachment(readResumeAttachment(reader)) {} + +void tracing::Resume::copyTo(rpc::Trace::Resume::Builder builder) { + KJ_IF_SOME(attach, attachment) { + builder.setAttachment(attach); + } +} + +tracing::Resume tracing::Resume::clone() { + return Resume(attachment.map([](auto& attach) { return kj::heapArray(attach); })); +} + +tracing::Hibernate::Hibernate(kj::Maybe> attachment) + : attachment(kj::mv(attachment)) {} + +tracing::Hibernate::Hibernate(rpc::Trace::Hibernate::Reader reader) + : attachment(readResumeAttachment(reader)) {} + +void tracing::Hibernate::copyTo(rpc::Trace::Hibernate::Builder builder) { + KJ_IF_SOME(attach, attachment) { + builder.setAttachment(attach); + } +} + +tracing::Hibernate tracing::Hibernate::clone() { + return Hibernate(attachment.map([](auto& attach) { return kj::heapArray(attach); })); +} + +tracing::Attribute::Attribute(kj::String name, kj::OneOf>&& value) + : name(kj::mv(name)), + value(kj::mv(value)) {} + +namespace { +kj::OneOf> readValues( + rpc::Trace::Attribute::Reader& reader) { + static auto readValue = + [](rpc::Trace::Attribute::Value::Reader reader) -> tracing::Attribute::Value { + auto inner = reader.getInner(); + switch (inner.which()) { + case rpc::Trace::Attribute::Value::Inner::TEXT: { + return kj::str(inner.getText()); + } + case rpc::Trace::Attribute::Value::Inner::BOOL: { + return inner.getBool(); + } + case rpc::Trace::Attribute::Value::Inner::FLOAT: { + return inner.getFloat(); + } + case rpc::Trace::Attribute::Value::Inner::INT: { + return static_cast(inner.getInt()); + } + } + KJ_UNREACHABLE; + }; + + // There should always be a value and it always have at least one entry in the list. + KJ_ASSERT(reader.hasValue()); + auto value = reader.getValue(); + KJ_ASSERT(value.size() >= 1); + if (value.size() == 1) { + return readValue(value[0]); + } + kj::Vector values(value.size()); + for (auto v: value) { + values.add(readValue(v)); + } + return values.releaseAsArray(); +} +} // namespace + +tracing::Attribute::Attribute(rpc::Trace::Attribute::Reader reader) + : name(kj::str(reader.getName())), + value(readValues(reader)) {} + +void tracing::Attribute::copyTo(rpc::Trace::Attribute::Builder builder) { + static auto writeValue = [](auto builder, const auto& value) mutable { + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(str, kj::String) { + builder[0].initInner().setText(str.asPtr()); + } + KJ_CASE_ONEOF(b, bool) { + builder[0].initInner().setBool(b); + } + KJ_CASE_ONEOF(f, float) { + builder[0].initInner().setFloat(f); + } + KJ_CASE_ONEOF(i, uint32_t) { + builder[0].initInner().setInt(i); + } + } + }; + builder.setName(name.asPtr()); + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(value, Value) { + writeValue(builder.initValue(1), value); + } + KJ_CASE_ONEOF(values, kj::Array) {} + } +} + +tracing::Attribute tracing::Attribute::clone() { + constexpr auto cloneValue = [](const Value& value) -> Value { + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(str, kj::String) { + return kj::str(str); + } + KJ_CASE_ONEOF(b, bool) { + return b; + } + KJ_CASE_ONEOF(f, float) { + return f; + } + KJ_CASE_ONEOF(i, uint32_t) { + return i; + } + } + KJ_UNREACHABLE; + }; + + KJ_SWITCH_ONEOF(value) { + KJ_CASE_ONEOF(value, Value) { + return Attribute(kj::str(name), cloneValue(value)); + } + KJ_CASE_ONEOF(values, kj::Array) { + return Attribute(kj::str(name), KJ_MAP(value, values) { return cloneValue(value); }); + } + } + KJ_UNREACHABLE; +} + +tracing::Return::Return(kj::Maybe info): info(kj::mv(info)) {} + +namespace { +kj::Maybe readReturnInfo(rpc::Trace::Return::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::Return::Info::EMPTY: + return kj::none; + case rpc::Trace::Return::Info::CUSTOM: { + auto list = info.getCustom(); + kj::Vector attrs(list.size()); + for (size_t n = 0; n < list.size(); n++) { + attrs.add(tracing::Attribute(list[n])); + } + return kj::Maybe(attrs.releaseAsArray()); + } + case rpc::Trace::Return::Info::FETCH: { + return kj::Maybe(tracing::FetchResponseInfo(info.getFetch())); + } + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::Return::Return(rpc::Trace::Return::Reader reader): info(readReturnInfo(reader)) {} + +void tracing::Return::copyTo(rpc::Trace::Return::Builder builder) { + KJ_IF_SOME(i, info) { + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchResponseInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + auto attributes = infoBuilder.initCustom(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + custom[n].copyTo(attributes[n]); + } + } + } + } +} + +tracing::Return tracing::Return::clone() { + KJ_IF_SOME(i, info) { + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchResponseInfo) { + return Return(kj::Maybe(fetch.clone())); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + return Return(kj::Maybe(KJ_MAP(i, custom) { return i.clone(); })); + } + } + KJ_UNREACHABLE; + } + return Return(); +} + +tracing::SpanOpen::SpanOpen(kj::Maybe operationName, kj::Maybe info) + : operationName(kj::mv(operationName)), + info(kj::mv(info)) {} + +namespace { +kj::Maybe readSpanOpenOperationName(rpc::Trace::SpanOpen::Reader& reader) { + if (!reader.hasOperationName()) return kj::none; + return kj::str(reader.getOperationName()); +} + +kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::SpanOpen::Info::EMPTY: + return kj::none; + case rpc::Trace::SpanOpen::Info::FETCH: { + return kj::Maybe(tracing::FetchEventInfo(info.getFetch())); + } + case rpc::Trace::SpanOpen::Info::JSRPC: { + return kj::Maybe(tracing::JsRpcEventInfo(info.getJsrpc())); + } + case rpc::Trace::SpanOpen::Info::CUSTOM: { + auto custom = info.getCustom(); + kj::Vector attrs(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + attrs.add(tracing::Attribute(custom[n])); + } + return kj::Maybe(attrs.releaseAsArray()); + } + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader) + : operationName(readSpanOpenOperationName(reader)), + info(readSpanOpenInfo(reader)) {} + +void tracing::SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) { + KJ_IF_SOME(name, operationName) { + builder.setOperationName(name.asPtr()); + } + KJ_IF_SOME(i, info) { + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(i) { + KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + jsrpc.copyTo(infoBuilder.initJsrpc()); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + auto customBuilder = infoBuilder.initCustom(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + custom[n].copyTo(customBuilder[n]); + } + } + } + } +} + +tracing::SpanOpen tracing::SpanOpen::clone() { + constexpr auto cloneInfo = [](kj::Maybe& info) -> kj::Maybe { + return info.map([](Info& info) -> tracing::SpanOpen::Info { + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + return fetch.clone(); + } + KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + return jsrpc.clone(); + } + KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + kj::Vector attrs(custom.size()); + for (size_t n = 0; n < custom.size(); n++) { + attrs.add(custom[n].clone()); + } + return attrs.releaseAsArray(); + } + } + KJ_UNREACHABLE; + }); + }; + return SpanOpen(operationName.map([](auto& str) { return kj::str(str); }), cloneInfo(info)); +} + +tracing::SpanClose::SpanClose(EventOutcome outcome): outcome(outcome) {} + +tracing::SpanClose::SpanClose(rpc::Trace::SpanClose::Reader reader): outcome(reader.getOutcome()) {} + +void tracing::SpanClose::copyTo(rpc::Trace::SpanClose::Builder builder) { + builder.setOutcome(outcome); +} + +tracing::SpanClose tracing::SpanClose::clone() { + return SpanClose(outcome); +} + +namespace { +tracing::Onset::Info getInfoFromReader(const rpc::Trace::Onset::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::Onset::Info::FETCH: { + return tracing::FetchEventInfo(info.getFetch()); + } + case rpc::Trace::Onset::Info::JSRPC: { + return tracing::JsRpcEventInfo(info.getJsrpc()); + } + case rpc::Trace::Onset::Info::SCHEDULED: { + return tracing::ScheduledEventInfo(info.getScheduled()); + } + case rpc::Trace::Onset::Info::ALARM: { + return tracing::AlarmEventInfo(info.getAlarm()); + } + case rpc::Trace::Onset::Info::QUEUE: { + return tracing::QueueEventInfo(info.getQueue()); + } + case rpc::Trace::Onset::Info::EMAIL: { + return tracing::EmailEventInfo(info.getEmail()); + } + case rpc::Trace::Onset::Info::TRACE: { + return tracing::TraceEventInfo(info.getTrace()); + } + case rpc::Trace::Onset::Info::HIBERNATABLE_WEB_SOCKET: { + return tracing::HibernatableWebSocketEventInfo(info.getHibernatableWebSocket()); + } + case rpc::Trace::Onset::Info::RESUME: { + return tracing::Resume(info.getResume()); + } + case rpc::Trace::Onset::Info::CUSTOM: { + return tracing::CustomEventInfo(); + } + } + KJ_UNREACHABLE; +} +kj::Maybe getScriptNameFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptName()) { + return kj::str(reader.getScriptName()); + } + return kj::none; +} +kj::Maybe> getScriptVersionFromReader( + const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptVersion()) { + return capnp::clone(reader.getScriptVersion()); + } + return kj::none; +} +kj::Maybe getDispatchNamespaceFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasDispatchNamespace()) { + return kj::str(reader.getDispatchNamespace()); + } + return kj::none; +} +kj::Maybe> getScriptTagsFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasScriptTags()) { + auto tags = reader.getScriptTags(); + kj::Vector scriptTags(tags.size()); + for (size_t i = 0; i < tags.size(); i++) { + scriptTags.add(kj::str(tags[i])); + } + return kj::Maybe(scriptTags.releaseAsArray()); + } + return kj::none; +} +kj::Maybe getEntrypointFromReader(const rpc::Trace::Onset::Reader& reader) { + if (reader.hasEntryPoint()) { + return kj::str(reader.getEntryPoint()); + } + return kj::none; +} +} // namespace + +tracing::Onset::Onset(tracing::Onset::Info&& info, + ExecutionModel executionModel, + kj::Maybe scriptName, + kj::Maybe> scriptVersion, + kj::Maybe dispatchNamespace, + kj::Maybe> scriptTags, + kj::Maybe entrypoint) + : info(kj::mv(info)), + executionModel(executionModel), + scriptName(kj::mv(scriptName)), + scriptVersion(kj::mv(scriptVersion)), + dispatchNamespace(kj::mv(dispatchNamespace)), + scriptTags(kj::mv(scriptTags)), + entrypoint(kj::mv(entrypoint)) {} + +tracing::Onset::Onset(rpc::Trace::Onset::Reader reader) + : info(getInfoFromReader(reader)), + executionModel(reader.getExecutionModel()), + scriptName(getScriptNameFromReader(reader)), + scriptVersion(getScriptVersionFromReader(reader)), + dispatchNamespace(getDispatchNamespaceFromReader(reader)), + scriptTags(getScriptTagsFromReader(reader)), + entrypoint(getEntrypointFromReader(reader)) {} + +void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) { + builder.setExecutionModel(executionModel); + KJ_IF_SOME(name, scriptName) { + builder.setScriptName(name); + } + KJ_IF_SOME(version, scriptVersion) { + builder.setScriptVersion(*version); + } + KJ_IF_SOME(name, dispatchNamespace) { + builder.setDispatchNamespace(name); + } + KJ_IF_SOME(tags, scriptTags) { + auto list = builder.initScriptTags(tags.size()); + for (size_t i = 0; i < tags.size(); i++) { + list.set(i, tags[i]); + } + } + KJ_IF_SOME(e, entrypoint) { + builder.setEntryPoint(e); + } + auto infoBuilder = builder.initInfo(); + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { + fetch.copyTo(infoBuilder.initFetch()); + } + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { + jsrpc.copyTo(infoBuilder.initJsrpc()); + } + KJ_CASE_ONEOF(scheduled, ScheduledEventInfo) { + scheduled.copyTo(infoBuilder.initScheduled()); + } + KJ_CASE_ONEOF(alarm, AlarmEventInfo) { + alarm.copyTo(infoBuilder.initAlarm()); + } + KJ_CASE_ONEOF(queue, QueueEventInfo) { + queue.copyTo(infoBuilder.initQueue()); + } + KJ_CASE_ONEOF(email, EmailEventInfo) { + email.copyTo(infoBuilder.initEmail()); + } + KJ_CASE_ONEOF(trace, TraceEventInfo) { + trace.copyTo(infoBuilder.initTrace()); + } + KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { + hws.copyTo(infoBuilder.initHibernatableWebSocket()); + } + KJ_CASE_ONEOF(resume, Resume) { + resume.copyTo(infoBuilder.initResume()); + } + KJ_CASE_ONEOF(custom, CustomEventInfo) { + infoBuilder.initCustom(); + } + } +} + +tracing::Onset tracing::Onset::clone() { + constexpr auto cloneInfo = [](Info& info) -> tracing::Onset::Info { + KJ_SWITCH_ONEOF(info) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { + return fetch.clone(); + } + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { + return jsrpc.clone(); + } + KJ_CASE_ONEOF(scheduled, ScheduledEventInfo) { + return scheduled.clone(); + } + KJ_CASE_ONEOF(alarm, AlarmEventInfo) { + return alarm.clone(); + } + KJ_CASE_ONEOF(queue, QueueEventInfo) { + return queue.clone(); + } + KJ_CASE_ONEOF(email, EmailEventInfo) { + return email.clone(); + } + KJ_CASE_ONEOF(trace, TraceEventInfo) { + return trace.clone(); + } + KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { + return hws.clone(); + } + KJ_CASE_ONEOF(resume, Resume) { + return resume.clone(); + } + KJ_CASE_ONEOF(custom, CustomEventInfo) { + return CustomEventInfo(); + } + } + KJ_UNREACHABLE; + }; + return Onset(cloneInfo(info), executionModel, + scriptName.map([](auto& name) { return kj::str(name); }), + scriptVersion.map([](auto& version) { return capnp::clone(*version); }), + dispatchNamespace.map([](auto& ns) { return kj::str(ns); }), + scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }), + entrypoint.map([](auto& e) { return kj::str(e); })); +} + +tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) + : outcome(outcome), + cpuTime(cpuTime), + wallTime(wallTime) {} + +tracing::Outcome::Outcome(rpc::Trace::Outcome::Reader reader) + : outcome(reader.getOutcome()), + cpuTime(reader.getCpuTime() * kj::MILLISECONDS), + wallTime(reader.getWallTime() * kj::MILLISECONDS) {} + +void tracing::Outcome::copyTo(rpc::Trace::Outcome::Builder builder) { + builder.setOutcome(outcome); + builder.setCpuTime(cpuTime / kj::MILLISECONDS); + builder.setWallTime(wallTime / kj::MILLISECONDS); +} + +tracing::Outcome tracing::Outcome::clone() { + return Outcome(outcome, cpuTime, wallTime); +} + +namespace { +tracing::TailEvent::Context getContextFromSpan( + const kj::Rc& context) { + return tracing::TailEvent::Context( + context->getTraceId(), context->getInvocationId(), context->getSpanId()); +} + +kj::Maybe getParentContextFromSpan( + kj::Rc& context) { + return context->getParent().map([](const kj::Rc& context) { + return getContextFromSpan(context); + }); +} +} // namespace + +tracing::TailEvent::Context::Context(TraceId traceId, TraceId invocationId, kj::uint spanId) + : traceId(traceId), + invocationId(invocationId), + spanId(spanId) {} + +tracing::TailEvent::Context::Context(rpc::InvocationSpanContext::Reader reader) + : traceId(TraceId::fromCapnp(reader.getTraceId())), + invocationId(TraceId::fromCapnp(reader.getInvocationId())), + spanId(reader.getSpanId()) {} + +void tracing::TailEvent::Context::copyTo(rpc::InvocationSpanContext::Builder builder) { + traceId.toCapnp(builder.initTraceId()); + invocationId.toCapnp(builder.initInvocationId()); + builder.setSpanId(spanId); +} + +tracing::TailEvent::Context tracing::TailEvent::Context::clone() { + return Context(traceId, invocationId, spanId); +} + +tracing::TailEvent::TailEvent(kj::Rc& context, + kj::Date timestamp, + kj::uint sequence, + Event&& event) + : context(getContextFromSpan(context)), + parentContext(getParentContextFromSpan(context)), + timestamp(timestamp), + sequence(sequence), + event(kj::mv(event)) {} + +tracing::TailEvent::TailEvent(Context context, + kj::Maybe parentContext, + kj::Date timestamp, + kj::uint sequence, + Event&& event) + : context(kj::mv(context)), + parentContext(kj::mv(parentContext)), + timestamp(timestamp), + sequence(sequence), + event(kj::mv(event)) {} + +namespace { +tracing::TailEvent::Context readContextFromTailEvent(rpc::Trace::TailEvent::Reader& reader) { + return tracing::TailEvent::Context(reader.getContext()); +} + +kj::Maybe readParentContextFromTailEvent( + rpc::Trace::TailEvent::Reader& reader) { + if (!reader.hasParentContext()) return kj::none; + return tracing::TailEvent::Context(reader.getParentContext()); +} + +tracing::TailEvent::Event readEventFromTailEvent(rpc::Trace::TailEvent::Reader& reader) { + auto event = reader.getEvent(); + switch (event.which()) { + case rpc::Trace::TailEvent::Event::ONSET: + return tracing::Onset(event.getOnset()); + case rpc::Trace::TailEvent::Event::OUTCOME: + return tracing::Outcome(event.getOutcome()); + case rpc::Trace::TailEvent::Event::HIBERNATE: + return tracing::Hibernate(event.getHibernate()); + case rpc::Trace::TailEvent::Event::SPAN_OPEN: + return tracing::SpanOpen(event.getSpanOpen()); + case rpc::Trace::TailEvent::Event::SPAN_CLOSE: + return tracing::SpanClose(event.getSpanClose()); + case rpc::Trace::TailEvent::Event::ATTRIBUTE: + return tracing::Mark(tracing::Attribute(event.getAttribute())); + case rpc::Trace::TailEvent::Event::RETURN: + return tracing::Mark(tracing::Return(event.getReturn())); + case rpc::Trace::TailEvent::Event::DIAGNOSTIC_CHANNEL_EVENT: + return tracing::Mark(tracing::DiagnosticChannelEvent(event.getDiagnosticChannelEvent())); + case rpc::Trace::TailEvent::Event::EXCEPTION: + return tracing::Mark(tracing::Exception(event.getException())); + case rpc::Trace::TailEvent::Event::LOG: + return tracing::Mark(tracing::Log(event.getLog())); + } + KJ_UNREACHABLE; +} +} // namespace + +tracing::TailEvent::TailEvent(rpc::Trace::TailEvent::Reader reader) + : context(readContextFromTailEvent(reader)), + parentContext(readParentContextFromTailEvent(reader)), + timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), + sequence(reader.getSequence()), + event(readEventFromTailEvent(reader)) {} + +void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) { + context.copyTo(builder.initContext()); + KJ_IF_SOME(parent, parentContext) { + parent.copyTo(builder.initParentContext()); + } + builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSequence(sequence); + auto eventBuilder = builder.initEvent(); + KJ_SWITCH_ONEOF(event) { + KJ_CASE_ONEOF(onset, Onset) { + onset.copyTo(eventBuilder.initOnset()); + } + KJ_CASE_ONEOF(outcome, Outcome) { + outcome.copyTo(eventBuilder.initOutcome()); + } + KJ_CASE_ONEOF(hibernate, Hibernate) { + hibernate.copyTo(eventBuilder.initHibernate()); + } + KJ_CASE_ONEOF(open, SpanOpen) { + open.copyTo(eventBuilder.initSpanOpen()); + } + KJ_CASE_ONEOF(close, SpanClose) { + close.copyTo(eventBuilder.initSpanClose()); + } + KJ_CASE_ONEOF(mark, Mark) { + KJ_SWITCH_ONEOF(mark) { + KJ_CASE_ONEOF(diag, DiagnosticChannelEvent) { + diag.copyTo(eventBuilder.initDiagnosticChannelEvent()); + } + KJ_CASE_ONEOF(ex, Exception) { + ex.copyTo(eventBuilder.initException()); + } + KJ_CASE_ONEOF(log, Log) { + log.copyTo(eventBuilder.initLog()); + } + KJ_CASE_ONEOF(ret, Return) { + ret.copyTo(eventBuilder.initReturn()); + } + KJ_CASE_ONEOF(attr, Attribute) { + attr.copyTo(eventBuilder.initAttribute()); + } + } + } + } +} + +tracing::TailEvent tracing::TailEvent::clone() { + constexpr auto cloneEvent = [](Event& event) -> Event { + KJ_SWITCH_ONEOF(event) { + KJ_CASE_ONEOF(onset, Onset) { + return onset.clone(); + } + KJ_CASE_ONEOF(outcome, Outcome) { + return outcome.clone(); + } + KJ_CASE_ONEOF(hibernate, Hibernate) { + return hibernate.clone(); + } + KJ_CASE_ONEOF(open, SpanOpen) { + return open.clone(); + } + KJ_CASE_ONEOF(close, SpanClose) { + return close.clone(); + } + KJ_CASE_ONEOF(mark, Mark) { + KJ_SWITCH_ONEOF(mark) { + KJ_CASE_ONEOF(diag, DiagnosticChannelEvent) { + return Mark(diag.clone()); + } + KJ_CASE_ONEOF(ex, Exception) { + return Mark(ex.clone()); + } + KJ_CASE_ONEOF(log, Log) { + return Mark(log.clone()); + } + KJ_CASE_ONEOF(ret, Return) { + return Mark(ret.clone()); + } + KJ_CASE_ONEOF(attr, Attribute) { + return Mark(attr.clone()); + } + } + } + } + KJ_UNREACHABLE; + }; + return TailEvent(context.clone(), + parentContext.map([](Context& parent) { return parent.clone(); }), timestamp, sequence, + cloneEvent(event)); +} + +// ====================================================================================== + SpanBuilder& SpanBuilder::operator=(SpanBuilder&& other) { end(); observer = kj::mv(other.observer); diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 1e9e70753b7..2873b46448b 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -207,15 +207,22 @@ struct FetchEventInfo final { explicit FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers); FetchEventInfo(rpc::Trace::FetchEventInfo::Reader reader); + FetchEventInfo(FetchEventInfo&&) = default; + FetchEventInfo& operator=(FetchEventInfo&&) = default; + KJ_DISALLOW_COPY(FetchEventInfo); struct Header final { explicit Header(kj::String name, kj::String value); Header(rpc::Trace::FetchEventInfo::Header::Reader reader); + Header(Header&&) = default; + Header& operator=(Header&&) = default; + KJ_DISALLOW_COPY(Header); kj::String name; kj::String value; void copyTo(rpc::Trace::FetchEventInfo::Header::Builder builder); + Header clone(); JSG_MEMORY_INFO(Header) { tracker.trackField("name", name); @@ -230,75 +237,105 @@ struct FetchEventInfo final { kj::Array
headers; void copyTo(rpc::Trace::FetchEventInfo::Builder builder); + FetchEventInfo clone(); }; struct JsRpcEventInfo final { explicit JsRpcEventInfo(kj::String methodName); JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader); + JsRpcEventInfo(JsRpcEventInfo&&) = default; + JsRpcEventInfo& operator=(JsRpcEventInfo&&) = default; + KJ_DISALLOW_COPY(JsRpcEventInfo); kj::String methodName; void copyTo(rpc::Trace::JsRpcEventInfo::Builder builder); + JsRpcEventInfo clone(); }; struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); ScheduledEventInfo(rpc::Trace::ScheduledEventInfo::Reader reader); + ScheduledEventInfo(ScheduledEventInfo&&) = default; + ScheduledEventInfo& operator=(ScheduledEventInfo&&) = default; + KJ_DISALLOW_COPY(ScheduledEventInfo); double scheduledTime; kj::String cron; void copyTo(rpc::Trace::ScheduledEventInfo::Builder builder); + ScheduledEventInfo clone(); }; struct AlarmEventInfo final { explicit AlarmEventInfo(kj::Date scheduledTime); AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader); + AlarmEventInfo(AlarmEventInfo&&) = default; + AlarmEventInfo& operator=(AlarmEventInfo&&) = default; + KJ_DISALLOW_COPY(AlarmEventInfo); kj::Date scheduledTime; void copyTo(rpc::Trace::AlarmEventInfo::Builder builder); + AlarmEventInfo clone(); }; struct QueueEventInfo final { explicit QueueEventInfo(kj::String queueName, uint32_t batchSize); QueueEventInfo(rpc::Trace::QueueEventInfo::Reader reader); + QueueEventInfo(QueueEventInfo&&) = default; + QueueEventInfo& operator=(QueueEventInfo&&) = default; + KJ_DISALLOW_COPY(QueueEventInfo); kj::String queueName; uint32_t batchSize; void copyTo(rpc::Trace::QueueEventInfo::Builder builder); + QueueEventInfo clone(); }; struct EmailEventInfo final { explicit EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize); EmailEventInfo(rpc::Trace::EmailEventInfo::Reader reader); + EmailEventInfo(EmailEventInfo&&) = default; + EmailEventInfo& operator=(EmailEventInfo&&) = default; + KJ_DISALLOW_COPY(EmailEventInfo); kj::String mailFrom; kj::String rcptTo; uint32_t rawSize; void copyTo(rpc::Trace::EmailEventInfo::Builder builder); + EmailEventInfo clone(); }; struct TraceEventInfo final { struct TraceItem; explicit TraceEventInfo(kj::ArrayPtr> traces); + TraceEventInfo(kj::Array traces): traces(kj::mv(traces)) {} TraceEventInfo(rpc::Trace::TraceEventInfo::Reader reader); + TraceEventInfo(TraceEventInfo&&) = default; + TraceEventInfo& operator=(TraceEventInfo&&) = default; + KJ_DISALLOW_COPY(TraceEventInfo); struct TraceItem final { explicit TraceItem(kj::Maybe scriptName); TraceItem(rpc::Trace::TraceEventInfo::TraceItem::Reader reader); + TraceItem(TraceItem&&) = default; + TraceItem& operator=(TraceItem&&) = default; + KJ_DISALLOW_COPY(TraceItem); kj::Maybe scriptName; void copyTo(rpc::Trace::TraceEventInfo::TraceItem::Builder builder); + TraceItem clone(); }; kj::Vector traces; void copyTo(rpc::Trace::TraceEventInfo::Builder builder); + TraceEventInfo clone(); }; struct HibernatableWebSocketEventInfo final { @@ -313,10 +350,14 @@ struct HibernatableWebSocketEventInfo final { explicit HibernatableWebSocketEventInfo(Type type); HibernatableWebSocketEventInfo(rpc::Trace::HibernatableWebSocketEventInfo::Reader reader); + HibernatableWebSocketEventInfo(HibernatableWebSocketEventInfo&&) = default; + HibernatableWebSocketEventInfo& operator=(HibernatableWebSocketEventInfo&&) = default; + KJ_DISALLOW_COPY(HibernatableWebSocketEventInfo); Type type; void copyTo(rpc::Trace::HibernatableWebSocketEventInfo::Builder builder); + HibernatableWebSocketEventInfo clone(); static Type readFrom(rpc::Trace::HibernatableWebSocketEventInfo::Reader reader); }; @@ -328,10 +369,14 @@ struct CustomEventInfo final { struct FetchResponseInfo final { explicit FetchResponseInfo(uint16_t statusCode); FetchResponseInfo(rpc::Trace::FetchResponseInfo::Reader reader); + FetchResponseInfo(FetchResponseInfo&&) = default; + FetchResponseInfo& operator=(FetchResponseInfo&&) = default; + KJ_DISALLOW_COPY(FetchResponseInfo); uint16_t statusCode; void copyTo(rpc::Trace::FetchResponseInfo::Builder builder); + FetchResponseInfo clone(); }; struct DiagnosticChannelEvent final { @@ -346,6 +391,7 @@ struct DiagnosticChannelEvent final { kj::Array message; void copyTo(rpc::Trace::DiagnosticChannelEvent::Builder builder); + DiagnosticChannelEvent clone(); }; struct Log final { @@ -363,6 +409,7 @@ struct Log final { kj::String message; void copyTo(rpc::Trace::Log::Builder builder); + Log clone(); }; struct Exception final { @@ -382,6 +429,33 @@ struct Exception final { kj::Maybe stack; void copyTo(rpc::Trace::Exception::Builder builder); + Exception clone(); +}; + +struct Resume final { + explicit Resume(kj::Maybe> attachment); + Resume(rpc::Trace::Resume::Reader reader); + Resume(Resume&&) = default; + KJ_DISALLOW_COPY(Resume); + ~Resume() noexcept(false) = default; + + kj::Maybe> attachment; + + void copyTo(rpc::Trace::Resume::Builder builder); + Resume clone(); +}; + +struct Hibernate final { + explicit Hibernate(kj::Maybe> attachment); + Hibernate(rpc::Trace::Hibernate::Reader reader); + Hibernate(Hibernate&&) = default; + KJ_DISALLOW_COPY(Hibernate); + ~Hibernate() noexcept(false) = default; + + kj::Maybe> attachment; + + void copyTo(rpc::Trace::Hibernate::Builder builder); + Hibernate clone(); }; // EventInfo types are used to describe the onset of an invocation. The FetchEventInfo @@ -394,23 +468,28 @@ using EventInfo = kj::OneOf; -// An Attribute mark is used to add detail to a span over it's lifetime. +// An Attribute mark is used to add detail to a span over its lifetime. // The Attribute struct can also be used to provide arbitrary additional // properties for some other structs. // Modeled after https://opentelemetry.io/docs/concepts/signals/traces/#attributes struct Attribute final { - using Value = kj::OneOf; + using Value = kj::OneOf; - explicit Attribute(kj::ConstString name, kj::OneOf>&& value); + explicit Attribute(kj::String name, kj::OneOf>&& value); + Attribute(rpc::Trace::Attribute::Reader reader); Attribute(Attribute&&) = default; Attribute& operator=(Attribute&&) = default; KJ_DISALLOW_COPY(Attribute); - kj::ConstString name; + kj::String name; kj::OneOf> value; + + void copyTo(rpc::Trace::Attribute::Builder builder); + Attribute clone(); }; using CustomInfo = kj::Array; @@ -424,11 +503,15 @@ struct Return final { using Info = kj::OneOf; explicit Return(kj::Maybe info = kj::none); + Return(rpc::Trace::Return::Reader reader); Return(Return&&) = default; Return& operator=(Return&&) = default; KJ_DISALLOW_COPY(Return); kj::Maybe info = kj::none; + + void copyTo(rpc::Trace::Return::Builder builder); + Return clone(); }; using Mark = kj::OneOf; @@ -440,13 +523,17 @@ struct SpanOpen final { using Info = kj::OneOf; explicit SpanOpen( - kj::Maybe operationName = kj::none, kj::Maybe info = kj::none); + kj::Maybe operationName = kj::none, kj::Maybe info = kj::none); + SpanOpen(rpc::Trace::SpanOpen::Reader reader); SpanOpen(SpanOpen&&) = default; SpanOpen& operator=(SpanOpen&&) = default; KJ_DISALLOW_COPY(SpanOpen); - kj::Maybe operationName = kj::none; + kj::Maybe operationName = kj::none; kj::Maybe info = kj::none; + + void copyTo(rpc::Trace::SpanOpen::Builder builder); + SpanOpen clone(); }; // Marks the closing of a child span within the streaming tail session. @@ -454,11 +541,15 @@ struct SpanOpen final { // span. struct SpanClose final { explicit SpanClose(EventOutcome outcome = EventOutcome::OK); + SpanClose(rpc::Trace::SpanClose::Reader reader); SpanClose(SpanClose&&) = default; SpanClose& operator=(SpanClose&&) = default; KJ_DISALLOW_COPY(SpanClose); EventOutcome outcome = EventOutcome::OK; + + void copyTo(rpc::Trace::SpanClose::Builder builder); + SpanClose clone(); }; // The Onset and Outcome event types are special forms of SpanOpen and @@ -468,17 +559,33 @@ struct SpanClose final { struct Onset final { using Info = EventInfo; - explicit Onset(Info&& info, ExecutionModel executionModel); + explicit Onset(Info&& info, + ExecutionModel executionModel, + kj::Maybe scriptName, + kj::Maybe> scriptVersion, + kj::Maybe dispatchNamespace, + kj::Maybe> scriptTags, + kj::Maybe entrypoint); + Onset(rpc::Trace::Onset::Reader reader); Onset(Onset&&) = default; Onset& operator=(Onset&&) = default; KJ_DISALLOW_COPY(Onset); Info info; ExecutionModel executionModel; + kj::Maybe scriptName; + kj::Maybe> scriptVersion; + kj::Maybe dispatchNamespace; + kj::Maybe> scriptTags; + kj::Maybe entrypoint; + + void copyTo(rpc::Trace::Onset::Builder builder); + Onset clone(); }; struct Outcome final { explicit Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime); + Outcome(rpc::Trace::Outcome::Reader reader); Outcome(Outcome&&) = default; Outcome& operator=(Outcome&&) = default; KJ_DISALLOW_COPY(Outcome); @@ -486,6 +593,9 @@ struct Outcome final { EventOutcome outcome = EventOutcome::OK; kj::Duration cpuTime; kj::Duration wallTime; + + void copyTo(rpc::Trace::Outcome::Builder builder); + Outcome clone(); }; // A streaming tail worker receives a series of Tail Events. Tail events always @@ -495,20 +605,30 @@ struct Outcome final { // and Mark events. Every SpanOpen *must* be associated with a SpanClose unless // the stream was abruptly terminated. struct TailEvent final { - using Event = kj::OneOf; + using Event = kj::OneOf; struct Context final { explicit Context(TraceId traceId, TraceId invocationId, kj::uint spanId); + Context(rpc::InvocationSpanContext::Reader reader); Context(Context&&) = default; Context& operator=(Context&&) = default; KJ_DISALLOW_COPY(Context); TraceId traceId; TraceId invocationId; kj::uint spanId; + + void copyTo(rpc::InvocationSpanContext::Builder builder); + Context clone(); }; explicit TailEvent( kj::Rc& context, kj::Date timestamp, kj::uint sequence, Event&& event); + TailEvent(Context context, + kj::Maybe parentContext, + kj::Date timestamp, + kj::uint sequence, + Event&& event); + TailEvent(rpc::Trace::TailEvent::Reader reader); TailEvent(TailEvent&&) = default; TailEvent& operator=(TailEvent&&) = default; KJ_DISALLOW_COPY(TailEvent); @@ -523,6 +643,9 @@ struct TailEvent final { kj::uint sequence; Event event; + + void copyTo(rpc::Trace::TailEvent::Builder builder); + TailEvent clone(); }; } // namespace tracing diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 8a3263bb725..293d72f6a7c 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -157,6 +157,126 @@ struct Trace @0x8e8d911203762d34 { executionModel @25 :ExecutionModel; # the execution model of the worker being traced. Can be stateless for a regular worker, # durableObject for a DO worker or workflow for the upcoming Workflows feature. + + # ===================================================================================== + # Additional types for streaming tail workers + + struct Attribute { + # An Attribute mark is used to add detail to a span over its lifetime. + # The Attribute struct can also be used to provide arbitrary additional + # properties for some other structs. + # Modeled after https://opentelemetry.io/docs/concepts/signals/traces/#attributes + struct Value { + inner :union { + text @0 :Text; + bool @1 :Bool; + int @2 :Int32; + float @3 :Float32; + } + } + name @0 :Text; + value @1 :List(Value); + } + + struct Return { + # A Return mark is used to mark the point at which a span operation returned + # a value. For instance, when a fetch subrequest response is received, or when + # the fetch handler returns a Response. Importantly, it does not signal that the + # span has closed, which may not happen for some period of time after the return + # mark is recorded (e.g. due to things like waitUntils or waiting to fully ready + # the response body payload, etc). + info :union { + empty @0 :Void; + custom @1 :List(Attribute); + fetch @2 :FetchResponseInfo; + } + } + + struct SpanOpen { + # Marks the opening of a child span within the streaming tail session. + operationName @0 :Text; + info :union { + empty @1 :Void; + custom @2 :List(Attribute); + fetch @3 :FetchEventInfo; + jsrpc @4 :JsRpcEventInfo; + } + } + + struct SpanClose { + # Marks the closing of a child span within the streaming tail session. + # Once emitted, no further mark events should occur within the closed + # span. + outcome @0 :EventOutcome; + } + + struct Resume { + # A resume event indicates that we are resuming a previously hibernated + # tail session. + attachment @0 :Data; + } + + struct Onset { + # The Onset and Outcome event types are special forms of SpanOpen and + # SpanClose that explicitly mark the start and end of the root span. + # A streaming tail session will always begin with an Onset event, and + # always end with an Outcome event. + executionModel @0 :ExecutionModel; + scriptName @1 :Text; + scriptVersion @2 :ScriptVersion; + dispatchNamespace @3 :Text; + scriptTags @4 :List(Text); + entryPoint @5 :Text; + + info :union { + fetch @6 :FetchEventInfo; + jsrpc @7 :JsRpcEventInfo; + scheduled @8 :ScheduledEventInfo; + alarm @9 :AlarmEventInfo; + queue @10 :QueueEventInfo; + email @11 :EmailEventInfo; + trace @12 :TraceEventInfo; + hibernatableWebSocket @13 :HibernatableWebSocketEventInfo; + resume @14 :Resume; + custom @15 :CustomEventInfo; + } + } + + struct Outcome { + outcome @0 :EventOutcome; + cpuTime @1 :UInt64; + wallTime @2 :UInt64; + } + + struct Hibernate { + # A hibernate event indicates that the tail session is being hibernated. + attachment @0 :Data; + } + + struct TailEvent { + # A streaming tail worker receives a series of Tail Events. Tail events always + # occur within an InvocationSpanContext. The first TailEvent delivered to a + # streaming tail session is always an Onset. The final TailEvent delivered is + # always an Outcome. Between those can be any number of SpanOpen, SpanClose, + # and Mark events. Every SpanOpen *must* be associated with a SpanClose unless + # the stream was abruptly terminated. + context @0 :InvocationSpanContext; + parentContext @1 :InvocationSpanContext; + timestampNs @2 :Int64; + sequence @3 :UInt32; + event :union { + onset @4 :Onset; + outcome @5 :Outcome; + hibernate @6 :Hibernate; + spanOpen @7 :SpanOpen; + spanClose @8 :SpanClose; + attribute @9 :Attribute; + return @10 :Return; + diagnosticChannelEvent @11 :DiagnosticChannelEvent; + exception @12 :Exception; + log @13 :Log; + } + } } struct SendTracesRun @0xde913ebe8e1b82a5 {