Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ func main() {
os.Exit(1)
}

// Initialize Firebase client for Firestore (used for deep research tracking)
var firebaseClient *auth.FirebaseClient

if config.AppConfig.FirebaseCredJSON != "" {
firebaseClient, err = auth.NewFirebaseClient(context.Background(), config.AppConfig.FirebaseProjectID, config.AppConfig.FirebaseCredJSON)
if err != nil {
log.Error("failed to initialize firebase client", slog.String("error", err.Error()))
os.Exit(1)
}
log.Info("firebase client initialized")

// Ensure cleanup on shutdown
defer func() {
if err := firebaseClient.Close(); err != nil {
log.Error("failed to close firebase client", slog.String("error", err.Error()))
}
}()
} else {
log.Warn("firebase credentials not provided - deep research tracking will not work properly")
}

// Initialize services
oauthService := oauth.NewService(logger.WithComponent("oauth"))
composioService := composio.NewService(logger.WithComponent("composio"))
Expand Down Expand Up @@ -161,6 +182,7 @@ func main() {
router := setupRESTServer(restServerInput{
logger: logger,
firebaseAuth: firebaseAuth,
firebaseClient: firebaseClient,
requestTrackingService: requestTrackingService,
oauthHandler: oauthHandler,
composioHandler: composioHandler,
Expand Down Expand Up @@ -260,6 +282,7 @@ func getKeys(m map[string]string) []string {
type restServerInput struct {
logger *logger.Logger
firebaseAuth *auth.FirebaseAuthMiddleware
firebaseClient *auth.FirebaseClient
requestTrackingService *request_tracking.Service
oauthHandler *oauth.Handler
composioHandler *composio.Handler
Expand Down Expand Up @@ -341,7 +364,7 @@ func setupRESTServer(input restServerInput) *gin.Engine {
api.POST("/exa/search", input.searchHandler.PostExaSearchHandler) // POST /api/v1/exa/search (Exa AI)

// Deep Research WebSocket endpoint (protected)
api.GET("/deepresearch/ws", deepr.DeepResearchHandler(input.logger, input.requestTrackingService)) // WebSocket proxy for deep research
api.GET("/deepresearch/ws", deepr.DeepResearchHandler(input.logger, input.requestTrackingService, input.firebaseClient)) // WebSocket proxy for deep research
}

// Protected proxy routes
Expand Down
4 changes: 4 additions & 0 deletions deploy/enclaver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ egress:
# aws-0-us-east-2.pooler.supabase.com
- 3.13.175.194
- 3.139.14.59
# demo deepresearch endpoint (temporary)
- 165.232.133.47:3031
env:
- APPSTORE_API_KEY_ID
- APPSTORE_API_KEY_P8
Expand All @@ -58,6 +60,8 @@ env:
- DB_CONN_MAX_LIFETIME_MINUTES
- DB_MAX_IDLE_CONNS
- DB_MAX_OPEN_CONNS
- DEEPR_STORAGE_PATH
- DEEP_RESEARCH_WS
- DRIP_DAILY_MESSAGES
- ENABLE_TELEGRAM_SERVER
- EXA_API_KEY
Expand Down
18 changes: 18 additions & 0 deletions deploy/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,24 @@ static_resources:
upstream_wire_bytes_sent: '%UPSTREAM_WIRE_BYTES_SENT%'
access_log_options:
flush_access_log_on_connected: true
# demo deepresearch endpoint (temporary)
- name: egress_deepresearch
address:
socket_address:
address: 0.0.0.0
port_value: 3031
filter_chains:
- filters:
- name: envoy.filters.network.tcp_proxy
typed_config:
'@type': type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp_proxy
cluster: enclaver_odyn_egress
tunneling_config:
hostname: '%DOWNSTREAM_DIRECT_REMOTE_ADDRESS_WITHOUT_PORT%:3031'
access_log: *access_log_tcp
access_log_options:
flush_access_log_on_connected: true
- name: egress_postgresql
address:
socket_address:
Expand Down
253 changes: 253 additions & 0 deletions docs/deep-research-reconnection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# Deep Research Reconnection Feature

## Overview

The Deep Research service now supports automatic session persistence and reconnection, allowing iOS clients to disconnect and reconnect without losing progress or messages.

## How It Works

### Architecture

1. **Session Persistence**: All messages from the deep research backend are stored in JSON files on the enchanted-proxy server
2. **Message State Tracking**: Each message is marked as "sent" or "unsent" (sent to iOS client)
3. **Backend Connection Persistence**: The connection between enchanted-proxy and deep research backend remains active even when iOS disconnects
4. **Automatic Reconnection**: When iOS reconnects, it receives all unsent messages and continues listening for new ones

### Message Flow

#### Initial Connection
```
iOS App → WebSocket → Enchanted Proxy → WebSocket → Deep Research Backend
JSON Storage
(Messages persisted)
```

#### iOS Disconnects
```
Deep Research Backend → Enchanted Proxy → JSON Storage (messages marked as "unsent")
(continues running) ↓
Backend stays connected
```

#### iOS Reconnects
```
iOS App → WebSocket → Enchanted Proxy
1. Loads session state
2. Sends unsent messages
3. Continues receiving new messages
```

## Key Components

### 1. Storage Layer (`storage.go`)

Handles persistence of messages and session state:

- **SessionState**: Tracks session metadata (backend connection status, completion status, etc.)
- **PersistedMessage**: Individual message with sent/unsent status
- **Storage Methods**:
- `LoadSession()`: Load session state from disk
- `SaveSession()`: Save session state to disk
- `AddMessage()`: Store new message with sent status
- `GetUnsentMessages()`: Retrieve messages not yet sent to client
- `MarkMessageAsSent()`: Mark message as delivered
- `IsSessionComplete()`: Check if research is complete

### 2. Session Manager (`session_manager.go`)

Manages active backend connections:

- **ActiveSession**: Represents a live backend connection with multiple client connections
- **SessionManager Methods**:
- `CreateSession()`: Create new backend session
- `GetSession()`: Retrieve active session
- `AddClientConnection()`: Add iOS client to session
- `RemoveClientConnection()`: Remove iOS client from session
- `BroadcastToClients()`: Send message to all connected clients
- `HasActiveBackend()`: Check if backend connection exists

### 3. Service Updates (`service.go`)

Enhanced service logic:

- **Reconnection Detection**: Checks if backend session exists for userID/chatID
- **Message Persistence**: Stores every message with sent/unsent status
- **Unsent Message Delivery**: Sends accumulated messages on reconnection
- **Session Completion Detection**: Recognizes final reports and errors

## Session States

### Message States

1. **sent: true**: Message successfully delivered to iOS client
2. **sent: false**: Message received from backend but not yet delivered to iOS

### Session Completion Conditions

A session is considered complete when:
1. **Final Report Received**: Message contains `final_report` field with content
2. **Error Occurred**: Message has `type: "error"` or contains `error` field

## Storage Format

### Session File Location

Default: `./deepr_sessions/session_{userID}_{chatID}.json`

Can be configured via environment variable: `DEEPR_STORAGE_PATH`

### Session File Structure

```json
{
"user_id": "abc123",
"chat_id": "chat456",
"messages": [
{
"id": "msg-uuid-1",
"user_id": "abc123",
"chat_id": "chat456",
"message": "{\"type\":\"status\",\"content\":\"Starting research...\"}",
"sent": true,
"timestamp": "2025-10-08T10:30:00Z",
"message_type": "status"
},
{
"id": "msg-uuid-2",
"user_id": "abc123",
"chat_id": "chat456",
"message": "{\"type\":\"update\",\"content\":\"Analyzing sources...\"}",
"sent": false,
"timestamp": "2025-10-08T10:31:00Z",
"message_type": "update"
}
],
"backend_connected": true,
"last_activity": "2025-10-08T10:31:00Z",
"final_report_received": false,
"error_occurred": false
}
```

## Reconnection Behavior

### On Reconnection

1. **Check Session**: Determine if backend session exists
2. **Send Unsent Messages**: Deliver all messages marked as unsent
3. **Check Completion**:
- If `final_report_received` or `error_occurred`: Send final message and close
- Otherwise: Continue listening for new messages
4. **Join Active Session**: Add client to existing session for real-time updates

### Multiple Clients

The system supports multiple iOS clients connected to the same session:

- Messages are broadcast to all connected clients
- Each client can disconnect/reconnect independently
- Backend connection persists as long as research is ongoing

## Configuration

### Environment Variables

```bash
# Storage path for session files (optional)
DEEPR_STORAGE_PATH=/path/to/sessions

# Deep research backend WebSocket URL (required)
DEEP_RESEARCH_WS=your-backend-host:port
```

### Default Values

- Storage Path: `./deepr_sessions`
- Session file naming: `session_{userID}_{chatID}.json`

## Cleanup

Session files should be cleaned up periodically using:

```go
storage.CleanupOldSessions(maxAge time.Duration)
```

Recommended: Clean up sessions older than 24-48 hours

## Error Handling

### Storage Failures

If storage operations fail:
- Error is logged
- Message delivery continues
- Reconnection may not have full history

### Backend Disconnection

If backend connection drops:
- Session marked as disconnected
- Existing messages remain available
- New connection attempts will create fresh session

### Client Disconnection

If iOS client disconnects:
- Backend connection remains active
- Messages continue to be stored as unsent
- Client can reconnect anytime

## Message Types

The system recognizes these message types:

1. **status**: Progress updates
2. **update**: Research updates
3. **error**: Error messages (marks session as complete)
4. **final**: Messages with `final_report` field (marks session as complete)

## Usage Example

### iOS Client Flow

```
1. Connect to /api/deepresearch/ws?chat_id=123
2. Send research query
3. Receive status updates
4. (App backgrounded/disconnected)
5. (Messages continue arriving at backend)
6. Reconnect to /api/deepresearch/ws?chat_id=123
7. Receive all unsent messages
8. Continue receiving new messages
9. Receive final report
10. Session complete
```

## Monitoring

Log messages indicate:
- Session creation/removal
- Client connections/disconnections
- Message persistence status
- Reconnection events
- Unsent message delivery

Example log:
```
INFO: created new session user_id=abc123 chat_id=chat456
INFO: message stored sent=false type=status
INFO: detected reconnection user_id=abc123 chat_id=chat456
INFO: sending unsent messages count=5
```

## Testing Recommendations

1. **Test Disconnection**: Kill iOS app during research
2. **Test Reconnection**: Reopen app and verify messages received
3. **Test Multiple Clients**: Connect from multiple devices
4. **Test Error Handling**: Verify error messages mark session complete
5. **Test Final Report**: Verify final report marks session complete
6. **Test Storage**: Check session files are created correctly
Loading