Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/workerd/api/hyperdrive.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#include "hyperdrive.h"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how much it really matters, but this file is missing a copyright header.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really just have a github action that checks this, adds the headers, and opens an automated pr adding them.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we still need to add a copyright header here

#include <openssl/rand.h>
#include <cstdint>
#include <kj/compat/http.h>
#include <kj/encoding.h>
#include "sockets.h"
#include "global-scope.h"
#include <kj/string.h>
#include <workerd/util/uuid.h>

namespace workerd::api {
Hyperdrive::Hyperdrive(uint clientIndex, kj::String database,
kj::String user, kj::String password, kj::String scheme)
: clientIndex(clientIndex), database(kj::mv(database)),
user(kj::mv(user)), password(kj::mv(password)), scheme(kj::mv(scheme)) {
kj::byte randomBytes[16];
KJ_ASSERT(RAND_bytes(randomBytes, sizeof(randomBytes)) == 1);
randomHost = kj::str(kj::encodeHex(randomBytes), ".hyperdrive.local");
}

jsg::Ref<Socket> Hyperdrive::connect(jsg::Lock& js) {
auto connPromise = connectToDb();

auto paf = kj::newPromiseAndFulfiller<kj::Maybe<kj::Exception>>();
auto conn = kj::newPromisedStream(connPromise.then(
[&f = *paf.fulfiller](kj::Own<kj::AsyncIoStream> stream) {
f.fulfill(kj::none);
return kj::mv(stream);
}, [&f = *paf.fulfiller](kj::Exception e) {
KJ_LOG(WARNING, "failed to connect to local database", e);
f.fulfill(kj::cp(e));
return kj::mv(e);
}).attach(kj::mv(paf.fulfiller)));

// TODO(someday): Support TLS? It's not at all necessary since we're connecting locally, but
// some users may want it anyway.
auto nullTlsStarter = kj::heap<kj::TlsStarterCallback>();
auto sock = setupSocket(js, kj::mv(conn), kj::none, kj::mv(nullTlsStarter),
false, kj::str(this->randomHost), false);
sock->handleProxyStatus(js, kj::mv(paf.promise));
return sock;
}

kj::StringPtr Hyperdrive::getDatabase() {
return this->database;
}

kj::StringPtr Hyperdrive::getUser() {
return this->user;
}
kj::StringPtr Hyperdrive::getPassword() {
return this->password;
}

kj::StringPtr Hyperdrive::getScheme() {
return this->scheme;
}

kj::StringPtr Hyperdrive::getHost() {
if (!registeredConnectOverride) {
IoContext::current().getCurrentLock().getWorker().setConnectOverride(
kj::str(this->randomHost, ":", getPort()), KJ_BIND_METHOD(*this, connect));
registeredConnectOverride = true;
}
return this->randomHost;
}

// Always returns the default postgres port
uint16_t Hyperdrive::getPort() {
return 5432;
Comment thread
jasnell marked this conversation as resolved.
}

kj::String Hyperdrive::getConnectionString() {
return kj::str(getScheme(), "://", getUser(), ":", getPassword(), "@", getHost(), ":", getPort(),
"/", getDatabase(), "?sslmode=disable");
}
Comment on lines +77 to +80
Copy link
Copy Markdown
Collaborator

@thomasgauvin thomasgauvin Aug 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmthecoder @a-robinson this hardcodes sslmode=disable. This prevents connection to remote databases such as Azure, Neon, etc. when developing locally. Can we have a way to connect to these databases which needs sslmode=require, while retaining the ability to connect to local Postgres with does need sslmode=disable?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a way to connect to these databases which needs sslmode=require, while retaining the ability to connect to local Postgres with does need sslmode=disable?

If you want to support remote databases that require SSL but also local DBs that don't support it, it sounds like what we want for local mode is sslmode=prefer, which allows SSL but doesn't require it.

It should be possible to vary the default sslmode in workerd compared to on the edge, but I'd suggest bringing it up internally either in chat or as a ticket so we don't lose track of it.

And I wouldn't expect Tejas to respond given that his internship ended last year :)


kj::Promise<kj::Own<kj::AsyncIoStream>> Hyperdrive::connectToDb() {
auto& context = IoContext::current();
auto service = context.getSubrequestChannel(this->clientIndex,
true, kj::none, "hyperdrive_dev"_kjc);

kj::HttpHeaderTable headerTable;
kj::HttpHeaders headers(headerTable);

auto connectReq = kj::newHttpClient(*service)->connect(
kj::str(getHost(), ":", getPort()), headers, kj::HttpConnectSettings{});

auto status = co_await connectReq.status;

if (status.statusCode >= 200 && status.statusCode < 300) {
co_return kj::mv(connectReq.connection);
}

KJ_IF_SOME(e, status.errorBody) {
try {
auto errorBody = co_await e->readAllText();
kj::throwFatalException(KJ_EXCEPTION(
FAILED, kj::str("unexpected error connecting to database: ", errorBody)));
} catch (const kj::Exception& e) {
kj::throwFatalException(
KJ_EXCEPTION(FAILED, kj::str("unexpected error connecting to database "
"and couldn't read error details: ", e)));
}
}
else {
kj::throwFatalException(
KJ_EXCEPTION(FAILED, kj::str("unexpected error connecting to database: ",
status.statusText)));
}
}
} // namespace workerd::api::public_beta
62 changes: 62 additions & 0 deletions src/workerd/api/hyperdrive.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once

#include "streams.h"
#include <capnp/compat/json.h>
#include <cstdint>
#include <kj/common.h>
#include <kj/async-io.h>
#include <workerd/jsg/jsg.h>
#include <workerd/util/http-util.h>

namespace workerd::api {

// A Hyperdrive resource for development integrations.
//
// Provides the same interface as Hyperdrive while sending connection
// traffic directly to postgres
class Hyperdrive : public jsg::Object {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing the same class name both here and in the internal repo had me worried, but I guess it works judging by the CI on your internal PR ¯\(ツ)

public:
// `clientIndex` is what to pass to IoContext::getHttpClient() to get an HttpClient
// representing this namespace.
explicit Hyperdrive(uint clientIndex, kj::String database,
kj::String user, kj::String password, kj::String scheme);
jsg::Ref<Socket> connect(jsg::Lock& js);
kj::StringPtr getDatabase();
kj::StringPtr getUser();
kj::StringPtr getPassword();
kj::StringPtr getScheme();

kj::StringPtr getHost();
uint16_t getPort();

kj::String getConnectionString();

JSG_RESOURCE_TYPE(Hyperdrive, CompatibilityFlags::Reader flags) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(database, getDatabase);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(user, getUser);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(password, getPassword);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(host, getHost);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(port, getPort);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(connectionString, getConnectionString);

JSG_METHOD(connect);
}

private:
uint clientIndex;
kj::String randomHost;
kj::String database;
kj::String user;
kj::String password;
kj::String scheme;
bool registeredConnectOverride = false;

kj::Promise<kj::Own<kj::AsyncIoStream>> connectToDb();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit, but it'd be nice to put an empty line in between the class's member variables and any private functions just to more cleanly visually separate them. When scanning over this I initially was trying to figure out why we needed a connectToDb variable before I stopped to take a closer look.

};
#define EW_HYPERDRIVE_ISOLATE_TYPES \
api::Hyperdrive
} // namespace workerd::api
22 changes: 22 additions & 0 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,10 @@ KJ_TEST("Server: capability bindings") {
` items.push(await (await env.r2.get("baz")).text());
` await env.queue.send("hello");
` items.push("Hello from Queue\n");
` const connection = await env.hyperdrive.connect();
` const encoded = new TextEncoder().encode("hyperdrive-test");
` await connection.writable.getWriter().write(new Uint8Array(encoded));
` items.push(`Hello from Hyperdrive(${env.hyperdrive.user})\n`);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test that actually calls connect()? Or is that too difficult?

Have you at least confirmed locally that it's able to successfully connect to a real DB?

` return new Response(items.join(""));
` }
`}
Expand All @@ -1030,6 +1034,15 @@ KJ_TEST("Server: capability bindings") {
),
( name = "queue",
queue = "queue-outbound"
),
( name = "hyperdrive",
hyperdrive = (
designator = "hyperdrive-outbound",
database = "test-db",
user = "test-user",
password = "test-password",
scheme = "postgresql"
)
)
]
)
Expand All @@ -1038,6 +1051,10 @@ KJ_TEST("Server: capability bindings") {
( name = "kv-outbound", external = "kv-host" ),
( name = "r2-outbound", external = "r2-host" ),
( name = "queue-outbound", external = "queue-host" ),
( name = "hyperdrive-outbound", external = (
address = "hyperdrive-host",
tcp = ()
))
],
sockets = [
( name = "main",
Expand Down Expand Up @@ -1119,11 +1136,16 @@ KJ_TEST("Server: capability bindings") {
)"_blockquote);
}

{
auto subreq = test.receiveSubrequest("hyperdrive-host");
subreq.recv("hyperdrive-test");
}
conn.recvHttp200(R"(
Hello from HTTP
Hello from KV
Hello from R2
Hello from Queue
Hello from Hyperdrive(test-user)
)"_blockquote);
}

Expand Down
86 changes: 86 additions & 0 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,65 @@ private:
kj::Maybe<kj::Own<kj::NetworkAddress>> addr;
};

class Server::ExternalTcpService final: public Service, private WorkerInterface {
public:
ExternalTcpService(kj::Own<kj::NetworkAddress> addrParam)
: addr(kj::mv(addrParam)) {}

kj::Own<WorkerInterface> startRequest(IoChannelFactory::SubrequestMetadata metadata) override {
return { this, kj::NullDisposer::instance };
}

bool hasHandler(kj::StringPtr handlerName) override {
return handlerName == "fetch"_kj || handlerName == "connect"_kj;
}

private:
kj::Own<kj::NetworkAddress> addr;

kj::Promise<void> request(
kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, kj::HttpService::Response& response) override {
throwUnsupported();
}

kj::Promise<void> connect(
kj::StringPtr host, const kj::HttpHeaders& headers, kj::AsyncIoStream& connection,
ConnectResponse& tunnel, kj::HttpConnectSettings settings) override {
auto io_stream = co_await addr->connect();

auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);

promises.add(connection.pumpTo(*io_stream).then([&io_stream=*io_stream](uint64_t size) {
io_stream.shutdownWrite();
}));

promises.add(io_stream->pumpTo(connection).then([&connection](uint64_t size) {
connection.shutdownWrite();
}));

tunnel.accept(200, "OK", kj::HttpHeaders(kj::HttpHeaderTable{}));

co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(io_stream));
}

