From 7561997026af926cb90a4737e701087f2a751e16 Mon Sep 17 00:00:00 2001 From: Daniel Riccio Date: Tue, 2 Sep 2025 13:15:59 -0500 Subject: [PATCH 1/5] feat(cloud): Add telemetry retry queue for network resilience - Implement RetryQueue class with workspace-scoped persistence - Queue failed telemetry events for automatic retry - Retry events every 60 seconds with fresh auth tokens - FIFO eviction when queue reaches 100 events - Persist queue across VS Code restarts This ensures telemetry data isn't lost during network failures or temporary server issues. Migrated from RooCodeInc/Roo-Code-Cloud#744 --- packages/cloud/src/CloudService.ts | 31 ++- packages/cloud/src/TelemetryClient.ts | 81 ++++-- .../src/__tests__/TelemetryClient.test.ts | 23 +- packages/cloud/src/index.ts | 3 + packages/cloud/src/retry-queue/RetryQueue.ts | 250 ++++++++++++++++++ .../retry-queue/__tests__/RetryQueue.test.ts | 167 ++++++++++++ packages/cloud/src/retry-queue/index.ts | 2 + packages/cloud/src/retry-queue/types.ts | 34 +++ 8 files changed, 547 insertions(+), 44 deletions(-) create mode 100644 packages/cloud/src/retry-queue/RetryQueue.ts create mode 100644 packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts create mode 100644 packages/cloud/src/retry-queue/index.ts create mode 100644 packages/cloud/src/retry-queue/types.ts diff --git a/packages/cloud/src/CloudService.ts b/packages/cloud/src/CloudService.ts index 8c8320cba7f0..9dc63ccd1d45 100644 --- a/packages/cloud/src/CloudService.ts +++ b/packages/cloud/src/CloudService.ts @@ -24,6 +24,7 @@ import { StaticSettingsService } from "./StaticSettingsService.js" import { CloudTelemetryClient as TelemetryClient } from "./TelemetryClient.js" import { CloudShareService } from "./CloudShareService.js" import { CloudAPI } from "./CloudAPI.js" +import { RetryQueue } from "./retry-queue/index.js" type AuthStateChangedPayload = CloudServiceEvents["auth-state-changed"][0] type AuthUserInfoPayload = CloudServiceEvents["user-info"][0] @@ -75,6 +76,12 @@ export class CloudService extends EventEmitter implements Di return this._cloudAPI } + private _retryQueue: RetryQueue | null = null + + public get retryQueue() { + return this._retryQueue + } + private constructor(context: ExtensionContext, log?: (...args: unknown[]) => void) { super() @@ -131,7 +138,25 @@ export class CloudService extends EventEmitter implements Di this._cloudAPI = new CloudAPI(this._authService, this.log) - this._telemetryClient = new TelemetryClient(this._authService, this._settingsService) + // Initialize retry queue with auth header provider + this._retryQueue = new RetryQueue( + this.context, + undefined, // Use default config + this.log, + () => { + // Provide fresh auth headers for retries + const sessionToken = this._authService?.getSessionToken() + if (sessionToken) { + return { + Authorization: `Bearer ${sessionToken}`, + "X-Organization-Id": this._authService?.getStoredOrganizationId() || "", + } + } + return undefined + }, + ) + + this._telemetryClient = new TelemetryClient(this._authService, this._settingsService, this._retryQueue) this._shareService = new CloudShareService(this._cloudAPI, this._settingsService, this.log) @@ -298,6 +323,10 @@ export class CloudService extends EventEmitter implements Di this.settingsService.dispose() } + if (this._retryQueue) { + this._retryQueue.dispose() + } + this.isInitialized = false } diff --git a/packages/cloud/src/TelemetryClient.ts b/packages/cloud/src/TelemetryClient.ts index 4be44720ea05..224f825e8378 100644 --- a/packages/cloud/src/TelemetryClient.ts +++ b/packages/cloud/src/TelemetryClient.ts @@ -11,6 +11,7 @@ import { } from "@roo-code/types" import { getRooCodeApiUrl } from "./config.js" +import type { RetryQueue } from "./retry-queue/index.js" abstract class BaseTelemetryClient implements TelemetryClient { protected providerRef: WeakRef | null = null @@ -82,18 +83,18 @@ abstract class BaseTelemetryClient implements TelemetryClient { } export class CloudTelemetryClient extends BaseTelemetryClient { + private retryQueue: RetryQueue | null = null + constructor( private authService: AuthService, private settingsService: SettingsService, - debug = false, + retryQueue?: RetryQueue, ) { - super( - { - type: "exclude", - events: [TelemetryEventName.TASK_CONVERSATION_MESSAGE], - }, - debug, - ) + super({ + type: "exclude", + events: [TelemetryEventName.TASK_CONVERSATION_MESSAGE], + }) + this.retryQueue = retryQueue || null } private async fetch(path: string, options: RequestInit) { @@ -108,18 +109,39 @@ export class CloudTelemetryClient extends BaseTelemetryClient { return } - const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, { + const url = `${getRooCodeApiUrl()}/api/${path}` + const fetchOptions: RequestInit = { ...options, headers: { Authorization: `Bearer ${token}`, "Content-Type": "application/json", }, - }) + } - if (!response.ok) { - console.error( - `[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`, - ) + try { + const response = await fetch(url, fetchOptions) + + if (!response.ok) { + console.error( + `[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`, + ) + } + + return response + } catch (error) { + console.error(`[TelemetryClient#fetch] Network error for ${options.method} ${path}: ${error}`) + + // Queue for retry if we have a retry queue and it's a network error + if (this.retryQueue && error instanceof TypeError && error.message.includes("fetch failed")) { + await this.retryQueue.enqueue( + url, + fetchOptions, + "telemetry", + `Telemetry: ${options.method} /api/${path}`, + ) + } + + throw error } } @@ -158,6 +180,7 @@ export class CloudTelemetryClient extends BaseTelemetryClient { }) } catch (error) { console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`) + // Error is already queued for retry in the fetch method } } @@ -200,21 +223,35 @@ export class CloudTelemetryClient extends BaseTelemetryClient { } // Custom fetch for multipart - don't set Content-Type header (let browser set it) - const response = await fetch(`${getRooCodeApiUrl()}/api/events/backfill`, { + const url = `${getRooCodeApiUrl()}/api/events/backfill` + const fetchOptions: RequestInit = { method: "POST", headers: { Authorization: `Bearer ${token}`, // Note: No Content-Type header - browser will set multipart/form-data with boundary }, body: formData, - }) + } - if (!response.ok) { - console.error( - `[TelemetryClient#backfillMessages] POST events/backfill -> ${response.status} ${response.statusText}`, - ) - } else if (this.debug) { - console.info(`[TelemetryClient#backfillMessages] Successfully uploaded messages for task ${taskId}`) + try { + const response = await fetch(url, fetchOptions) + + if (!response.ok) { + console.error( + `[TelemetryClient#backfillMessages] POST events/backfill -> ${response.status} ${response.statusText}`, + ) + } + } catch (fetchError) { + // For backfill, also queue for retry on network errors + if (this.retryQueue && fetchError instanceof TypeError && fetchError.message.includes("fetch failed")) { + await this.retryQueue.enqueue( + url, + fetchOptions, + "telemetry", + `Telemetry: Backfill messages for task ${taskId}`, + ) + } + throw fetchError } } catch (error) { console.error(`[TelemetryClient#backfillMessages] Error uploading messages: ${error}`) diff --git a/packages/cloud/src/__tests__/TelemetryClient.test.ts b/packages/cloud/src/__tests__/TelemetryClient.test.ts index 6078e601dd7b..33c230f97c52 100644 --- a/packages/cloud/src/__tests__/TelemetryClient.test.ts +++ b/packages/cloud/src/__tests__/TelemetryClient.test.ts @@ -684,27 +684,8 @@ describe("TelemetryClient", () => { ) }) - it("should log debug information when debug is enabled", async () => { - const client = new TelemetryClient(mockAuthService, mockSettingsService, true) - - const messages = [ - { - ts: 1, - type: "say" as const, - say: "text" as const, - text: "test message", - }, - ] - - await client.backfillMessages(messages, "test-task-id") - - expect(console.info).toHaveBeenCalledWith( - "[TelemetryClient#backfillMessages] Uploading 1 messages for task test-task-id", - ) - expect(console.info).toHaveBeenCalledWith( - "[TelemetryClient#backfillMessages] Successfully uploaded messages for task test-task-id", - ) - }) + // Debug logging has been removed in the new implementation + // This test is no longer applicable it("should handle empty messages array", async () => { const client = new TelemetryClient(mockAuthService, mockSettingsService) diff --git a/packages/cloud/src/index.ts b/packages/cloud/src/index.ts index dd40e6fc5279..65b92ebc13c1 100644 --- a/packages/cloud/src/index.ts +++ b/packages/cloud/src/index.ts @@ -3,3 +3,6 @@ export * from "./config.js" export { CloudService } from "./CloudService.js" export { BridgeOrchestrator } from "./bridge/index.js" + +export { RetryQueue } from "./retry-queue/index.js" +export type { QueuedRequest, QueueStats, RetryQueueConfig, RetryQueueEvents } from "./retry-queue/index.js" diff --git a/packages/cloud/src/retry-queue/RetryQueue.ts b/packages/cloud/src/retry-queue/RetryQueue.ts new file mode 100644 index 000000000000..ff1271ed0466 --- /dev/null +++ b/packages/cloud/src/retry-queue/RetryQueue.ts @@ -0,0 +1,250 @@ +import { EventEmitter } from "events" +import type { ExtensionContext } from "vscode" +import type { QueuedRequest, QueueStats, RetryQueueConfig, RetryQueueEvents } from "./types.js" + +type AuthHeaderProvider = () => Record | undefined + +export class RetryQueue extends EventEmitter { + private queue: Map = new Map() + private context: ExtensionContext + private config: RetryQueueConfig + private log: (...args: unknown[]) => void + private isProcessing = false + private retryTimer?: NodeJS.Timeout + private readonly STORAGE_KEY = "roo.retryQueue" + private authHeaderProvider?: AuthHeaderProvider + + constructor( + context: ExtensionContext, + config?: Partial, + log?: (...args: unknown[]) => void, + authHeaderProvider?: AuthHeaderProvider, + ) { + super() + this.context = context + this.log = log || console.log + this.authHeaderProvider = authHeaderProvider + + this.config = { + maxRetries: 0, + retryDelay: 60000, + maxQueueSize: 100, + persistQueue: true, + networkCheckInterval: 60000, + ...config, + } + + this.loadPersistedQueue() + this.startRetryTimer() + } + + private loadPersistedQueue(): void { + if (!this.config.persistQueue) return + + try { + const stored = this.context.workspaceState.get(this.STORAGE_KEY) + if (stored && Array.isArray(stored)) { + stored.forEach((request) => { + this.queue.set(request.id, request) + }) + this.log(`[RetryQueue] Loaded ${stored.length} persisted requests from workspace storage`) + } + } catch (error) { + this.log("[RetryQueue] Failed to load persisted queue:", error) + } + } + + private async persistQueue(): Promise { + if (!this.config.persistQueue) return + + try { + const requests = Array.from(this.queue.values()) + await this.context.workspaceState.update(this.STORAGE_KEY, requests) + } catch (error) { + this.log("[RetryQueue] Failed to persist queue:", error) + } + } + + public async enqueue( + url: string, + options: RequestInit, + type: QueuedRequest["type"] = "other", + operation?: string, + ): Promise { + if (this.queue.size >= this.config.maxQueueSize) { + const oldestId = Array.from(this.queue.keys())[0] + if (oldestId) { + this.queue.delete(oldestId) + } + } + + const request: QueuedRequest = { + id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, + url, + options, + timestamp: Date.now(), + retryCount: 0, + type, + operation, + } + + this.queue.set(request.id, request) + await this.persistQueue() + + this.emit("request-queued", request) + this.log(`[RetryQueue] Queued request: ${operation || url}`) + } + + public async retryAll(): Promise { + if (this.isProcessing) { + return + } + + const requests = Array.from(this.queue.values()) + if (requests.length === 0) { + return + } + + this.isProcessing = true + + requests.sort((a, b) => a.timestamp - b.timestamp) + + const lastRequest = requests[requests.length - 1] + if (!lastRequest) { + this.isProcessing = false + return + } + + try { + await this.retryRequest(lastRequest) + this.queue.delete(lastRequest.id) + this.emit("request-retry-success", lastRequest) + + const remainingRequests = Array.from(this.queue.values()).sort((a, b) => a.timestamp - b.timestamp) + + for (const request of remainingRequests) { + try { + await this.retryRequest(request) + this.queue.delete(request.id) + this.emit("request-retry-success", request) + } catch (error) { + request.retryCount++ + request.lastError = error instanceof Error ? error.message : String(error) + + this.queue.set(request.id, request) + this.emit("request-retry-failed", request, error as Error) + } + + await this.delay(100) + } + } catch (error) { + lastRequest.retryCount++ + lastRequest.lastError = error instanceof Error ? error.message : String(error) + + this.queue.set(lastRequest.id, lastRequest) + this.emit("request-retry-failed", lastRequest, error as Error) + } + + await this.persistQueue() + this.isProcessing = false + } + + private async retryRequest(request: QueuedRequest): Promise { + this.log(`[RetryQueue] Retrying request: ${request.operation || request.url}`) + + let headers = { ...request.options.headers } + if (this.authHeaderProvider) { + const freshAuthHeaders = this.authHeaderProvider() + if (freshAuthHeaders) { + headers = { + ...headers, + ...freshAuthHeaders, + } + } + } + + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), 30000) + + try { + const response = await fetch(request.url, { + ...request.options, + signal: controller.signal, + headers: { + ...headers, + "X-Retry-Queue": "true", + }, + }) + + clearTimeout(timeoutId) + + if (!response.ok) { + throw new Error(`Request failed with status ${response.status}`) + } + + return response + } catch (error) { + clearTimeout(timeoutId) + throw error + } + } + + private startRetryTimer(): void { + if (this.retryTimer) { + clearInterval(this.retryTimer) + } + + this.retryTimer = setInterval(() => { + this.retryAll().catch((error) => { + this.log("[RetryQueue] Error during retry cycle:", error) + }) + }, this.config.networkCheckInterval) + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + public getStats(): QueueStats { + const requests = Array.from(this.queue.values()) + const byType: Record = {} + let totalRetries = 0 + let failedRetries = 0 + + requests.forEach((request) => { + byType[request.type] = (byType[request.type] || 0) + 1 + totalRetries += request.retryCount + if (request.lastError) { + failedRetries++ + } + }) + + const timestamps = requests.map((r) => r.timestamp) + const oldestRequest = timestamps.length > 0 ? new Date(Math.min(...timestamps)) : undefined + const newestRequest = timestamps.length > 0 ? new Date(Math.max(...timestamps)) : undefined + + return { + totalQueued: requests.length, + byType, + oldestRequest, + newestRequest, + totalRetries, + failedRetries, + } + } + + public clear(): void { + this.queue.clear() + this.persistQueue().catch((error) => { + this.log("[RetryQueue] Failed to persist after clear:", error) + }) + this.emit("queue-cleared") + } + + public dispose(): void { + if (this.retryTimer) { + clearInterval(this.retryTimer) + } + this.removeAllListeners() + } +} diff --git a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts new file mode 100644 index 000000000000..5625eb6c9730 --- /dev/null +++ b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts @@ -0,0 +1,167 @@ +import type { ExtensionContext } from "vscode" +import { RetryQueue } from "../RetryQueue.js" +import type { QueuedRequest } from "../types.js" + +// Mock ExtensionContext +const createMockContext = (): ExtensionContext => { + const storage = new Map() + + return { + workspaceState: { + get: vi.fn((key: string) => storage.get(key)), + update: vi.fn(async (key: string, value: unknown) => { + storage.set(key, value) + }), + }, + } as unknown as ExtensionContext +} + +describe("RetryQueue", () => { + let mockContext: ExtensionContext + let retryQueue: RetryQueue + + beforeEach(() => { + vi.clearAllMocks() + mockContext = createMockContext() + retryQueue = new RetryQueue(mockContext) + }) + + afterEach(() => { + retryQueue.dispose() + }) + + describe("enqueue", () => { + it("should add a request to the queue", async () => { + const url = "https://api.example.com/test" + const options = { method: "POST", body: JSON.stringify({ test: "data" }) } + + await retryQueue.enqueue(url, options, "telemetry", "test operation") + + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + expect(stats.byType["telemetry"]).toBe(1) + }) + + it("should enforce max queue size with FIFO eviction", async () => { + // Create a queue with max size of 3 + retryQueue = new RetryQueue(mockContext, { maxQueueSize: 3 }) + + // Add 4 requests + for (let i = 1; i <= 4; i++) { + await retryQueue.enqueue( + `https://api.example.com/test${i}`, + { method: "POST" }, + "telemetry", + `operation ${i}`, + ) + } + + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(3) // Should only have 3 items (oldest was evicted) + }) + }) + + describe("persistence", () => { + it("should persist queue to workspace state", async () => { + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + expect(mockContext.workspaceState.update).toHaveBeenCalledWith( + "roo.retryQueue", + expect.arrayContaining([ + expect.objectContaining({ + url: "https://api.example.com/test", + type: "telemetry", + }), + ]), + ) + }) + + it("should load persisted queue on initialization", () => { + const persistedRequests: QueuedRequest[] = [ + { + id: "test-1", + url: "https://api.example.com/test1", + options: { method: "POST" }, + timestamp: Date.now(), + retryCount: 0, + type: "telemetry", + }, + ] + + // Set up mock to return persisted data + const storage = new Map([["roo.retryQueue", persistedRequests]]) + mockContext = { + workspaceState: { + get: vi.fn((key: string) => storage.get(key)), + update: vi.fn(), + }, + } as unknown as ExtensionContext + + retryQueue = new RetryQueue(mockContext) + + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + expect(mockContext.workspaceState.get).toHaveBeenCalledWith("roo.retryQueue") + }) + }) + + describe("clear", () => { + it("should clear all queued requests", async () => { + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "api-call") + + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + + retryQueue.clear() + + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) + }) + + describe("getStats", () => { + it("should return correct statistics", async () => { + const now = Date.now() + + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "api-call") + await retryQueue.enqueue("https://api.example.com/test3", { method: "POST" }, "telemetry") + + const stats = retryQueue.getStats() + + expect(stats.totalQueued).toBe(3) + expect(stats.byType["telemetry"]).toBe(2) + expect(stats.byType["api-call"]).toBe(1) + expect(stats.oldestRequest).toBeDefined() + expect(stats.newestRequest).toBeDefined() + expect(stats.oldestRequest!.getTime()).toBeGreaterThanOrEqual(now) + expect(stats.newestRequest!.getTime()).toBeGreaterThanOrEqual(now) + }) + }) + + describe("events", () => { + it("should emit request-queued event when enqueueing", async () => { + const listener = vi.fn() + retryQueue.on("request-queued", listener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + expect(listener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + type: "telemetry", + }), + ) + }) + + it("should emit queue-cleared event when clearing", () => { + const listener = vi.fn() + retryQueue.on("queue-cleared", listener) + + retryQueue.clear() + + expect(listener).toHaveBeenCalled() + }) + }) +}) diff --git a/packages/cloud/src/retry-queue/index.ts b/packages/cloud/src/retry-queue/index.ts new file mode 100644 index 000000000000..13b1e654e1f9 --- /dev/null +++ b/packages/cloud/src/retry-queue/index.ts @@ -0,0 +1,2 @@ +export { RetryQueue } from "./RetryQueue.js" +export type { QueuedRequest, QueueStats, RetryQueueConfig, RetryQueueEvents } from "./types.js" diff --git a/packages/cloud/src/retry-queue/types.ts b/packages/cloud/src/retry-queue/types.ts new file mode 100644 index 000000000000..d6ab26ff9530 --- /dev/null +++ b/packages/cloud/src/retry-queue/types.ts @@ -0,0 +1,34 @@ +export interface QueuedRequest { + id: string + url: string + options: RequestInit + timestamp: number + retryCount: number + type: "api-call" | "telemetry" | "settings" | "other" + operation?: string + lastError?: string +} + +export interface QueueStats { + totalQueued: number + byType: Record + oldestRequest?: Date + newestRequest?: Date + totalRetries: number + failedRetries: number +} + +export interface RetryQueueConfig { + maxRetries: number // 0 means unlimited + retryDelay: number + maxQueueSize: number // FIFO eviction when full + persistQueue: boolean + networkCheckInterval: number // milliseconds +} + +export interface RetryQueueEvents { + "request-queued": [request: QueuedRequest] + "request-retry-success": [request: QueuedRequest] + "request-retry-failed": [request: QueuedRequest, error: Error] + "queue-cleared": [] +} From deaec36a4c7cc81ff1819f79af025cafeb6d593a Mon Sep 17 00:00:00 2001 From: daniel-lxs Date: Tue, 9 Sep 2025 11:08:10 -0500 Subject: [PATCH 2/5] fix: address PR review feedback for retry queue - Fix retry order to use consistent FIFO processing - Add retry limit enforcement with max retries check - Add configurable request timeout (default 30s) - Add comprehensive tests for retryAll() method - Add request-max-retries-exceeded event - Fix timeout test to avoid timing issues --- packages/cloud/src/retry-queue/RetryQueue.ts | 49 ++--- .../retry-queue/__tests__/RetryQueue.test.ts | 194 ++++++++++++++++++ packages/cloud/src/retry-queue/types.ts | 2 + 3 files changed, 217 insertions(+), 28 deletions(-) diff --git a/packages/cloud/src/retry-queue/RetryQueue.ts b/packages/cloud/src/retry-queue/RetryQueue.ts index ff1271ed0466..8426197e0fc9 100644 --- a/packages/cloud/src/retry-queue/RetryQueue.ts +++ b/packages/cloud/src/retry-queue/RetryQueue.ts @@ -31,6 +31,7 @@ export class RetryQueue extends EventEmitter { maxQueueSize: 100, persistQueue: true, networkCheckInterval: 60000, + requestTimeout: 30000, // Make timeout configurable ...config, } @@ -107,42 +108,34 @@ export class RetryQueue extends EventEmitter { this.isProcessing = true + // Sort by timestamp to process in FIFO order (oldest first) requests.sort((a, b) => a.timestamp - b.timestamp) - const lastRequest = requests[requests.length - 1] - if (!lastRequest) { - this.isProcessing = false - return - } - - try { - await this.retryRequest(lastRequest) - this.queue.delete(lastRequest.id) - this.emit("request-retry-success", lastRequest) - - const remainingRequests = Array.from(this.queue.values()).sort((a, b) => a.timestamp - b.timestamp) - - for (const request of remainingRequests) { - try { - await this.retryRequest(request) + // Process all requests in FIFO order + for (const request of requests) { + try { + await this.retryRequest(request) + this.queue.delete(request.id) + this.emit("request-retry-success", request) + } catch (error) { + request.retryCount++ + request.lastError = error instanceof Error ? error.message : String(error) + + // Check if we've exceeded max retries + if (this.config.maxRetries > 0 && request.retryCount >= this.config.maxRetries) { + this.log( + `[RetryQueue] Max retries (${this.config.maxRetries}) reached for request: ${request.operation || request.url}`, + ) this.queue.delete(request.id) - this.emit("request-retry-success", request) - } catch (error) { - request.retryCount++ - request.lastError = error instanceof Error ? error.message : String(error) - + this.emit("request-max-retries-exceeded", request, error as Error) + } else { this.queue.set(request.id, request) this.emit("request-retry-failed", request, error as Error) } + // Add a small delay between retry attempts await this.delay(100) } - } catch (error) { - lastRequest.retryCount++ - lastRequest.lastError = error instanceof Error ? error.message : String(error) - - this.queue.set(lastRequest.id, lastRequest) - this.emit("request-retry-failed", lastRequest, error as Error) } await this.persistQueue() @@ -164,7 +157,7 @@ export class RetryQueue extends EventEmitter { } const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), 30000) + const timeoutId = setTimeout(() => controller.abort(), this.config.requestTimeout) try { const response = await fetch(request.url, { diff --git a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts index 5625eb6c9730..09a63576215e 100644 --- a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts +++ b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts @@ -164,4 +164,198 @@ describe("RetryQueue", () => { expect(listener).toHaveBeenCalled() }) }) + + describe("retryAll", () => { + let fetchMock: ReturnType + + beforeEach(() => { + // Mock global fetch + fetchMock = vi.fn() + global.fetch = fetchMock + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it("should process requests in FIFO order", async () => { + const successListener = vi.fn() + retryQueue.on("request-retry-success", successListener) + + // Add multiple requests + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test3", { method: "POST" }, "telemetry") + + // Mock successful responses + fetchMock.mockResolvedValue({ ok: true }) + + await retryQueue.retryAll() + + // Check that fetch was called in FIFO order + expect(fetchMock).toHaveBeenCalledTimes(3) + expect(fetchMock.mock.calls[0][0]).toBe("https://api.example.com/test1") + expect(fetchMock.mock.calls[1][0]).toBe("https://api.example.com/test2") + expect(fetchMock.mock.calls[2][0]).toBe("https://api.example.com/test3") + + // Check that success events were emitted + expect(successListener).toHaveBeenCalledTimes(3) + + // Queue should be empty after successful retries + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) + + it("should handle failed retries and increment retry count", async () => { + const failListener = vi.fn() + retryQueue.on("request-retry-failed", failListener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock failed response + fetchMock.mockRejectedValue(new Error("Network error")) + + await retryQueue.retryAll() + + // Check that failure event was emitted + expect(failListener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + retryCount: 1, + lastError: "Network error", + }), + expect.any(Error), + ) + + // Request should still be in queue + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + }) + + it("should enforce max retries limit", async () => { + // Create queue with max retries of 2 + retryQueue = new RetryQueue(mockContext, { maxRetries: 2 }) + + const maxRetriesListener = vi.fn() + retryQueue.on("request-max-retries-exceeded", maxRetriesListener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock failed responses + fetchMock.mockRejectedValue(new Error("Network error")) + + // First retry + await retryQueue.retryAll() + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) // Still in queue + + // Second retry - should hit max retries + await retryQueue.retryAll() + + // Check that max retries event was emitted + expect(maxRetriesListener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + retryCount: 2, + }), + expect.any(Error), + ) + + // Request should be removed from queue after exceeding max retries + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) + + it("should not process if already processing", async () => { + // Add a request + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock a slow response + fetchMock.mockImplementation(() => new Promise((resolve) => setTimeout(() => resolve({ ok: true }), 100))) + + // Start first retryAll (don't await) + const firstCall = retryQueue.retryAll() + + // Try to call retryAll again immediately + const secondCall = retryQueue.retryAll() + + // Both should complete without errors + await Promise.all([firstCall, secondCall]) + + // Fetch should only be called once (from the first call) + expect(fetchMock).toHaveBeenCalledTimes(1) + }) + + it("should handle empty queue gracefully", async () => { + // Call retryAll on empty queue + await expect(retryQueue.retryAll()).resolves.toBeUndefined() + + // No fetch calls should be made + expect(fetchMock).not.toHaveBeenCalled() + }) + + it("should use auth header provider if available", async () => { + const authHeaderProvider = vi.fn().mockReturnValue({ + Authorization: "Bearer fresh-token", + }) + + retryQueue = new RetryQueue(mockContext, {}, undefined, authHeaderProvider) + + await retryQueue.enqueue( + "https://api.example.com/test", + { + method: "POST", + headers: { "Content-Type": "application/json" }, + }, + "telemetry", + ) + + fetchMock.mockResolvedValue({ ok: true }) + + await retryQueue.retryAll() + + // Check that fresh auth headers were used + expect(fetchMock).toHaveBeenCalledWith( + "https://api.example.com/test", + expect.objectContaining({ + headers: expect.objectContaining({ + Authorization: "Bearer fresh-token", + "Content-Type": "application/json", + "X-Retry-Queue": "true", + }), + }), + ) + + expect(authHeaderProvider).toHaveBeenCalled() + }) + + it("should respect configurable timeout", async () => { + // Create queue with custom timeout (short timeout for testing) + retryQueue = new RetryQueue(mockContext, { requestTimeout: 100 }) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock fetch to reject with abort error + const abortError = new Error("The operation was aborted") + abortError.name = "AbortError" + fetchMock.mockRejectedValue(abortError) + + const failListener = vi.fn() + retryQueue.on("request-retry-failed", failListener) + + await retryQueue.retryAll() + + // Check that the request failed with an abort error + expect(failListener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + lastError: "The operation was aborted", + }), + expect.any(Error), + ) + + // The timeout configuration is being used (verified by the constructor accepting it) + // The actual timeout behavior is handled by the browser's AbortController + }) + }) }) diff --git a/packages/cloud/src/retry-queue/types.ts b/packages/cloud/src/retry-queue/types.ts index d6ab26ff9530..351ab71f08b1 100644 --- a/packages/cloud/src/retry-queue/types.ts +++ b/packages/cloud/src/retry-queue/types.ts @@ -24,11 +24,13 @@ export interface RetryQueueConfig { maxQueueSize: number // FIFO eviction when full persistQueue: boolean networkCheckInterval: number // milliseconds + requestTimeout: number // milliseconds for request timeout } export interface RetryQueueEvents { "request-queued": [request: QueuedRequest] "request-retry-success": [request: QueuedRequest] "request-retry-failed": [request: QueuedRequest, error: Error] + "request-max-retries-exceeded": [request: QueuedRequest, error: Error] "queue-cleared": [] } From 59393d8aadc42c25dd732c81e6f37645ec26d972 Mon Sep 17 00:00:00 2001 From: daniel-lxs Date: Tue, 9 Sep 2025 11:09:15 -0500 Subject: [PATCH 3/5] fix: resolve TypeScript errors in RetryQueue tests --- packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts index 09a63576215e..209e54b11a25 100644 --- a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts +++ b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts @@ -194,9 +194,9 @@ describe("RetryQueue", () => { // Check that fetch was called in FIFO order expect(fetchMock).toHaveBeenCalledTimes(3) - expect(fetchMock.mock.calls[0][0]).toBe("https://api.example.com/test1") - expect(fetchMock.mock.calls[1][0]).toBe("https://api.example.com/test2") - expect(fetchMock.mock.calls[2][0]).toBe("https://api.example.com/test3") + expect(fetchMock.mock.calls[0]?.[0]).toBe("https://api.example.com/test1") + expect(fetchMock.mock.calls[1]?.[0]).toBe("https://api.example.com/test2") + expect(fetchMock.mock.calls[2]?.[0]).toBe("https://api.example.com/test3") // Check that success events were emitted expect(successListener).toHaveBeenCalledTimes(3) From a14a0da4b66b7ab935fcb59baf0238f3dbb88af5 Mon Sep 17 00:00:00 2001 From: daniel-lxs Date: Wed, 10 Sep 2025 09:42:02 -0500 Subject: [PATCH 4/5] fix(cloud): Address PR feedback for telemetry retry queue - Handle HTTP error status codes (500s, 401/403, 429) as failures that trigger retry - Remove queuing of backfill operations since they're user-initiated - Fix race condition in concurrent retry processing with isProcessing flag - Add specialized retry logic for 429 with Retry-After header support - Clean up unnecessary comments - Add comprehensive tests for new status code handling - Add temporary debug logs with emojis for testing --- packages/cloud/src/TelemetryClient.ts | 38 +++-- packages/cloud/src/retry-queue/RetryQueue.ts | 110 +++++++++---- .../retry-queue/__tests__/RetryQueue.test.ts | 145 ++++++++++++++++++ packages/cloud/src/retry-queue/types.ts | 1 + 4 files changed, 253 insertions(+), 41 deletions(-) diff --git a/packages/cloud/src/TelemetryClient.ts b/packages/cloud/src/TelemetryClient.ts index 224f825e8378..b3b01d2510ed 100644 --- a/packages/cloud/src/TelemetryClient.ts +++ b/packages/cloud/src/TelemetryClient.ts @@ -97,7 +97,7 @@ export class CloudTelemetryClient extends BaseTelemetryClient { this.retryQueue = retryQueue || null } - private async fetch(path: string, options: RequestInit) { + private async fetch(path: string, options: RequestInit, allowQueueing = true) { if (!this.authService.isAuthenticated()) { return } @@ -125,6 +125,23 @@ export class CloudTelemetryClient extends BaseTelemetryClient { console.error( `[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`, ) + + // Queue for retry on server errors (5xx), rate limiting (429), or auth errors (401/403) + if ( + this.retryQueue && + allowQueueing && + (response.status >= 500 || + response.status === 429 || + response.status === 401 || + response.status === 403) + ) { + await this.retryQueue.enqueue( + url, + fetchOptions, + "telemetry", + `Telemetry: ${options.method} /api/${path}`, + ) + } } return response @@ -132,7 +149,12 @@ export class CloudTelemetryClient extends BaseTelemetryClient { console.error(`[TelemetryClient#fetch] Network error for ${options.method} ${path}: ${error}`) // Queue for retry if we have a retry queue and it's a network error - if (this.retryQueue && error instanceof TypeError && error.message.includes("fetch failed")) { + if ( + this.retryQueue && + allowQueueing && + error instanceof TypeError && + error.message.includes("fetch failed") + ) { await this.retryQueue.enqueue( url, fetchOptions, @@ -222,13 +244,11 @@ export class CloudTelemetryClient extends BaseTelemetryClient { ) } - // Custom fetch for multipart - don't set Content-Type header (let browser set it) const url = `${getRooCodeApiUrl()}/api/events/backfill` const fetchOptions: RequestInit = { method: "POST", headers: { Authorization: `Bearer ${token}`, - // Note: No Content-Type header - browser will set multipart/form-data with boundary }, body: formData, } @@ -242,15 +262,7 @@ export class CloudTelemetryClient extends BaseTelemetryClient { ) } } catch (fetchError) { - // For backfill, also queue for retry on network errors - if (this.retryQueue && fetchError instanceof TypeError && fetchError.message.includes("fetch failed")) { - await this.retryQueue.enqueue( - url, - fetchOptions, - "telemetry", - `Telemetry: Backfill messages for task ${taskId}`, - ) - } + console.error(`[TelemetryClient#backfillMessages] Network error: ${fetchError}`) throw fetchError } } catch (error) { diff --git a/packages/cloud/src/retry-queue/RetryQueue.ts b/packages/cloud/src/retry-queue/RetryQueue.ts index 8426197e0fc9..4544420d7995 100644 --- a/packages/cloud/src/retry-queue/RetryQueue.ts +++ b/packages/cloud/src/retry-queue/RetryQueue.ts @@ -31,7 +31,7 @@ export class RetryQueue extends EventEmitter { maxQueueSize: 100, persistQueue: true, networkCheckInterval: 60000, - requestTimeout: 30000, // Make timeout configurable + requestTimeout: 30000, ...config, } @@ -98,6 +98,7 @@ export class RetryQueue extends EventEmitter { public async retryAll(): Promise { if (this.isProcessing) { + this.log("[RetryQueue] Already processing, skipping retry cycle") return } @@ -108,38 +109,76 @@ export class RetryQueue extends EventEmitter { this.isProcessing = true - // Sort by timestamp to process in FIFO order (oldest first) - requests.sort((a, b) => a.timestamp - b.timestamp) - - // Process all requests in FIFO order - for (const request of requests) { - try { - await this.retryRequest(request) - this.queue.delete(request.id) - this.emit("request-retry-success", request) - } catch (error) { - request.retryCount++ - request.lastError = error instanceof Error ? error.message : String(error) - - // Check if we've exceeded max retries - if (this.config.maxRetries > 0 && request.retryCount >= this.config.maxRetries) { + try { + // Sort by timestamp to process in FIFO order (oldest first) + requests.sort((a, b) => a.timestamp - b.timestamp) + + // Process all requests in FIFO order + for (const request of requests) { + // Skip if request should not be retried yet (rate limiting) + if (request.nextRetryAfter && Date.now() < request.nextRetryAfter) { this.log( - `[RetryQueue] Max retries (${this.config.maxRetries}) reached for request: ${request.operation || request.url}`, + `[RetryQueue] Skipping rate-limited request until ${new Date(request.nextRetryAfter).toISOString()}`, ) - this.queue.delete(request.id) - this.emit("request-max-retries-exceeded", request, error as Error) - } else { - this.queue.set(request.id, request) - this.emit("request-retry-failed", request, error as Error) + continue } - // Add a small delay between retry attempts - await this.delay(100) + try { + const response = await this.retryRequest(request) + + // Check if we got a Retry-After header for rate limiting + if (response && response.status === 429) { + const retryAfter = response.headers.get("Retry-After") + if (retryAfter) { + // Parse Retry-After (could be seconds or a date) + let delayMs: number + const retryAfterSeconds = parseInt(retryAfter, 10) + if (!isNaN(retryAfterSeconds)) { + delayMs = retryAfterSeconds * 1000 + } else { + // Try parsing as a date + const retryDate = new Date(retryAfter) + if (!isNaN(retryDate.getTime())) { + delayMs = retryDate.getTime() - Date.now() + } else { + delayMs = 60000 // Default to 1 minute if we can't parse + } + } + request.nextRetryAfter = Date.now() + delayMs + this.log(`[RetryQueue] Rate limited, will retry after ${delayMs}ms`) + this.queue.set(request.id, request) + continue + } + } + + this.queue.delete(request.id) + this.emit("request-retry-success", request) + } catch (error) { + request.retryCount++ + request.lastError = error instanceof Error ? error.message : String(error) + + // Check if we've exceeded max retries + if (this.config.maxRetries > 0 && request.retryCount >= this.config.maxRetries) { + this.log( + `[RetryQueue] Max retries (${this.config.maxRetries}) reached for request: ${request.operation || request.url}`, + ) + this.queue.delete(request.id) + this.emit("request-max-retries-exceeded", request, error as Error) + } else { + this.queue.set(request.id, request) + this.emit("request-retry-failed", request, error as Error) + } + + // Add a small delay between retry attempts + await this.delay(100) + } } - } - await this.persistQueue() - this.isProcessing = false + await this.persistQueue() + } finally { + // Always reset the processing flag, even if an error occurs + this.isProcessing = false + } } private async retryRequest(request: QueuedRequest): Promise { @@ -171,8 +210,23 @@ export class RetryQueue extends EventEmitter { clearTimeout(timeoutId) + // Check for error status codes that should trigger retry if (!response.ok) { - throw new Error(`Request failed with status ${response.status}`) + // Handle different status codes appropriately + if (response.status >= 500) { + // Server errors should be retried + throw new Error(`Server error: ${response.status} ${response.statusText}`) + } else if (response.status === 429) { + // Rate limiting - return response to let caller handle Retry-After + return response + } else if (response.status === 401 || response.status === 403) { + // Auth errors - retry with fresh auth headers from provider + throw new Error(`Auth error: ${response.status}`) + } else if (response.status >= 400 && response.status < 500) { + // Other client errors (400, 404, etc.) should not be retried + this.log(`[RetryQueue] Non-retryable status ${response.status}, removing from queue`) + return response + } } return response diff --git a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts index 209e54b11a25..2364cf434247 100644 --- a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts +++ b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts @@ -357,5 +357,150 @@ describe("RetryQueue", () => { // The timeout configuration is being used (verified by the constructor accepting it) // The actual timeout behavior is handled by the browser's AbortController }) + + it("should retry on 500+ status codes", async () => { + const failListener = vi.fn() + const successListener = vi.fn() + retryQueue.on("request-retry-failed", failListener) + retryQueue.on("request-retry-success", successListener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // First attempt: 500 error + fetchMock.mockResolvedValueOnce({ ok: false, status: 500, statusText: "Internal Server Error" }) + + await retryQueue.retryAll() + + // Should fail and remain in queue + expect(failListener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + retryCount: 1, + lastError: "Server error: 500 Internal Server Error", + }), + expect.any(Error), + ) + + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + + // Second attempt: success + fetchMock.mockResolvedValueOnce({ ok: true, status: 200 }) + + await retryQueue.retryAll() + + // Should succeed and be removed from queue + expect(successListener).toHaveBeenCalled() + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) + + it("should handle 429 rate limiting with Retry-After header", async () => { + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock 429 response with Retry-After header (in seconds) + const retryAfterResponse = { + ok: false, + status: 429, + headers: { + get: vi.fn((header: string) => { + if (header === "Retry-After") return "2" // 2 seconds + return null + }), + }, + } + + fetchMock.mockResolvedValueOnce(retryAfterResponse) + + await retryQueue.retryAll() + + // Request should still be in queue with nextRetryAfter set + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + + // Try to retry immediately - should be skipped due to rate limiting + fetchMock.mockClear() + await retryQueue.retryAll() + + // Fetch should not be called because request is rate-limited + expect(fetchMock).not.toHaveBeenCalled() + }) + + it("should retry on 401/403 auth errors", async () => { + const failListener = vi.fn() + retryQueue.on("request-retry-failed", failListener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock 401 error + fetchMock.mockResolvedValueOnce({ ok: false, status: 401, statusText: "Unauthorized" }) + + await retryQueue.retryAll() + + // Should fail and remain in queue for retry + expect(failListener).toHaveBeenCalledWith( + expect.objectContaining({ + url: "https://api.example.com/test", + retryCount: 1, + lastError: "Auth error: 401", + }), + expect.any(Error), + ) + + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) + }) + + it("should not retry on 400/404 client errors", async () => { + const successListener = vi.fn() + retryQueue.on("request-retry-success", successListener) + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Mock 404 error + fetchMock.mockResolvedValueOnce({ ok: false, status: 404, statusText: "Not Found" }) + + await retryQueue.retryAll() + + // Should be removed from queue without retry + expect(successListener).toHaveBeenCalled() + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) + + it("should prevent concurrent processing", async () => { + // Add a single request + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + + // Mock slow response + let resolveFirst: () => void + const firstPromise = new Promise<{ ok: boolean }>((resolve) => { + resolveFirst = () => resolve({ ok: true }) + }) + + fetchMock.mockReturnValueOnce(firstPromise) + + // Start first retryAll (don't await) + const firstCall = retryQueue.retryAll() + + // Try to call retryAll again immediately - should return immediately without processing + const secondCall = retryQueue.retryAll() + + // Second call should return immediately + await secondCall + + // Fetch should only be called once (from first call) + expect(fetchMock).toHaveBeenCalledTimes(1) + + // Resolve the promise + resolveFirst!() + + // Wait for first call to complete + await firstCall + + // Queue should be empty + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + }) }) }) diff --git a/packages/cloud/src/retry-queue/types.ts b/packages/cloud/src/retry-queue/types.ts index 351ab71f08b1..e0a2be184f2f 100644 --- a/packages/cloud/src/retry-queue/types.ts +++ b/packages/cloud/src/retry-queue/types.ts @@ -7,6 +7,7 @@ export interface QueuedRequest { type: "api-call" | "telemetry" | "settings" | "other" operation?: string lastError?: string + nextRetryAfter?: number // Timestamp for when to retry next (for rate limiting) } export interface QueueStats { From f4242ca1128d240b40b34988fa40e3471212aa01 Mon Sep 17 00:00:00 2001 From: daniel-lxs Date: Thu, 11 Sep 2025 10:01:13 -0500 Subject: [PATCH 5/5] refactor: address PR feedback for telemetry retry queue - Remove unused X-Organization-Id header from auth header provider - Simplify enqueue() API by removing operation parameter - Fix error retry logic: only retry 5xx, 429, and network failures - Stop retrying 4xx client errors (400, 401, 403, 404, 422) - Implement queue-wide pause for 429 rate limiting - Add auth state management integration: - Pause queue when not in active-session - Clear queue on logout or user change - Preserve queue when same user logs back in - Remove debug comments - Fix ESLint no-case-declarations error with proper block scope - Update tests for all new behaviors --- packages/cloud/src/CloudService.ts | 66 ++++- packages/cloud/src/TelemetryClient.ts | 29 +- .../src/__tests__/TelemetryClient.test.ts | 3 - packages/cloud/src/retry-queue/RetryQueue.ts | 116 ++++++-- .../retry-queue/__tests__/RetryQueue.test.ts | 260 +++++++++++++++--- packages/cloud/src/retry-queue/types.ts | 1 - 6 files changed, 393 insertions(+), 82 deletions(-) diff --git a/packages/cloud/src/CloudService.ts b/packages/cloud/src/CloudService.ts index 9dc63ccd1d45..f2d47fbad238 100644 --- a/packages/cloud/src/CloudService.ts +++ b/packages/cloud/src/CloudService.ts @@ -89,6 +89,8 @@ export class CloudService extends EventEmitter implements Di this.log = log || console.log this.authStateListener = (data: AuthStateChangedPayload) => { + // Handle retry queue based on auth state changes + this.handleAuthStateChangeForRetryQueue(data) this.emit("auth-state-changed", data) } @@ -149,7 +151,6 @@ export class CloudService extends EventEmitter implements Di if (sessionToken) { return { Authorization: `Bearer ${sessionToken}`, - "X-Organization-Id": this._authService?.getStoredOrganizationId() || "", } } return undefined @@ -389,4 +390,67 @@ export class CloudService extends EventEmitter implements Di static isEnabled(): boolean { return !!this._instance?.isAuthenticated() } + + /** + * Handle auth state changes for the retry queue + * - Pause queue when not in 'active-session' state + * - Clear queue when user logs out or logs in as different user + * - Resume queue when returning to active-session with same user + */ + private handleAuthStateChangeForRetryQueue(data: AuthStateChangedPayload): void { + if (!this._retryQueue) { + return + } + + const newState = data.state + const userInfo = this.getUserInfo() + const newUserId = userInfo?.id + + this.log(`[CloudService] Auth state changed to: ${newState}, user: ${newUserId}`) + + // Handle different auth states + switch (newState) { + case "active-session": { + // Check if user changed (different user logged in) + const wasCleared = this._retryQueue.clearIfUserChanged(newUserId) + + if (!wasCleared) { + // Same user or first login, resume the queue + this._retryQueue.resume() + this.log("[CloudService] Resuming retry queue for active session") + } else { + // Different user, queue was cleared, but we can resume processing + this._retryQueue.resume() + this.log("[CloudService] Retry queue cleared for new user, resuming processing") + } + break + } + + case "logged-out": + // User is logged out, clear the queue + this._retryQueue.clearIfUserChanged(undefined) + this._retryQueue.pause() + this.log("[CloudService] Pausing and clearing retry queue for logged-out state") + break + + case "initializing": + case "attempting-session": + // Transitional states, pause the queue but don't clear + this._retryQueue.pause() + this.log(`[CloudService] Pausing retry queue during ${newState}`) + break + + case "inactive-session": + // Session is inactive (possibly expired), pause but don't clear + // The queue might resume if the session becomes active again + this._retryQueue.pause() + this.log("[CloudService] Pausing retry queue for inactive session") + break + + default: + // Unknown state, pause as a safety measure + this._retryQueue.pause() + this.log(`[CloudService] Pausing retry queue for unknown state: ${newState}`) + } + } } diff --git a/packages/cloud/src/TelemetryClient.ts b/packages/cloud/src/TelemetryClient.ts index b3b01d2510ed..dcf9613a0640 100644 --- a/packages/cloud/src/TelemetryClient.ts +++ b/packages/cloud/src/TelemetryClient.ts @@ -126,21 +126,10 @@ export class CloudTelemetryClient extends BaseTelemetryClient { `[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`, ) - // Queue for retry on server errors (5xx), rate limiting (429), or auth errors (401/403) - if ( - this.retryQueue && - allowQueueing && - (response.status >= 500 || - response.status === 429 || - response.status === 401 || - response.status === 403) - ) { - await this.retryQueue.enqueue( - url, - fetchOptions, - "telemetry", - `Telemetry: ${options.method} /api/${path}`, - ) + // Queue for retry on server errors (5xx) or rate limiting (429) + // Do NOT retry on client errors (4xx) except 429 - they won't succeed + if (this.retryQueue && allowQueueing && (response.status >= 500 || response.status === 429)) { + await this.retryQueue.enqueue(url, fetchOptions, "telemetry") } } @@ -148,19 +137,15 @@ export class CloudTelemetryClient extends BaseTelemetryClient { } catch (error) { console.error(`[TelemetryClient#fetch] Network error for ${options.method} ${path}: ${error}`) - // Queue for retry if we have a retry queue and it's a network error + // Queue for retry on network failures (typically TypeError with "fetch failed" message) + // These are transient network issues that may succeed on retry if ( this.retryQueue && allowQueueing && error instanceof TypeError && error.message.includes("fetch failed") ) { - await this.retryQueue.enqueue( - url, - fetchOptions, - "telemetry", - `Telemetry: ${options.method} /api/${path}`, - ) + await this.retryQueue.enqueue(url, fetchOptions, "telemetry") } throw error diff --git a/packages/cloud/src/__tests__/TelemetryClient.test.ts b/packages/cloud/src/__tests__/TelemetryClient.test.ts index 33c230f97c52..cbfd48de6a90 100644 --- a/packages/cloud/src/__tests__/TelemetryClient.test.ts +++ b/packages/cloud/src/__tests__/TelemetryClient.test.ts @@ -684,9 +684,6 @@ describe("TelemetryClient", () => { ) }) - // Debug logging has been removed in the new implementation - // This test is no longer applicable - it("should handle empty messages array", async () => { const client = new TelemetryClient(mockAuthService, mockSettingsService) diff --git a/packages/cloud/src/retry-queue/RetryQueue.ts b/packages/cloud/src/retry-queue/RetryQueue.ts index 4544420d7995..17362ad41d35 100644 --- a/packages/cloud/src/retry-queue/RetryQueue.ts +++ b/packages/cloud/src/retry-queue/RetryQueue.ts @@ -13,6 +13,10 @@ export class RetryQueue extends EventEmitter { private retryTimer?: NodeJS.Timeout private readonly STORAGE_KEY = "roo.retryQueue" private authHeaderProvider?: AuthHeaderProvider + private queuePausedUntil?: number // Timestamp when the queue can resume processing + private isPaused = false // Manual pause state (e.g., for auth state changes) + private currentUserId?: string // Track current user ID for conditional clearing + private hasHadUser = false // Track if we've ever had a user (to distinguish from first login) constructor( context: ExtensionContext, @@ -93,7 +97,7 @@ export class RetryQueue extends EventEmitter { await this.persistQueue() this.emit("request-queued", request) - this.log(`[RetryQueue] Queued request: ${operation || url}`) + this.log(`[RetryQueue] Queued request: ${url}`) } public async retryAll(): Promise { @@ -102,6 +106,18 @@ export class RetryQueue extends EventEmitter { return } + // Check if the queue is manually paused (e.g., due to auth state) + if (this.isPaused) { + this.log("[RetryQueue] Queue is manually paused") + return + } + + // Check if the entire queue is paused due to rate limiting + if (this.queuePausedUntil && Date.now() < this.queuePausedUntil) { + this.log(`[RetryQueue] Queue is paused until ${new Date(this.queuePausedUntil).toISOString()}`) + return + } + const requests = Array.from(this.queue.values()) if (requests.length === 0) { return @@ -115,18 +131,10 @@ export class RetryQueue extends EventEmitter { // Process all requests in FIFO order for (const request of requests) { - // Skip if request should not be retried yet (rate limiting) - if (request.nextRetryAfter && Date.now() < request.nextRetryAfter) { - this.log( - `[RetryQueue] Skipping rate-limited request until ${new Date(request.nextRetryAfter).toISOString()}`, - ) - continue - } - try { const response = await this.retryRequest(request) - // Check if we got a Retry-After header for rate limiting + // Check if we got a 429 rate limiting response if (response && response.status === 429) { const retryAfter = response.headers.get("Retry-After") if (retryAfter) { @@ -144,10 +152,13 @@ export class RetryQueue extends EventEmitter { delayMs = 60000 // Default to 1 minute if we can't parse } } - request.nextRetryAfter = Date.now() + delayMs - this.log(`[RetryQueue] Rate limited, will retry after ${delayMs}ms`) + // Pause the entire queue + this.queuePausedUntil = Date.now() + delayMs + this.log(`[RetryQueue] Rate limited, pausing entire queue for ${delayMs}ms`) + // Keep the request in the queue for later retry this.queue.set(request.id, request) - continue + // Stop processing further requests since the queue is paused + break } } @@ -160,7 +171,7 @@ export class RetryQueue extends EventEmitter { // Check if we've exceeded max retries if (this.config.maxRetries > 0 && request.retryCount >= this.config.maxRetries) { this.log( - `[RetryQueue] Max retries (${this.config.maxRetries}) reached for request: ${request.operation || request.url}`, + `[RetryQueue] Max retries (${this.config.maxRetries}) reached for request: ${request.url}`, ) this.queue.delete(request.id) this.emit("request-max-retries-exceeded", request, error as Error) @@ -182,7 +193,7 @@ export class RetryQueue extends EventEmitter { } private async retryRequest(request: QueuedRequest): Promise { - this.log(`[RetryQueue] Retrying request: ${request.operation || request.url}`) + this.log(`[RetryQueue] Retrying request: ${request.url}`) let headers = { ...request.options.headers } if (this.authHeaderProvider) { @@ -214,17 +225,15 @@ export class RetryQueue extends EventEmitter { if (!response.ok) { // Handle different status codes appropriately if (response.status >= 500) { - // Server errors should be retried + // Server errors (5xx) should be retried throw new Error(`Server error: ${response.status} ${response.statusText}`) } else if (response.status === 429) { // Rate limiting - return response to let caller handle Retry-After return response - } else if (response.status === 401 || response.status === 403) { - // Auth errors - retry with fresh auth headers from provider - throw new Error(`Auth error: ${response.status}`) } else if (response.status >= 400 && response.status < 500) { - // Other client errors (400, 404, etc.) should not be retried - this.log(`[RetryQueue] Non-retryable status ${response.status}, removing from queue`) + // Client errors (4xx including 401/403) should NOT be retried + // These errors indicate problems with the request itself that won't be fixed by retrying + this.log(`[RetryQueue] Non-retryable client error ${response.status}, removing from queue`) return response } } @@ -288,6 +297,71 @@ export class RetryQueue extends EventEmitter { this.emit("queue-cleared") } + /** + * Pause the retry queue. When paused, no retries will be processed. + * This is useful when auth state is not active or during logout. + */ + public pause(): void { + this.isPaused = true + this.log("[RetryQueue] Queue paused") + } + + /** + * Resume the retry queue. Retries will be processed again on the next interval. + */ + public resume(): void { + this.isPaused = false + this.log("[RetryQueue] Queue resumed") + } + + /** + * Check if the queue is paused + */ + public isPausedState(): boolean { + return this.isPaused + } + + /** + * Set the current user ID for tracking user changes + */ + public setCurrentUserId(userId: string | undefined): void { + this.currentUserId = userId + } + + /** + * Get the current user ID + */ + public getCurrentUserId(): string | undefined { + return this.currentUserId + } + + /** + * Conditionally clear the queue based on user ID change. + * If newUserId is different from currentUserId, clear the queue. + * Returns true if queue was cleared, false otherwise. + */ + public clearIfUserChanged(newUserId: string | undefined): boolean { + // First time ever setting a user (initial login) + if (!this.hasHadUser && newUserId !== undefined) { + this.currentUserId = newUserId + this.hasHadUser = true + return false + } + + // If user IDs are different (including logout case where newUserId is undefined) + if (this.currentUserId !== newUserId) { + this.log(`[RetryQueue] User changed from ${this.currentUserId} to ${newUserId}, clearing queue`) + this.clear() + this.currentUserId = newUserId + if (newUserId !== undefined) { + this.hasHadUser = true + } + return true + } + + return false + } + public dispose(): void { if (this.retryTimer) { clearInterval(this.retryTimer) diff --git a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts index 2364cf434247..becee719c153 100644 --- a/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts +++ b/packages/cloud/src/retry-queue/__tests__/RetryQueue.test.ts @@ -35,7 +35,7 @@ describe("RetryQueue", () => { const url = "https://api.example.com/test" const options = { method: "POST", body: JSON.stringify({ test: "data" }) } - await retryQueue.enqueue(url, options, "telemetry", "test operation") + await retryQueue.enqueue(url, options, "telemetry") const stats = retryQueue.getStats() expect(stats.totalQueued).toBe(1) @@ -48,12 +48,7 @@ describe("RetryQueue", () => { // Add 4 requests for (let i = 1; i <= 4; i++) { - await retryQueue.enqueue( - `https://api.example.com/test${i}`, - { method: "POST" }, - "telemetry", - `operation ${i}`, - ) + await retryQueue.enqueue(`https://api.example.com/test${i}`, { method: "POST" }, "telemetry") } const stats = retryQueue.getStats() @@ -165,6 +160,143 @@ describe("RetryQueue", () => { }) }) + describe("auth state management", () => { + it("should pause and resume the queue", () => { + expect(retryQueue.isPausedState()).toBe(false) + + retryQueue.pause() + expect(retryQueue.isPausedState()).toBe(true) + + retryQueue.resume() + expect(retryQueue.isPausedState()).toBe(false) + }) + + it("should not process retries when paused", async () => { + const fetchMock = vi.fn().mockResolvedValue({ ok: true }) + global.fetch = fetchMock + + await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + + // Pause the queue + retryQueue.pause() + + // Try to retry all + await retryQueue.retryAll() + + // Fetch should not be called because queue is paused + expect(fetchMock).not.toHaveBeenCalled() + + // Resume and retry + retryQueue.resume() + await retryQueue.retryAll() + + // Now fetch should be called + expect(fetchMock).toHaveBeenCalledTimes(1) + }) + + it("should track and update current user ID", () => { + expect(retryQueue.getCurrentUserId()).toBeUndefined() + + retryQueue.setCurrentUserId("user_123") + expect(retryQueue.getCurrentUserId()).toBe("user_123") + + retryQueue.setCurrentUserId("user_456") + expect(retryQueue.getCurrentUserId()).toBe("user_456") + + retryQueue.setCurrentUserId(undefined) + expect(retryQueue.getCurrentUserId()).toBeUndefined() + }) + + it("should clear queue when user changes", async () => { + // Add some requests + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + + // Set initial user + retryQueue.setCurrentUserId("user_123") + + // Same user login - should not clear + let wasCleared = retryQueue.clearIfUserChanged("user_123") + expect(wasCleared).toBe(false) + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + + // Different user login - should clear + wasCleared = retryQueue.clearIfUserChanged("user_456") + expect(wasCleared).toBe(true) + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + expect(retryQueue.getCurrentUserId()).toBe("user_456") + }) + + it("should clear queue on logout (undefined user)", async () => { + // Set initial user + retryQueue.setCurrentUserId("user_123") + + // Add some requests + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + + // Logout (undefined user) - should clear + const wasCleared = retryQueue.clearIfUserChanged(undefined) + expect(wasCleared).toBe(true) + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(0) + expect(retryQueue.getCurrentUserId()).toBeUndefined() + }) + + it("should not clear on first login (no previous user)", async () => { + // Add some requests before any user is set + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + + let stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + + // First login - should not clear + const wasCleared = retryQueue.clearIfUserChanged("user_123") + expect(wasCleared).toBe(false) + stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(2) + expect(retryQueue.getCurrentUserId()).toBe("user_123") + }) + + it("should handle multiple user transitions correctly", async () => { + const clearListener = vi.fn() + retryQueue.on("queue-cleared", clearListener) + + // First user logs in + retryQueue.clearIfUserChanged("user_123") + await retryQueue.enqueue("https://api.example.com/user1-req", { method: "POST" }, "telemetry") + + // User logs out + const clearedOnLogout = retryQueue.clearIfUserChanged(undefined) + expect(clearedOnLogout).toBe(true) + expect(clearListener).toHaveBeenCalledTimes(1) + + // Different user logs in + await retryQueue.enqueue("https://api.example.com/user2-req", { method: "POST" }, "telemetry") + const clearedOnNewUser = retryQueue.clearIfUserChanged("user_456") + expect(clearedOnNewUser).toBe(true) + expect(clearListener).toHaveBeenCalledTimes(2) + + // Same user logs back in + await retryQueue.enqueue("https://api.example.com/user2-req2", { method: "POST" }, "telemetry") + const notCleared = retryQueue.clearIfUserChanged("user_456") + expect(notCleared).toBe(false) + expect(clearListener).toHaveBeenCalledTimes(2) // Still 2, not cleared + + const stats = retryQueue.getStats() + expect(stats.totalQueued).toBe(1) // Only the last request remains + }) + }) + describe("retryAll", () => { let fetchMock: ReturnType @@ -395,10 +527,13 @@ describe("RetryQueue", () => { expect(stats.totalQueued).toBe(0) }) - it("should handle 429 rate limiting with Retry-After header", async () => { - await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + it("should pause entire queue on 429 rate limiting with Retry-After header", async () => { + // Add multiple requests to test queue-wide pause + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test3", { method: "POST" }, "telemetry") - // Mock 429 response with Retry-After header (in seconds) + // Mock 429 response with Retry-After header (in seconds) for the first request const retryAfterResponse = { ok: false, status: 429, @@ -414,21 +549,64 @@ describe("RetryQueue", () => { await retryQueue.retryAll() - // Request should still be in queue with nextRetryAfter set + // All requests should still be in queue const stats = retryQueue.getStats() - expect(stats.totalQueued).toBe(1) + expect(stats.totalQueued).toBe(3) + + // Only the first request should have been attempted + expect(fetchMock).toHaveBeenCalledTimes(1) + expect(fetchMock).toHaveBeenCalledWith("https://api.example.com/test1", expect.any(Object)) - // Try to retry immediately - should be skipped due to rate limiting + // Try to retry immediately - should be skipped due to queue-wide rate limiting fetchMock.mockClear() await retryQueue.retryAll() - // Fetch should not be called because request is rate-limited + // No fetch calls should be made because the entire queue is paused expect(fetchMock).not.toHaveBeenCalled() }) - it("should retry on 401/403 auth errors", async () => { - const failListener = vi.fn() - retryQueue.on("request-retry-failed", failListener) + it("should process all requests after rate limit period expires", async () => { + // Add multiple requests + await retryQueue.enqueue("https://api.example.com/test1", { method: "POST" }, "telemetry") + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + + // Mock 429 response with very short Retry-After (for testing) + const retryAfterResponse = { + ok: false, + status: 429, + headers: { + get: vi.fn((header: string) => { + if (header === "Retry-After") return "0" // 0 seconds (immediate) + return null + }), + }, + } + + fetchMock.mockResolvedValueOnce(retryAfterResponse) + + await retryQueue.retryAll() + + // Queue should be paused but requests still in queue + expect(retryQueue.getStats().totalQueued).toBe(2) + + // Wait a tiny bit for the rate limit to "expire" + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Mock successful responses for both requests + fetchMock.mockResolvedValue({ ok: true }) + + // Now retry should process all requests + await retryQueue.retryAll() + + // All requests should be processed and removed from queue + expect(retryQueue.getStats().totalQueued).toBe(0) + // First request will be retried plus the second one + expect(fetchMock).toHaveBeenCalledTimes(3) // 1 (429) + 2 (success) + }) + + it("should not retry on 401/403 auth errors", async () => { + const successListener = vi.fn() + retryQueue.on("request-retry-success", successListener) await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") @@ -437,33 +615,47 @@ describe("RetryQueue", () => { await retryQueue.retryAll() - // Should fail and remain in queue for retry - expect(failListener).toHaveBeenCalledWith( - expect.objectContaining({ - url: "https://api.example.com/test", - retryCount: 1, - lastError: "Auth error: 401", - }), - expect.any(Error), - ) - + // Should be removed from queue without retry (401 is a client error) + expect(successListener).toHaveBeenCalled() const stats = retryQueue.getStats() - expect(stats.totalQueued).toBe(1) + expect(stats.totalQueued).toBe(0) + + // Test 403 as well + await retryQueue.enqueue("https://api.example.com/test2", { method: "POST" }, "telemetry") + fetchMock.mockResolvedValueOnce({ ok: false, status: 403, statusText: "Forbidden" }) + + await retryQueue.retryAll() + + // Should also be removed from queue without retry + expect(successListener).toHaveBeenCalledTimes(2) + const stats2 = retryQueue.getStats() + expect(stats2.totalQueued).toBe(0) }) - it("should not retry on 400/404 client errors", async () => { + it("should not retry on 400/404/422 client errors", async () => { const successListener = vi.fn() retryQueue.on("request-retry-success", successListener) - await retryQueue.enqueue("https://api.example.com/test", { method: "POST" }, "telemetry") + // Test various 4xx errors that should not be retried + const clientErrors = [ + { status: 400, statusText: "Bad Request" }, + { status: 404, statusText: "Not Found" }, + { status: 422, statusText: "Unprocessable Entity" }, + ] - // Mock 404 error - fetchMock.mockResolvedValueOnce({ ok: false, status: 404, statusText: "Not Found" }) + for (const error of clientErrors) { + await retryQueue.enqueue( + `https://api.example.com/test-${error.status}`, + { method: "POST" }, + "telemetry", + ) + fetchMock.mockResolvedValueOnce({ ok: false, ...error }) + } await retryQueue.retryAll() - // Should be removed from queue without retry - expect(successListener).toHaveBeenCalled() + // All requests should be removed from queue without retry + expect(successListener).toHaveBeenCalledTimes(3) const stats = retryQueue.getStats() expect(stats.totalQueued).toBe(0) }) diff --git a/packages/cloud/src/retry-queue/types.ts b/packages/cloud/src/retry-queue/types.ts index e0a2be184f2f..351ab71f08b1 100644 --- a/packages/cloud/src/retry-queue/types.ts +++ b/packages/cloud/src/retry-queue/types.ts @@ -7,7 +7,6 @@ export interface QueuedRequest { type: "api-call" | "telemetry" | "settings" | "other" operation?: string lastError?: string - nextRetryAfter?: number // Timestamp for when to retry next (for rate limiting) } export interface QueueStats {