Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion packages/cloud/src/CloudService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { StaticSettingsService } from "./StaticSettingsService.js"
import { CloudTelemetryClient as TelemetryClient } from "./TelemetryClient.js"
import { CloudShareService } from "./CloudShareService.js"
import { CloudAPI } from "./CloudAPI.js"
import { RetryQueue } from "./retry-queue/index.js"

type AuthStateChangedPayload = CloudServiceEvents["auth-state-changed"][0]
type AuthUserInfoPayload = CloudServiceEvents["user-info"][0]
Expand Down Expand Up @@ -75,13 +76,21 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements Di
return this._cloudAPI
}

private _retryQueue: RetryQueue | null = null

public get retryQueue() {
return this._retryQueue
}

private constructor(context: ExtensionContext, log?: (...args: unknown[]) => void) {
super()

this.context = context
this.log = log || console.log

this.authStateListener = (data: AuthStateChangedPayload) => {
// Handle retry queue based on auth state changes
this.handleAuthStateChangeForRetryQueue(data)
this.emit("auth-state-changed", data)
}

Expand Down Expand Up @@ -131,7 +140,24 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements Di

this._cloudAPI = new CloudAPI(this._authService, this.log)

this._telemetryClient = new TelemetryClient(this._authService, this._settingsService)
// Initialize retry queue with auth header provider
this._retryQueue = new RetryQueue(
this.context,
undefined, // Use default config
this.log,
() => {
// Provide fresh auth headers for retries
const sessionToken = this._authService?.getSessionToken()
if (sessionToken) {
return {
Authorization: `Bearer ${sessionToken}`,
}
}
return undefined
},
)

this._telemetryClient = new TelemetryClient(this._authService, this._settingsService, this._retryQueue)

this._shareService = new CloudShareService(this._cloudAPI, this._settingsService, this.log)

Expand Down Expand Up @@ -298,6 +324,10 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements Di
this.settingsService.dispose()
}

if (this._retryQueue) {
this._retryQueue.dispose()
}

this.isInitialized = false
}

Expand Down Expand Up @@ -360,4 +390,67 @@ export class CloudService extends EventEmitter<CloudServiceEvents> implements Di
static isEnabled(): boolean {
return !!this._instance?.isAuthenticated()
}

/**
* Handle auth state changes for the retry queue
* - Pause queue when not in 'active-session' state
* - Clear queue when user logs out or logs in as different user
* - Resume queue when returning to active-session with same user
*/
private handleAuthStateChangeForRetryQueue(data: AuthStateChangedPayload): void {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleAuthStateChangeForRetryQueue cleanly manages pausing, clearing, and resuming the retry queue on different auth states. Consider refactoring the duplicate resume() calls (lines 417–424) for clarity.

if (!this._retryQueue) {
return
}

const newState = data.state
const userInfo = this.getUserInfo()
const newUserId = userInfo?.id

this.log(`[CloudService] Auth state changed to: ${newState}, user: ${newUserId}`)

// Handle different auth states
switch (newState) {
case "active-session": {
// Check if user changed (different user logged in)
const wasCleared = this._retryQueue.clearIfUserChanged(newUserId)

if (!wasCleared) {
// Same user or first login, resume the queue
this._retryQueue.resume()
this.log("[CloudService] Resuming retry queue for active session")
} else {
// Different user, queue was cleared, but we can resume processing
this._retryQueue.resume()
this.log("[CloudService] Retry queue cleared for new user, resuming processing")
}
break
}

case "logged-out":
// User is logged out, clear the queue
this._retryQueue.clearIfUserChanged(undefined)
this._retryQueue.pause()
this.log("[CloudService] Pausing and clearing retry queue for logged-out state")
break

case "initializing":
case "attempting-session":
// Transitional states, pause the queue but don't clear
this._retryQueue.pause()
this.log(`[CloudService] Pausing retry queue during ${newState}`)
break

case "inactive-session":
// Session is inactive (possibly expired), pause but don't clear
// The queue might resume if the session becomes active again
this._retryQueue.pause()
this.log("[CloudService] Pausing retry queue for inactive session")
break

default:
// Unknown state, pause as a safety measure
this._retryQueue.pause()
this.log(`[CloudService] Pausing retry queue for unknown state: ${newState}`)
}
}
}
84 changes: 59 additions & 25 deletions packages/cloud/src/TelemetryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "@roo-code/types"

import { getRooCodeApiUrl } from "./config.js"
import type { RetryQueue } from "./retry-queue/index.js"

abstract class BaseTelemetryClient implements TelemetryClient {
protected providerRef: WeakRef<TelemetryPropertiesProvider> | null = null
Expand Down Expand Up @@ -82,21 +83,21 @@ abstract class BaseTelemetryClient implements TelemetryClient {
}

export class CloudTelemetryClient extends BaseTelemetryClient {
private retryQueue: RetryQueue | null = null

constructor(
private authService: AuthService,
private settingsService: SettingsService,
debug = false,
retryQueue?: RetryQueue,
) {
super(
{
type: "exclude",
events: [TelemetryEventName.TASK_CONVERSATION_MESSAGE],
},
debug,
)
super({
type: "exclude",
events: [TelemetryEventName.TASK_CONVERSATION_MESSAGE],
})
this.retryQueue = retryQueue || null
}

private async fetch(path: string, options: RequestInit) {
private async fetch(path: string, options: RequestInit, allowQueueing = true) {
if (!this.authService.isAuthenticated()) {
return
}
Expand All @@ -108,18 +109,46 @@ export class CloudTelemetryClient extends BaseTelemetryClient {
return
}

const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, {
const url = `${getRooCodeApiUrl()}/api/${path}`
const fetchOptions: RequestInit = {
...options,
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json",
},
})
}

if (!response.ok) {
console.error(
`[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`,
)
try {
const response = await fetch(url, fetchOptions)

if (!response.ok) {
console.error(
`[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`,
)

// Queue for retry on server errors (5xx) or rate limiting (429)
// Do NOT retry on client errors (4xx) except 429 - they won't succeed
if (this.retryQueue && allowQueueing && (response.status >= 500 || response.status === 429)) {
await this.retryQueue.enqueue(url, fetchOptions, "telemetry")
}
}

return response
} catch (error) {
console.error(`[TelemetryClient#fetch] Network error for ${options.method} ${path}: ${error}`)

// Queue for retry on network failures (typically TypeError with "fetch failed" message)
// These are transient network issues that may succeed on retry
if (
this.retryQueue &&
allowQueueing &&
error instanceof TypeError &&
error.message.includes("fetch failed")
) {
await this.retryQueue.enqueue(url, fetchOptions, "telemetry")
}

throw error
}
}

Expand Down Expand Up @@ -158,6 +187,7 @@ export class CloudTelemetryClient extends BaseTelemetryClient {
})
} catch (error) {
console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`)
// Error is already queued for retry in the fetch method
}
}

Expand Down Expand Up @@ -199,22 +229,26 @@ export class CloudTelemetryClient extends BaseTelemetryClient {
)
}

// Custom fetch for multipart - don't set Content-Type header (let browser set it)
const response = await fetch(`${getRooCodeApiUrl()}/api/events/backfill`, {
const url = `${getRooCodeApiUrl()}/api/events/backfill`
const fetchOptions: RequestInit = {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
// Note: No Content-Type header - browser will set multipart/form-data with boundary
},
body: formData,
})
}

if (!response.ok) {
console.error(
`[TelemetryClient#backfillMessages] POST events/backfill -> ${response.status} ${response.statusText}`,
)
} else if (this.debug) {
console.info(`[TelemetryClient#backfillMessages] Successfully uploaded messages for task ${taskId}`)
try {
const response = await fetch(url, fetchOptions)

if (!response.ok) {
console.error(
`[TelemetryClient#backfillMessages] POST events/backfill -> ${response.status} ${response.statusText}`,
)
}
} catch (fetchError) {
console.error(`[TelemetryClient#backfillMessages] Network error: ${fetchError}`)
throw fetchError
}
} catch (error) {
console.error(`[TelemetryClient#backfillMessages] Error uploading messages: ${error}`)
Expand Down
22 changes: 0 additions & 22 deletions packages/cloud/src/__tests__/TelemetryClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,28 +684,6 @@ describe("TelemetryClient", () => {
)
})

it("should log debug information when debug is enabled", async () => {
const client = new TelemetryClient(mockAuthService, mockSettingsService, true)

const messages = [
{
ts: 1,
type: "say" as const,
say: "text" as const,
text: "test message",
},
]

await client.backfillMessages(messages, "test-task-id")

expect(console.info).toHaveBeenCalledWith(
"[TelemetryClient#backfillMessages] Uploading 1 messages for task test-task-id",
)
expect(console.info).toHaveBeenCalledWith(
"[TelemetryClient#backfillMessages] Successfully uploaded messages for task test-task-id",
)
})

it("should handle empty messages array", async () => {
const client = new TelemetryClient(mockAuthService, mockSettingsService)

Expand Down
3 changes: 3 additions & 0 deletions packages/cloud/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ export * from "./config.js"
export { CloudService } from "./CloudService.js"

export { BridgeOrchestrator } from "./bridge/index.js"

export { RetryQueue } from "./retry-queue/index.js"
export type { QueuedRequest, QueueStats, RetryQueueConfig, RetryQueueEvents } from "./retry-queue/index.js"
Loading
Loading