-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline stuck in loop when a feedback branch is added #8641
Comments
Here is another example with the pipeline running forever: from typing import Optional, List, Union, Callable
from haystack import component, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.core.component.types import Variadic
@component
class Generator:
def __init__(self):
self.iteration_counter = 0
@component.output_types(replies=str)
def run(self, parts: Variadic[Union[str, int]],
streaming_callback: Optional[Callable[[str], None]] = None,):
if self.iteration_counter:
self.iteration_counter += 1
return {"replies": ["Not OK"]}
return {"replies": ["OK"]}
@component
class OutputValidator:
def __init__(self):
self.iteration_counter = 0
# Define the component output
@component.output_types(valid_replies=List[str], invalid_replies=Optional[str], error_message=Optional[str])
def run(self, replies: str):
# if replies == "OK":
print("Executing OutputValidator")
return {"valid_replies": replies}
# else:
# return {"invalid_replies": "Something reaaaalllly bad happened"}
if __name__ == '__main__':
prompt_builder = PromptBuilder("""
{{valid_replies}}
{{invalid_replies}}
""")
generator = Generator()
output_validator = OutputValidator()
pipeline = Pipeline()
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
pipeline.add_component("output_validator", output_validator)
pipeline.connect("prompt_builder", "generator")
pipeline.connect("generator", "output_validator")
pipeline.connect("output_validator.invalid_replies", "prompt_builder.invalid_replies")
pipeline.run({}) The difference with the previous case is that generator has 2 inputs and this is causing the component to never been seen with enough input to be run. |
Hi @marfago, thanks for opening the issue! I tried the code snippet you shared. It seems like working as it is so I made an assumption that I need to revert the code snippets you commented out in @component
class OutputValidator:
def __init__(self):
self.iteration_counter = 0
# Define the component output
@component.output_types(valid_replies=List[str], invalid_replies=Optional[str], error_message=Optional[str])
def run(self, replies: str):
if replies[0] == "OK":
return {"valid_replies": replies}
else:
return {"invalid_replies": "Something reaaaalllly bad happened"} The reason the pipeline is stuck in the loop seems like a small bug in the code. Since ℹ️ Even though that's not the point of this issue, setting Let me know if you have more info to share. Happy to investigate further if this isn't the issue 🙌 |
Hello bilgeyucel, thank you for your reply. I tried your suggested fix, but in my case, both the examples (with and without the comments) get stuck somewhere.
Is there a way for me to provide more details or help you with the investigation? Updated: I am currently using python 3.11.9. |
@marfago What are the logs when you run the pipeline? You can enable Real-Time Logging to see the inputs and outputs of each component import logging
from haystack import tracing
from haystack.tracing.logging_tracer import LoggingTracer
logging.basicConfig(format="%(levelname)s - %(name)s - %(message)s", level=logging.WARNING)
logging.getLogger("haystack").setLevel(logging.DEBUG)
tracing.tracer.is_content_tracing_enabled = True # to enable tracing/logging content (inputs/outputs)
tracing.enable_tracing(LoggingTracer(tags_color_strings={"haystack.component.input": "\x1b[1;31m", "haystack.component.name": "\x1b[1;34m"})) |
This is the log trace as you suggested.
It seems like my code gets stuck in this endless loop: haystack/haystack/core/pipeline/pipeline.py Line 426 in a5b57f4
|
Here is a simplified version of the code (no conditions, no lists): import logging
from typing import Optional, List, Union, Callable
from haystack import component, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.core.component.types import Variadic
@component
class Generator:
def __init__(self):
self.iteration_counter = 0
# Define the component output
@component.output_types(reply=str)
def run(self, parts: Variadic[Union[str, int]]):
return {"reply": "OK"}
# Define the component input parameters
@component
class OutputValidator:
def __init__(self):
self.iteration_counter = 0
# Define the component output
@component.output_types(valid_reply=Optional[str], invalid_reply=Optional[str], error_message=Optional[str])
def run(self, reply: str):
return {"valid_reply": reply}
if __name__ == '__main__':
import logging
from haystack import tracing
from haystack.tracing.logging_tracer import LoggingTracer
logging.basicConfig(format="%(asctime)s %(levelname)s - %(name)s - %(message)s", level=logging.WARNING, datefmt='%Y-%m-%d %H:%M:%S')
logging.getLogger("haystack").setLevel(logging.DEBUG)
tracing.tracer.is_content_tracing_enabled = True # to enable tracing/logging content (inputs/outputs)
tracing.enable_tracing(LoggingTracer(tags_color_strings={"haystack.component.input": "\x1b[1;31m", "haystack.component.name": "\x1b[1;34m"}))
prompt_builder = PromptBuilder("""
{{valid_reply}}
{{invalid_reply}}
{{error_message}}
""")
generator = Generator()
output_validator = OutputValidator()
pipeline = Pipeline(max_runs_per_component=2)
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
pipeline.add_component("output_validator", output_validator)
pipeline.connect("prompt_builder", "generator")
pipeline.connect("generator", "output_validator")
pipeline.connect("output_validator.invalid_reply", "prompt_builder.invalid_reply")
pipeline.run({}) And the logs:
From the logs you can see it was stuck for some time before I stopped it. |
@marfago we are trying to understand the behavior and we're working on a fix for a range of potential issues in the pipeline.run logic. We have a potential solution here: #8707 We're doing extensive testing currently, to make sure that simple and complex use cases are covered. It would be great if you could check out the branch and test if your original issue is solved. We'd be happy about any feedback! |
Describe the bug
I am following this tutorial which introduces a feedback branch.
However, it seems that the pipeline gets stuck in an infinite loop and never exits.
Error message
RuntimeWarning: Pipeline is stuck running in a loop. Partial outputs will be returned. Check the Pipeline graph for possible issues.
warn(RuntimeWarning(msg))
Expected behavior
The pipeline should run once and exit nicely.
Additional context
N/A
To Reproduce
I have created a small example reported at the bottom.
FAQ Check
System:
Code to reproduce the bug
The text was updated successfully, but these errors were encountered: