Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions 03-integrations/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Strands Agent Integrations

| Integration | Features showcased |
| ------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [A2A Protocol](./A2A-protocol/) | Demonstrates agent-to-agent communication protocol for collaborative problem-solving between specialized AI agents. |
| Integration | Features showcased |
| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [A2A Protocol](./A2A-protocol/) | Demonstrates agent-to-agent communication protocol for collaborative problem-solving between specialized AI agents. |
| [Aurora DSQL](./aurora-DSQL) | Demonstrates the Strands Agent integration with Amazon Aurora DSQL. |
| [Nova Act](./nova-act) | Nova Act integration with Strands. Amazon Nova Act is an AI model trained to perform actions within a web browser. |
| [Tavily](./tavily/) | This agent uses Tavily's web search, extract and crawl APIs to gather information from reliable sources, extract key insights, and save comprehensive research reports in Markdown format. |
| [Persistent Session With KurrentDB](./persistent-session-kurrentdb/) | A conversation and session manager which allows you to store and restore state of your agents to resume execution. It also has unique temporal windowing capabilities, in addition to fix window sizing.|
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
176 changes: 176 additions & 0 deletions 03-integrations/persistent-session-kurrentdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# KurrentDB Conversation Manager for Strands Agent

A persistent conversation manager implementation for Strands Agent that uses KurrentDB as the storage backend. This manager enables conversation history persistence, state management, and recovery capabilities for AI agents.

## Overview

The `KurrentDBConversationManager` extends the Strands framework's `ConversationManager` to provide:

- **Persistent Message Storage**: All conversation messages are stored as events in KurrentDB streams
- **State Checkpointing**: Save and restore agent state at any point in the conversation
- **Conversation History Management**: Configure retention policies with maximum age or count limits
- **Recovery Capabilities**: Restore agent state and conversation history after restarts

## Installation

### Prerequisites

- Python 3.7+
- KurrentDB instance running (default: `localhost:2113`)
- Required Python packages:
```bash
pip install strands kurrentdbclient
```
## Quick Start
```json
pip install strands-agents[anthropic]
```
Setup an instance of KurrentDB: https://console.kurrent.cloud/signup or https://aws.amazon.com/marketplace/pp/prodview-kxo6grvoovk2y?sr=0-1&ref_=beagle&applicationId=AWSMPContessa
```python
from strands import Agent
from strands.models.anthropic import AnthropicModel
from kurrentdb_session_manager import KurrentDBConversationManager

unique_run_id = "run-01"
kurrentdb_conversation_manager = (
KurrentDBConversationManager(unique_run_id, "connection string here")
) # replace with your actual connection string

# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
model = AnthropicModel(
client_args={
"api_key": "Your API KEY here", # Replace with your actual API key
},
# **model_config
max_tokens= 4096,
model_id="claude-3-5-haiku-latest",
params={
"temperature": 0.7,
}
)

poet_agent = Agent(
system_prompt="You are a hungry poet who loves to write haikus about everything.",
model=model,
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
)
poet_agent("Write a haiku about the beauty of nature.")
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
state={"messages": poet_agent.messages,
"system_prompt": poet_agent.system_prompt})
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
poet_agent("What did we just talk about?")




```

## Features

### 1. Persistent Message Storage

Every message in the conversation is automatically stored as an event in KurrentDB:
- Each message is stored with its role (user/assistant/system) as the event type
- Messages are stored in order with stream positions for accurate replay

### 2. State Management

Save and restore complete agent state:

```python
# Save current state
conversation_manager.save_agent_state(
unique_run_id="run-01",
state={
"messages": agent.messages,
"system_prompt": agent.system_prompt,
"custom_data": "any additional state"
}
)

# Restore state later
agent = conversation_manager.restore_agent_state(
agent=agent,
unique_run_id="run-01"
)
```

### 3. Conversation Retention Policies

Configure how long conversations are retained:

```python
# Set maximum age (in seconds)
conversation_manager.set_max_window_age(3600) # Keep messages for 1 hour

# Set maximum message count
conversation_manager.set_max_window_size(100) # Keep last 100 messages
```

### 4. Window Size Management

