diff --git a/apps/relay/fly.staging.toml b/apps/relay/fly.staging.toml index dfa5e237386..f575a1f5af0 100644 --- a/apps/relay/fly.staging.toml +++ b/apps/relay/fly.staging.toml @@ -1,5 +1,7 @@ app = "superset-relay-staging" primary_region = "sjc" +kill_signal = "SIGINT" +kill_timeout = "10s" [build] dockerfile = "Dockerfile" diff --git a/apps/relay/fly.toml b/apps/relay/fly.toml index ebb1c461fda..1dba7964739 100644 --- a/apps/relay/fly.toml +++ b/apps/relay/fly.toml @@ -6,6 +6,8 @@ app = "superset-relay" primary_region = "sjc" +kill_signal = "SIGINT" +kill_timeout = "10s" [build] dockerfile = "Dockerfile" diff --git a/apps/relay/scripts/deploy-staging.sh b/apps/relay/scripts/deploy-staging.sh new file mode 100755 index 00000000000..76a39e282cc --- /dev/null +++ b/apps/relay/scripts/deploy-staging.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Deploy the relay to the staging Fly app (superset-relay-staging). Mirrors +# deploy.sh but targets fly.staging.toml + the staging app, so we can iterate +# on multi-region without risking prod. Edit REGIONS to grow the fleet. +set -euo pipefail + +APP=superset-relay-staging +REGIONS=(sjc iad fra) +COUNT=${#REGIONS[@]} +REGION_LIST=$(IFS=, ; echo "${REGIONS[*]}") + +cd "$(git rev-parse --show-toplevel)" + +echo "==> fly deploy ($APP)" +fly deploy \ + --config apps/relay/fly.staging.toml \ + --dockerfile apps/relay/Dockerfile \ + --app "$APP" \ + . + +echo "==> fly scale count: $COUNT machines, 1 per region across $REGION_LIST" +fly scale count "app=$COUNT" \ + --region "$REGION_LIST" \ + --max-per-region 1 \ + --app "$APP" \ + --yes + +echo "==> Status" +fly status --app "$APP" + +echo "==> Smoke test" +"$(dirname "$0")/smoke-test.sh" "${APP}.fly.dev" "${REGIONS[@]}" diff --git a/apps/relay/scripts/deploy.sh b/apps/relay/scripts/deploy.sh index c7e72da9c5b..483f39921ad 100755 --- a/apps/relay/scripts/deploy.sh +++ b/apps/relay/scripts/deploy.sh @@ -24,3 +24,6 @@ fly scale count "app=$COUNT" \ echo "==> Status" fly status --app "$APP" + +echo "==> Smoke test" +"$(dirname "$0")/smoke-test.sh" "${APP}.fly.dev" "${REGIONS[@]}" diff --git a/apps/relay/scripts/smoke-test.sh b/apps/relay/scripts/smoke-test.sh new file mode 100755 index 00000000000..6b1c6abdc07 --- /dev/null +++ b/apps/relay/scripts/smoke-test.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# Post-deploy smoke test for the relay. Hits /health on the public hostname +# with `fly-prefer-region` for every region the fleet is supposed to span, +# then verifies the response 200s with `region` matching the requested +# region — catches partial-deploy failures, missing regions, and machines +# that booted but aren't actually serving. +# +# Usage: smoke-test.sh [ ...] +# smoke-test.sh superset-relay-staging.fly.dev sjc iad fra +# +# Exits non-zero on any failure so callers (deploy.sh, deploy-staging.sh) +# can halt the pipeline. +set -euo pipefail + +if [ $# -lt 2 ]; then + echo "usage: $0 [ ...]" >&2 + exit 64 +fi + +HOSTNAME="$1" +shift +REGIONS=("$@") + +fail=0 +for region in "${REGIONS[@]}"; do + printf " %-4s " "$region" + body=$(curl -sS --max-time 8 -H "fly-prefer-region: $region" "https://${HOSTNAME}/health" 2>&1) || { + printf " ✗ curl failed: %s\n" "$body" + fail=$((fail + 1)) + continue + } + got=$(printf "%s" "$body" | sed -nE 's/.*"region":"([^"]+)".*/\1/p') + if [ "$got" = "$region" ]; then + printf " ✓ %s\n" "$body" + else + printf " ✗ wanted region=%s, got=%s: %s\n" "$region" "$got" "$body" + fail=$((fail + 1)) + fi +done + +if [ "$fail" -ne 0 ]; then + echo "==> smoke test FAILED: $fail region(s) did not respond as expected" >&2 + exit 1 +fi +echo "==> smoke test OK across ${#REGIONS[@]} region(s)" diff --git a/apps/relay/src/directory.ts b/apps/relay/src/directory.ts index 4e780eb250c..25081cbcd2f 100644 --- a/apps/relay/src/directory.ts +++ b/apps/relay/src/directory.ts @@ -131,6 +131,46 @@ end return removed `; +// Called on relay startup. Removes any directory entries the prior process +// generation left behind (SIGKILL / crash / drain race) before we begin +// accepting connections. The owner value includes the Fly machineId, which +// stays the same across restarts of a given VM — so any pre-existing entry +// with our owner string is necessarily stale. +// +// Batched as a single Lua eval so startup time stays bounded at one Redis +// round-trip regardless of how many tunnels the previous generation owned; +// per-host serial unregister calls would scale with directory size and eat +// directly into deploy recovery. +const CLEAR_STALE_SCRIPT = ` +local owner = ARGV[1] +local entries = redis.call('HGETALL', KEYS[1]) +local cleared = 0 +for i = 1, #entries, 2 do + local hostId = entries[i] + local current = entries[i + 1] + if current == owner then + redis.call('HDEL', KEYS[1], hostId) + redis.call('HDEL', KEYS[2], hostId) + redis.call('ZREM', KEYS[3], hostId) + cleared = cleared + 1 + end +end +return cleared +`; + +export async function clearStaleEntriesForMachine( + region: string, + machineId: string, +): Promise { + const myOwner = encodeOwner(region, machineId); + const result = await redis.eval( + CLEAR_STALE_SCRIPT, + [OWNER_KEY, META_KEY, TTL_KEY], + [myOwner], + ); + return typeof result === "number" ? result : 0; +} + export async function sweepStale(): Promise { const now = Date.now(); const result = await redis.eval( diff --git a/apps/relay/src/index.ts b/apps/relay/src/index.ts index 8c4c7d03859..9444f59c702 100644 --- a/apps/relay/src/index.ts +++ b/apps/relay/src/index.ts @@ -47,6 +47,43 @@ const app = new Hono(); const tunnelManager = new TunnelManager(); const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); +// Graceful drain on Fly's pre-stop signal. Fly's init sends SIGINT (not +// SIGTERM) to the main process during rolling deploys; listen on both to +// also cover hand-rolled `fly machine stop` and local Ctrl-C. Without this, +// deploys kill the process while tunnels are open and host-services see +// TCP-RST'd sockets, triggering their long exponential backoff. +// +// Sequence: stop accepting new TCP connections (server.close), then close +// every open tunnel with the app-defined drain code so hosts reconnect +// promptly, and clear this machine's directory ownership before process exit. +// server is assigned at the bottom of this file — by signal time, the closure +// has it. +let server: ReturnType | null = null; +let draining = false; +const handleDrain = async (signal: string) => { + if (draining) return; + draining = true; + console.log(`[relay] ${signal} received, draining tunnels`); + try { + server?.close(); + const cleared = await tunnelManager.drain({ + clearDirectory: () => + directory.clearStaleEntriesForMachine( + env.FLY_REGION, + env.FLY_MACHINE_ID, + ), + }); + if (cleared > 0) { + console.log(`[relay] cleared ${cleared} directory entries during drain`); + } + } catch (err) { + console.error("[relay] drain failed", err); + } + process.exit(0); +}; +process.on("SIGINT", () => void handleDrain("SIGINT")); +process.on("SIGTERM", () => void handleDrain("SIGTERM")); + app.use("*", redactingLogger); app.use("*", cors()); @@ -144,6 +181,11 @@ app.get( return { onOpen: async (_event, ws) => { + if (draining) { + ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy"); + return; + } + if (!hostId || !token) { ws.close(1008, "Missing hostId or token"); return; @@ -161,6 +203,11 @@ app.get( return; } + if (draining) { + ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy"); + return; + } + await tunnelManager.register(hostId, token, ws); // register closes ws itself on directory failure; only mark // authorized if the socket is still usable. @@ -319,7 +366,25 @@ if (env.RELAY_SYNTHETIC_JWT) { // ── Start ─────────────────────────────────────────────────────────── -const server = serve({ fetch: app.fetch, port: env.RELAY_PORT }, (info) => { +// Clear any directory entries our previous process generation left behind +// (SIGKILL, drain race, etc.) before we begin accepting connections, so +// fly-replay doesn't route cross-region requests at us for tunnels we no +// longer have. Best-effort: relay still boots if Upstash is unreachable. +try { + const cleared = await directory.clearStaleEntriesForMachine( + env.FLY_REGION, + env.FLY_MACHINE_ID, + ); + if (cleared > 0) { + console.log( + `[relay] cleared ${cleared} stale directory entries on startup`, + ); + } +} catch (err) { + console.error("[relay] startup cleanup failed", err); +} + +server = serve({ fetch: app.fetch, port: env.RELAY_PORT }, (info) => { console.log( `[relay] listening on http://localhost:${info.port} (region=${env.FLY_REGION} machine=${env.FLY_MACHINE_ID})`, ); diff --git a/apps/relay/src/tunnel.ts b/apps/relay/src/tunnel.ts index 4a83b20d118..115a33879f3 100644 --- a/apps/relay/src/tunnel.ts +++ b/apps/relay/src/tunnel.ts @@ -33,6 +33,11 @@ interface TunnelState { missedPings: number; } +interface DrainOptions { + reason?: string; + clearDirectory: () => Promise; +} + export class TunnelManager { private readonly tunnels = new Map(); private readonly requestTimeoutMs: number; @@ -42,12 +47,18 @@ export class TunnelManager { ReturnType >(); private readonly onlineWriteVersions = new Map(); + private draining = false; constructor(requestTimeoutMs = 30_000) { this.requestTimeoutMs = requestTimeoutMs; } async register(hostId: string, token: string, ws: WsSocket): Promise { + if (this.draining) { + ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy"); + return; + } + // Last-write-wins: close the old socket so flaky clients don't get // stuck behind a dead-but-not-yet-detected WS. const existing = this.tunnels.get(hostId); @@ -73,12 +84,15 @@ export class TunnelManager { // hadn't returned yet), so it skipped unregister. Roll the directory // entry back ourselves; otherwise other machines fly-replay traffic // to a dead local tunnel for ~90s until the TTL ages out. - if (ws.readyState !== 1) { + if (ws.readyState !== 1 || this.draining) { await directory .unregister(hostId, env.FLY_REGION, env.FLY_MACHINE_ID) .catch((err) => { console.error("[relay] directory.unregister rollback failed", err); }); + if (this.draining) { + ws.close(TunnelManager.WS_CLOSE_DRAIN, "Server draining for deploy"); + } return; } @@ -159,7 +173,77 @@ export class TunnelManager { console.log(`[relay] tunnel unregistered: ${hostId}`); } - private disposeTunnel(tunnel: TunnelState, reason: string): void { + // Application-defined WS close code (4xxx range) signaling "relay is + // draining for a deploy — reconnect immediately." Distinct from 1001 + // ("Going Away") which the ping-timeout / dispose paths use; the host + // resets its backoff only on this code, so a mass ping-timeout doesn't + // trigger a thundering-herd of fast reconnects. + static readonly WS_CLOSE_DRAIN = 4001; + + // SIGTERM-driven graceful drain. Closes every open tunnel with the + // app-defined "drain" close code so the host-service can recognize this + // as a deploy drain (not a hard disconnect or ping timeout) and + // reconnect immediately. Called from the SIGINT/SIGTERM handler in + // index.ts. + // + // Owns directory cleanup directly instead of relying on websocket close + // callbacks. The process exits immediately after drain, so fire-and-forget + // unregister work from onClose is not a reliable shutdown primitive. + async drain(options: DrainOptions): Promise { + this.draining = true; + const reason = options.reason ?? "Server draining for deploy"; + const tunnels = Array.from(this.tunnels.values()); + console.log(`[relay] draining ${tunnels.length} tunnels`); + // In-band drain signal first: send a JSON {type:"drain"} message on + // the WS message channel before closing. Game-day testing showed + // the WS close frame doesn't reliably reach the host within Fly's + // kill_timeout window (host's TCP socket sees ESTABLISHED with no + // onclose for 75+ seconds, until its watchdog fires). The message + // channel is already exercised by ping/pong every 30s, so we know + // it flushes cleanly. Host's onmessage handler triggers a clean + // reconnect on receipt; the WS close after is just belt-and- + // suspenders. + for (const tunnel of tunnels) { + try { + this.send(tunnel.ws, { type: "drain", reason }); + } catch { + // best-effort + } + } + // Give the message frames a moment to reach the host before we + // start closing the underlying sockets. + await new Promise((resolve) => setTimeout(resolve, 500)); + + let cleared = 0; + let clearError: unknown; + try { + cleared = await options.clearDirectory(); + } catch (err) { + clearError = err; + } + + for (const tunnel of tunnels) { + this.disposeTunnel(tunnel, reason, TunnelManager.WS_CLOSE_DRAIN); + this.tunnels.delete(tunnel.hostId); + } + + // Brief tail wait so the close-handshake gets a chance to complete + // before the process exits and RSTs the underlying TCP. + const WS_CLOSED = 3; + const deadline = Date.now() + 1_500; + while (Date.now() < deadline) { + if (tunnels.every((t) => t.ws.readyState === WS_CLOSED)) break; + await new Promise((resolve) => setTimeout(resolve, 50)); + } + if (clearError) throw clearError; + return cleared; + } + + private disposeTunnel( + tunnel: TunnelState, + reason: string, + tunnelCloseCode = 1000, + ): void { if (tunnel.pingTimer) clearInterval(tunnel.pingTimer); for (const [, pending] of tunnel.pendingRequests) { @@ -172,7 +256,7 @@ export class TunnelManager { } try { - tunnel.ws.close(1000, reason); + tunnel.ws.close(tunnelCloseCode, reason); } catch { // already closed } diff --git a/packages/host-service/src/tunnel/tunnel-client.ts b/packages/host-service/src/tunnel/tunnel-client.ts index 06176f16d87..a8d16634f91 100644 --- a/packages/host-service/src/tunnel/tunnel-client.ts +++ b/packages/host-service/src/tunnel/tunnel-client.ts @@ -8,7 +8,10 @@ import type { } from "./types"; const RECONNECT_BASE_MS = 1_000; -const RECONNECT_MAX_MS = 30_000; +// 5s ceiling rather than 30s. Under a sustained outage this means slightly +// more retry traffic, but under transient relay restarts (the common case) +// it ensures we don't sit idle for 30s while the relay is back online. +const RECONNECT_MAX_MS = 5_000; const INBOUND_SILENCE_TIMEOUT_MS = 75_000; const WATCHDOG_INTERVAL_MS = 10_000; const CONNECT_TIMEOUT_MS = 20_000; @@ -130,6 +133,18 @@ export class TunnelClient { `[host-service:tunnel] relay rejected connection (code=${event.code}, reason=${event.reason ?? ""}); retrying`, ); } + // App-defined "relay draining for deploy" close code + // (4001). Distinct from 1001 ("Going Away") which the + // ping-timeout / dispose paths use — only reset on 4001 so + // a mass ping-timeout doesn't trigger a thundering-herd of + // instant reconnects. After reset, next attempt fires at + // the base delay instead of the 5s ceiling. + if (event.code === 4001) { + this.reconnectAttempts = 0; + console.log( + "[host-service:tunnel] relay draining; reconnecting immediately", + ); + } } catch (err) { console.warn( "[host-service:tunnel] error during onclose cleanup", @@ -189,6 +204,24 @@ export class TunnelClient { case "ping": this.send({ type: "pong" }); break; + case "drain": + // In-band drain signal from the relay before it + // SIGINT-shuts-down. Reset backoff and tear the socket + // down ourselves so the next reconnect attempt fires at + // the base delay — far more reliable than waiting for + // the WS close frame to arrive (which game-day testing + // showed sometimes doesn't, leaving the host idle until + // its 75s inactivity watchdog). + console.log( + `[host-service:tunnel] relay drain notice received${message.reason ? ` (${message.reason})` : ""}; reconnecting immediately`, + ); + this.reconnectAttempts = 0; + try { + this.socket?.close(); + } catch { + // onclose handler will schedule the reconnect + } + break; case "http": await this.handleHttpRequest(message); break; diff --git a/packages/shared/src/tunnel-protocol.ts b/packages/shared/src/tunnel-protocol.ts index 7bafd07dba7..17e2b50560f 100644 --- a/packages/shared/src/tunnel-protocol.ts +++ b/packages/shared/src/tunnel-protocol.ts @@ -33,12 +33,23 @@ export interface TunnelPing { type: "ping"; } +// In-band drain signal — relay sends this to every tunnel right before +// SIGINT-triggered shutdown so the host knows to reconnect immediately +// rather than waiting for the WS close frame (which doesn't reliably +// reach the host within the kill_timeout window) or the host-side +// inactivity watchdog. +export interface TunnelDrain { + type: "drain"; + reason?: string; +} + export type TunnelRequest = | TunnelHttpRequest | TunnelWsOpen | TunnelWsFrame | TunnelWsClose - | TunnelPing; + | TunnelPing + | TunnelDrain; // ── Host → Relay ────────────────────────────────────────────────────