From e34ab7e73effe3fd0ab12e11e148b8005a038912 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Fri, 29 Nov 2024 16:50:39 -0600 Subject: [PATCH] Implement connection props. This adds `ctx.props` to the `ctx` object given to `WorkerEntrypoint`s. The property receives metadata about the particular service binding over which the entrypoint was invoked. ``` class MyEntrypoint extends WorkerEntrypoint { foo() { console.log("called by: " + this.ctx.props.caller); } } ``` Service binding declarations in the workerd config may specify what metadata to pass: ``` bindings = [ ( name = "FOO", service = ( name = "my-service", entrypoint = "MyEntrypoint", props = ( json = `{"caller": "my-calling-service"} ) ) ) ] ``` Note that "caller" is just an example. The props can contain anything. Use cases include: * Authentication of the caller's identity. * Authorization / permissions (independent of caller identity). * Specifying a particular resource. For example, if the `WorkerEntrypoint` represents a chat room, `props.roomId` could be the ID of the specific chat room to access. This allows service bindings to implement a deeper capability-based security model, where bindings point to specific resources with specific permissions, instead of general APIs. On Cloudflare, only users who have permission to modify your worker will have permission to create a binding containing arbitrary metadata. Meanwhile we will be creating a mechanism by which you can grant a service binding to your worker to someone, but where you specify the metadata. Thus, you can use the metadata to authenticate requests, without the need for any cryptography. --- src/workerd/api/global-scope.h | 19 +++++ src/workerd/api/hibernatable-web-socket.c++ | 13 +-- src/workerd/api/hibernatable-web-socket.h | 1 + src/workerd/api/queue.c++ | 10 ++- src/workerd/api/queue.h | 1 + src/workerd/api/trace.c++ | 13 +-- src/workerd/api/trace.h | 1 + src/workerd/api/worker-rpc.c++ | 16 ++-- src/workerd/api/worker-rpc.h | 1 + src/workerd/io/BUILD.bazel | 1 + src/workerd/io/worker-entrypoint.c++ | 44 ++++++---- src/workerd/io/worker-entrypoint.h | 2 + src/workerd/io/worker-interface.h | 2 + src/workerd/io/worker.c++ | 14 +++- src/workerd/io/worker.h | 9 +- src/workerd/server/server-test.c++ | 47 +++++++++++ src/workerd/server/server.c++ | 92 +++++++++++++++------ src/workerd/server/workerd.capnp | 10 +++ src/workerd/tests/test-fixture.c++ | 2 +- 19 files changed, 227 insertions(+), 71 deletions(-) diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index ee0d5bdf620..8a3b9f61f51 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -204,6 +204,9 @@ class TestController: public jsg::Object { class ExecutionContext: public jsg::Object { public: + ExecutionContext(jsg::Lock& js): props(js, js.obj()) {} + ExecutionContext(jsg::Lock& js, jsg::JsValue props): props(js, props) {} + void waitUntil(kj::Promise promise); void passThroughOnException(); @@ -211,9 +214,14 @@ class ExecutionContext: public jsg::Object { // and throwing an error at the client. void abort(jsg::Lock& js, jsg::Optional reason); + jsg::JsValue getProps(jsg::Lock& js) { + return props.getHandle(js); + } + JSG_RESOURCE_TYPE(ExecutionContext, CompatibilityFlags::Reader flags) { JSG_METHOD(waitUntil); JSG_METHOD(passThroughOnException); + JSG_LAZY_INSTANCE_PROPERTY(props, getProps); if (flags.getWorkerdExperimental()) { // TODO(soon): Before making this generally available we need to: @@ -229,6 +237,17 @@ class ExecutionContext: public jsg::Object { JSG_METHOD(abort); } } + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("props", props); + } + + private: + jsg::JsRef props; + + void visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(props); + } }; // AlarmEventInfo is a jsg::Object used to pass alarm invocation info to an alarm handler. diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index 2748c9c3ac9..6257c79c2f0 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -59,6 +59,7 @@ jsg::Ref HibernatableWebSocketEvent::claimWebSocket( kj::Promise HibernatableWebSocketCustomEventImpl::run( kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) { // Mark the request as delivered because we're about to run some JS. auto& context = incomingRequest->getContext(); @@ -100,28 +101,28 @@ kj::Promise HibernatableWebSocketCustomEve try { co_await context.run( - [entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters)]( - Worker::Lock& lock) mutable { + [entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters), + props = kj::mv(props)](Worker::Lock& lock) mutable { KJ_SWITCH_ONEOF(eventParameters.eventType) { KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) { return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(text.message), eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock, - lock.getExportedHandler(entrypointName, context.getActor())); + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); } KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) { return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(data.message), eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock, - lock.getExportedHandler(entrypointName, context.getActor())); + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); } KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) { return lock.getGlobalScope().sendHibernatableWebSocketClose(kj::mv(close), eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock, - lock.getExportedHandler(entrypointName, context.getActor())); + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); } KJ_CASE_ONEOF(e, HibernatableSocketParams::Error) { return lock.getGlobalScope().sendHibernatableWebSocketError(kj::mv(e.error), eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock, - lock.getExportedHandler(entrypointName, context.getActor())); + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); } KJ_UNREACHABLE; } diff --git a/src/workerd/api/hibernatable-web-socket.h b/src/workerd/api/hibernatable-web-socket.h index f1b82dc6744..93b5e83a94f 100644 --- a/src/workerd/api/hibernatable-web-socket.h +++ b/src/workerd/api/hibernatable-web-socket.h @@ -66,6 +66,7 @@ class HibernatableWebSocketCustomEventImpl final: public WorkerInterface::Custom kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) override; kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 8b7b95475a9..36bc07b1af6 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -513,6 +513,7 @@ jsg::Ref startQueueEvent(EventTarget& globalEventTarget, kj::Promise QueueCustomEventImpl::run( kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) { incomingRequest->delivered(); auto& context = incomingRequest->getContext(); @@ -546,13 +547,14 @@ kj::Promise QueueCustomEventImpl::run( // can't just wait on their addEventListener handler to resolve because it can't be async). context.addWaitUntil(context.run( [this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder), - &metrics = incomingRequest->getMetrics()](Worker::Lock& lock) mutable { + &metrics = incomingRequest->getMetrics(), + props = kj::mv(props)](Worker::Lock& lock) mutable { jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock); - queueEvent->event = - startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock, - lock.getExportedHandler(entrypointName, context.getActor()), typeHandler); + queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params), + context.addObject(result), lock, + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler); })); // TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index dfcbd20662e..567a69ae15b 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -341,6 +341,7 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) override; kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index ca23202813c..2ba7ebab889 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -601,6 +601,7 @@ jsg::Ref UnsafeTraceMetrics::fromTrace(jsg::Ref item) { namespace { kj::Promise sendTracesToExportedHandler(kj::Own incomingRequest, kj::Maybe entrypointNamePtr, + Frankenvalue props, kj::ArrayPtr> traces) { // Mark the request as delivered because we're about to run some JS. incomingRequest->delivered(); @@ -624,11 +625,12 @@ kj::Promise sendTracesToExportedHandler(kj::Own sendTracesToExportedHandler(kj::Own incomingRequest, kj::Maybe entrypointNamePtr, + Frankenvalue props, kj::TaskSet& waitUntilTasks) -> kj::Promise { // Don't bother to wait around for the handler to run, just hand it off to the waitUntil tasks. - waitUntilTasks.add( - sendTracesToExportedHandler(kj::mv(incomingRequest), entrypointNamePtr, traces)); + waitUntilTasks.add(sendTracesToExportedHandler( + kj::mv(incomingRequest), entrypointNamePtr, kj::mv(props), traces)); return Result{ .outcome = EventOutcome::OK, diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 812119ea2ad..dd9a804fbed 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -615,6 +615,7 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent { kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) override; kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 9d1dc2b29c2..ec304b53513 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1666,18 +1666,21 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase { public: EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> tracer) : JsRpcTargetBase(ioCtx), // Most of the time we don't really have to clone this but it's hard to fully prove, so // let's be safe. entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })), + props(kj::mv(props)), tracer(kj::mv(tracer)) {} TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override { jsg::Lock& js = lock; - auto handler = KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointName, ioCtx.getActor()), - "Failed to get handler to worker."); + auto handler = + KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointName, kj::mv(props), ioCtx.getActor()), + "Failed to get handler to worker."); if (handler->missingSuperclass) { // JS RPC is not enabled on the server side, we cannot call any methods. @@ -1709,6 +1712,7 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase { private: kj::Maybe entrypointName; + Frankenvalue props; kj::Maybe> tracer; bool isReservedName(kj::StringPtr name) override { @@ -1781,15 +1785,17 @@ class JsRpcSessionCustomEventImpl::ServerTopLevelMembrane final: public capnp::M kj::Promise JsRpcSessionCustomEventImpl::run( kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) { IoContext& ioctx = incomingRequest->getContext(); incomingRequest->delivered(); auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller(); - capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName, - mapAddRef(incomingRequest->getWorkerTracer())), - kj::refcounted(kj::mv(doneFulfiller)))); + capFulfiller->fulfill( + capnp::membrane(kj::heap(ioctx, entrypointName, kj::mv(props), + mapAddRef(incomingRequest->getWorkerTracer())), + kj::refcounted(kj::mv(doneFulfiller)))); KJ_DEFER({ // waitUntil() should allow extending execution on the server side even when the client diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index e32e2d40517..3cf88f1de16 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -420,6 +420,7 @@ class JsRpcSessionCustomEventImpl final: public WorkerInterface::CustomEvent { kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) override; kj::Promise sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index e7e5f43b876..6248e30f2fa 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -220,6 +220,7 @@ wd_cc_library( hdrs = ["worker-interface.h"], visibility = ["//visibility:public"], deps = [ + ":frankenvalue_capnp", ":worker-interface_capnp", "@capnp-cpp//src/capnp:capnp-rpc", "@capnp-cpp//src/capnp:capnpc", diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index ad8fa6089e5..83c7c5aed8d 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -45,6 +45,7 @@ class WorkerEntrypoint final: public WorkerInterface { static kj::Own construct(ThreadContext& threadContext, kj::Own worker, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> actor, kj::Own limitEnforcer, kj::Own ioContextDependency, @@ -82,6 +83,7 @@ class WorkerEntrypoint final: public WorkerInterface { kj::Maybe> incomingRequest; bool tunnelExceptions; kj::Maybe entrypointName; + Frankenvalue props; kj::Maybe cfBlobJson; // Hacky members used to hold some temporary state while processing a request. @@ -114,6 +116,7 @@ class WorkerEntrypoint final: public WorkerInterface { kj::TaskSet& waitUntilTasks, bool tunnelExceptions, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe cfBlobJson); }; @@ -152,6 +155,7 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo kj::Own WorkerEntrypoint::construct(ThreadContext& threadContext, kj::Own worker, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> actor, kj::Own limitEnforcer, kj::Own ioContextDependency, @@ -171,7 +175,7 @@ kj::Own WorkerEntrypoint::construct(ThreadContext& threadContex threadContext.getEntropySource()); auto obj = kj::heap(kj::Badge(), threadContext, - waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson)); + waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(props), kj::mv(cfBlobJson)); obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer), kj::mv(invocationSpanContext)); @@ -184,11 +188,13 @@ WorkerEntrypoint::WorkerEntrypoint(kj::Badge badge, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe cfBlobJson) : threadContext(threadContext), waitUntilTasks(waitUntilTasks), tunnelExceptions(tunnelExceptions), entrypointName(entrypointName), + props(kj::mv(props)), cfBlobJson(kj::mv(cfBlobJson)) {} void WorkerEntrypoint::init(kj::Own worker, @@ -288,7 +294,8 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); return lock.getGlobalScope().request(method, url, headers, requestBody, wrappedResponse, - cfBlobJson, lock, lock.getExportedHandler(entrypointName, context.getActor())); + cfBlobJson, lock, + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); }) .then([this](api::DeferredProxy deferredProxy) { TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step", @@ -494,14 +501,14 @@ kj::Promise WorkerEntrypoint::runScheduled( } // Scheduled handlers run entirely in waitUntil() tasks. - context.addWaitUntil( - context.run([scheduledTime, cron, entrypointName = entrypointName, &context, - &metrics = incomingRequest->getMetrics()](Worker::Lock& lock) mutable { + context.addWaitUntil(context.run( + [scheduledTime, cron, entrypointName = entrypointName, props = kj::mv(props), &context, + &metrics = incomingRequest->getMetrics()](Worker::Lock& lock) mutable { TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() run"); jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); - lock.getGlobalScope().startScheduled( - scheduledTime, cron, lock, lock.getExportedHandler(entrypointName, context.getActor())); + lock.getGlobalScope().startScheduled(scheduledTime, cron, lock, + lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor())); })); static auto constexpr waitForFinished = [](IoContext& context, @@ -569,7 +576,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( try { auto result = co_await context.run([scheduledTime, retryCount, entrypointName = entrypointName, - &context](Worker::Lock& lock) { + props = kj::mv(props), &context](Worker::Lock& lock) mutable { jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); // If we have an invalid timeout, set it to the default value of 15 minutes. @@ -579,7 +586,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( timeout = 15 * kj::MINUTES; } - auto handler = lock.getExportedHandler(entrypointName, context.getActor()); + auto handler = lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()); return lock.getGlobalScope().runAlarm(scheduledTime, timeout, retryCount, lock, handler); }); @@ -638,15 +645,15 @@ kj::Promise WorkerEntrypoint::test() { auto& context = incomingRequest->getContext(); - context.addWaitUntil(context.run( - [entrypointName = entrypointName, &context, &metrics = incomingRequest->getMetrics()]( - Worker::Lock& lock) mutable -> kj::Promise { + context.addWaitUntil(context.run([entrypointName = entrypointName, props = kj::mv(props), + &context, &metrics = incomingRequest->getMetrics()]( + Worker::Lock& lock) mutable -> kj::Promise { TRACE_EVENT("workerd", "WorkerEntrypoint::test() run"); jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); return context.awaitJs(lock, lock.getGlobalScope().test( - lock, lock.getExportedHandler(entrypointName, context.getActor()))); + lock, lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()))); })); static auto constexpr waitForFinished = @@ -669,8 +676,8 @@ kj::Promise WorkerEntrypoint::customEvent( this->incomingRequest = kj::none; auto& context = incomingRequest->getContext(); - auto promise = - event->run(kj::mv(incomingRequest), entrypointName, waitUntilTasks).attach(kj::mv(event)); + auto promise = event->run(kj::mv(incomingRequest), entrypointName, kj::mv(props), waitUntilTasks) + .attach(kj::mv(event)); // TODO(cleanup): In theory `context` may have been destroyed by now if `event->run()` dropped // the `incomingRequest` synchronously. No current implementation does that, and @@ -720,6 +727,7 @@ kj::Promise WorkerEntrypoint::maybeAddGcPassForTest(IoContext& context, kj::P kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own worker, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> actor, kj::Own limitEnforcer, kj::Own ioContextDependency, @@ -731,9 +739,9 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Maybe cfBlobJson, kj::Maybe maybeTriggerInvocationSpan) { return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName), - kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), - kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson), - kj::mv(maybeTriggerInvocationSpan)); + kj::mv(props), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), + kj::mv(ioChannelFactory), kj::mv(metrics), waitUntilTasks, tunnelExceptions, + kj::mv(workerTracer), kj::mv(cfBlobJson), kj::mv(maybeTriggerInvocationSpan)); } } // namespace workerd diff --git a/src/workerd/io/worker-entrypoint.h b/src/workerd/io/worker-entrypoint.h index a5f4628e736..f6ad7ccc388 100644 --- a/src/workerd/io/worker-entrypoint.h +++ b/src/workerd/io/worker-entrypoint.h @@ -4,6 +4,7 @@ #pragma once +#include #include namespace workerd { @@ -30,6 +31,7 @@ class InvocationSpanContext; kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own worker, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> actor, kj::Own limitEnforcer, kj::Own ioContextDependency, diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index d49d7b61026..8b70643bc11 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -12,6 +12,7 @@ namespace workerd { +class Frankenvalue; class IoContext_IncomingRequest; // An interface representing the services made available by a worker/pipeline to handle a @@ -111,6 +112,7 @@ class WorkerInterface: public kj::HttpService { // for this event. virtual kj::Promise run(kj::Own incomingRequest, kj::Maybe entrypointName, + Frankenvalue props, kj::TaskSet& waitUntilTasks) = 0; // Forward the event over RPC. diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 6ca0c3b3a1e..552467699c5 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -1675,7 +1676,12 @@ Worker::Worker(kj::Own scriptParam, KJ_SWITCH_ONEOF(handler.value) { KJ_CASE_ONEOF(obj, api::ExportedHandler) { obj.env = lock.v8Ref(bindingsScope.As()); - obj.ctx = jsg::alloc(); + // TODO(cleanup): Unfortunately, for non-class-based handlers, we have + // always created only a single `ctx` object and reused it for all + // requests. This is weird and obviously wrong but changing it probably + // requires a compat flag. Until then, connection properties will not be + // available for non-class handlers. + obj.ctx = jsg::alloc(lock); impl->namedHandlers.insert(kj::mv(handler.name), kj::mv(obj)); } @@ -1964,7 +1970,7 @@ static inline kj::Own fakeOwn(T& ref) { } kj::Maybe> Worker::Lock::getExportedHandler( - kj::Maybe name, kj::Maybe actor) { + kj::Maybe name, Frankenvalue props, kj::Maybe actor) { KJ_IF_SOME(a, actor) { KJ_IF_SOME(h, a.getHandler()) { return fakeOwn(h); @@ -1976,8 +1982,8 @@ kj::Maybe> Worker::Lock::getExportedHandler( return fakeOwn(h); } else KJ_IF_SOME(cls, worker.impl->statelessClasses.find(n)) { jsg::Lock& js = *this; - auto handler = kj::heap(cls( - js, jsg::alloc(), KJ_ASSERT_NONNULL(worker.impl->env).addRef(js))); + auto handler = kj::heap(cls(js, jsg::alloc(js, props.toJs(js)), + KJ_ASSERT_NONNULL(worker.impl->env).addRef(js))); // HACK: We set handler.env and handler.ctx to undefined because we already passed the real // env and ctx into the constructor, and we want the handler methods to act like they take diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index ab31be89224..beae2e9d483 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -8,6 +8,7 @@ #include // because we can't forward-declare ActorCache::SharedLru. #include #include +#include #include #include #include @@ -627,10 +628,12 @@ class Worker::Lock { // default handler. Returns null if this is not a modules-syntax worker (but `entrypointName` // must be null in that case). // - // If running in an actor, the name is ignored and the entrypoint originally used to construct - // the actor is returned. + // `props` is the value to place in `ctx.props`. + // + // If running in an actor, the name and props are ignored and the entrypoint originally used to + // construct the actor is returned. kj::Maybe> getExportedHandler( - kj::Maybe entrypointName, kj::Maybe actor); + kj::Maybe entrypointName, Frankenvalue props, kj::Maybe actor); // Get the C++ object representing the global scope. api::ServiceWorkerGlobalScope& getGlobalScope(); diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 51708b3de3c..54c8ea19748 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -3697,6 +3697,53 @@ KJ_TEST("Server: JS RPC over HTTP connections") { conn.httpGet200("/", "got: 35"); } +KJ_TEST("Server: Entrypoint binding with props") { + TestServer test(R"(( + services = [ + ( name = "hello", + worker = ( + compatibilityDate = "2024-02-23", + compatibilityFlags = ["experimental"], + modules = [ + ( name = "main.js", + esModule = + `import {WorkerEntrypoint} from "cloudflare:workers"; + `export default { + ` async fetch(request, env) { + ` return new Response("got: " + await env.MyRpc.getProps()); + ` } + `} + `export class MyRpc extends WorkerEntrypoint { + ` getProps() { return this.ctx.props.foo; } + `} + ) + ], + bindings = [ + ( name = "MyRpc", + service = ( + name = "hello", + entrypoint = "MyRpc", + props = ( + json = `{"foo": 123} + ) + ) + ) + ] + ) + ), + ], + sockets = [ + ( name = "main", address = "test-addr", service = "hello" ), + ] + ))"_kj); + + test.server.allowExperimental(); + test.start(); + + auto conn = test.connect("test-addr"); + conn.httpGet200("/", "got: 123"); +} + // ======================================================================================= // TODO(beta): Test TLS (send and receive) diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 873d6950ac4..9be9dc570ec 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1622,9 +1622,17 @@ class Server::WorkerService final: public Service, } } - kj::Maybe> getEntrypoint(kj::StringPtr name) { - auto& entry = KJ_UNWRAP_OR_RETURN(namedEntrypoints.findEntry(name), kj::none); - return kj::heap(*this, entry.key, entry.value); + kj::Maybe> getEntrypoint( + kj::Maybe name, kj::Maybe propsJson) { + kj::HashSet* handlers; + KJ_IF_SOME(n, name) { + auto& entry = KJ_UNWRAP_OR_RETURN(namedEntrypoints.findEntry(n), kj::none); + name = entry.key; // replace with more-permanent string + handlers = &entry.value; + } else { + handlers = &KJ_UNWRAP_OR_RETURN(defaultEntrypointHandlers, kj::none); + } + return kj::heap(*this, name, propsJson, *handlers); } kj::Array getEntrypointNames() { @@ -1658,7 +1666,7 @@ class Server::WorkerService final: public Service, } kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { - return startRequest(kj::mv(metadata), kj::none); + return startRequest(kj::mv(metadata), kj::none, {}); } bool hasHandler(kj::StringPtr handlerName) override { @@ -1671,6 +1679,7 @@ class Server::WorkerService final: public Service, kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata, kj::Maybe entrypointName, + Frankenvalue props, kj::Maybe> actor = kj::none) { TRACE_EVENT("workerd", "Server::WorkerService::startRequest()"); @@ -1720,7 +1729,7 @@ class Server::WorkerService final: public Service, auto observer = kj::refcounted(kj::addRef(*workerTracer)); return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName, - kj::mv(actor), kj::Own(this, kj::NullDisposer::instance), + kj::mv(props), kj::mv(actor), kj::Own(this, kj::NullDisposer::instance), {}, // ioContextDependency kj::Own(this, kj::NullDisposer::instance), kj::mv(observer), waitUntilTasks, @@ -1996,7 +2005,8 @@ class Server::WorkerService final: public Service, cleanupTask = cleanupLoop(); } - co_return service.startRequest(kj::mv(metadata), className, kj::mv(actor)) + // Actors always have empty `props`, at least for now. + co_return service.startRequest(kj::mv(metadata), className, {}, kj::mv(actor)) .attach(kj::mv(refTracker)); } @@ -2216,14 +2226,20 @@ class Server::WorkerService final: public Service, private: class EntrypointService final: public Service { public: - EntrypointService( - WorkerService& worker, kj::StringPtr entrypoint, kj::HashSet& handlers) + EntrypointService(WorkerService& worker, + kj::Maybe entrypoint, + kj::Maybe propsJson, + kj::HashSet& handlers) : worker(worker), entrypoint(entrypoint), - handlers(handlers) {} + handlers(handlers) { + KJ_IF_SOME(m, propsJson) { + props = Frankenvalue::fromJson(kj::str(m)); + } + } kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { - return worker.startRequest(kj::mv(metadata), entrypoint); + return worker.startRequest(kj::mv(metadata), entrypoint, props.clone()); } bool hasHandler(kj::StringPtr handlerName) override { @@ -2232,8 +2248,9 @@ class Server::WorkerService final: public Service, private: WorkerService& worker; - kj::StringPtr entrypoint; + kj::Maybe entrypoint; kj::HashSet& handlers; + Frankenvalue props; }; ThreadContext& threadContext; @@ -3312,25 +3329,50 @@ kj::Own Server::lookupService( return fakeOwn(*invalidConfigServiceSingleton); }); + kj::Maybe entrypointName; if (designator.hasEntrypoint()) { - kj::StringPtr entrypointName = designator.getEntrypoint(); - if (WorkerService* worker = dynamic_cast(service)) { - KJ_IF_SOME(ep, worker->getEntrypoint(entrypointName)) { - return kj::mv(ep); - } else { - reportConfigError(kj::str(errorContext, " refers to service \"", targetName, - "\" with a named entrypoint \"", entrypointName, "\", but \"", targetName, - "\" has no such named entrypoint.")); - return fakeOwn(*invalidConfigServiceSingleton); - } + entrypointName = designator.getEntrypoint(); + } + + auto propsJson = [&]() -> kj::Maybe { + auto props = designator.getProps(); + switch (props.which()) { + case config::ServiceDesignator::Props::EMPTY: + return kj::none; + case config::ServiceDesignator::Props::JSON: + return props.getJson(); + } + reportConfigError(kj::str(errorContext, + " has unrecognized props type. Was the config compiled with a " + "newer version of the schema?")); + return kj::none; + }(); + + if (WorkerService* worker = dynamic_cast(service)) { + KJ_IF_SOME(ep, worker->getEntrypoint(entrypointName, propsJson)) { + return kj::mv(ep); + } else KJ_IF_SOME(ep, entrypointName) { + reportConfigError(kj::str(errorContext, " refers to service \"", targetName, + "\" with a named entrypoint \"", ep, "\", but \"", targetName, + "\" has no such named entrypoint.")); + return fakeOwn(*invalidConfigServiceSingleton); } else { reportConfigError(kj::str(errorContext, " refers to service \"", targetName, - "\" with a named entrypoint \"", entrypointName, "\", but \"", targetName, - "\" is not a Worker, so does not have any " - "named entrypoints.")); + "\", but does not specify an entrypoint, and the service does not have a " + "default entrypoint.")); return fakeOwn(*invalidConfigServiceSingleton); } } else { + KJ_IF_SOME(ep, entrypointName) { + reportConfigError(kj::str(errorContext, " refers to service \"", targetName, + "\" with a named entrypoint \"", ep, "\", but \"", targetName, + "\" is not a Worker, so does not have any named entrypoints.")); + } else if (propsJson != kj::none) { + reportConfigError(kj::str(errorContext, " refers to service \"", targetName, + "\" and provides a `props` value, but \"", targetName, + "\" is not a Worker, so cannot accept `props`")); + } + return fakeOwn(*service); } } @@ -4072,7 +4114,7 @@ kj::Promise Server::test(jsg::V8System& v8System, if (WorkerService* worker = dynamic_cast(service.value.get())) { for (auto& name: worker->getEntrypointNames()) { if (entrypointGlob.matches(name)) { - kj::Own ep = KJ_ASSERT_NONNULL(worker->getEntrypoint(name)); + kj::Own ep = KJ_ASSERT_NONNULL(worker->getEntrypoint(name, kj::none)); if (ep->hasHandler("test"_kj)) { co_await doTest(*ep, kj::str(service.key, ':', name)); } diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 693a4f2cde0..cbf77bfc4ed 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -192,6 +192,16 @@ struct ServiceDesignator { # `entrypoint` is specified here, it names an alternate entrypoint to use on the target worker, # otherwise the default is used. + props :union { + # Value to provide in `ctx.props` in the target worker. + + empty @2 :Void; + # Empty object. (This is the default.) + + json @3 :Text; + # A JSON-encoded value. + } + # TODO(someday): Options to specify which event types are allowed. # TODO(someday): Allow adding an outgoing middleware stack here (see TODO in Service, above). } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 22aad93051f..8834f5c3123 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -425,7 +425,7 @@ TestFixture::Response TestFixture::runRequest( runInIoContext([&](const TestFixture::Environment& env) { auto& globalScope = env.lock.getGlobalScope(); return globalScope.request(method, url, requestHeaders, *requestBody, response, "{}"_kj, - env.lock, env.lock.getExportedHandler(kj::none, kj::none)); + env.lock, env.lock.getExportedHandler(kj::none, {}, kj::none)); }); return {.statusCode = response.statusCode, .body = response.body->str()};