Skip to content

Commit

Permalink
Merge pull request #3261 from cloudflare/felix/subrequest-client-span…
Browse files Browse the repository at this point in the history
…s-kv

[o11y] Add KV span tags
  • Loading branch information
fhanau authored Jan 8, 2025
2 parents 8ba75c9 + 0b852c0 commit 827685a
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 54 deletions.
85 changes: 72 additions & 13 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ constexpr auto FLPROD_405_HEADER = "CF-KV-FLPROD-405"_kj;
kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrUnknown,
kj::StringPtr urlStr) {
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options) {
const auto operationName = [&] {
KJ_SWITCH_ONEOF(opTypeOrUnknown) {
KJ_CASE_ONEOF(name, kj::LiteralStringConst) {
Expand All @@ -82,6 +83,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
switch (opType) {
case LimitEnforcer::KvOpType::GET:
return "kv_get"_kjc;
case LimitEnforcer::KvOpType::GET_WITH:
return "kv_getWithMetadata"_kjc;
case LimitEnforcer::KvOpType::PUT:
return "kv_put"_kjc;
case LimitEnforcer::KvOpType::LIST:
Expand All @@ -95,8 +98,55 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
KJ_UNREACHABLE;
}();

kj::Vector<Span::Tag> tags;
tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc));
tags.add("cloudflare.kv.operation.name"_kjc, kj::str(operationName.slice(3)));

KJ_IF_SOME(_options, options) {
KJ_SWITCH_ONEOF(_options) {
KJ_CASE_ONEOF(o2, kj::OneOf<kj::String, GetOptions>) {
KJ_SWITCH_ONEOF(o2) {
KJ_CASE_ONEOF(type, kj::String) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_CASE_ONEOF(o, GetOptions) {
KJ_IF_SOME(type, o.type) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_IF_SOME(cacheTtl, o.cacheTtl) {
tags.add("cloudflare.kv.query.parameter.cacheTtl"_kjc, (int64_t)cacheTtl);
}
}
}
}
KJ_CASE_ONEOF(o, ListOptions) {
KJ_IF_SOME(l, o.limit) {
tags.add("cloudflare.kv.query.parameter.limit"_kjc, (int64_t)l);
}
KJ_IF_SOME(prefix, o.prefix) {
KJ_IF_SOME(p, prefix) {
tags.add("cloudflare.kv.query.parameter.prefix"_kjc, kj::mv(p));
}
}
KJ_IF_SOME(cursor, o.cursor) {
KJ_IF_SOME(c, cursor) {
tags.add("cloudflare.kv.query.parameter.cursor"_kjc, kj::mv(c));
}
}
}
KJ_CASE_ONEOF(o, PutOptions) {
KJ_IF_SOME(expiration, o.expiration) {
tags.add("cloudflare.kv.query.parameter.expiration"_kjc, (int64_t)expiration);
}
KJ_IF_SOME(expirationTtl, o.expirationTtl) {
tags.add("cloudflare.kv.query.parameter.expirationTtl"_kjc, (int64_t)expirationTtl);
}
}
}
}
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}});
subrequestChannel, true, kj::none, operationName, kj::mv(tags));

headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand All @@ -105,19 +155,25 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags) {
jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
auto resp = getWithMetadata(js, kj::mv(name), kj::mv(options));
auto resp =
getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET);
return resp.then(js,
[](jsg::Lock&, KvNamespace::GetWithMetadataResult result) { return kj::mv(result.value); });
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op) {
validateKeyName("GET", name);

auto& context = IoContext::current();
Expand All @@ -132,11 +188,11 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_IF_SOME(cacheTtl, options.cacheTtl) {
url.query.add(kj::Url::QueryParam{kj::str("cache_ttl"), kj::str(cacheTtl)});
Expand All @@ -148,7 +204,7 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET, urlStr);
auto client = getHttpClient(context, headers, op, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -260,7 +316,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::list(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -363,7 +420,8 @@ jsg::Promise<void> KvNamespace::put(jsg::Lock& js,

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[&context, client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
Expand Down Expand Up @@ -419,7 +477,8 @@ jsg::Promise<void> KvNamespace::delete_(jsg::Lock& js, kj::String name) {

kj::HttpHeaders headers(context.getHeaderTable());

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr, kj::none);

auto promise = context.waitForOutputLocks().then(
[headers = kj::mv(headers), client = kj::mv(client), urlStr = kj::mv(urlStr)]() mutable {
Expand Down
13 changes: 8 additions & 5 deletions src/workerd/api/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ class KvNamespace: public jsg::Object {
using GetResult = kj::Maybe<
kj::OneOf<jsg::Ref<ReadableStream>, kj::Array<byte>, kj::String, jsg::JsRef<jsg::JsValue>>>;

jsg::Promise<GetResult> get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags);
jsg::Promise<GetResult> get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

struct GetWithMetadataResult {
GetResult value;
Expand All @@ -68,6 +66,10 @@ class KvNamespace: public jsg::Object {
});
};

jsg::Promise<GetWithMetadataResult> getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op);
jsg::Promise<GetWithMetadataResult> getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

Expand Down Expand Up @@ -173,7 +175,8 @@ class KvNamespace: public jsg::Object {
kj::Own<kj::HttpClient> getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrName,
kj::StringPtr urlStr);
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options);

private:
kj::Array<AdditionalHeader> additionalHeaders;
Expand Down
14 changes: 6 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket here.
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_create"_kjc, {"rpc.method"_kjc, "CreateBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -59,8 +58,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags) {
auto& context = IoContext::current();
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}});
auto client = r2GetClient(
context, subrequestChannel, {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -116,9 +115,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
jsg::Promise<void> R2Admin::delete_(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
49 changes: 42 additions & 7 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
#include <regex>

namespace workerd::api::public_beta {
kj::Own<kj::HttpClient> r2GetClient(
IoContext& context, uint subrequestChannel, R2UserTracing user) {
kj::Vector<Span::Tag> tags;
tags.add("rpc.service"_kjc, kj::str("r2"_kjc));
tags.add(user.method.key, kj::str(user.method.value));
KJ_IF_SOME(b, user.bucket) {
tags.add("cloudflare.r2.bucket"_kjc, kj::str(b));
}
KJ_IF_SOME(tag, user.extraTag) {
tags.add(tag.key, kj::str(tag.value));
}

return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, user.op, kj::mv(tags));
}

static bool isWholeNumber(double x) {
double intpart;
return modf(x, &intpart) == 0;
Expand Down Expand Up @@ -341,7 +356,8 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(jsg::Lock
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -377,7 +393,9 @@ R2Bucket::get(jsg::Lock& js,
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.bucket"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -439,7 +457,9 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::put(jsg::Lock&
});

auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_put"_kjc, {"rpc.method"_kjc, "PutObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.key"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -630,8 +650,9 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client =
context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_createMultipartUpload"_kjc, {"rpc.method"_kjc, "CreateMultipartUpload"_kjc},
this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -729,7 +750,20 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc);
auto deleteKey = [&]() {
KJ_SWITCH_ONEOF(keys) {
KJ_CASE_ONEOF(ks, kj::Array<kj::String>) {
return kj::str(ks);
}
KJ_CASE_ONEOF(k, kj::String) {
return kj::str(k);
}
}
KJ_UNREACHABLE;
}();
auto client = r2GetClient(context, clientIndex,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.delete"_kjc, deleteKey.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -774,7 +808,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,
CompatibilityFlags::Reader flags) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ class Headers;

namespace workerd::api::public_beta {

struct StringTagParams {
kj::LiteralStringConst key;
kj::StringPtr value;
};

struct R2UserTracing {
kj::LiteralStringConst op;
StringTagParams method;
// Passing Maybe<kj::StringPtr> instead of Maybe<StringTagParams> here – this avoids a branch on
// the caller side when bucket is already a Maybe, which is more convenient.
kj::Maybe<kj::StringPtr> bucket;
kj::Maybe<StringTagParams> extraTag;
};

// Helper for creating R2 HTTP Client with the right span tags across operations. This is much
// cleaner than setting span tags directly in each function.
kj::Own<kj::HttpClient> r2GetClient(IoContext& context, uint subrequestChannel, R2UserTracing user);

kj::Array<kj::byte> cloneByteArray(const kj::Array<kj::byte>& arr);
kj::ArrayPtr<kj::StringPtr> fillR2Path(
kj::StringPtr pathStorage[1], const kj::Maybe<kj::String>& bucket);
Expand Down
15 changes: 9 additions & 6 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(jsg:
"Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber);

auto& context = IoContext::current();
auto client =
context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_uploadPart"_kjc, {"rpc.method"_kjc, "UploadPart"_kjc}, this->bucket->adminBucketName(),
{{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -95,8 +96,9 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(jsg::Lo
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_completeMultipartUpload"_kjc, {"rpc.method"_kjc, "CompleteMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -145,8 +147,9 @@ jsg::Promise<void> R2MultipartUpload::abort(
jsg::Lock& js, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_abortMultipartUpload"_kjc, {"rpc.method"_kjc, "AbortMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
Loading

0 comments on commit 827685a

Please sign in to comment.