From 2c56853f2f982ac089c059761f13ce5b0b7f8295 Mon Sep 17 00:00:00 2001 From: Dan Lapid Date: Fri, 1 Nov 2024 13:38:41 +0000 Subject: [PATCH] Add FuturePromisedWorkerInterface This is a new WorkerInterface class that will be used in the internal PR, see comments to understand what it does. --- src/workerd/io/worker-interface.h | 99 +++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index 0048cdb3d6a..916030a5d6a 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -151,6 +151,105 @@ class WorkerInterface: public kj::HttpService { // for the promise, then invoke the destination object. kj::Own newPromisedWorkerInterface(kj::Promise> promise); +template +class LazyWorkerInterface final: public WorkerInterface { +public: + LazyWorkerInterface(Func func): func(kj::mv(func)) {} + + void ensureResolve() { + if (promise == kj::none) { + promise = KJ_ASSERT_NONNULL(func)() + .then([this](kj::Own result) { worker = kj::mv(result); }) + .eagerlyEvaluate(nullptr) + .fork(); + func = kj::none; + } + } + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_await w->request(method, url, headers, requestBody, response); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_await KJ_ASSERT_NONNULL(worker)->request(method, url, headers, requestBody, response); + } + } + + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_await w->connect(host, headers, connection, response, kj::mv(settings)); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_await KJ_ASSERT_NONNULL(worker)->connect( + host, headers, connection, response, kj::mv(settings)); + } + } + + kj::Promise prewarm(kj::StringPtr url) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_return co_await w->prewarm(url); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url); + } + } + + kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_return co_await w->runScheduled(scheduledTime, cron); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->runScheduled(scheduledTime, cron); + } + } + + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_return co_await w->runAlarm(scheduledTime, retryCount); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, retryCount); + } + } + + kj::Promise customEvent(kj::Own event) override { + ensureResolve(); + KJ_IF_SOME(w, worker) { + co_return co_await w->customEvent(kj::mv(event)); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->customEvent(kj::mv(event)); + } + } + +private: + kj::Maybe func; + kj::Maybe> promise; + kj::Maybe> worker; +}; +// Similar to newPromisedWorkerInterface but receives a function that returns a Promise for a +// WorkerInterface. This is useful when you are not sure if the worker will be used or not and +// you don't want it to be created in case it isn't used. If you just create a +// PromisedWorkerInterface then the async loop might run the promise before it is eventually +// destroyed even if it was never used. +template +kj::Own newLazyWorkerInterface(Func func) { + return kj::heap>(kj::mv(func)); +} + // Adapts WorkerInterface to HttpClient, including taking ownership. // // (Use kj::newHttpClient() if you don't want to take ownership.)