Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Larger outputs cause streaming errors: peer closed connection without sending complete message body #2055

Open
5 tasks done
antoremin opened this issue Oct 8, 2024 · 1 comment

Comments

@antoremin
Copy link

antoremin commented Oct 8, 2024

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangGraph/LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangGraph/LangChain rather than my code.
  • I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.

Example Code

Run notebook testing.ipynb from the repo below. 

To reproduce this on the demo langgraph, I 10x'd search volume in a single search iteration, and then this cell kicks off 10 threads, half of which end up breaking with the issue I'm describing, pretty much every time. 


from langgraph_sdk import get_client
import asyncio

# Initialize the client
client = get_client(url="http://localhost:8123")  # Update this URL as needed

async def run_search_agent_for_company(company):
    try:
        # Get default assistant
        assistants = await client.assistants.search()
        assistant = [a for a in assistants if not a["config"]][0]
        assistant_id = assistant["assistant_id"]
        print(f"Using assistant with ID: {assistant_id} for {company}")

        # Create a thread
        thread = await client.threads.create()
        thread_id = thread["thread_id"]
        print(f"Created thread with ID: {thread_id} for {company}")

        input_data = {
            "topic": topic.format(company=company),
            "extraction_schema": schema,
            "configurable": {
                "model_name": "anthropic/claude-3-5-sonnet-20240620",
                "max_loops": 50,
                "max_info_tool_calls": 10,
                "max_search_results": 200
            },
            "messages": [
                {
                    "role": "user", 
                    "content": f"Make a plan to complete the request for {company}"
                }
            ],
            'current_plan': f"Initial plan: Analyze the request for {company}",
            'iteration_number': 0,
        }

        # Execute a run on the thread
        async for chunk in client.runs.stream(
            thread_id,
            assistant_id,
            input=input_data,
            stream_mode="updates",
            config={
                "recursion_limit": 50
            }
        ):
            if chunk.data and chunk.event != "metadata":
                print(f"{company}: {chunk.data}")
                if 'error' in chunk.data:
                    print(f"Error encountered for {company}: {chunk.data['error']}")
                    print(f"Error message: {chunk.data['message']}")
                    break

        # Get final state
        final_state = await client.threads.get_state(thread_id)
        print(f"Final results for {company}:", final_state)
        return final_state

    except Exception as e:
        print(f"An error occurred for {company}: {str(e)}")

async def run_all_companies():
    companies = [
        "Mars", "Hershey's", "Nestlé", "Ferrero", "Mondelez International",
        "Lindt & Sprüngli", "Perfetti Van Melle", "Haribo", "Meiji", "Tootsie Roll Industries",
        "Ghirardelli Chocolate Company", "Godiva Chocolatier", "Cadbury", "Jelly Belly Candy Company",
        "Russell Stover Chocolates", "Storck", "Pladis", "Arcor", "Lotte Confectionery", "Ezaki Glico",
        "Fazer", "Cloetta", "Ritter Sport", "Tony's Chocolonely", "Guylian", "Cemoi", "Leaf Brands",
        "Ferrara Candy Company", "Just Born", "Bahlsen"
    ]
    
    tasks = []
    results = {}
    for company in companies:
        task = asyncio.create_task(run_search_agent_for_company(company))
        tasks.append((company, task))
    
    for company, task in tasks:
        result = await task
        results[company] = result

    return results

# Run the async function for all companies and get the results
search_results = await run_all_companies()


### Error Message and Stack Trace (if applicable)

