Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion packages/gateway/src/__tests__/base-deployment-grants.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestDeploymentManager extends BaseDeploymentManager {
async listDeployments(): Promise<DeploymentInfo[]> {
return [];
}
async createDeployment(): Promise<void> {
protected async spawnDeployment(): Promise<void> {
/* noop */
}
async scaleDeployment(): Promise<void> {
Expand Down Expand Up @@ -260,3 +260,151 @@ describe("BaseDeploymentManager.syncNetworkConfigGrants", () => {
).toBe(false);
});
});

/**
* In-flight coalescing for `ensureDeployment` lives in the base class so all
* orchestrators share one implementation. Subclass-specific concerns (Docker
* 409, K8s AlreadyExists, embedded `workers.has` short-circuit) are tested
* separately in their own files.
*/
describe("BaseDeploymentManager.ensureDeployment in-flight coalescing", () => {
class CountingManager extends BaseDeploymentManager {
spawnCalls = 0;
/** Resolver for the most recent spawn — lets tests hold spawn open. */
releaseSpawn: () => void = () => {
// replaced by each test before spawn runs
};

async listDeployments(): Promise<DeploymentInfo[]> {
return [];
}
protected async spawnDeployment(): Promise<void> {
this.spawnCalls++;
await new Promise<void>((resolve) => {
this.releaseSpawn = resolve;
});
}
async scaleDeployment(): Promise<void> {
/* noop */
}
async deleteDeployment(): Promise<void> {
/* noop */
}
async updateDeploymentActivity(): Promise<void> {
/* noop */
}
async validateWorkerImage(): Promise<void> {
/* noop */
}
protected getDispatcherHost(): string {
return "localhost";
}
}

test("concurrent calls for the same name share a single spawn", async () => {
const manager = new CountingManager(TEST_CONFIG);

const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));
const p2 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));
const p3 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));

// All three callers are blocked on the single in-flight spawn.
expect(manager.spawnCalls).toBe(1);

manager.releaseSpawn();
await Promise.all([p1, p2, p3]);

expect(manager.spawnCalls).toBe(1);
});

test("concurrent calls for different names spawn independently", async () => {
const manager = new CountingManager(TEST_CONFIG);

const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));
// Capture the first spawn's resolver before the second call overwrites it.
const release1 = manager.releaseSpawn;
const p2 = manager.ensureDeployment("worker-2", "u", "u", buildPayload({}));
const release2 = manager.releaseSpawn;

expect(manager.spawnCalls).toBe(2);

release1();
release2();
await Promise.all([p1, p2]);
});

test("after spawn resolves, a subsequent call re-spawns (cache cleared)", async () => {
const manager = new CountingManager(TEST_CONFIG);

const p1 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));
manager.releaseSpawn();
await p1;
expect(manager.spawnCalls).toBe(1);

// The base class only dedupes in-flight work — once it settles, a fresh
// call re-invokes spawn. Subclasses are responsible for their own
// post-completion idempotency (e.g. embedded's `workers.has` guard).
const p2 = manager.ensureDeployment("worker-1", "u", "u", buildPayload({}));
manager.releaseSpawn();
await p2;
expect(manager.spawnCalls).toBe(2);
});

test("rejected spawn clears the in-flight entry so the next call retries", async () => {
class FailingManager extends BaseDeploymentManager {
attempts = 0;
async listDeployments(): Promise<DeploymentInfo[]> {
return [];
}
protected async spawnDeployment(): Promise<void> {
this.attempts++;
if (this.attempts === 1) {
throw new Error("transient");
}
}
async scaleDeployment(): Promise<void> {
/* noop */
}
async deleteDeployment(): Promise<void> {
/* noop */
}
async updateDeploymentActivity(): Promise<void> {
/* noop */
}
async validateWorkerImage(): Promise<void> {
/* noop */
}
protected getDispatcherHost(): string {
return "localhost";
}
}

const manager = new FailingManager(TEST_CONFIG);
await expect(
manager.ensureDeployment("worker-1", "u", "u", buildPayload({}))
).rejects.toThrow("transient");
await expect(
manager.ensureDeployment("worker-1", "u", "u", buildPayload({}))
).resolves.toBeUndefined();
expect(manager.attempts).toBe(2);
});
});

// ---------------------------------------------------------------------------
// Known coverage gap: K8sDeploymentManager
// ---------------------------------------------------------------------------
//
// There is currently NO test file for `orchestration/impl/k8s/deployment.ts`.
// As a result, the following recently-added behavior is unverified by tests:
//
// 1. The `409 AlreadyExists` short-circuit added in `spawnDeployment`
// (treats concurrent multi-replica creates as benign success and returns
// without touching the PVC).
// 2. The PVC creation / cleanup paths in general.
// 3. The deployment env / pod-spec construction.
//
// In production this path is covered only by manual smoke tests via
// `make deploy` against a real cluster. Adding a `@kubernetes/client-node`
// mock layer is non-trivial (the SDK uses class-based watchers and dynamic
// API discovery), so coverage here was deferred. Track this gap before any
// further changes to k8s/deployment.ts.
143 changes: 141 additions & 2 deletions packages/gateway/src/__tests__/chat-response-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,23 @@ function createStreamingTarget() {
return { target, collected, plainPosts, drained };
}

function createHarness(target: unknown, platform = "slack") {
function createHarness(
target: unknown,
platform = "telegram",
slackPostMessage?: ReturnType<typeof mock>
) {
const state = new InMemoryStateAdapter();
const conversationState = new ConversationStateStore(state);
const slackAdapter = slackPostMessage
? { client: { chat: { postMessage: slackPostMessage } } }
: undefined;
const manager = {
getInstance: () => ({
connection: { platform },
chat: {
channel: () => target,
getAdapter: (name: string) =>
name === "slack" ? slackAdapter : undefined,
},
conversationState,
}),
Expand All @@ -55,7 +64,7 @@ const basePayload = {
userId: "u1",
teamId: "t1",
timestamp: 0,
platform: "slack",
platform: "telegram",
platformMetadata: {
connectionId: "conn-1",
chatId: "123",
Expand Down Expand Up @@ -271,6 +280,136 @@ describe("ChatResponseBridge.handleDelta — AsyncIterable streaming", () => {
});
});

describe("ChatResponseBridge.handleDelta — Slack markdown_text path", () => {
test("Slack accumulates deltas and posts via chat.postMessage with markdown_text", async () => {
const { target } = createStreamingTarget();
const slackPost = mock(async () => ({ ok: true, ts: "1.1" }));
const { manager } = createHarness(target, "slack", slackPost);
const bridge = new ChatResponseBridge(manager as any);

const slackPayload = {
...basePayload,
platform: "slack",
channelId: "slack:C123",
conversationId: "slack:C123:1700000000.123456",
platformMetadata: {
connectionId: "conn-1",
chatId: "slack:C123",
},
};

await bridge.handleDelta({ ...slackPayload, delta: "hello " }, "s");
await bridge.handleDelta({ ...slackPayload, delta: "world" }, "s");
await bridge.handleCompletion(slackPayload, "s");

// SDK target.post should NOT be called for Slack — we use markdown_text.
expect(target.post).not.toHaveBeenCalled();
expect(slackPost).toHaveBeenCalledTimes(1);
expect(slackPost.mock.calls[0]?.[0]).toMatchObject({
channel: "C123",
thread_ts: "1700000000.123456",
markdown_text: "hello world",
});
});

test("Slack decodes HTML entities and strips empty markdown links", async () => {
const { target } = createStreamingTarget();
const slackPost = mock(async () => ({ ok: true, ts: "1.1" }));
const { manager } = createHarness(target, "slack", slackPost);
const bridge = new ChatResponseBridge(manager as any);

const slackPayload = {
...basePayload,
platform: "slack",
channelId: "slack:C123",
conversationId: "slack:C123",
platformMetadata: {
connectionId: "conn-1",
chatId: "slack:C123",
},
};

await bridge.handleDelta(
{
...slackPayload,
delta: "&lt;summary&gt;hi&lt;/summary&gt; [foo:1, 2]() bar",
},
"s"
);
await bridge.handleCompletion(slackPayload, "s");

expect(slackPost).toHaveBeenCalledTimes(1);
expect(slackPost.mock.calls[0]?.[0]).toMatchObject({
markdown_text: "<summary>hi</summary> foo:1, 2 bar",
});
});

test("Slack chunks long content on paragraph boundaries", async () => {
const { target } = createStreamingTarget();
const slackPost = mock(async () => ({ ok: true, ts: "1.1" }));
const { manager } = createHarness(target, "slack", slackPost);
const bridge = new ChatResponseBridge(manager as any);

const slackPayload = {
...basePayload,
platform: "slack",
channelId: "slack:C123",
conversationId: "slack:C123",
platformMetadata: {
connectionId: "conn-1",
chatId: "slack:C123",
},
};

// Build content > 11000 chars with explicit paragraph boundaries every ~5000 chars.
const para = `${"x".repeat(5000)}`;
const body = `${para}\n\n${para}\n\n${para}`;

await bridge.handleDelta({ ...slackPayload, delta: body }, "s");
await bridge.handleCompletion(slackPayload, "s");

expect(slackPost).toHaveBeenCalledTimes(2);
// Each posted chunk must NOT split mid-paragraph.
for (const call of slackPost.mock.calls) {
const md = (call[0] as { markdown_text: string }).markdown_text;
expect(md.length).toBeLessThanOrEqual(11_000);
// Chunks must start/end at paragraph boundaries (no leading/trailing
// partial paragraph fragments split mid-content).
expect(md.startsWith("xxxx")).toBe(true);
}
});

test("Slack isFullReplacement discards prior buffered content", async () => {
const { target } = createStreamingTarget();
const slackPost = mock(async () => ({ ok: true, ts: "1.1" }));
const { manager } = createHarness(target, "slack", slackPost);
const bridge = new ChatResponseBridge(manager as any);

const slackPayload = {
...basePayload,
platform: "slack",
channelId: "slack:C123",
conversationId: "slack:C123",
platformMetadata: {
connectionId: "conn-1",
chatId: "slack:C123",
},
};

await bridge.handleDelta({ ...slackPayload, delta: "old streamed" }, "s");
await bridge.handleDelta(
{ ...slackPayload, delta: "new content", isFullReplacement: true },
"s"
);
await bridge.handleCompletion(slackPayload, "s");

expect(slackPost).toHaveBeenCalledTimes(1);
expect(slackPost.mock.calls[0]?.[0]).toMatchObject({
markdown_text: "new content",
});
});
});

describe("ChatResponseBridge.handleEphemeral", () => {
test("renders settings links as native buttons for Chat SDK targets", async () => {
const posts: unknown[] = [];
Expand Down
Loading
Loading