From f96a6f0eeff8576a400d9b853022d440923256d4 Mon Sep 17 00:00:00 2001 From: bonadio Date: Wed, 1 Nov 2023 01:30:22 -0300 Subject: [PATCH] Adding async support to get_human_input (#466) * Adding async support to get_human_input * Adjust code for Code formatting testing fail * Adjust the test_async_get_human_input.py to run async on test * Adjust the test_async_get_human_input.py for pre-commit-check error * Adjust the test_async_get_human_input.py for pre-commit-check error v2 * Adjust remove unnecessary register_reply * Adjust test to use asyncio call * Adjust go back to not use asyncio --- autogen/agentchat/conversable_agent.py | 85 ++++++++++++++++++++ test/agentchat/test_async_get_human_input.py | 35 ++++++++ 2 files changed, 120 insertions(+) create mode 100644 test/agentchat/test_async_get_human_input.py diff --git a/autogen/agentchat/conversable_agent.py b/autogen/agentchat/conversable_agent.py index a46cb03cb582..017ba4e848ac 100644 --- a/autogen/agentchat/conversable_agent.py +++ b/autogen/agentchat/conversable_agent.py @@ -755,6 +755,77 @@ def check_termination_and_human_reply( return False, None + async def a_check_termination_and_human_reply( + self, + messages: Optional[List[Dict]] = None, + sender: Optional[Agent] = None, + config: Optional[Any] = None, + ) -> Tuple[bool, Union[str, Dict, None]]: + """(async) Check if the conversation should be terminated, and if human reply is provided.""" + if config is None: + config = self + if messages is None: + messages = self._oai_messages[sender] + message = messages[-1] + reply = "" + no_human_input_msg = "" + if self.human_input_mode == "ALWAYS": + reply = await self.a_get_human_input( + f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: " + ) + no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" + # if the human input is empty, and the message is a termination message, then we will terminate the conversation + reply = reply if reply or not self._is_termination_msg(message) else "exit" + else: + if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]: + if self.human_input_mode == "NEVER": + reply = "exit" + else: + # self.human_input_mode == "TERMINATE": + terminate = self._is_termination_msg(message) + reply = await self.a_get_human_input( + f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " + if terminate + else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: " + ) + no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" + # if the human input is empty, and the message is a termination message, then we will terminate the conversation + reply = reply if reply or not terminate else "exit" + elif self._is_termination_msg(message): + if self.human_input_mode == "NEVER": + reply = "exit" + else: + # self.human_input_mode == "TERMINATE": + reply = await self.a_get_human_input( + f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " + ) + no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" + # if the human input is empty, and the message is a termination message, then we will terminate the conversation + reply = reply or "exit" + + # print the no_human_input_msg + if no_human_input_msg: + print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True) + + # stop the conversation + if reply == "exit": + # reset the consecutive_auto_reply_counter + self._consecutive_auto_reply_counter[sender] = 0 + return True, None + + # send the human reply + if reply or self._max_consecutive_auto_reply_dict[sender] == 0: + # reset the consecutive_auto_reply_counter + self._consecutive_auto_reply_counter[sender] = 0 + return True, reply + + # increment the consecutive_auto_reply_counter + self._consecutive_auto_reply_counter[sender] += 1 + if self.human_input_mode != "NEVER": + print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True) + + return False, None + def generate_reply( self, messages: Optional[List[Dict]] = None, @@ -891,6 +962,20 @@ def get_human_input(self, prompt: str) -> str: reply = input(prompt) return reply + async def a_get_human_input(self, prompt: str) -> str: + """(Async) Get human input. + + Override this method to customize the way to get human input. + + Args: + prompt (str): prompt for the human input. + + Returns: + str: human input. + """ + reply = input(prompt) + return reply + def run_code(self, code, **kwargs): """Run the code and return the result. diff --git a/test/agentchat/test_async_get_human_input.py b/test/agentchat/test_async_get_human_input.py new file mode 100644 index 000000000000..cdc7ea2aa7e8 --- /dev/null +++ b/test/agentchat/test_async_get_human_input.py @@ -0,0 +1,35 @@ +import asyncio +import autogen +import pytest +from test_assistant_agent import KEY_LOC, OAI_CONFIG_LIST + + +@pytest.mark.asyncio +async def test_async_get_human_input(): + try: + import openai + except ImportError: + return + config_list = autogen.config_list_from_json(OAI_CONFIG_LIST, KEY_LOC) + + # create an AssistantAgent instance named "assistant" + assistant = autogen.AssistantAgent( + name="assistant", + max_consecutive_auto_reply=2, + llm_config={"request_timeout": 600, "seed": 41, "config_list": config_list, "temperature": 0}, + ) + + user_proxy = autogen.UserProxyAgent(name="user", human_input_mode="ALWAYS", code_execution_config=False) + + async def custom_a_get_human_input(prompt): + return "This is a test" + + user_proxy.a_get_human_input = custom_a_get_human_input + + user_proxy.register_reply([autogen.Agent, None], autogen.ConversableAgent.a_check_termination_and_human_reply) + + await user_proxy.a_initiate_chat(assistant, clear_history=True, message="Hello.") + + +if __name__ == "__main__": + test_async_get_human_input()