void prewarm(kj::StringPtr url) override {}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime) override {
throwUnsupported();
}
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
throwUnsupported();
}

[[noreturn]] void throwUnsupported() {
JSG_FAIL_REQUIRE(Error, "External TCP servers don't support this event type.");
}

};

// Service used when the service is configured as external HTTP service.
class Server::ExternalHttpService final: public Service {
public:
Expand Down Expand Up @@ -637,6 +696,19 @@ kj::Own<Server::Service> Server::makeExternalService(
return kj::heap<ExternalHttpService>(
kj::mv(addr), kj::mv(rewriter), globalContext->headerTable, timer, entropySource);
}
case config::ExternalServer::TCP: {
auto tcpConf = conf.getTcp();
auto addr = kj::heap<PromisedNetworkAddress>(network.parseAddress(addrStr, 80));
if (tcpConf.hasTlsOptions()) {
kj::Maybe<kj::StringPtr> certificateHost;
if (tcpConf.hasCertificateHost()) {
certificateHost = tcpConf.getCertificateHost();
}
addr = kj::heap<PromisedNetworkAddress>(
makeTlsNetworkAddress(tcpConf.getTlsOptions(), addrStr, certificateHost, 0));
}
return kj::heap<ExternalTcpService>(kj::mv(addr));
}
}
reportConfigError(kj::str(
"External service named \"", name, "\" has unrecognized protocol. Was the config "
Expand Down Expand Up @@ -2064,6 +2136,20 @@ static kj::Maybe<WorkerdApiIsolate::Global> createBinding(
.version = 0,
});
}
case config::Worker::Binding::HYPERDRIVE: {
uint channel = (uint)subrequestChannels.size() + IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT;
subrequestChannels.add(FutureSubrequestChannel {
binding.getHyperdrive().getDesignator(),
kj::mv(errorContext)
});
return makeGlobal(Global::Hyperdrive{
.subrequestChannel = channel,
.database = kj::str(binding.getHyperdrive().getDatabase()),
.user = kj::str(binding.getHyperdrive().getUser()),
.password = kj::str(binding.getHyperdrive().getPassword()),
.scheme = kj::str(binding.getHyperdrive().getScheme()),
});
}
}
errorReporter.addError(kj::str(
errorContext, "has unrecognized type. Was the config compiled with a newer version of "
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class Server: private kj::TaskSet::ErrorHandler {

class InvalidConfigService;
class ExternalHttpService;
class ExternalTcpService;
class NetworkService;
class DiskDirectoryService;
class WorkerService;
Expand Down
11 changes: 11 additions & 0 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <workerd/api/encoding.h>
#include <workerd/api/global-scope.h>
#include <workerd/api/html-rewriter.h>
#include <workerd/api/hyperdrive.h>
#include <workerd/api/kv.h>
#include <workerd/api/modules.h>
#include <workerd/api/queue.h>
Expand Down Expand Up @@ -83,6 +84,7 @@ JSG_DECLARE_ISOLATE_TYPE(JsgWorkerdIsolate,
EW_SQL_ISOLATE_TYPES,
EW_NODE_ISOLATE_TYPES,
EW_RTTI_ISOLATE_TYPES,
EW_HYPERDRIVE_ISOLATE_TYPES,
#ifdef WORKERD_EXPERIMENTAL_ENABLE_WEBGPU
EW_WEBGPU_ISOLATE_TYPES,
#endif
Expand Down Expand Up @@ -615,6 +617,12 @@ static v8::Local<v8::Value> createBindingValue(
KJ_LOG(ERROR, "wrapped binding module can't be resolved (internal modules only)", moduleName);
}
}
KJ_CASE_ONEOF(hyperdrive, Global::Hyperdrive) {
value = lock.wrap(context, jsg::alloc<api::Hyperdrive>(
hyperdrive.subrequestChannel, kj::str(hyperdrive.database),
kj::str(hyperdrive.user), kj::str(hyperdrive.password),
kj::str(hyperdrive.scheme)));
}
}

return value;
Expand Down Expand Up @@ -685,6 +693,9 @@ WorkerdApiIsolate::Global WorkerdApiIsolate::Global::clone() const {
KJ_CASE_ONEOF(wrapped, Global::Wrapped) {
result.value = wrapped.clone();
}
KJ_CASE_ONEOF(hyperdrive, Global::Hyperdrive) {
result.value = hyperdrive.clone();
}
}

return result;
Expand Down
Loading