Skip to content

Allow limiting num. of Session events fetched when calling Runner.run_async #3562

@nimanthadilz

Description

@nimanthadilz

Is your feature request related to a problem? Please describe.
Yes.
I want to limit the number of events pulled from the Session DB when calling session_service.get_session inside the Runner.run_async. Without limiting, the get_session can pull a large num. of events from the DB.
I am using a Runner class object to run an agent as below:

runner = Runner(
             agent=orchestrator_agent, app_name=APP_NAME, session_service=session_service
         )

async for event in runner.run_async(
             user_id=req.user_id, session_id=session.id, new_message=user_message
         ):
             if event.is_final_response():
                 if event.content and event.content.parts:
                     agent_response = event.content.parts[0].text

Here, inside the Runner.run_async method, it is fetching session events using the session service we passed when constructing the Runner object.

      with tracer.start_as_current_span('invocation'):
        session = await self.session_service.get_session(
            app_name=self.app_name, user_id=user_id, session_id=session_id
        )

This call is leading to fetch all the session events which is not desired. It would be better to have a way to limit the num. of session events fetched.

Describe the solution you'd like

The BaseSessionService already accepts a [GetSessionConfig](config: Optional[GetSessionConfig] = None) field in get_session method. This is used in DatabaseSessionService's get_session method if provided.

if config and config.after_timestamp:
        after_dt = datetime.fromtimestamp(config.after_timestamp)
        stmt = stmt.filter(StorageEvent.timestamp >= after_dt)

      stmt = stmt.order_by(StorageEvent.timestamp.desc())

      if config and config.num_recent_events:
        stmt = stmt.limit(config.num_recent_events)

My Solution:
Add an optional field to RunConfig class which is passed as run_config parameter of Runner.run_async method. Then it can be used to pass to SessionService.get_session method as config.

class RunConfig(BaseModel):
  get_session_config: Optional[GetSessionConfig] = None # New optional field


# Use above field in run_async method
class Runner:
  async def run_async(..., run_config: Optional[RunConfig] = None,   ) -> AsyncGenerator[Event, None]:
    # existing code ...
    async def _run_with_trace(
          new_message: Optional[types.Content] = None,
          invocation_id: Optional[str] = None,
      ) -> AsyncGenerator[Event, None]:
        with tracer.start_as_current_span('invocation'):
          session = await self.session_service.get_session(
              app_name=self.app_name, user_id=user_id, session_id=session_id, config=run_config.get_session_config
          )
    # existing code ....

Describe alternatives you've considered
Without native support from ADK, I would have to implement a custom DatabaseSessionService that allows users to set a limit and pass an instance of that session service to the runner. I don't like this approach.

Additional context
I noticed there's a closed PR for this feature: #2843

Metadata

Metadata

Assignees

No one assigned

    Labels

    services[Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions