-
Couldn't load subscription status.
- Fork 0
fix: session messages saved into database #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change introduces PostgreSQL-backed storage for deep research messages and Firestore-backed session state tracking. It replaces the prior concrete storage with a MessageStorage interface and implements a DBStorage using SQL migrations and query definitions. The WebSocket handler and service are updated to accept injected storage and to enforce session-state gating and freemium/pro access via Firebase. main.go wires up the new storage and passes it through to the REST/WebSocket layer. The service updates session state based on backend events and changes the default backend host to localhost:3031. The SessionState type is removed. .gitignore adds deepr_sessions/. Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant WS as REST WS Handler
participant S as DeepResearch Service
participant FB as Firebase (Sessions)
participant ST as MessageStorage (DB)
participant BE as Deep Research Backend
Note over C,BE: New WebSocket connection
C->>WS: Connect
WS->>S: Handle(conn, userID, chatID, storage)
S->>FB: validateFreemiumAccess(userID, chatID, isReconnection=false)
FB-->>S: OK or error
alt Access allowed
S->>ST: UpdateBackendConnectionStatus(userID, chatID, connected=true)
S->>BE: Dial WS (default localhost:3031)
BE-->>S: Connected
S-->>C: Connection ready
else Access denied
S-->>C: Error and close
end
sequenceDiagram
autonumber
participant C as Client
participant S as DeepResearch Service
participant ST as MessageStorage (DB)
participant BE as Deep Research Backend
participant FB as Firebase (Sessions)
Note over C,BE: Client -> Backend message flow with gating
C->>S: ClientMessage
S->>FB: canForwardMessage(userID, chatID) via session state
FB-->>S: Allowed? + reason
alt Allowed
S->>ST: AddMessage(..., sent=false, type=client)
S->>BE: Forward message
BE-->>S: BackendEvent(eventType, payload)
S->>ST: AddMessage(..., sent=false, type=server)
S->>FB: UpdateSessionState(mapEventTypeToState(eventType))
par Dispatch to client
S-->>C: Broadcast payload
S->>ST: MarkMessageAsSent(...) and/or MarkAllMessagesAsSent(...)
end
else Not allowed
S-->>C: Send state-aware error
end
Note over S,FB: On research_complete or error
S->>FB: UpdateSessionState("complete" | "error")
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
internal/auth/firebase_client.go (1)
290-302: Consider using aggregation query for count.The current implementation retrieves all completed session documents to count them. For users with many completed sessions, this could be inefficient.
Consider using Firestore's aggregation queries (if available in your Firebase SDK version) for more efficient counting:
func (f *FirebaseClient) GetCompletedSessionCountForUser(ctx context.Context, userID string) (int, error) { query := f.firestoreClient.Collection("deep_research_sessions"). Where("user_id", "==", userID). Where("state", "==", "complete") // Use aggregation query if SDK supports it aggregationQuery := query.NewAggregationQuery().WithCount("all") results, err := aggregationQuery.Get(ctx) if err != nil { return 0, fmt.Errorf("failed to get completed sessions count: %w", err) } count, ok := results["all"] if !ok { return 0, fmt.Errorf("count not returned in aggregation results") } return int(count.(*firestorepb.Value).GetIntegerValue()), nil }If aggregation queries are not available, the current implementation is acceptable but may need pagination for users with many sessions.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
.gitignore(1 hunks)cmd/server/main.go(4 hunks)internal/auth/firebase_client.go(1 hunks)internal/deepr/db_storage.go(1 hunks)internal/deepr/handlers.go(2 hunks)internal/deepr/models.go(1 hunks)internal/deepr/service.go(10 hunks)internal/deepr/storage.go(1 hunks)internal/storage/pg/migrations/006_create_deep_research_messages.sql(1 hunks)internal/storage/pg/queries/deep_research_messages.sql(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
🔇 Additional comments (16)
.gitignore (1)
62-63: LGTM! Clear deprecation notice.The comment effectively communicates that session storage has migrated from file-based to database-backed persistence, aligning with the broader architectural changes in this PR.
internal/deepr/models.go (1)
27-27: LGTM! Documentation accurately updated.The comment correctly reflects the architectural shift from disk-based to database-backed message persistence.
cmd/server/main.go (2)
196-196: LGTM! Storage dependency properly wired.The storage component is correctly injected through the dependency chain: initialized at startup, passed through
restServerInput, and provided to the WebSocket handler.Also applies to: 297-297, 372-372
132-133: ****The
NewDBStoragefunction atinternal/deepr/db_storage.go:21returns only*DBStorage, not(*DBStorage, error). No error handling is needed or possible for this initialization. The code at lines 132-133 incmd/server/main.gois correct as written.Likely an incorrect or invalid review comment.
internal/storage/pg/migrations/006_create_deep_research_messages.sql (2)
2-12: LGTM! Well-designed schema.The table structure appropriately supports the message persistence requirements:
session_idusing theuserID__chatIDpattern enables efficient querying- Separate
sentflag andsent_attimestamp properly track delivery state- Nullable
sent_atis correct (only populated when message is sent)
15-18: LGTM! Indexes align with query patterns.The four indexes support the expected query patterns:
idx_deep_research_messages_session_id: Session-based queriesidx_deep_research_messages_user_chat: User/chat lookupsidx_deep_research_messages_sent: Composite index for filtering unsent messages (matchesGetUnsentMessagesquery)idx_deep_research_messages_created_at: Time-ordered retrieval within sessionsinternal/deepr/handlers.go (1)
21-21: LGTM! Clean dependency injection.The storage dependency is properly injected into the handler and passed to the service layer, following good architectural practices and enabling testability.
Also applies to: 77-78
internal/deepr/storage.go (1)
3-12: LGTM! Well-designed storage abstraction.The
MessageStorageinterface provides a clean abstraction for message persistence with clear responsibilities:
- Message CRUD operations
- Delivery state tracking
- Session lifecycle management
The interface-based design enables multiple implementations and improves testability.
internal/storage/pg/queries/deep_research_messages.sql (4)
1-3: LGTM! Insert query properly structured.The insert includes all necessary fields with appropriate parameter ordering for the table schema.
5-9: LGTM! Query efficiently uses composite index.The WHERE clause
session_id = $1 AND sent = FALSEaligns with theidx_deep_research_messages_sentcomposite index defined in the migration, enabling efficient retrieval of unsent messages.
11-19: LGTM! Delivery tracking properly implemented.Both
MarkMessageAsSentandMarkAllMessagesAsSentcorrectly:
- Set
sent = TRUE- Populate
sent_atwith the current timestampThis provides accurate delivery tracking for individual and bulk operations.
21-39: LGTM! Comprehensive session management queries.The additional queries (
GetSessionMessages,DeleteSessionMessages,GetSessionMessageCount,GetUnsentMessageCount) provide robust session lifecycle and observability capabilities.internal/auth/firebase_client.go (4)
180-188: LGTM! Well-structured session state model.The
DeepResearchSessionStatestruct appropriately captures session lifecycle with distinct timestamps for creation, updates, and completion.
190-211: LGTM! Proper error handling for session retrieval.The method correctly:
- Uses the
userID__chatIDformat matching the PostgreSQLsession_idconvention- Returns
nilfor non-existent sessions (NotFound case) rather than an error- Propagates other errors appropriately
213-265: LGTM! Robust session state updates.The implementation properly handles both session creation and updates:
- Creates new session documents when they don't exist
- Uses
firestore.MergeAllto preserve existing fields during updates- Sets
completed_atonly when transitioning to "complete" state- Maintains
updated_aton every state change
267-288: LGTM! Efficient active session query.The query correctly filters for in-progress sessions using Firestore's
inoperator for states["in_progress", "clarify"], enabling identification of concurrent sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/deepr/service.go (1)
609-622: Escape user-provided path segments in WS URL (and consider wss)Avoid path traversal/encoding issues by escaping userID/chatID. Optional: auto-use wss for non-localhost.
Apply:
wsURL := url.URL{ - Scheme: "ws", - Host: deepResearchHost, - Path: "/deep_research/" + userID + "/" + chatID + "/", + Scheme: "ws", + Host: deepResearchHost, + Path: "/deep_research/" + url.PathEscape(userID) + "/" + url.PathEscape(chatID) + "/", }Optional (if you decide to switch schemes):
// import "strings" scheme := "ws" if !strings.HasPrefix(deepResearchHost, "localhost:") && !strings.HasPrefix(deepResearchHost, "127.0.0.1:") { scheme = "wss" } wsURL := url.URL{Scheme: scheme, Host: deepResearchHost, Path: "/deep_research/" + url.PathEscape(userID) + "/" + url.PathEscape(chatID) + "/"}
♻️ Duplicate comments (3)
internal/deepr/service.go (3)
470-492: Avoid per-message remote state check/log spam (see earlier comment)Same concern as canForwardMessage: add timeout and reduce log level to avoid hot-path overhead.
558-579: Same: per-message Firebase read/logging in hot pathMirror the caching/timeout/log-level recommendations from canForwardMessage to this flow as well.
883-889: Privacy fix confirmed in no-storage path as wellNo raw payload logged; only metadata. Matches prior guidance.
🧹 Nitpick comments (4)
internal/deepr/service.go (4)
28-41: Normalize event types via constants/enumsUsing raw strings risks typos. Define typed constants (or an enum) for "clarify", "in_progress", "complete", "error" and the known backend events ("research_complete", "clarification_needed", etc.) to keep mappings safe and discoverable.
43-79: Per-message Firebase read + Info logs will be hot and noisy; add timeout + lower log levelEach client message triggers a Firebase read and an Info log. This is high-churn and can throttle/raise costs.
- Add a short timeout to the state fetch.
- Downgrade the “message forwarding check” to Debug.
Apply:
- sessionState, err := s.firebaseClient.GetSessionState(ctx, userID, chatID) + // Short timeout to prevent slow downstreams from stalling the WS read loop + stateCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + sessionState, err := s.firebaseClient.GetSessionState(stateCtx, userID, chatID) + cancel() @@ - log.Info("message forwarding check", + log.Debug("message forwarding check", slog.String("user_id", userID), slog.String("chat_id", chatID), slog.String("session_state", sessionState.State), slog.Bool("can_forward", canForward))
81-205: Freemium gating: possible race + transport mixing
- TOCTOU: Two rapid new connections may both pass the “no active session” check before one registers, creating multiple active sessions. Consider a server-side lock/transaction (e.g., Firebase transaction or a storage-level uniqueness/lease per user) to atomically gate session creation.
- Keep business logic pure: writing error payloads to the WS within validateFreemiumAccess couples transport with policy. Prefer returning typed errors and let caller send WS responses.
738-754: State write on every backend message; add timeout and update-on-changeThis can flood Firebase. Use a short timeout and only write when state actually changes.
Apply:
- if err := s.firebaseClient.UpdateSessionState(ctx, userID, chatID, sessionState); err != nil { + // Best-effort, short timeout to avoid stalling the read loop + stCtx, stCancel := context.WithTimeout(ctx, 2*time.Second) + err := s.firebaseClient.UpdateSessionState(stCtx, userID, chatID, sessionState) + stCancel() + if err != nil { log.Error("failed to update session state in Firebase",Additionally, keep last state to skip redundant writes (outside this hunk):
// Before the for loop: lastSessionState := "" // Inside the loop after computing sessionState: if sessionState != lastSessionState { // call UpdateSessionState(...) lastSessionState = sessionState }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
internal/deepr/db_storage.go(1 hunks)internal/deepr/service.go(10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/deepr/db_storage.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build Enclave Image
🔇 Additional comments (6)
internal/deepr/service.go (6)
24-26: Good DI change for storageInterface-based storage with nil checks enables a clean “no-storage” path. LGTM.
207-216: Constructor wiring looks goodNewService cleanly injects MessageStorage alongside existing deps. LGTM.
250-258: Freemium validation on new connectionsCorrectly gate at connect time; closes on failure. LGTM.
359-367: Freemium validation on reconnectionRe-use of the same validator for reconnections is consistent. LGTM.
764-772: Privacy fix confirmed: no raw payload in INFO logsThe previous “raw_message” field is removed. Thanks for addressing the privacy concern.
789-860: Track usage strictly on 'research_complete' — confirm specOnly increments/marks usage when msg.Type == "research_complete". If other terminal messages (e.g., FinalReport without that type) occur, usage won’t be tracked. Confirm that ‘research_complete’ is the only terminal trigger by contract.
Summary by CodeRabbit
New Features
Improvements
Refactor
Configuration
Chores