Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/relay/fly.staging.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
app = "superset-relay-staging"
primary_region = "sjc"
kill_signal = "SIGINT"
kill_timeout = "10s"

[build]
dockerfile = "Dockerfile"
Expand Down
2 changes: 2 additions & 0 deletions apps/relay/fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

app = "superset-relay"
primary_region = "sjc"
kill_signal = "SIGINT"
kill_timeout = "10s"

[build]
dockerfile = "Dockerfile"
Expand Down
32 changes: 32 additions & 0 deletions apps/relay/scripts/deploy-staging.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash
# Deploy the relay to the staging Fly app (superset-relay-staging). Mirrors
# deploy.sh but targets fly.staging.toml + the staging app, so we can iterate
# on multi-region without risking prod. Edit REGIONS to grow the fleet.
set -euo pipefail

APP=superset-relay-staging
REGIONS=(sjc iad fra)
COUNT=${#REGIONS[@]}
REGION_LIST=$(IFS=, ; echo "${REGIONS[*]}")

cd "$(git rev-parse --show-toplevel)"

echo "==> fly deploy ($APP)"
fly deploy \
--config apps/relay/fly.staging.toml \
--dockerfile apps/relay/Dockerfile \
--app "$APP" \
.

echo "==> fly scale count: $COUNT machines, 1 per region across $REGION_LIST"
fly scale count "app=$COUNT" \
--region "$REGION_LIST" \
--max-per-region 1 \
--app "$APP" \
--yes

echo "==> Status"
fly status --app "$APP"

echo "==> Smoke test"
"$(dirname "$0")/smoke-test.sh" "${APP}.fly.dev" "${REGIONS[@]}"
3 changes: 3 additions & 0 deletions apps/relay/scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ fly scale count "app=$COUNT" \

echo "==> Status"
fly status --app "$APP"

echo "==> Smoke test"
"$(dirname "$0")/smoke-test.sh" "${APP}.fly.dev" "${REGIONS[@]}"
45 changes: 45 additions & 0 deletions apps/relay/scripts/smoke-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash
# Post-deploy smoke test for the relay. Hits /health on the public hostname
# with `fly-prefer-region` for every region the fleet is supposed to span,
# then verifies the response 200s with `region` matching the requested
# region — catches partial-deploy failures, missing regions, and machines
# that booted but aren't actually serving.
#
# Usage: smoke-test.sh <hostname> <region> [<region> ...]
# smoke-test.sh superset-relay-staging.fly.dev sjc iad fra
#
# Exits non-zero on any failure so callers (deploy.sh, deploy-staging.sh)
# can halt the pipeline.
set -euo pipefail

if [ $# -lt 2 ]; then
echo "usage: $0 <hostname> <region> [<region> ...]" >&2
exit 64
fi

HOSTNAME="$1"
shift
REGIONS=("$@")

fail=0
for region in "${REGIONS[@]}"; do
printf " %-4s " "$region"
body=$(curl -sS --max-time 8 -H "fly-prefer-region: $region" "https://${HOSTNAME}/health" 2>&1) || {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The smoke test never verifies HTTP status 200, so non-200 /health responses can be treated as success if the body still matches the region.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/relay/scripts/smoke-test.sh, line 27:

<comment>The smoke test never verifies HTTP status 200, so non-200 `/health` responses can be treated as success if the body still matches the region.</comment>

<file context>
@@ -0,0 +1,45 @@
+fail=0
+for region in "${REGIONS[@]}"; do
+  printf "  %-4s " "$region"
+  body=$(curl -sS --max-time 8 -H "fly-prefer-region: $region" "https://${HOSTNAME}/health" 2>&1) || {
+    printf "  ✗ curl failed: %s\n" "$body"
+    fail=$((fail + 1))
</file context>

printf " ✗ curl failed: %s\n" "$body"
fail=$((fail + 1))
continue
}
got=$(printf "%s" "$body" | sed -nE 's/.*"region":"([^"]+)".*/\1/p')
if [ "$got" = "$region" ]; then
printf " ✓ %s\n" "$body"
else
printf " ✗ wanted region=%s, got=%s: %s\n" "$region" "$got" "$body"
fail=$((fail + 1))
fi
done

if [ "$fail" -ne 0 ]; then
echo "==> smoke test FAILED: $fail region(s) did not respond as expected" >&2
exit 1
fi
echo "==> smoke test OK across ${#REGIONS[@]} region(s)"
40 changes: 40 additions & 0 deletions apps/relay/src/directory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ end
return removed
`;

// Called on relay startup. Removes any directory entries the prior process
// generation left behind (SIGKILL / crash / drain race) before we begin
// accepting connections. The owner value includes the Fly machineId, which
// stays the same across restarts of a given VM — so any pre-existing entry
// with our owner string is necessarily stale.
//
// Batched as a single Lua eval so startup time stays bounded at one Redis
// round-trip regardless of how many tunnels the previous generation owned;
// per-host serial unregister calls would scale with directory size and eat
// directly into deploy recovery.
const CLEAR_STALE_SCRIPT = `
local owner = ARGV[1]
local entries = redis.call('HGETALL', KEYS[1])
local cleared = 0
for i = 1, #entries, 2 do
local hostId = entries[i]
local current = entries[i + 1]
if current == owner then
redis.call('HDEL', KEYS[1], hostId)
redis.call('HDEL', KEYS[2], hostId)
redis.call('ZREM', KEYS[3], hostId)
cleared = cleared + 1
end
end
return cleared
`;

export async function clearStaleEntriesForMachine(
region: string,
machineId: string,
): Promise<number> {
const myOwner = encodeOwner(region, machineId);
const result = await redis.eval(
CLEAR_STALE_SCRIPT,
[OWNER_KEY, META_KEY, TTL_KEY],
[myOwner],
);
return typeof result === "number" ? result : 0;
}

export async function sweepStale(): Promise<number> {
const now = Date.now();
const result = await redis.eval(
Expand Down
67 changes: 66 additions & 1 deletion apps/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,43 @@ const app = new Hono<AppContext>();
const tunnelManager = new TunnelManager();
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app });

// Graceful drain on Fly's pre-stop signal. Fly's init sends SIGINT (not
// SIGTERM) to the main process during rolling deploys; listen on both to
// also cover hand-rolled `fly machine stop` and local Ctrl-C. Without this,
// deploys kill the process while tunnels are open and host-services see
// TCP-RST'd sockets, triggering their long exponential backoff.
//
// Sequence: stop accepting new TCP connections (server.close), then close
// every open tunnel with the app-defined drain code so hosts reconnect
// promptly, and clear this machine's directory ownership before process exit.
// server is assigned at the bottom of this file — by signal time, the closure
// has it.
let server: ReturnType<typeof serve> | null = null;
let draining = false;
const handleDrain = async (signal: string) => {
if (draining) return;
draining = true;
console.log(`[relay] ${signal} received, draining tunnels`);
try {
server?.close();
const cleared = await tunnelManager.drain({
clearDirectory: () =>
directory.clearStaleEntriesForMachine(
env.FLY_REGION,
env.FLY_MACHINE_ID,
),
});
if (cleared > 0) {
console.log(`[relay] cleared ${cleared} directory entries during drain`);
}
} catch (err) {
console.error("[relay] drain failed", err);
}
process.exit(0);
};
process.on("SIGINT", () => void handleDrain("SIGINT"));
process.on("SIGTERM", () => void handleDrain("SIGTERM"));

app.use("*", redactingLogger);
app.use("*", cors());

Expand Down Expand Up @@ -144,6 +181,11 @@ app.get(

return {
onOpen: async (_event, ws) => {
if (draining) {
ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy");
return;
}

if (!hostId || !token) {
ws.close(1008, "Missing hostId or token");
return;
Expand All @@ -161,6 +203,11 @@ app.get(
return;
}

if (draining) {
ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy");
return;
}

await tunnelManager.register(hostId, token, ws);
// register closes ws itself on directory failure; only mark
// authorized if the socket is still usable.
Expand Down Expand Up @@ -319,7 +366,25 @@ if (env.RELAY_SYNTHETIC_JWT) {

// ── Start ───────────────────────────────────────────────────────────

const server = serve({ fetch: app.fetch, port: env.RELAY_PORT }, (info) => {
// Clear any directory entries our previous process generation left behind
// (SIGKILL, drain race, etc.) before we begin accepting connections, so
// fly-replay doesn't route cross-region requests at us for tunnels we no
// longer have. Best-effort: relay still boots if Upstash is unreachable.
try {
const cleared = await directory.clearStaleEntriesForMachine(
env.FLY_REGION,
env.FLY_MACHINE_ID,
);
if (cleared > 0) {
console.log(
`[relay] cleared ${cleared} stale directory entries on startup`,
);
}
} catch (err) {
console.error("[relay] startup cleanup failed", err);
}

server = serve({ fetch: app.fetch, port: env.RELAY_PORT }, (info) => {
console.log(
`[relay] listening on http://localhost:${info.port} (region=${env.FLY_REGION} machine=${env.FLY_MACHINE_ID})`,
);
Expand Down
90 changes: 87 additions & 3 deletions apps/relay/src/tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ interface TunnelState {
missedPings: number;
}

interface DrainOptions {
reason?: string;
clearDirectory: () => Promise<number>;
}

export class TunnelManager {
private readonly tunnels = new Map<string, TunnelState>();
private readonly requestTimeoutMs: number;
Expand All @@ -42,12 +47,18 @@ export class TunnelManager {
ReturnType<typeof setTimeout>
>();
private readonly onlineWriteVersions = new Map<string, number>();
private draining = false;

constructor(requestTimeoutMs = 30_000) {
this.requestTimeoutMs = requestTimeoutMs;
}

async register(hostId: string, token: string, ws: WsSocket): Promise<void> {
if (this.draining) {
ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy");
return;
}

// Last-write-wins: close the old socket so flaky clients don't get
// stuck behind a dead-but-not-yet-detected WS.
const existing = this.tunnels.get(hostId);
Expand All @@ -73,12 +84,15 @@ export class TunnelManager {
// hadn't returned yet), so it skipped unregister. Roll the directory
// entry back ourselves; otherwise other machines fly-replay traffic
// to a dead local tunnel for ~90s until the TTL ages out.
if (ws.readyState !== 1) {
if (ws.readyState !== 1 || this.draining) {
await directory
.unregister(hostId, env.FLY_REGION, env.FLY_MACHINE_ID)
.catch((err) => {
console.error("[relay] directory.unregister rollback failed", err);
});
if (this.draining) {
ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy");
}
return;
}

Expand Down Expand Up @@ -159,7 +173,77 @@ export class TunnelManager {
console.log(`[relay] tunnel unregistered: ${hostId}`);
}

private disposeTunnel(tunnel: TunnelState, reason: string): void {
// Application-defined WS close code (4xxx range) signaling "relay is
// draining for a deploy — reconnect immediately." Distinct from 1001
// ("Going Away") which the ping-timeout / dispose paths use; the host
// resets its backoff only on this code, so a mass ping-timeout doesn't
// trigger a thundering-herd of fast reconnects.
static readonly WS_CLOSE_DRAIN = 4001;

// SIGTERM-driven graceful drain. Closes every open tunnel with the
// app-defined "drain" close code so the host-service can recognize this
// as a deploy drain (not a hard disconnect or ping timeout) and
// reconnect immediately. Called from the SIGINT/SIGTERM handler in
// index.ts.
//
// Owns directory cleanup directly instead of relying on websocket close
// callbacks. The process exits immediately after drain, so fire-and-forget
// unregister work from onClose is not a reliable shutdown primitive.
async drain(options: DrainOptions): Promise<number> {
this.draining = true;
const reason = options.reason ?? "Server draining for deploy";
const tunnels = Array.from(this.tunnels.values());
console.log(`[relay] draining ${tunnels.length} tunnels`);
// In-band drain signal first: send a JSON {type:"drain"} message on
// the WS message channel before closing. Game-day testing showed
// the WS close frame doesn't reliably reach the host within Fly's
// kill_timeout window (host's TCP socket sees ESTABLISHED with no
// onclose for 75+ seconds, until its watchdog fires). The message
// channel is already exercised by ping/pong every 30s, so we know
// it flushes cleanly. Host's onmessage handler triggers a clean
// reconnect on receipt; the WS close after is just belt-and-
// suspenders.
for (const tunnel of tunnels) {
try {
this.send(tunnel.ws, { type: "drain", reason });
} catch {
// best-effort
}
}
// Give the message frames a moment to reach the host before we
// start closing the underlying sockets.
await new Promise((resolve) => setTimeout(resolve, 500));

let cleared = 0;
let clearError: unknown;
try {
cleared = await options.clearDirectory();
} catch (err) {
clearError = err;
}

for (const tunnel of tunnels) {
this.disposeTunnel(tunnel, reason, TunnelManager.WS_CLOSE_DRAIN);
this.tunnels.delete(tunnel.hostId);
}

// Brief tail wait so the close-handshake gets a chance to complete
// before the process exits and RSTs the underlying TCP.
const WS_CLOSED = 3;
const deadline = Date.now() + 1_500;
while (Date.now() < deadline) {
if (tunnels.every((t) => t.ws.readyState === WS_CLOSED)) break;
await new Promise((resolve) => setTimeout(resolve, 50));
}
if (clearError) throw clearError;
return cleared;
}

private disposeTunnel(
tunnel: TunnelState,
reason: string,
tunnelCloseCode = 1000,
): void {
if (tunnel.pingTimer) clearInterval(tunnel.pingTimer);

for (const [, pending] of tunnel.pendingRequests) {
Expand All @@ -172,7 +256,7 @@ export class TunnelManager {
}

try {
tunnel.ws.close(1000, reason);
tunnel.ws.close(tunnelCloseCode, reason);
} catch {
// already closed
}
Expand Down
Loading
Loading