Control how many messages are loaded into memory:

```python
conversation_manager = KurrentDBConversationManager(
unique_run_id="run-01",
connection_string="esdb://localhost:2113?Tls=false",
window_size=40 # Load last 40 messages by default
)
```

## API Reference

### Constructor

```python
KurrentDBConversationManager(
unique_run_id: str,
connection_string: str = "esdb://localhost:2113?Tls=false",
window_size: int = 40,
reducer_function = lambda x: x
)
```

**Parameters:**
- `unique_run_id`: Unique identifier for the conversation stream
- `connection_string`: KurrentDB connection string
- `window_size`: Maximum number of messages to keep in memory
- `reducer_function`: Function to reduce messages if context limit is exceeded

### Methods

#### `apply_management(messages: Messages) -> None`
Applies management strategies to the messages list and persists new messages to KurrentDB.

#### `reduce_context(messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]`
Reduces the context window size when it exceeds limits using the configured reducer function.

#### `set_max_window_age(max_age: int) -> None`
Sets the maximum age for messages in the conversation (KurrentDB stream metadata).

#### `set_max_window_size(max_count: int) -> None`
Sets the maximum number of messages to retain in the stream.

#### `save_agent_state(unique_run_id: str, state: dict) -> None`
Saves the current agent state to a checkpoint stream.

#### `restore_agent_state(agent: Agent, unique_run_id: str) -> Agent`
Restores agent state from the checkpoint stream.

## How It Works

### Stream Structure

The manager uses two types of streams in KurrentDB:

1. **Conversation Stream** (`{unique_run_id}`):
- Contains all conversation messages as events
- Event types: "user", "assistant", "system", "StateRestored"
- Messages stored in chronological order

2. **Checkpoint Stream** (`strands_checkpoint-{unique_run_id}`):
- Contains agent state snapshots
- Used for recovery and state restoration
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from strands.agent.conversation_manager import ConversationManager
from strands.agent import Agent
from strands.types.content import Messages
from typing import Optional
from kurrentdbclient import KurrentDBClient, NewEvent, StreamState
from kurrentdbclient.exceptions import NotFoundError
import json

"""
Example usage:
from strands import Agent
from strands.models.anthropic import AnthropicModel
from kurrentdb_session_manager import KurrentDBConversationManager

unique_run_id = "run-01"
kurrentdb_conversation_manager = (
KurrentDBConversationManager(unique_run_id, "esdb://localhost:2113?Tls=false")
) # replace with your actual connection string

# kurrentdb_conversation_manager.set_max_window_age(60) # Set max window age to 60 seconds
model = AnthropicModel(
client_args={
"api_key": "Your API KEY here", # Replace with your actual API key
},
# **model_config
max_tokens= 4096,
model_id="claude-3-5-haiku-latest",
params={
"temperature": 0.7,
}
)

poet_agent = Agent(
system_prompt="You are a hungry poet who loves to write haikus about everything.",
model=model,
conversation_manager=kurrentdb_conversation_manager, # Assuming no specific conversation manager is needed
)
poet_agent("Write a haiku about the beauty of nature.")
kurrentdb_conversation_manager.save_agent_state(unique_run_id=unique_run_id,
state={"messages": poet_agent.messages,
"system_prompt": poet_agent.system_prompt})
poet_agent("Based on the previous haiku, write another one about the changing seasons.")
poet_agent = kurrentdb_conversation_manager.restore_agent_state(agent=poet_agent,unique_run_id=unique_run_id)
poet_agent("What did we just talk about?")
"""
class KurrentDBConversationManager(ConversationManager):
client: KurrentDBClient
def __init__(self, unique_run_id:str,
connection_string: str = "esdb://localhost:2113?Tls=false",
window_size: int = 40,
reducer_function = lambda x: x) -> None:
"""
Initializes the KurrentDB conversation manager with a connection string.
:param connection_string: The connection string for KurrentDB.
"""
self.client = KurrentDBClient(connection_string)
self.stream_id = unique_run_id
self.checkpoint = -1 # Default checkpoint value, no messages processed yet
self.window_size = window_size # Maximum number of messages to keep in the conversation
self.reducer_function = reducer_function # Function to reduce messages if needed

def apply_management(self, messages: Messages) -> None:
"""Apply management strategies to the messages list."""
justRestored = False
try:
events = self.client.get_stream(
stream_name=self.stream_id,
resolve_links=True,
backwards=True,
limit=1
) # Get the last event in the stream
if len(events) == 1 and events[0].type == "StateRestored":
# then we don't need to remove any message
justRestored = True
self.checkpoint = events[0].stream_position

except NotFoundError as e:
#this means that the stream does not exist yet
if self.checkpoint != -1:
# Handle inconsistency in the outside the conversation manager
raise Exception("Inconsistent state: Stream not found but checkpoint exists.")
if self.checkpoint != -1 and justRestored == False:
# remove already added messages from the messages list
messages = messages[self.checkpoint + 1:] # Keep only new messages
events = []
for message in messages:
metadata = {}
event = NewEvent(type=message["role"], data=bytes(json.dumps(message), 'utf-8'),
content_type='application/json',
metadata=bytes(json.dumps(metadata), 'utf-8'))
events.append(event)
self.client.append_to_stream(
stream_name=self.stream_id,
events=events,
current_version=StreamState.ANY # TODO: tighten this up if needed if agent is called in parallel and order is important(is that possible?)
)
self.checkpoint += len(events) # Update checkpoint after appending messages


def reduce_context(self, messages: Messages, e: Optional[Exception] = None) -> Optional[Messages]:
"""Function to reduce the context window size when it exceeds the model's limit.
"""
return self.reducer_function(messages)

def set_max_window_age(self, max_age: int) -> None:
"""Set the maximum age for messages in the conversation inside KurrentDB."""
self.client.set_stream_metadata(self.stream_id,
metadata={"$maxAge": max_age},
current_version=StreamState.ANY
)

def set_max_window_size(self, max_count: int) -> None:
"""Set the maximum size for the conversation history inside KurrentDB."""
self.client.set_stream_metadata(self.stream_id,
metadata={"$maxCount": max_count},
current_version=StreamState.ANY
)

def save_agent_state(self, unique_run_id: str, state: dict) -> None:
"""
Saves the agent state variables to a checkpoint stream in KurrentDB.
This event contains which position in the stream the agent is at and other state variables.
"""
del state["messages"] # We already keep messages in the stream, so we don't need to save them again.
state["kurrentdb_checkpoint"] = self.checkpoint
state["kurrentdb_checkpoint_stream_id"] = unique_run_id
event = NewEvent(type="agent_state", data=bytes(json.dumps(state), 'utf-8'),
content_type='application/json')
self.client.append_to_stream(
stream_name="strands_checkpoint-" + unique_run_id,
events=[event],
current_version=StreamState.ANY)


def restore_agent_state(self, agent: Agent, unique_run_id: str) -> Agent:
"""
Builds the agent state messages from a stream in KurrentDB.
"""
try:
checkpoint_event = self.client.get_stream(
stream_name="strands_checkpoint-" + unique_run_id,
resolve_links=True,
backwards=True,
limit=1
)
if not checkpoint_event or len(checkpoint_event) == 0:
return None # No state found

state = json.loads(checkpoint_event[0].data.decode('utf-8'))
self.stream_id = state["kurrentdb_checkpoint_stream_id"]
self.checkpoint = state["kurrentdb_checkpoint"]

messages = []
message_events = self.client.get_stream(
stream_name=unique_run_id,
resolve_links=True,
backwards=True,
stream_position=self.checkpoint,
limit=self.window_size
)
for event in message_events:
if event.type == "StateRestored":
break #reached of this state
message = json.loads(event.data.decode('utf-8'))
messages.insert(0,message)
state["messages"] = messages
agent.messages = messages

#append an event to know restore state was called
system_event = NewEvent(
type="StateRestored",
data=bytes("{}", 'utf-8'),
content_type='application/json',
metadata=bytes("{}", 'utf-8')
)
self.client.append_to_stream(
stream_name=unique_run_id,
events=[system_event],
current_version=StreamState.ANY
)
return agent
except NotFoundError as e:
return agent #unchanged agent, no state to restore
Loading