From 3f506689efa84aa2dc5f344e9decdc710d01e067 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Fri, 6 Dec 2024 09:56:05 -0800 Subject: [PATCH] More refinements to types --- src/workerd/io/trace-test.c++ | 46 ++++++-- src/workerd/io/trace.c++ | 164 ++++++++++---------------- src/workerd/io/trace.h | 112 +++++------------- src/workerd/io/worker-interface.capnp | 38 +++--- 4 files changed, 145 insertions(+), 215 deletions(-) diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index 98147246485..2c9005b49b9 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -409,14 +409,14 @@ KJ_TEST("Read/Write Attribute works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::Attribute::Builder("foo", 1).add(123.0).add(true).finish().copyTo(infoBuilder); + tracing::Attribute attr(kj::str("foo"), {123.0, 321.2}); + attr.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); tracing::Attribute info2(reader); KJ_ASSERT(info2.name == "foo"_kj); - auto& vals = KJ_ASSERT_NONNULL(info2.value.tryGet()); - KJ_ASSERT(KJ_ASSERT_NONNULL(vals[0].tryGet()) == 123.0); - KJ_ASSERT(KJ_ASSERT_NONNULL(vals[1].tryGet())); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[0].tryGet()) == 123.0); + KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[1].tryGet()) == 321.2); } KJ_TEST("Read/Write Return works") { @@ -517,7 +517,7 @@ KJ_TEST("Read/Write Outcome works") { KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); } -KJ_TEST("Read/Write TraceEvent works") { +KJ_TEST("Read/Write TailEvent works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); @@ -532,9 +532,9 @@ KJ_TEST("Read/Write TraceEvent works") { tracing::TailEvent info2(reader); KJ_ASSERT(info2.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(info2.sequence == 0); - KJ_ASSERT(info2.context.invocationId == context.getInvocationId()); - KJ_ASSERT(info2.context.traceId == context.getTraceId()); - KJ_ASSERT(info2.context.spanId == context.getSpanId()); + KJ_ASSERT(info2.invocationId == context.getInvocationId()); + KJ_ASSERT(info2.traceId == context.getTraceId()); + KJ_ASSERT(info2.spanId == context.getSpanId()); auto& event = KJ_ASSERT_NONNULL(info2.event.tryGet()); auto& log2 = KJ_ASSERT_NONNULL(event.tryGet()); @@ -545,9 +545,9 @@ KJ_TEST("Read/Write TraceEvent works") { tracing::TailEvent info3 = info.clone(); KJ_ASSERT(info3.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(info3.sequence == 0); - KJ_ASSERT(info3.context.invocationId == context.getInvocationId()); - KJ_ASSERT(info3.context.traceId == context.getTraceId()); - KJ_ASSERT(info3.context.spanId == context.getSpanId()); + KJ_ASSERT(info3.invocationId == context.getInvocationId()); + KJ_ASSERT(info3.traceId == context.getTraceId()); + KJ_ASSERT(info3.spanId == context.getSpanId()); auto& event2 = KJ_ASSERT_NONNULL(info3.event.tryGet()); auto& log3 = KJ_ASSERT_NONNULL(event2.tryGet()); @@ -556,5 +556,29 @@ KJ_TEST("Read/Write TraceEvent works") { KJ_ASSERT(log3.message == "foo"_kj); } +KJ_TEST("Read/Write TailEvent with Multiple Attributes") { + capnp::MallocMessageBuilder builder; + auto infoBuilder = builder.initRoot(); + + FakeEntropySource entropy; + auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy); + + // An attribute event can have one or more Attributes specified. + kj::Vector attrs(2); + attrs.add(tracing::Attribute(kj::str("foo"), true)); + attrs.add(tracing::Attribute(kj::str("bar"), 123)); + + tracing::TailEvent info(context, kj::UNIX_EPOCH, 0, tracing::Mark(attrs.releaseAsArray())); + info.copyTo(infoBuilder); + + tracing::TailEvent info2(infoBuilder.asReader()); + auto& mark = KJ_ASSERT_NONNULL(info2.event.tryGet()); + auto& attrs2 = KJ_ASSERT_NONNULL(mark.tryGet>()); + KJ_ASSERT(attrs2.size() == 2); + + KJ_ASSERT(attrs2[0].name == "foo"_kj); + KJ_ASSERT(attrs2[1].name == "bar"_kj); +} + } // namespace } // namespace workerd::tracing diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 5ae3deb9e10..3b74c65a192 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -853,15 +853,14 @@ tracing::Hibernate tracing::Hibernate::clone() { tracing::Attribute::Attribute(kj::String name, Value&& value) : name(kj::mv(name)), - value(kj::mv(value)) {} + value(kj::arr(kj::mv(value))) {} -tracing::Attribute::Attribute(kj::String name, kj::Array&& value) +tracing::Attribute::Attribute(kj::String name, Values&& value) : name(kj::mv(name)), value(kj::mv(value)) {} namespace { -kj::OneOf> readValues( - const rpc::Trace::Attribute::Reader& reader) { +kj::Array readValues(const rpc::Trace::Attribute::Reader& reader) { static auto readValue = [](rpc::Trace::Attribute::Value::Reader reader) -> tracing::Attribute::Value { auto inner = reader.getInner(); @@ -876,7 +875,7 @@ kj::OneOf> readV return inner.getFloat(); } case rpc::Trace::Attribute::Value::Inner::INT: { - return static_cast(inner.getInt()); + return static_cast(inner.getInt()); } } KJ_UNREACHABLE; @@ -885,10 +884,6 @@ kj::OneOf> readV // There should always be a value and it always have at least one entry in the list. KJ_ASSERT(reader.hasValue()); auto value = reader.getValue(); - KJ_ASSERT(value.size() >= 1); - if (value.size() == 1) { - return readValue(value[0]); - } kj::Vector values(value.size()); for (auto v: value) { values.add(readValue(v)); @@ -913,22 +908,15 @@ void tracing::Attribute::copyTo(rpc::Trace::Attribute::Builder builder) { KJ_CASE_ONEOF(f, double) { builder.initInner().setFloat(f); } - KJ_CASE_ONEOF(i, uint32_t) { + KJ_CASE_ONEOF(i, int32_t) { builder.initInner().setInt(i); } } }; builder.setName(name.asPtr()); - KJ_SWITCH_ONEOF(value) { - KJ_CASE_ONEOF(value, Value) { - writeValue(builder.initValue(1)[0], value); - } - KJ_CASE_ONEOF(values, kj::Array) { - auto vec = builder.initValue(values.size()); - for (size_t n = 0; n < values.size(); n++) { - writeValue(vec[n], values[n]); - } - } + auto vec = builder.initValue(value.size()); + for (size_t n = 0; n < value.size(); n++) { + writeValue(vec[n], value[n]); } } @@ -944,22 +932,14 @@ tracing::Attribute tracing::Attribute::clone() { KJ_CASE_ONEOF(f, double) { return f; } - KJ_CASE_ONEOF(i, uint32_t) { + KJ_CASE_ONEOF(i, int32_t) { return i; } } KJ_UNREACHABLE; }; - KJ_SWITCH_ONEOF(value) { - KJ_CASE_ONEOF(value, Value) { - return Attribute(kj::str(name), cloneValue(value)); - } - KJ_CASE_ONEOF(values, kj::Array) { - return Attribute(kj::str(name), KJ_MAP(value, values) { return cloneValue(value); }); - } - } - KJ_UNREACHABLE; + return Attribute(kj::str(name), KJ_MAP(v, value) { return cloneValue(v); }); } tracing::Return::Return(kj::Maybe info): info(kj::mv(info)) {} @@ -1340,111 +1320,87 @@ tracing::Outcome tracing::Outcome::clone() { return Outcome(outcome, cpuTime, wallTime); } -namespace { -tracing::TailEvent::Context getContextFromSpan(const tracing::InvocationSpanContext& context) { - return tracing::TailEvent::Context( - context.getTraceId(), context.getInvocationId(), context.getSpanId()); -} - -kj::Maybe getParentContextFromSpan( - const tracing::InvocationSpanContext& context) { - return context.getParent().map( - [](const tracing::InvocationSpanContext& context) { return getContextFromSpan(context); }); -} -} // namespace - -tracing::TailEvent::Context::Context(TraceId traceId, TraceId invocationId, SpanId spanId) - : traceId(kj::mv(traceId)), - invocationId(kj::mv(invocationId)), - spanId(kj::mv(spanId)) {} - -tracing::TailEvent::Context::Context(rpc::InvocationSpanContext::Reader reader) - : traceId(TraceId::fromCapnp(reader.getTraceId())), - invocationId(TraceId::fromCapnp(reader.getInvocationId())), - spanId(reader.getSpanId()) {} - -void tracing::TailEvent::Context::copyTo(rpc::InvocationSpanContext::Builder builder) { - traceId.toCapnp(builder.initTraceId()); - invocationId.toCapnp(builder.initInvocationId()); - builder.setSpanId(spanId); -} - -tracing::TailEvent::Context tracing::TailEvent::Context::clone() { - return Context(traceId, invocationId, spanId); -} - tracing::TailEvent::TailEvent(const tracing::InvocationSpanContext& context, kj::Date timestamp, kj::uint sequence, Event&& event) - : context(getContextFromSpan(context)), - parentContext(getParentContextFromSpan(context)), + : traceId(context.getTraceId()), + invocationId(context.getInvocationId()), + spanId(context.getSpanId()), timestamp(timestamp), sequence(sequence), event(kj::mv(event)) {} -tracing::TailEvent::TailEvent(Context context, - kj::Maybe parentContext, +tracing::TailEvent::TailEvent(TraceId traceId, + TraceId invocationId, + SpanId spanId, kj::Date timestamp, kj::uint sequence, Event&& event) - : context(kj::mv(context)), - parentContext(kj::mv(parentContext)), + : traceId(kj::mv(traceId)), + invocationId(kj::mv(invocationId)), + spanId(kj::mv(spanId)), timestamp(timestamp), sequence(sequence), event(kj::mv(event)) {} namespace { -tracing::TailEvent::Context readContextFromTailEvent(const rpc::Trace::TailEvent::Reader& reader) { - return tracing::TailEvent::Context(reader.getContext()); -} - -kj::Maybe readParentContextFromTailEvent( - const rpc::Trace::TailEvent::Reader& reader) { - if (!reader.hasParentContext()) return kj::none; - return tracing::TailEvent::Context(reader.getParentContext()); -} - tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Reader& reader) { const auto event = reader.getEvent(); switch (event.which()) { - case rpc::Trace::TailEvent::Event::ONSET: + case rpc::Trace::TailEvent::Event::ONSET: { return tracing::Onset(event.getOnset()); - case rpc::Trace::TailEvent::Event::OUTCOME: + } + case rpc::Trace::TailEvent::Event::OUTCOME: { return tracing::Outcome(event.getOutcome()); - case rpc::Trace::TailEvent::Event::HIBERNATE: + } + case rpc::Trace::TailEvent::Event::HIBERNATE: { return tracing::Hibernate(event.getHibernate()); - case rpc::Trace::TailEvent::Event::SPAN_OPEN: + } + case rpc::Trace::TailEvent::Event::SPAN_OPEN: { return tracing::SpanOpen(event.getSpanOpen()); - case rpc::Trace::TailEvent::Event::SPAN_CLOSE: + } + case rpc::Trace::TailEvent::Event::SPAN_CLOSE: { return tracing::SpanClose(event.getSpanClose()); - case rpc::Trace::TailEvent::Event::ATTRIBUTE: - return tracing::Mark(tracing::Attribute(event.getAttribute())); - case rpc::Trace::TailEvent::Event::RETURN: + } + case rpc::Trace::TailEvent::Event::ATTRIBUTE: { + auto listReader = event.getAttribute(); + kj::Vector attrs(listReader.size()); + for (size_t n = 0; n < listReader.size(); n++) { + attrs.add(tracing::Attribute(listReader[n])); + } + return tracing::Mark(attrs.releaseAsArray()); + } + case rpc::Trace::TailEvent::Event::RETURN: { return tracing::Mark(tracing::Return(event.getReturn())); - case rpc::Trace::TailEvent::Event::DIAGNOSTIC_CHANNEL_EVENT: + } + case rpc::Trace::TailEvent::Event::DIAGNOSTIC_CHANNEL_EVENT: { return tracing::Mark(tracing::DiagnosticChannelEvent(event.getDiagnosticChannelEvent())); - case rpc::Trace::TailEvent::Event::EXCEPTION: + } + case rpc::Trace::TailEvent::Event::EXCEPTION: { return tracing::Mark(tracing::Exception(event.getException())); - case rpc::Trace::TailEvent::Event::LOG: + } + case rpc::Trace::TailEvent::Event::LOG: { return tracing::Mark(tracing::Log(event.getLog())); + } } KJ_UNREACHABLE; } } // namespace tracing::TailEvent::TailEvent(rpc::Trace::TailEvent::Reader reader) - : context(readContextFromTailEvent(reader)), - parentContext(readParentContextFromTailEvent(reader)), + : traceId(TraceId::fromCapnp(reader.getContext().getTraceId())), + invocationId(TraceId::fromCapnp(reader.getContext().getInvocationId())), + spanId(SpanId(reader.getContext().getSpanId())), timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), sequence(reader.getSequence()), event(readEventFromTailEvent(reader)) {} void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) { - context.copyTo(builder.initContext()); - KJ_IF_SOME(parent, parentContext) { - parent.copyTo(builder.initParentContext()); - } + auto context = builder.initContext(); + traceId.toCapnp(context.initTraceId()); + invocationId.toCapnp(context.initInvocationId()); + context.setSpanId(spanId.getId()); builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setSequence(sequence); auto eventBuilder = builder.initEvent(); @@ -1478,8 +1434,12 @@ void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) { KJ_CASE_ONEOF(ret, Return) { ret.copyTo(eventBuilder.initReturn()); } - KJ_CASE_ONEOF(attr, Attribute) { - attr.copyTo(eventBuilder.initAttribute()); + KJ_CASE_ONEOF(attrs, kj::Array) { + // Mark is a collection of attributes. + auto attrBuilder = eventBuilder.initAttribute(attrs.size()); + for (size_t n = 0; n < attrs.size(); n++) { + attrs[n].copyTo(attrBuilder[n]); + } } } } @@ -1518,17 +1478,15 @@ tracing::TailEvent tracing::TailEvent::clone() { KJ_CASE_ONEOF(ret, Return) { return Mark(ret.clone()); } - KJ_CASE_ONEOF(attr, Attribute) { - return Mark(attr.clone()); + KJ_CASE_ONEOF(attrs, tracing::CustomInfo) { + return Mark(KJ_MAP(attr, attrs) { return attr.clone(); }); } } } } KJ_UNREACHABLE; }; - return TailEvent(context.clone(), - parentContext.map([](Context& parent) { return parent.clone(); }), timestamp, sequence, - cloneEvent(event)); + return TailEvent(traceId, invocationId, spanId, timestamp, sequence, cloneEvent(event)); } // ====================================================================================== diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index bf37df4f0ca..5a2eb0daab3 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -18,6 +18,9 @@ #include #include +#include +#include + namespace kj { enum class HttpMethod; class EntropySource; @@ -548,75 +551,42 @@ using EventInfo = kj::OneOf; +template +concept AttributeValue = kj::isSameType() || kj::isSameType() || + kj::isSameType() || kj::isSameType(); + // An Attribute mark is used to add detail to a span over its lifetime. // The Attribute struct can also be used to provide arbitrary additional // properties for some other structs. // Modeled after https://opentelemetry.io/docs/concepts/signals/traces/#attributes struct Attribute final { - using Value = kj::OneOf; + using Value = kj::OneOf; using Values = kj::Array; explicit Attribute(kj::String name, Value&& value); explicit Attribute(kj::String name, Values&& values); + + template + explicit Attribute(kj::String name, V v): Attribute(kj::mv(name), Value(kj::mv(v))) {} + + template + explicit Attribute(kj::String name, kj::Array vals) + : Attribute(kj::mv(name), KJ_MAP(v, vals) { return Value(kj::mv(v)); }) {} + + template + explicit Attribute(kj::String name, std::initializer_list list) + : Attribute(kj::mv(name), kj::heapArray(list)) {} + Attribute(rpc::Trace::Attribute::Reader reader); Attribute(Attribute&&) = default; Attribute& operator=(Attribute&&) = default; KJ_DISALLOW_COPY(Attribute); kj::String name; - - kj::OneOf value; + Values value; void copyTo(rpc::Trace::Attribute::Builder builder); Attribute clone(); - - class Builder final { - public: - Builder(kj::String name): name(kj::mv(name)) {} - Builder(kj::String name, size_t n): name(kj::mv(name)), vec(n) {} - Builder(kj::StringPtr name): Builder(kj::str(name)) {} - Builder(kj::StringPtr name, size_t n): name(kj::str(name)), vec(n) {} - KJ_DISALLOW_COPY_AND_MOVE(Builder); - - Builder& add(kj::String value) KJ_LIFETIMEBOUND { - vec.add(kj::mv(value)); - return *this; - } - - Builder& add(kj::StringPtr value) KJ_LIFETIMEBOUND { - vec.add(kj::str(value)); - return *this; - } - - Builder& add(bool value) KJ_LIFETIMEBOUND { - vec.add(value); - return *this; - } - - Builder& add(double value) KJ_LIFETIMEBOUND { - vec.add(value); - return *this; - } - - Builder& add(uint32_t value) KJ_LIFETIMEBOUND { - vec.add(value); - return *this; - } - - Attribute finish() { - KJ_ASSERT(vec.size() >= 1); - if (vec.size() == 1) { - auto val = kj::mv(vec[0]); - vec.clear(); - return Attribute(kj::mv(name), kj::mv(val)); - } - return Attribute(kj::mv(name), vec.releaseAsArray()); - } - - private: - kj::String name; - kj::Vector vec; - }; }; using CustomInfo = kj::Array; @@ -641,7 +611,7 @@ struct Return final { Return clone(); }; -using Mark = kj::OneOf; +using Mark = kj::OneOf>; // Marks the opening of a child span within the streaming tail session. struct SpanOpen final { @@ -734,36 +704,11 @@ struct Outcome final { struct TailEvent final { using Event = kj::OneOf; - struct Context final { - explicit Context(TraceId traceId, TraceId invocationId, SpanId spanId); - Context(const InvocationSpanContext& context) - : Context(context.getTraceId(), context.getInvocationId(), context.getSpanId()) {} - Context(rpc::InvocationSpanContext::Reader reader); - Context(Context&&) = default; - Context& operator=(Context&&) = default; - KJ_DISALLOW_COPY(Context); - TraceId traceId; - TraceId invocationId; - SpanId spanId; - - inline bool operator==(const Context& other) { - return traceId == other.traceId && invocationId == other.invocationId && - spanId == other.spanId; - } - - inline bool operator==(const InvocationSpanContext& other) { - return traceId == other.getTraceId() && invocationId == other.getInvocationId() && - spanId == other.getSpanId(); - } - - void copyTo(rpc::InvocationSpanContext::Builder builder); - Context clone(); - }; - explicit TailEvent( const InvocationSpanContext& context, kj::Date timestamp, kj::uint sequence, Event&& event); - TailEvent(Context context, - kj::Maybe parentContext, + TailEvent(TraceId traceId, + TraceId invocationId, + SpanId spanId, kj::Date timestamp, kj::uint sequence, Event&& event); @@ -773,10 +718,9 @@ struct TailEvent final { KJ_DISALLOW_COPY(TailEvent); // The invocation span context this event is associated with. - Context context; - - // The parent or trigger span context, if any. - kj::Maybe parentContext = kj::none; + TraceId traceId; + TraceId invocationId; + SpanId spanId; kj::Date timestamp; // Unix epoch, spectre-mitigated resolution kj::uint sequence; diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 61fc7c658ee..80308a18c39 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -184,7 +184,7 @@ struct Trace @0x8e8d911203762d34 { # the fetch handler returns a Response. Importantly, it does not signal that the # span has closed, which may not happen for some period of time after the return # mark is recorded (e.g. due to things like waitUntils or waiting to fully ready - # the response body payload, etc). + # the response body payload, etc). Not all spans will have a Return mark. info :union { empty @0 :Void; custom @1 :List(Attribute); @@ -213,7 +213,12 @@ struct Trace @0x8e8d911203762d34 { struct Resume { # A resume event indicates that we are resuming a previously hibernated # tail session. + attachment @0 :Data; + # When a tail session is hibernated, the tail worker is given the opportunity + # to provide some additional data that will be serialized and stored with the + # hibernated state. When the stream is resumed, if the tail worker has provided + # such data, it will be passed back to the worker in the resume event. } struct Onset { @@ -256,24 +261,23 @@ struct Trace @0x8e8d911203762d34 { # A streaming tail worker receives a series of Tail Events. Tail events always # occur within an InvocationSpanContext. The first TailEvent delivered to a # streaming tail session is always an Onset. The final TailEvent delivered is - # always an Outcome. Between those can be any number of SpanOpen, SpanClose, - # and Mark events. Every SpanOpen *must* be associated with a SpanClose unless - # the stream was abruptly terminated. + # always an Outcome or Hibernate. Between those can be any number of SpanOpen, + # SpanClose, and Mark events. Every SpanOpen *must* be associated with a SpanClose + # unless the stream was abruptly terminated. context @0 :InvocationSpanContext; - parentContext @1 :InvocationSpanContext; - timestampNs @2 :Int64; - sequence @3 :UInt32; + timestampNs @1 :Int64; + sequence @2 :UInt32; event :union { - onset @4 :Onset; - outcome @5 :Outcome; - hibernate @6 :Hibernate; - spanOpen @7 :SpanOpen; - spanClose @8 :SpanClose; - attribute @9 :Attribute; - return @10 :Return; - diagnosticChannelEvent @11 :DiagnosticChannelEvent; - exception @12 :Exception; - log @13 :Log; + onset @3 :Onset; + outcome @4 :Outcome; + hibernate @5 :Hibernate; + spanOpen @6 :SpanOpen; + spanClose @7 :SpanClose; + attribute @8 :List(Attribute); + return @9 :Return; + diagnosticChannelEvent @10 :DiagnosticChannelEvent; + exception @11 :Exception; + log @12 :Log; } } }