```shell
Client side error: 


peer closed connection without sending complete message body (incomplete chunked read)

Langsmith error:

CancelledError()Traceback (most recent call last):


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1502, in astream
    async for _ in runner.atick(


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/runner.py", line 130, in atick
    await arun_with_retry(t, retry_policy, stream=self.use_astream)


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/retry.py", line 102, in arun_with_retry
    await task.proc.ainvoke(task.input, config)


  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 452, in ainvoke
    input = await asyncio.create_task(coro, context=context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 148, in ainvoke
    return await super().ainvoke(input, config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 235, in ainvoke
    ret = await asyncio.create_task(coro, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 162, in _afunc
    outputs = await asyncio.gather(
              ^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langgraph/prebuilt/tool_node.py", line 192, in _arun_one
    tool_message: ToolMessage = await self.tools_by_name[call["name"]].ainvoke(
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/structured.py", line 58, in ainvoke
    return await super().ainvoke(input, config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 490, in ainvoke
    return await self.arun(tool_input, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 767, in arun
    response = await asyncio.create_task(coro, context=context)  # type: ignore
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/structured.py", line 96, in _arun
    return await self.coroutine(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/deps/data-enrichment/src/enrichment_agent/tools.py", line 55, in extensive_search
    result = await search(query, config=config)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/deps/data-enrichment/src/enrichment_agent/tools.py", line 33, in search
    result = await wrapped.ainvoke({"query": query})
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 490, in ainvoke
    return await self.arun(tool_input, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/tools/base.py", line 767, in arun
    response = await asyncio.create_task(coro, context=context)  # type: ignore
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_community/tools/tavily_search/tool.py", line 178, in _arun
    raw_results = await self.api_wrapper.raw_results_async(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_community/utilities/tavily_search.py", line 149, in raw_results_async
    results_json_str = await fetch()
                       ^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_community/utilities/tavily_search.py", line 142, in fetch
    async with session.post(f"{TAVILY_API_URL}/search", json=params) as res:


  File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 1357, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 688, in _request
    await resp.start(conn)


  File "/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1058, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 643, in read
    await self._waiter


asyncio.exceptions.CancelledError


### Description

When I kick off several langgraph threads at once, each of which process larger volumes, I get "peer closed connection .." errors for some of the threads pretty much every time. 

I was able to reproduce this with a sample langgraph by adding some more search calls on the search step and kicking off several threads at once (which I do with my langgraph as well). 


Here's the repo: https://github.com/antoremin/data-enrichment

to reproduce, langgraph up the graph and run testing.ipynb locally 

https://github.com/antoremin/data-enrichment

### System Info

System Information
------------------
> OS:  Darwin
> OS Version:  Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:46 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6031
> Python Version:  3.9.11 (main, Dec 13 2023, 15:51:08) 
[Clang 14.0.3 (clang-1403.0.22.14.1)]

Langgraph's pyproject.toml: 

[project]
name = "enrichment-agent"
version = "0.0.1"
description = "An agent that populates and enriches custom schemas"
authors = [
    { name = "William Fu-Hinthorn", email = "[email protected]" },
]
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.9"
dependencies = [
    "langgraph>=0.2.19",
    "langchain-openai>=0.1.22",
    "langchain-anthropic>=0.1.23",
    "langchain>=0.2.14",
    "langchain-fireworks>=0.1.7",
    "python-dotenv>=1.0.1",
    "langchain-community>=0.2.13",
    "transformers",
]

[project.optional-dependencies]
dev = ["mypy>=1.11.1", "ruff>=0.6.1", "pytest-asyncio"]

[build-system]
requires = ["setuptools>=73.0.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
packages = ["enrichment_agent"]
[tool.setuptools.package-dir]
"enrichment_agent" = "src/enrichment_agent"
"langgraph.templates.enrichment_agent" = "src/enrichment_agent"


[tool.setuptools.package-data]
"*" = ["py.typed"]

[tool.ruff]
lint.select = [
    "E",    # pycodestyle
    "F",    # pyflakes
    "I",    # isort
    "D",    # pydocstyle
    "D401", # First line should be in imperative mood
    "T201",
    "UP",
]
include = ["*.py", "*.pyi", "*.ipynb"]
lint.ignore = ["UP006", "UP007", "UP035", "D417", "E501"]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["D", "UP"]
"ntbk/*" = ["D", "UP", "T201"]
[tool.ruff.lint.pydocstyle]
convention = "google"
@antoremin
Copy link
Author

Changing Tavily to Google search didn't solve the issue

Getting errors like this:

CancelledError()Traceback (most recent call last):


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1502, in astream
    async for _ in runner.atick(


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/runner.py", line 130, in atick
    await arun_with_retry(t, retry_policy, stream=self.use_astream)


  File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/retry.py", line 102, in arun_with_retry
    await task.proc.ainvoke(task.input, config)


  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 452, in ainvoke
    input = await asyncio.create_task(coro, context=context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langgraph/utils/runnable.py", line 235, in ainvoke
    ret = await asyncio.create_task(coro, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/deps/data-enrichment/src/enrichment_agent/graph.py", line 62, in call_agent_model
    response = cast(AIMessage, await model.ainvoke(messages))
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 5349, in ainvoke
    return await self.bound.ainvoke(
           ^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 305, in ainvoke
    llm_result = await self.agenerate_prompt(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 794, in agenerate_prompt
    return await self.agenerate(
           ^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 720, in agenerate
    results = await asyncio.gather(
              ^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_core/language_models/chat_models.py", line 924, in _agenerate_with_cache
    result = await self._agenerate(
             ^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/langchain_anthropic/chat_models.py", line 805, in _agenerate
    data = await self._async_client.messages.create(**payload)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anthropic/resources/messages.py", line 1811, in create
    return await self._post(
           ^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1838, in post
    return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1532, in request
    return await self._request(
           ^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anthropic/_base_client.py", line 1552, in _request
    self._platform = await asyncify(get_platform)()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anthropic/_utils/_sync.py", line 69, in wrapper
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2356, in run_sync_in_worker_thread
    await cls.checkpoint()


  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2264, in checkpoint
    await sleep(0)


  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 640, in sleep
    await __sleep0()


  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 634, in __sleep0
    yield


asyncio.exceptions.CancelledError

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant