-
Couldn't load subscription status.
- Fork 0
Staging #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Staging #33
Conversation
WalkthroughThis PR adds Firebase Firestore integration and a FirebaseClient; wires it through server initialization into the REST DeepResearchHandler and deepr Service; introduces a file-backed Storage subsystem and a SessionManager for per-session WebSocket/backend management; implements reconnection logic that replays unsent messages and evaluates session completion; extends message models with final_report and error fields; updates config defaults and Envoy/enclaver egress settings; and adds two Deep Research documentation pages. Sequence Diagram(s)sequenceDiagram
autonumber
participant Main as Server main
participant FB as FirebaseClient
participant REST as REST server
participant Handler as DeepResearchHandler
rect rgb(230,245,230)
Main->>FB: NewFirebaseClient(ctx, projectID, credJSON)
FB-->>Main: firebaseClient
Main->>REST: NewRestServer(..., firebaseClient)
REST->>Handler: DeepResearchHandler(..., firebaseClient)
end
sequenceDiagram
autonumber
actor Client as iOS Client
participant WS as REST WS Handler
participant Svc as DeepR Service
participant SM as SessionManager
participant ST as Storage
participant BE as DeepR Backend
participant FB as Firebase
rect rgba(220,235,245,0.5)
Client->>WS: WebSocket connect (userID, chatID, JWT)
WS->>Svc: HandleConnection(userID, chatID)
Svc->>SM: HasActiveBackend(userID, chatID)?
alt Reconnection (backend active)
Svc->>ST: GetUnsentMessages / IsSessionComplete
ST-->>Svc: unsent messages, complete?
Svc->>SM: AddClientConnection(...)
loop Replay unsent
Svc->>SM: BroadcastToClients(message)
Svc->>ST: MarkMessageAsSent(messageID)
end
else New session (no backend)
Svc->>FB: checkAndTrackSubscription(userID)
FB-->>Svc: ok / exhausted
Svc->>BE: Dial backend WS
BE-->>Svc: connected
Svc->>SM: CreateSession(...)
Svc->>ST: UpdateBackendConnectionStatus(true)
par Backend -> Clients
BE-->>Svc: message
Svc->>ST: AddMessage(message, sent=false, type)
Svc->>SM: BroadcastToClients(message)
Svc->>ST: MarkMessageAsSent(lastMessageID)
and Client -> Backend
Client-->>WS: client message
WS->>Svc: forward
Svc->>SM: WriteToBackend(...) / BE
end
end
end
sequenceDiagram
autonumber
participant Svc as DeepR Service
participant FB as Firebase
note over Svc,FB: Subscription enforcement and tracking
Svc->>FB: HasUsedFreeDeepResearch(userID)?
alt Freemium unused
FB-->>Svc: false
Svc->>FB: MarkFreeDeepResearchUsed(userID)
else Freemium used
FB-->>Svc: true
Svc-->>Client: error (freemium exhausted)
end
alt Pro user
Svc->>FB: IncrementDeepResearchUsage(userID)
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to data retention organization setting Knowledge base: Disabled due to data retention organization setting Disabled knowledge base sources:
📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (8)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (2)
docs/deep-research.md (1)
16-52: Add language hints to the fenced blocks.markdownlint (MD040) is failing because these code fences have no language. Tag them (e.g., ```text) so the docs pass lint.
docs/deep-research-reconnection.md (1)
19-41: Add explicit fence languages for linter compliance.markdownlint is flagging the unlabeled code fences (architecture diagrams, flows, logs). Add an explicit language such as
textto silence MD040 and improve highlighting. As per static analysis hints.Also applies to: 214-245
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
cmd/server/main.go(4 hunks)docs/deep-research-reconnection.md(1 hunks)docs/deep-research.md(1 hunks)internal/auth/firebase_client.go(1 hunks)internal/config/config.go(2 hunks)internal/deepr/handlers.go(2 hunks)internal/deepr/models.go(2 hunks)internal/deepr/service.go(2 hunks)internal/deepr/session_manager.go(1 hunks)internal/deepr/storage.go(1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
docs/deep-research-reconnection.md
19-19: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
27-27: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
34-34: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
216-216: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
239-239: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
docs/deep-research.md
19-19: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
27-27: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
34-34: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/deepr/service.go (1)
183-209: Synchronize backend writes from multiple clients.The reconnected client's reader goroutine (starting line 183) and any other client's
handleClientMessagesgoroutine can both write tosession.BackendConnconcurrently (lines 202 and 236 respectively). Gorilla websocket requires that at most one goroutine writes to a connection at a time. Concurrent writes will triggerwebsocket: concurrent writeand drop the connection.Based on learnings: gorilla/websocket requires one concurrent writer per connection. Serialize all writes to
session.BackendConnby either:
- Option A (recommended): Add a
sync.Mutexto the Session struct insession_manager.goto guard backend writes, and acquire it before everysession.BackendConn.WriteMessage()call across the codebase.- Option B: Route all client→backend messages through a single goroutine via a channel, similar to how backend→client broadcasts are centralized.
Apply Option A if Session struct can be modified:
In
internal/deepr/session_manager.go, add a mutex to Session:type Session struct { // ... existing fields ... backendWriteMu sync.Mutex }Then, in both
handleReconnection(line 202) andhandleClientMessages(line 236), wrap backend writes:if session, exists := s.sessionManager.GetSession(userID, chatID); exists && session.BackendConn != nil { + session.backendWriteMu.Lock() if err := session.BackendConn.WriteMessage(websocket.TextMessage, message); err != nil { + session.backendWriteMu.Unlock() log.Error("error forwarding message to backend", slog.String("error", err.Error())) return } + session.backendWriteMu.Unlock() }
🧹 Nitpick comments (2)
internal/deepr/service.go (1)
329-342: Message sent flag may be inconsistent if storage fails.The broadcast is attempted first (line 331), and the message is stored with
messageSent = trueonly if broadcast succeeded and clients exist (line 332). IfAddMessagefails (line 335), the message was broadcast but not persisted, leaving no record. On reconnection, the client won't receive this message. This is likely acceptable for transient status updates, but consider logging a warning if storage consistently fails to help diagnose issues.Optionally, add a warning if storage fails repeatedly:
if err := s.storage.AddMessage(userID, chatID, string(message), messageSent, messageType); err != nil { - log.Error("failed to store message", slog.String("error", err.Error())) + log.Warn("failed to store message; reconnecting clients may miss this message", + slog.String("error", err.Error()), + slog.String("message_type", messageType)) }internal/deepr/storage.go (1)
248-287: Cleanup holds write lock for extended duration.
CleanupOldSessionsacquires the write lock at the start (line 250) and holds it while iterating all session files and deleting old ones. This blocks all session reads and writes during cleanup. For large session directories, this could cause noticeable latency.Since this is likely a low-frequency background task and the file I/O should be fast, this is acceptable. If cleanup latency becomes an issue, consider:
- Collecting file paths to delete under read lock, then acquiring write lock only for deletions.
- Using a separate lock for cleanup operations.
- Running cleanup during low-traffic periods.
For now, no changes needed unless profiling shows issues.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
internal/deepr/service.go(2 hunks)internal/deepr/storage.go(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
🔇 Additional comments (7)
internal/deepr/service.go (5)
29-49: LGTM! Service initialization properly wired.The constructor correctly initializes the storage subsystem with environment-based path fallback, handles initialization errors gracefully by logging and continuing with in-memory-only mode, and creates the session manager. The Firebase client integration is clean.
52-83: LGTM! Reconnection detection and routing implemented correctly.The connection handler properly generates a unique client ID, differentiates between reconnections and new connections using
HasActiveBackend, and routes accordingly. The subscription check is correctly placed only in the new connection path.
114-118: LGTM! Freemium quota enforcement now returns a proper error.This addresses the critical issue from the previous review. The function now returns
fmt.Errorfwhen the freemium quota is exhausted, preventing the caller from proceeding with the connection.
142-176: LGTM! Replay-before-registration pattern prevents concurrent writes.This addresses the major concurrency issue from the previous review. By replaying all unsent messages before calling
AddClientConnection(line 176), you ensure that the backend broadcast goroutine cannot write to this client during replay, eliminating thewebsocket: concurrent writerace condition.
286-292: LGTM! Session lifecycle now independent of individual clients.This addresses the critical issue from the previous review. By creating the session context with
context.Background()(line 287) instead of the client's context, the backend session remains alive even when the original client disconnects. The deferredRemoveSession(line 292) only executes when the backend reader exits, not when a client disconnects. Reconnecting clients can now successfully rejoin and replay messages.internal/deepr/storage.go (2)
129-144: LGTM! Atomic read-modify-write pattern prevents lost updates.The
modifySessionhelper correctly addresses the critical race condition from the previous review. By holding the write lock (line 131) for the entire load→mutate→save cycle, you prevent two goroutines from loading the same snapshot and overwriting each other's changes. All mutation methods (AddMessage,MarkMessageAsSent,MarkAllMessagesAsSent,UpdateBackendConnectionStatus) now use this pattern.
146-174: LGTM! Message persistence correctly detects completion.
AddMessageproperly uses the atomicmodifySessionpattern and correctly parses the message JSON to setFinalReportReceivedandErrorOccurredflags (lines 162-170). The UUID generation for message IDs is appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/deepr/service.go(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (3)
internal/deepr/service.go (2)
248-252: Validate or document DEEP_RESEARCH_WS format.The backend host is read directly from the environment variable without validation (line 248). If the value contains unexpected characters or an invalid format, the URL construction will fail silently or create incorrect URLs.
Consider adding validation:
deepResearchHost := os.Getenv("DEEP_RESEARCH_WS") if deepResearchHost == "" { deepResearchHost = "165.232.133.47:3031" log.Info("DEEP_RESEARCH_WS environment variable not set, using default", slog.String("default", deepResearchHost)) + } else { + // Validate format: should be "host:port" + if _, _, err := net.SplitHostPort(deepResearchHost); err != nil { + log.Error("invalid DEEP_RESEARCH_WS format, using default", + slog.String("invalid", deepResearchHost), + slog.String("error", err.Error())) + deepResearchHost = "165.232.133.47:3031" + } }Remember to add
netto the imports.
31-34: Consider validating storage path.The storage path from the environment variable is used directly without validation. If the path points to an invalid location or the process lacks permissions, storage initialization will fail silently.
While the current fail-open design is acceptable, consider adding validation to help operators catch configuration issues:
storagePath := os.Getenv("DEEPR_STORAGE_PATH") if storagePath == "" { storagePath = filepath.Join(".", "deepr_sessions") + } else { + // Validate path is absolute or relative, and directory is accessible + if absPath, err := filepath.Abs(storagePath); err != nil { + logger.WithComponent("deepr").Warn("invalid storage path, using default", + slog.String("invalid", storagePath), + slog.String("error", err.Error())) + storagePath = filepath.Join(".", "deepr_sessions") + } else { + storagePath = absPath + } }internal/deepr/session_manager.go (1)
150-187: Consider early exit on no clients.The
BroadcastToClientsmethod iterates oversession.clientConnseven when the map is empty. While not incorrect, checking the client count before acquiring the read lock would be more efficient and avoid unnecessary logging for empty broadcasts.Apply this diff to add an early check:
func (sm *SessionManager) BroadcastToClients(userID, chatID string, message []byte) error { sm.mu.RLock() key := sm.getSessionKey(userID, chatID) session, exists := sm.sessions[key] sm.mu.RUnlock() if !exists { return nil // No active session, message will be stored as unsent } session.mu.RLock() + clientCount := len(session.clientConns) + if clientCount == 0 { + session.mu.RUnlock() + return nil // No clients connected + } defer session.mu.RUnlock()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
internal/deepr/service.go(2 hunks)internal/deepr/session_manager.go(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
🔇 Additional comments (6)
internal/deepr/service.go (3)
29-49: LGTM! Storage initialization follows fail-open pattern.The storage initialization properly handles failure by logging and continuing with in-memory operation. The nil checks throughout the codebase ensure graceful degradation when storage is unavailable.
85-130: Verify Firebase error handling consistency.The error handling differs between Firebase operations:
- Lines 101-103: Logs Firebase increment errors but continues
- Lines 108-112, 120-124: Returns on Firebase check/mark errors
For Pro users (lines 101-103), if Firebase tracking fails, the usage isn't recorded but the session proceeds. This could allow unbounded usage if Firebase is down. Consider whether tracking failures should also abort the connection for Pro users to maintain usage integrity.
Should Pro user tracking failures also return an error to maintain usage accounting integrity, or is the current fail-open behavior intentional for availability?
317-323: Remove outdated verification: Message struct is defined in internal/deepr/models.go (lines 6–12) with fields Type, FinalReport, and Error.Likely an incorrect or invalid review comment.
internal/deepr/session_manager.go (3)
12-22: LGTM! Proper synchronization primitives.The ActiveSession struct correctly uses separate mutexes:
muprotects theclientConnsmap from concurrent accessbackendWriteMuserializes writes to the backend websocket (addresses the concurrent write issue mentioned in past reviews)This design properly handles the multi-client, single-backend scenario.
86-110: LGTM! Proper cleanup order in RemoveSession.The session removal correctly:
- Closes all client connections first (lines 87-97)
- Cancels the session context (lines 99-102)
- Removes from the sessions map (line 104)
This order ensures that client connections are cleanly closed before the session context is cancelled, preventing potential race conditions.
214-235: LGTM! Proper backend write synchronization.The
WriteToBackendmethod correctly:
- Checks for session existence
- Acquires
backendWriteMuto serialize writes- Checks for nil backend connection
- Performs the write
This implementation addresses the concurrent write issue from past reviews and ensures only one goroutine writes to the backend websocket at a time.
| if msg.FinalReport != "" || msg.Type == "error" || msg.Error != "" { | ||
| log.Info("session complete, backend will be closed after final message broadcast") | ||
| // Continue to allow reconnecting clients to receive this final message | ||
| // The backend connection will close naturally or via timeout | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete session doesn't close backend connection.
When a final message is detected (line 342), the code logs but doesn't close the backend connection or cancel the session. The comment claims "backend will close naturally," but if it doesn't, this leaks the connection, session manager entry, and goroutines.
Apply this diff to explicitly close the session on completion:
// Check if session is complete
if msg.FinalReport != "" || msg.Type == "error" || msg.Error != "" {
log.Info("session complete, backend will be closed after final message broadcast")
- // Continue to allow reconnecting clients to receive this final message
- // The backend connection will close naturally or via timeout
+ if s.storage != nil {
+ if err := s.storage.MarkSessionComplete(userID, chatID); err != nil {
+ log.Error("failed to mark session as complete", slog.String("error", err.Error()))
+ }
+ }
+ // Allow current message to be stored and broadcast, then exit
+ // Next iteration of the loop will not execute because we return below
+ cancel()
+ return
}This ensures the session is properly marked complete in storage, the session context is cancelled, and the backend connection is closed (via the defer on line 268).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if msg.FinalReport != "" || msg.Type == "error" || msg.Error != "" { | |
| log.Info("session complete, backend will be closed after final message broadcast") | |
| // Continue to allow reconnecting clients to receive this final message | |
| // The backend connection will close naturally or via timeout | |
| } | |
| // Check if session is complete | |
| if msg.FinalReport != "" || msg.Type == "error" || msg.Error != "" { | |
| log.Info("session complete, backend will be closed after final message broadcast") | |
| if s.storage != nil { | |
| if err := s.storage.MarkSessionComplete(userID, chatID); err != nil { | |
| log.Error("failed to mark session as complete", slog.String("error", err.Error())) | |
| } | |
| } | |
| // Allow current message to be stored and broadcast, then exit | |
| // Next iteration of the loop will not execute because we return below | |
| cancel() | |
| return | |
| } |
🤖 Prompt for AI Agents
In internal/deepr/service.go around lines 342-346, the "final message" branch
only logs but never ends the session; update that if block to (1) mark the
session complete in persistent storage via the session manager/storage API used
elsewhere in this file, (2) invoke the session's cancel function or otherwise
cancel the session context so in-flight goroutines stop, and (3) close the
backend connection (or call the backend close helper) so the defer at line 268
runs and the connection is actually closed; perform these calls immediately when
a final message/error is detected.
| for clientID, conn := range session.clientConns { | ||
| if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { | ||
| sm.logger.WithComponent("deepr-session").Error("failed to send message to client", | ||
| slog.String("user_id", userID), | ||
| slog.String("chat_id", chatID), | ||
| slog.String("client_id", clientID), | ||
| slog.String("error", err.Error())) | ||
| lastErr = err | ||
| } else { | ||
| sentCount++ | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcast error handling could mask partial failures.
When broadcasting to multiple clients, the method returns only the last error (line 186). If the first client fails but subsequent clients succeed, the caller receives an error even though some messages were delivered. This could cause the caller to incorrectly mark messages as unsent in storage.
Consider returning a multi-error or nil if at least one client succeeded:
var lastErr error
sentCount := 0
for clientID, conn := range session.clientConns {
if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
sm.logger.WithComponent("deepr-session").Error("failed to send message to client",
slog.String("user_id", userID),
slog.String("chat_id", chatID),
slog.String("client_id", clientID),
slog.String("error", err.Error()))
lastErr = err
} else {
sentCount++
}
}
sm.logger.WithComponent("deepr-session").Debug("broadcast message to clients",
slog.String("user_id", userID),
slog.String("chat_id", chatID),
slog.Int("sent_count", sentCount),
slog.Int("total_clients", len(session.clientConns)))
- return lastErr
+ // Return error only if no messages were sent successfully
+ if sentCount == 0 && lastErr != nil {
+ return lastErr
+ }
+ return nil
}This ensures that messages are marked as sent in storage if at least one client received them, which is the correct behavior for replay logic.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for clientID, conn := range session.clientConns { | |
| if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { | |
| sm.logger.WithComponent("deepr-session").Error("failed to send message to client", | |
| slog.String("user_id", userID), | |
| slog.String("chat_id", chatID), | |
| slog.String("client_id", clientID), | |
| slog.String("error", err.Error())) | |
| lastErr = err | |
| } else { | |
| sentCount++ | |
| } | |
| } | |
| sm.logger.WithComponent("deepr-session").Debug("broadcast message to clients", | |
| slog.String("user_id", userID), | |
| slog.String("chat_id", chatID), | |
| slog.Int("sent_count", sentCount), | |
| slog.Int("total_clients", len(session.clientConns))) | |
| // Return error only if no messages were sent successfully | |
| if sentCount == 0 && lastErr != nil { | |
| return lastErr | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In internal/deepr/session_manager.go around lines 167 to 178, the broadcast loop
currently returns only the last error which can incorrectly signal failure even
if some clients received the message; change the error handling so that if at
least one WriteMessage succeeded (sentCount > 0) the function returns nil,
otherwise return the aggregated error (either the single lastErr or a
multi-error composed of all failures); implement by tracking sentCount and
collecting errors into a slice (or using errors.Join) and return nil when
sentCount > 0, otherwise return the joined/last error.
Summary by CodeRabbit
New Features
Documentation
Chores