From 0cfc3e67ce052873ddc81ea5a9cd85eca27f521e Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Fri, 17 Oct 2025 20:37:40 -0400 Subject: [PATCH 01/17] wip --- examples/survey/survey_agent.py | 370 ++++++++++++++++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 examples/survey/survey_agent.py diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py new file mode 100644 index 0000000000..13bc4f0dcf --- /dev/null +++ b/examples/survey/survey_agent.py @@ -0,0 +1,370 @@ +import csv +import logging +import os +from dataclasses import dataclass + +from dotenv import load_dotenv + +from livekit.agents import ( + Agent, + AgentSession, + AgentTask, + JobContext, + JobProcess, + RoomInputOptions, + RunContext, + WorkerOptions, + cli, + metrics, +) +from livekit.agents.beta.workflows import GetEmailTask, Task, TaskOrchestrator +from livekit.agents.llm import function_tool +from livekit.plugins import deepgram, openai, silero + + +@dataclass +class Userdata: + task_results: dict + + +def write_to_csv(filename: str, data: dict): + with open(filename, "a", newline="") as csvfile: + fieldnames = data.keys() + csv_writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + if not os.path.exists(filename): + csv_writer.writeheader() + csv_writer.writerow(data) + + +@function_tool() +async def disqualify(context: RunContext, disqualification_reason: str) -> None: + """Call if the candidate refuses to explicitly answer or if they do not meet the prerequisites for the position. + This function will terminate the interview and hang up. + + Args: + disqualification_reason (str): The justification for ending the interview (ex. Refuses to answer question) + """ + context.session.generate_reply( + instructions=f"The interview is ending now, inform the candidate that the reason was {disqualification_reason}" + ) + # in this scenario, disqualified candidates do not get recorded into the csv file, is that ideal? + context.session.shutdown() + + +class WorkDistributionTask(AgentTask[str]): + def __init__(self, team_size: int) -> None: + super().__init__( + instructions="""You will be asking the candidate about their project in relation to their team size.""" + ) + self._team_size = team_size + + async def on_enter(self) -> None: + if self._team_size == 1: + self.session.generate_reply( + instructions="Have the candidate walk you through their thought process on splitting the project work between a team of four. If unspecified, probe further on why they made their decisions." + ) + + else: + self.session.generate_reply( + instructions="Inquire how the work was divided up between the team. If the candidate believes that there was a better way to distribute work, probe further on what they would change." + ) + + @function_tool() + async def record_work_division_response(self, overall_response: str): + """Call once the candidate has provided a complete overview to their perspective on work division. + + Args: + overall_response (str): The candidate's response to approaching work division, especially regarding their aforementioned project + """ + self.complete(overall_response) + + +class ExpandProjectTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions="You will be asking the candidate to expand upon their project intentions, whether it be to scale their current one or to create a new one." + ) + self._result = {} + + async def on_enter(self) -> None: + self.session.generate_reply( + instructions="Allow the candidate to choose a scenario between expanding upon the project they are currently speaking of or creating a new project entirely. Dissect their thought process and decisions." + ) + + @function_tool() + async def record_old_project_expansion_response( + self, new_features: str, scale_plan: str, overall_response: str + ): + """Call if the candidate decides to scale their previous project. + + Args: + new_features (str): Record any new features the candidate expressed adding, such as improving GUI or adding an AI component. + scale_plan (str): Record how the candidate expressed scaling their project, such as deploying it if not already + overall_response (str): An overview of the candidate's response + """ + self.session.generate_reply( + instructions="Express interest in seeing the candidate scale their project as they described in the future." + ) + self._result["new_features"] = ( + new_features # MIGHT OMIT THIS AND BELOW BECAUSE WE JUST WANTED LLM TO PROMPT THE CANDIDATE AS WELL. IDK + ) + self._result["scale_plan"] = scale_plan + self._result["overall_response"] = overall_response + self.complete(self._result) + + @function_tool() + async def record_new_project_creation_response( + self, new_project_type: str, development_plan: str, overall_response: str + ): + """Call if the candidate decides to create a new project entirely. + + Args: + new_project_type (str): The type of project, such as mobile app or AI program + development_plan (str): Record how the candidate plans to develop this project from start to finish with detail + overall_response (str): An overview of the candidate's response + """ + self.session.generate_reply( + instructions="Respond to the candidate's project idea and express support for pursuing it in the future." + ) + self._result["new_project_type"] = new_project_type + self._result["development_plan"] = development_plan + self._result["overall_response"] = overall_response + self.complete(self._result) + + +class ProjectTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. + Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used + if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position.""", + tools=[disqualify], + ) + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Learn about the candidate's most technically difficult project, be inquisitive on their design choices and thinking process." + ) + + @function_tool() + async def record_project_details( + self, context: RunContext, team_size: int, project_description: str + ) -> None: + """Call to record team size, a categorization of the project, and any additional notes before another round of questions. + + Args: + team_size (int): The size of the project team, minimum of 1 + project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. + """ + tasks = [ + Task( + lambda: WorkDistributionTask(team_size=team_size), + id="work_distribution_task", + description="Collects the candidate's perspective on work distribution regarding their project", + ), + Task( + lambda: ExpandProjectTask(), + id="expand_project_task", + description="Collects the candidate's response on either scaling a previous project or creating a new one", + ), + ] + task_group = TaskOrchestrator(tasks=tasks) + results = await task_group + self.complete(results) + + +class ExperienceTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions=""" + You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. + Record how many years of experience the candidate has and the descriptions of their previous jobs if any. There is no set required amount for this position. + Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. If the candidate wishes to change a previous answer, call out_of_scope. + """, + tools=[disqualify], + ) + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Gather the candidate's work experience including how many years of experience they have and a general overview of their career.", + tool_choice="none", + ) + + @function_tool() + async def record_experience(self, context: RunContext, experience_description: str) -> None: + """Call to record the years of experience the candidate has and its descriptions. + + Args: + experience_description (str): The years of experience the candidate has and a description of each role they previously held. Take note of the corresponding companies as well. + """ + self.complete(experience_description) + + +class CommuteTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions=""" + You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. + Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. + """, + tools=[disqualify], + ) + self._result = {} + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Gather the candidate's commute flexibility, specfically whether or not they are able to commute to the office. Be brief and to the point.", + tool_choice="none", + ) + + @function_tool() + async def record_commute_flexibility(self, context: RunContext, can_commute: bool) -> None: + """Call to record whether or not the candidate can commute to the office. + + Args: + can_commute (bool): If the candidate can commute or not + """ + self._result["can_commute"] = can_commute + if can_commute: + commute_method = await CommuteMethodTask(commute_task=self) + self._result["commute_method"] = commute_method + + self.complete(self._result) + + +class CommuteMethodTask(AgentTask[str]): + def __init__(self, commute_task: CommuteTask) -> None: + out_of_scope_tool = None + for tool in commute_task.tools: + if tool.__name__ == "out_of_scope": + out_of_scope_tool = tool + super().__init__( + instructions="You will now be collecting the candidate's method of transportation.", + chat_ctx=commute_task.chat_ctx, + tools=[disqualify, out_of_scope_tool], + ) + self._commute_task = commute_task + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Gather their transportation method.", + tool_choice="none", + ) + + @function_tool() + async def record_commute_method(self, context: RunContext, commute_method: str) -> None: + """Call to record the candidate's method of transportation for their commute. + + Args: + commute_method (str): The candidate's method of transportation for their commute. + """ + self.complete(commute_method) + + +class IntroTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions=""" + You are Alex, an interviewer screening a candidate for a software engineering position. You both have just started the call. + Welcome the candidate to the interview, remain positive and concise. + You will also be collecting their name and introduction. + """, + ) + self._results = {} + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Welcome the candidate by introducing yourself and gather their name after their introduction.", + tool_choice="none", + ) + + @function_tool() + async def record_name(self, context: RunContext, name: str, intro_notes: str) -> None: + """Call to record the candidate's name and any notes about their response + + Args: + name (str): The candidate's name + intro_notes (str): The candidate's introduction and any additional notes + """ + self._results["name"] = name + self._results["intro_notes"] = intro_notes + self.complete(self._results) + + +class SurveyAgent(Agent): + def __init__(self) -> None: + super().__init__( + instructions=""" + You are a Survey agent screening candidates for a Software Engineer position. + """ + ) + + async def on_enter(self) -> AgentTask: + tasks = [ + Task( + lambda: ProjectTask(), + id="project_task", + description="Probes the user about their thought process on projects", + ), + Task( + lambda: ExperienceTask(), + id="experience_task", + description="Collects years of experience", + ), + Task(lambda: CommuteTask(), id="commute_task", description="Asks about commute"), + Task(lambda: GetEmailTask(), id="get_email_task", description="Collects email"), + Task(lambda: IntroTask(), id="get_name_intro_task", description="Collects name"), + ] + results = await TaskOrchestrator(tasks) + # TaskOrchestrator returns a dictionary with Task IDs as the keys and the results as the values + r = await self.chat_ctx.copy().summarize(llm_v=self.session.llm) + results["summary"] = r.content[0] + self.session.userdata = results + write_to_csv(filename="results.csv", data=results) + + async def on_exit(self) -> None: + await self.session.generate_reply( + instructions="The interview is now complete, alert the user and thank them for their time. They will hear back within 3 days." + ) + + +logger = logging.getLogger("SurveyAgent") + +load_dotenv(".env.local") + + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + + +async def entrypoint(ctx: JobContext): + session = AgentSession[Userdata]( + userdata=Userdata(task_results={}), + llm=openai.LLM(model="gpt-4.1"), + stt=deepgram.STT(model="nova-3", language="multi"), + tts=openai.TTS(), + vad=ctx.proc.userdata["vad"], + preemptive_generation=True, + ) + + usage_collector = metrics.UsageCollector() + + async def log_usage(): + summary = usage_collector.get_summary() + logger.info(f"Usage: {summary}") + + ctx.add_shutdown_callback(log_usage) + + await session.start( + agent=SurveyAgent(), + room=ctx.room, + room_input_options=RoomInputOptions( + delete_room_on_close=True, + ), + ) + + await ctx.connect() + + +if __name__ == "__main__": + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) From 9fe85b8fe22555babffe40c95e10dd983964b34f Mon Sep 17 00:00:00 2001 From: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> Date: Fri, 17 Oct 2025 20:41:51 -0400 Subject: [PATCH 02/17] wip --- examples/survey/survey_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 13bc4f0dcf..27467c5013 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -106,7 +106,7 @@ async def record_old_project_expansion_response( instructions="Express interest in seeing the candidate scale their project as they described in the future." ) self._result["new_features"] = ( - new_features # MIGHT OMIT THIS AND BELOW BECAUSE WE JUST WANTED LLM TO PROMPT THE CANDIDATE AS WELL. IDK + new_features ) self._result["scale_plan"] = scale_plan self._result["overall_response"] = overall_response From e609993590327eb7d11d43ff38bf98a4890fbba3 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Sun, 19 Oct 2025 19:19:32 -0400 Subject: [PATCH 03/17] adapted to taskgroup --- examples/survey/survey_agent.py | 83 +++++++++++++++++---------------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 27467c5013..fa8102b327 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -17,9 +17,9 @@ cli, metrics, ) -from livekit.agents.beta.workflows import GetEmailTask, Task, TaskOrchestrator +from livekit.agents.beta.workflows import GetEmailTask, TaskGroup from livekit.agents.llm import function_tool -from livekit.plugins import deepgram, openai, silero +from livekit.plugins import cartesia, deepgram, openai, silero @dataclass @@ -47,7 +47,6 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: context.session.generate_reply( instructions=f"The interview is ending now, inform the candidate that the reason was {disqualification_reason}" ) - # in this scenario, disqualified candidates do not get recorded into the csv file, is that ideal? context.session.shutdown() @@ -105,9 +104,7 @@ async def record_old_project_expansion_response( self.session.generate_reply( instructions="Express interest in seeing the candidate scale their project as they described in the future." ) - self._result["new_features"] = ( - new_features - ) + self._result["new_features"] = new_features self._result["scale_plan"] = scale_plan self._result["overall_response"] = overall_response self.complete(self._result) @@ -136,7 +133,7 @@ class ProjectTask(AgentTask[dict]): def __init__(self) -> None: super().__init__( instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used + Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position.""", tools=[disqualify], ) @@ -156,21 +153,20 @@ async def record_project_details( team_size (int): The size of the project team, minimum of 1 project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. """ - tasks = [ - Task( - lambda: WorkDistributionTask(team_size=team_size), - id="work_distribution_task", - description="Collects the candidate's perspective on work distribution regarding their project", - ), - Task( - lambda: ExpandProjectTask(), - id="expand_project_task", - description="Collects the candidate's response on either scaling a previous project or creating a new one", - ), - ] - task_group = TaskOrchestrator(tasks=tasks) + task_group = TaskGroup() + task_group.add( + lambda: WorkDistributionTask(team_size=team_size), + id="work_distribution_task", + description="Collects the candidate's perspective on work distribution regarding their project", + ) + task_group.add( + lambda: ExpandProjectTask(), + id="expand_project_task", + description="Collects the candidate's response on either scaling a previous project or creating a new one", + ) + results = await task_group - self.complete(results) + self.complete(results.task_results) class ExperienceTask(AgentTask[dict]): @@ -300,25 +296,30 @@ def __init__(self) -> None: ) async def on_enter(self) -> AgentTask: - tasks = [ - Task( - lambda: ProjectTask(), - id="project_task", - description="Probes the user about their thought process on projects", - ), - Task( - lambda: ExperienceTask(), - id="experience_task", - description="Collects years of experience", - ), - Task(lambda: CommuteTask(), id="commute_task", description="Asks about commute"), - Task(lambda: GetEmailTask(), id="get_email_task", description="Collects email"), - Task(lambda: IntroTask(), id="get_name_intro_task", description="Collects name"), - ] - results = await TaskOrchestrator(tasks) - # TaskOrchestrator returns a dictionary with Task IDs as the keys and the results as the values - r = await self.chat_ctx.copy().summarize(llm_v=self.session.llm) - results["summary"] = r.content[0] + task_group = TaskGroup() + task_group.add( + lambda: IntroTask(), + id="get_name_intro_task", + description="Collects name and introduction", + ) + task_group.add(lambda: GetEmailTask(), id="get_email_task", description="Collects email") + task_group.add(lambda: CommuteTask(), id="commute_task", description="Asks about commute") + task_group.add( + lambda: ExperienceTask(), + id="experience_task", + description="Collects years of experience", + ) + task_group.add( + lambda: ProjectTask(), + id="project_task", + description="Probes the user about their thought process on projects", + ) + + results = await task_group + results = results.task_results + # TaskGroup returns a TaskGroupResult object. The task_results field holds a dictionary with Task IDs as the keys and the results as the values + summary = self.chat_ctx.items[-1] + results["summary"] = summary.content self.session.userdata = results write_to_csv(filename="results.csv", data=results) @@ -342,7 +343,7 @@ async def entrypoint(ctx: JobContext): userdata=Userdata(task_results={}), llm=openai.LLM(model="gpt-4.1"), stt=deepgram.STT(model="nova-3", language="multi"), - tts=openai.TTS(), + tts=cartesia.TTS(), vad=ctx.proc.userdata["vad"], preemptive_generation=True, ) From a22fc612d3e0b04f0c7ea08fbb95c09775af8512 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Mon, 20 Oct 2025 17:42:37 -0400 Subject: [PATCH 04/17] refactor commute task and disqualify --- examples/survey/survey_agent.py | 75 ++++++++++++--------------------- 1 file changed, 28 insertions(+), 47 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index fa8102b327..de69853f04 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -19,11 +19,12 @@ ) from livekit.agents.beta.workflows import GetEmailTask, TaskGroup from livekit.agents.llm import function_tool -from livekit.plugins import cartesia, deepgram, openai, silero +from livekit.plugins import deepgram, openai, silero @dataclass class Userdata: + filename: str task_results: dict @@ -38,8 +39,8 @@ def write_to_csv(filename: str, data: dict): @function_tool() async def disqualify(context: RunContext, disqualification_reason: str) -> None: - """Call if the candidate refuses to explicitly answer or if they do not meet the prerequisites for the position. - This function will terminate the interview and hang up. + """Call if the candidate refuses to cooperate, provides an unsatisfactory or inappropriate answer, or do not meet the prerequisites for the position. + This function will terminate the interview, record their disqualification, and hang up. Args: disqualification_reason (str): The justification for ending the interview (ex. Refuses to answer question) @@ -47,6 +48,12 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: context.session.generate_reply( instructions=f"The interview is ending now, inform the candidate that the reason was {disqualification_reason}" ) + disqualification_reason = "[DISQUALIFIED] " + disqualification_reason + data = { + "name": context.session.userdata.task_results["name"], + "disqualification reason": disqualification_reason, + } + write_to_csv(context.session.userdata.filename, data) context.session.shutdown() @@ -183,7 +190,6 @@ def __init__(self) -> None: async def on_enter(self) -> None: await self.session.generate_reply( instructions="Gather the candidate's work experience including how many years of experience they have and a general overview of their career.", - tool_choice="none", ) @function_tool() @@ -201,7 +207,7 @@ def __init__(self) -> None: super().__init__( instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. + Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. Disqualify the candidate if they cannot commute at all. """, tools=[disqualify], ) @@ -209,54 +215,29 @@ def __init__(self) -> None: async def on_enter(self) -> None: await self.session.generate_reply( - instructions="Gather the candidate's commute flexibility, specfically whether or not they are able to commute to the office. Be brief and to the point.", - tool_choice="none", + instructions="Gather the candidate's commute flexibility, specfically whether or not they are able to commute to the office. If they are able to commute, collect their commute method. Be brief and to the point.", ) @function_tool() - async def record_commute_flexibility(self, context: RunContext, can_commute: bool) -> None: - """Call to record whether or not the candidate can commute to the office. + async def record_commute_flexibility( + self, context: RunContext, office_flexibility: str, commute_method: str + ) -> None: + """Call to record the candidate's flexibility of going into office and notes about their commute. If they are able to commute, record their method of transportation. Args: - can_commute (bool): If the candidate can commute or not + office_flexibility (str): How often the candidate can commute to the office + commute_method (str): The method of transportation the candidate will take to commute (e.g. personal car, bus, subway) """ - self._result["can_commute"] = can_commute - if can_commute: - commute_method = await CommuteMethodTask(commute_task=self) - self._result["commute_method"] = commute_method + self._result["office_flexibility"] = office_flexibility + self._result["commute_method"] = commute_method + if commute_method.lower() == "personal car": + self.session.generate_reply( + instructions="The candidate noted that they will drive to work. Inform them that there is no designated parking lot for the office, but there is metered street parking." + ) self.complete(self._result) -class CommuteMethodTask(AgentTask[str]): - def __init__(self, commute_task: CommuteTask) -> None: - out_of_scope_tool = None - for tool in commute_task.tools: - if tool.__name__ == "out_of_scope": - out_of_scope_tool = tool - super().__init__( - instructions="You will now be collecting the candidate's method of transportation.", - chat_ctx=commute_task.chat_ctx, - tools=[disqualify, out_of_scope_tool], - ) - self._commute_task = commute_task - - async def on_enter(self) -> None: - await self.session.generate_reply( - instructions="Gather their transportation method.", - tool_choice="none", - ) - - @function_tool() - async def record_commute_method(self, context: RunContext, commute_method: str) -> None: - """Call to record the candidate's method of transportation for their commute. - - Args: - commute_method (str): The candidate's method of transportation for their commute. - """ - self.complete(commute_method) - - class IntroTask(AgentTask[dict]): def __init__(self) -> None: super().__init__( @@ -271,7 +252,6 @@ def __init__(self) -> None: async def on_enter(self) -> None: await self.session.generate_reply( instructions="Welcome the candidate by introducing yourself and gather their name after their introduction.", - tool_choice="none", ) @function_tool() @@ -282,6 +262,7 @@ async def record_name(self, context: RunContext, name: str, intro_notes: str) -> name (str): The candidate's name intro_notes (str): The candidate's introduction and any additional notes """ + self.session.userdata.task_results["name"] = name self._results["name"] = name self._results["intro_notes"] = intro_notes self.complete(self._results) @@ -321,7 +302,7 @@ async def on_enter(self) -> AgentTask: summary = self.chat_ctx.items[-1] results["summary"] = summary.content self.session.userdata = results - write_to_csv(filename="results.csv", data=results) + write_to_csv(filename=self.session.userdata.filename, data=results) async def on_exit(self) -> None: await self.session.generate_reply( @@ -340,10 +321,10 @@ def prewarm(proc: JobProcess): async def entrypoint(ctx: JobContext): session = AgentSession[Userdata]( - userdata=Userdata(task_results={}), + userdata=Userdata(filename="results.csv", task_results={}), llm=openai.LLM(model="gpt-4.1"), stt=deepgram.STT(model="nova-3", language="multi"), - tts=cartesia.TTS(), + tts=openai.TTS(), vad=ctx.proc.userdata["vad"], preemptive_generation=True, ) From 5df0f143599271d5ff92d96b306adb842d663a10 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Tue, 21 Oct 2025 13:49:24 -0400 Subject: [PATCH 05/17] adapt to AgentServer --- examples/survey/survey_agent.py | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index de69853f04..89f168a904 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -7,15 +7,13 @@ from livekit.agents import ( Agent, + AgentServer, AgentSession, AgentTask, JobContext, - JobProcess, RoomInputOptions, RunContext, - WorkerOptions, cli, - metrics, ) from livekit.agents.beta.workflows import GetEmailTask, TaskGroup from livekit.agents.llm import function_tool @@ -314,29 +312,20 @@ async def on_exit(self) -> None: load_dotenv(".env.local") +server = AgentServer() -def prewarm(proc: JobProcess): - proc.userdata["vad"] = silero.VAD.load() - -async def entrypoint(ctx: JobContext): +@server.realtime_session() +async def survey_agent(ctx: JobContext) -> None: session = AgentSession[Userdata]( userdata=Userdata(filename="results.csv", task_results={}), llm=openai.LLM(model="gpt-4.1"), stt=deepgram.STT(model="nova-3", language="multi"), tts=openai.TTS(), - vad=ctx.proc.userdata["vad"], + vad=silero.VAD.load(), preemptive_generation=True, ) - usage_collector = metrics.UsageCollector() - - async def log_usage(): - summary = usage_collector.get_summary() - logger.info(f"Usage: {summary}") - - ctx.add_shutdown_callback(log_usage) - await session.start( agent=SurveyAgent(), room=ctx.room, @@ -344,9 +333,8 @@ async def log_usage(): delete_room_on_close=True, ), ) - - await ctx.connect() + ctx.connect() if __name__ == "__main__": - cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) + cli.run_app(server) From 8eabbbb2c9ee7e239f706d4e676fb883b52f59b0 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Tue, 21 Oct 2025 15:39:12 -0400 Subject: [PATCH 06/17] condense project task --- examples/survey/survey_agent.py | 167 +++++++++++++------------------- 1 file changed, 67 insertions(+), 100 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 89f168a904..5c7d61ac05 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -11,9 +11,11 @@ AgentSession, AgentTask, JobContext, + JobProcess, RoomInputOptions, RunContext, cli, + metrics, ) from livekit.agents.beta.workflows import GetEmailTask, TaskGroup from livekit.agents.llm import function_tool @@ -55,123 +57,68 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: context.session.shutdown() -class WorkDistributionTask(AgentTask[str]): - def __init__(self, team_size: int) -> None: +class ProjectTask(AgentTask[dict]): + def __init__(self) -> None: super().__init__( - instructions="""You will be asking the candidate about their project in relation to their team size.""" + instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. + Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used + if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position.""", + tools=[disqualify], ) - self._team_size = team_size + self._results = {} async def on_enter(self) -> None: - if self._team_size == 1: - self.session.generate_reply( - instructions="Have the candidate walk you through their thought process on splitting the project work between a team of four. If unspecified, probe further on why they made their decisions." - ) - - else: - self.session.generate_reply( - instructions="Inquire how the work was divided up between the team. If the candidate believes that there was a better way to distribute work, probe further on what they would change." - ) + await self.session.generate_reply( + instructions="Learn about the candidate's most technically difficult project, be inquisitive on their design choices and thinking process." + ) @function_tool() - async def record_work_division_response(self, overall_response: str): - """Call once the candidate has provided a complete overview to their perspective on work division. + async def record_project_details(self, context: RunContext, project_description: str) -> None: + """Call to record and gradually update the description of their project as they anwer more questions. Args: - overall_response (str): The candidate's response to approaching work division, especially regarding their aforementioned project + project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. """ - self.complete(overall_response) + self._results["project_description"] = project_description + if not self._results["work_division_response"]: + self.session.generate_reply( + instructions="Have the candidate walk you through their thought process on splitting the project work. If they already worked in a team for that project, gather their thoughts on what they would do differently." + ) -class ExpandProjectTask(AgentTask[dict]): - def __init__(self) -> None: - super().__init__( - instructions="You will be asking the candidate to expand upon their project intentions, whether it be to scale their current one or to create a new one." - ) - self._result = {} + elif ( + self._results["work_division_response"] + and not self._results["scaling_project_response"] + ): + self.session.generate_reply( + instructions="Allow the candidate to choose a scenario between expanding upon the project they are currently speaking of or creating a new project entirely. Dissect their thought process and decisions." + ) - async def on_enter(self) -> None: - self.session.generate_reply( - instructions="Allow the candidate to choose a scenario between expanding upon the project they are currently speaking of or creating a new project entirely. Dissect their thought process and decisions." - ) + else: + self.complete(self._results) @function_tool() - async def record_old_project_expansion_response( - self, new_features: str, scale_plan: str, overall_response: str - ): - """Call if the candidate decides to scale their previous project. + async def record_work_division_response(self, work_division_response: str): + """Call once the candidate has provided a complete overview to their perspective on work division. Args: - new_features (str): Record any new features the candidate expressed adding, such as improving GUI or adding an AI component. - scale_plan (str): Record how the candidate expressed scaling their project, such as deploying it if not already - overall_response (str): An overview of the candidate's response + work_division_response (str): The candidate's response to approaching work division, especially regarding their aforementioned project """ - self.session.generate_reply( - instructions="Express interest in seeing the candidate scale their project as they described in the future." - ) - self._result["new_features"] = new_features - self._result["scale_plan"] = scale_plan - self._result["overall_response"] = overall_response - self.complete(self._result) + self._results["work_division_response"] = work_division_response @function_tool() - async def record_new_project_creation_response( - self, new_project_type: str, development_plan: str, overall_response: str - ): - """Call if the candidate decides to create a new project entirely. + async def record_project_scale_response(self, chosen_scenario: str, scale_response: str): + """Call to record the candidate's response to scaling their project, either the aforementioned or a new one Args: - new_project_type (str): The type of project, such as mobile app or AI program - development_plan (str): Record how the candidate plans to develop this project from start to finish with detail - overall_response (str): An overview of the candidate's response + chosen_scenario (str): The scenario the candidate chose, either 'old_project' or 'new_project' + old_project_scale_response (str): An overview of the candidate's response """ self.session.generate_reply( - instructions="Respond to the candidate's project idea and express support for pursuing it in the future." - ) - self._result["new_project_type"] = new_project_type - self._result["development_plan"] = development_plan - self._result["overall_response"] = overall_response - self.complete(self._result) - - -class ProjectTask(AgentTask[dict]): - def __init__(self) -> None: - super().__init__( - instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used - if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position.""", - tools=[disqualify], - ) - - async def on_enter(self) -> None: - await self.session.generate_reply( - instructions="Learn about the candidate's most technically difficult project, be inquisitive on their design choices and thinking process." - ) - - @function_tool() - async def record_project_details( - self, context: RunContext, team_size: int, project_description: str - ) -> None: - """Call to record team size, a categorization of the project, and any additional notes before another round of questions. - - Args: - team_size (int): The size of the project team, minimum of 1 - project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. - """ - task_group = TaskGroup() - task_group.add( - lambda: WorkDistributionTask(team_size=team_size), - id="work_distribution_task", - description="Collects the candidate's perspective on work distribution regarding their project", - ) - task_group.add( - lambda: ExpandProjectTask(), - id="expand_project_task", - description="Collects the candidate's response on either scaling a previous project or creating a new one", + instructions="Express interest in seeing the candidate scale their project as they described in the future." ) - - results = await task_group - self.complete(results.task_results) + results = {"scenario": chosen_scenario, "response": scale_response} + self._result["scaling_project_response"] = results class ExperienceTask(AgentTask[dict]): @@ -180,10 +127,11 @@ def __init__(self) -> None: instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. Record how many years of experience the candidate has and the descriptions of their previous jobs if any. There is no set required amount for this position. - Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. If the candidate wishes to change a previous answer, call out_of_scope. + Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. """, tools=[disqualify], ) + self._results = {} async def on_enter(self) -> None: await self.session.generate_reply( @@ -191,13 +139,18 @@ async def on_enter(self) -> None: ) @function_tool() - async def record_experience(self, context: RunContext, experience_description: str) -> None: - """Call to record the years of experience the candidate has and its descriptions. + async def record_experience( + self, context: RunContext, years_of_experience: int, experience_description: str + ) -> None: + """Call to record the work experience the candidate has and a description of each role with a clear timeline. Args: - experience_description (str): The years of experience the candidate has and a description of each role they previously held. Take note of the corresponding companies as well. + years_of_experience (int): The years of experience the candidate has + experience_description (str): A description of each role they previously held. Take note of the corresponding companies as well. """ - self.complete(experience_description) + self._results["years_of_experience"] = years_of_experience + self._results["experience_description"] = experience_description + self.complete(self._results) class CommuteTask(AgentTask[dict]): @@ -312,11 +265,16 @@ async def on_exit(self) -> None: load_dotenv(".env.local") + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + + server = AgentServer() @server.realtime_session() -async def survey_agent(ctx: JobContext) -> None: +async def entrypoint(ctx: JobContext): session = AgentSession[Userdata]( userdata=Userdata(filename="results.csv", task_results={}), llm=openai.LLM(model="gpt-4.1"), @@ -326,6 +284,14 @@ async def survey_agent(ctx: JobContext) -> None: preemptive_generation=True, ) + usage_collector = metrics.UsageCollector() + + async def log_usage(): + summary = usage_collector.get_summary() + logger.info(f"Usage: {summary}") + + ctx.add_shutdown_callback(log_usage) + await session.start( agent=SurveyAgent(), room=ctx.room, @@ -333,7 +299,8 @@ async def survey_agent(ctx: JobContext) -> None: delete_room_on_close=True, ), ) - ctx.connect() + + await ctx.connect() if __name__ == "__main__": From 7a43d8e81dc2c606e39b4207adb88b3fd270f928 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Tue, 21 Oct 2025 17:02:52 -0400 Subject: [PATCH 07/17] added evaluation method and behaviorial task --- examples/survey/survey_agent.py | 89 ++++++++++++++++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 5c7d61ac05..de2dfa5999 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -15,6 +15,7 @@ RoomInputOptions, RunContext, cli, + llm, metrics, ) from livekit.agents.beta.workflows import GetEmailTask, TaskGroup @@ -37,6 +38,31 @@ def write_to_csv(filename: str, data: dict): csv_writer.writerow(data) +async def evaluate_candidate(llm_model, summary) -> str: + """Analyzes the full conversation to determine if the candidate is a good fit for the role and position""" + conversation_text = summary.content + chat_ctx = llm.ChatContext() + chat_ctx.add_message( + role="system", + content=( + "Evaluate whether or not this candidate is a good fit for the company and role based on the conversation summary provided.\n" + "Take into account their holistic and professional profile and the quality of their responses.\n" + "Be concise and firm in the evaluation." + ), + ) + chat_ctx.add_message( + role="user", + content=f"Conversation to evaluate:\n\n{conversation_text}", + ) + + chunks: list[str] = [] + async for chunk in llm_model.chat(chat_ctx=chat_ctx): + if chunk.delta and chunk.delta.content: + chunks.append(chunk.delta.content) + evaluation = "".join(chunks).strip() + return evaluation + + @function_tool() async def disqualify(context: RunContext, disqualification_reason: str) -> None: """Call if the candidate refuses to cooperate, provides an unsatisfactory or inappropriate answer, or do not meet the prerequisites for the position. @@ -57,6 +83,57 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: context.session.shutdown() +class BehaviorialTask(AgentTask[dict]): + def __init__(self) -> None: + super().__init__( + instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. + You will now be learning more about the candidate holistically. This includes their strengths, weaknesses, and work and communication style. You are testing the candidate for a good fit in the company. + The ideal candidate would be well spoken, energetic, and thorough in their answers. Do not mention the prerequisites. If the candidate does not fulfill the description or refuses to answer, call disqualify(). + """, + tools=[disqualify], + ) + self._results = {} + + async def on_enter(self) -> None: + await self.session.generate_reply( + instructions="Approach this task in a natural conversational manner and incrementally gather the candidate's strengths, weaknesses, and work style in no particular order." + ) + + @function_tool() + async def record_strengths(self, strengths_summary: str): + """Call to record a summary of the candidate's strengths. + + Args: + strengths_summary (str): A summary of the candidate's strengths + """ + self._results["strengths"] = strengths_summary + self._check_completion() + + @function_tool() + async def record_weaknesses(self, weaknesses_summary): + """Call to record a summary of the candidate's weaknesses. + + Args: + weaknesses_summary (str): A summary of the candidate's weaknesses + """ + self._results["weaknesses"] = weaknesses_summary + self._check_completion() + + @function_tool() + async def record_work_style(self, work_style: str): + """Call to record a summary of the candidate's work and communication style. + + Args: + work_style (str): The candidate's work and communication style + """ + self._results["work_style"] = work_style + self._check_completion() + + def _check_completion(self): + if self._results.keys() == {"strengths", "weaknesses", "work_style"}: + self.complete(self._results) + + class ProjectTask(AgentTask[dict]): def __init__(self) -> None: super().__init__( @@ -74,7 +151,7 @@ async def on_enter(self) -> None: @function_tool() async def record_project_details(self, context: RunContext, project_description: str) -> None: - """Call to record and gradually update the description of their project as they anwer more questions. + """Call to record and gradually update the description of their project as they answer more questions. Args: project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. @@ -241,6 +318,11 @@ async def on_enter(self) -> AgentTask: id="experience_task", description="Collects years of experience", ) + task_group.add( + lambda: BehaviorialTask(), + id="behavorial_task", + description="Gathers a holistic view of the candidate, including their strengths, weaknesses, and work style", + ) task_group.add( lambda: ProjectTask(), id="project_task", @@ -251,8 +333,11 @@ async def on_enter(self) -> AgentTask: results = results.task_results # TaskGroup returns a TaskGroupResult object. The task_results field holds a dictionary with Task IDs as the keys and the results as the values summary = self.chat_ctx.items[-1] + evaluation = await evaluate_candidate(llm_model=self.session.llm, summary=summary) results["summary"] = summary.content - self.session.userdata = results + results["evaluation"] = evaluation + + self.session.userdata.task_results = results write_to_csv(filename=self.session.userdata.filename, data=results) async def on_exit(self) -> None: From 4723dd25c3546106c81aeef45eb8c8a33171aa40 Mon Sep 17 00:00:00 2001 From: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> Date: Wed, 22 Oct 2025 17:21:44 -0400 Subject: [PATCH 08/17] add type hint --- examples/survey/survey_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index de2dfa5999..bca1b259bf 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -110,7 +110,7 @@ async def record_strengths(self, strengths_summary: str): self._check_completion() @function_tool() - async def record_weaknesses(self, weaknesses_summary): + async def record_weaknesses(self, weaknesses_summary: str): """Call to record a summary of the candidate's weaknesses. Args: From 1d092e2321b81f8ce914985f7851ee7ea86bdbc8 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Thu, 23 Oct 2025 00:46:16 -0400 Subject: [PATCH 09/17] wip --- examples/survey/survey_agent.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index de2dfa5999..27633d2536 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -89,6 +89,7 @@ def __init__(self) -> None: instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. You will now be learning more about the candidate holistically. This includes their strengths, weaknesses, and work and communication style. You are testing the candidate for a good fit in the company. The ideal candidate would be well spoken, energetic, and thorough in their answers. Do not mention the prerequisites. If the candidate does not fulfill the description or refuses to answer, call disqualify(). + Avoid listing out questions with bullet points or numbers, use a natural conversational tone. """, tools=[disqualify], ) @@ -110,7 +111,7 @@ async def record_strengths(self, strengths_summary: str): self._check_completion() @function_tool() - async def record_weaknesses(self, weaknesses_summary): + async def record_weaknesses(self, weaknesses_summary: str): """Call to record a summary of the candidate's weaknesses. Args: @@ -139,7 +140,7 @@ def __init__(self) -> None: super().__init__( instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used - if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position.""", + if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position. Avoid listing out questions with bullet points or numbers, use a natural conversational tone.""", tools=[disqualify], ) self._results = {} @@ -158,14 +159,14 @@ async def record_project_details(self, context: RunContext, project_description: """ self._results["project_description"] = project_description - if not self._results["work_division_response"]: + if "work_division_response" not in self._results.keys(): self.session.generate_reply( instructions="Have the candidate walk you through their thought process on splitting the project work. If they already worked in a team for that project, gather their thoughts on what they would do differently." ) elif ( self._results["work_division_response"] - and not self._results["scaling_project_response"] + and "scaling_project_response" not in self._results.keys() ): self.session.generate_reply( instructions="Allow the candidate to choose a scenario between expanding upon the project they are currently speaking of or creating a new project entirely. Dissect their thought process and decisions." @@ -195,7 +196,7 @@ async def record_project_scale_response(self, chosen_scenario: str, scale_respon instructions="Express interest in seeing the candidate scale their project as they described in the future." ) results = {"scenario": chosen_scenario, "response": scale_response} - self._result["scaling_project_response"] = results + self._results["scaling_project_response"] = results class ExperienceTask(AgentTask[dict]): @@ -204,7 +205,7 @@ def __init__(self) -> None: instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. Record how many years of experience the candidate has and the descriptions of their previous jobs if any. There is no set required amount for this position. - Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. + Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. Avoid listing out questions with bullet points or numbers, use a natural conversational tone. """, tools=[disqualify], ) @@ -258,9 +259,9 @@ async def record_commute_flexibility( """ self._result["office_flexibility"] = office_flexibility self._result["commute_method"] = commute_method - if commute_method.lower() == "personal car": + if commute_method.lower() == "subway" or commute_method.lower() == "bus": self.session.generate_reply( - instructions="The candidate noted that they will drive to work. Inform them that there is no designated parking lot for the office, but there is metered street parking." + instructions="Inform the candidate that the company may sponsor their transportation expenses." ) self.complete(self._result) @@ -323,11 +324,12 @@ async def on_enter(self) -> AgentTask: id="behavorial_task", description="Gathers a holistic view of the candidate, including their strengths, weaknesses, and work style", ) - task_group.add( - lambda: ProjectTask(), - id="project_task", - description="Probes the user about their thought process on projects", - ) + # TODO refactor to safely complete task + # task_group.add( + # lambda: ProjectTask(), + # id="project_task", + # description="Probes the user about their thought process on projects", + # ) results = await task_group results = results.task_results @@ -336,7 +338,6 @@ async def on_enter(self) -> AgentTask: evaluation = await evaluate_candidate(llm_model=self.session.llm, summary=summary) results["summary"] = summary.content results["evaluation"] = evaluation - self.session.userdata.task_results = results write_to_csv(filename=self.session.userdata.filename, data=results) From 5a1be6f4d0d4c669dd94585c03bd9126ee557843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Wed, 22 Oct 2025 22:50:13 -0700 Subject: [PATCH 10/17] add load_dotenv --- examples/survey/survey_agent.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 27633d2536..4c6742b671 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -22,6 +22,7 @@ from livekit.agents.llm import function_tool from livekit.plugins import deepgram, openai, silero +load_dotenv() @dataclass class Userdata: From 5c0d03b7350a0b890f702160ce15297aad904c8c Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Thu, 23 Oct 2025 13:00:28 -0400 Subject: [PATCH 11/17] change results to dataclasses and shorten --- examples/survey/survey_agent.py | 170 ++++++++++++-------------------- 1 file changed, 64 insertions(+), 106 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 4c6742b671..9c4271497f 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -2,8 +2,10 @@ import logging import os from dataclasses import dataclass +from typing import Annotated from dotenv import load_dotenv +from pydantic import Field from livekit.agents import ( Agent, @@ -11,7 +13,6 @@ AgentSession, AgentTask, JobContext, - JobProcess, RoomInputOptions, RunContext, cli, @@ -22,14 +23,45 @@ from livekit.agents.llm import function_tool from livekit.plugins import deepgram, openai, silero +logger = logging.getLogger("SurveyAgent") + load_dotenv() +CommuteMethods = ["driving", "bus", "subway", "none"] + + @dataclass class Userdata: filename: str + candidate_name: str task_results: dict +@dataclass +class IntroResults: + name: str + intro: str + + +@dataclass +class CommuteResults: + can_commute: bool + commute_method: str + + +@dataclass +class ExperienceResults: + years_of_experience: int + experience_description: str + + +@dataclass +class BehavioralResults: + strengths: str + weaknesses: str + work_style: str + + def write_to_csv(filename: str, data: dict): with open(filename, "a", newline="") as csvfile: fieldnames = data.keys() @@ -84,7 +116,7 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: context.session.shutdown() -class BehaviorialTask(AgentTask[dict]): +class BehavioralTask(AgentTask[BehavioralResults]): def __init__(self) -> None: super().__init__( instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. @@ -133,84 +165,24 @@ async def record_work_style(self, work_style: str): def _check_completion(self): if self._results.keys() == {"strengths", "weaknesses", "work_style"}: - self.complete(self._results) - - -class ProjectTask(AgentTask[dict]): - def __init__(self) -> None: - super().__init__( - instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Gather information about the most technical project the candidate has attempted and probe for their thinking process. Note specificities such as if they worked solo or in a team, and the technology stack they used - if applicable. If they have no projects to dissect, call disqualify(). Do not mention any prerequisites for this position. Avoid listing out questions with bullet points or numbers, use a natural conversational tone.""", - tools=[disqualify], - ) - self._results = {} - - async def on_enter(self) -> None: - await self.session.generate_reply( - instructions="Learn about the candidate's most technically difficult project, be inquisitive on their design choices and thinking process." - ) - - @function_tool() - async def record_project_details(self, context: RunContext, project_description: str) -> None: - """Call to record and gradually update the description of their project as they answer more questions. - - Args: - project_description (str): A description of the project, including the type of project being described, such as "full stack application." Include the technology stack they used and their reasoning behind it. - """ - - self._results["project_description"] = project_description - if "work_division_response" not in self._results.keys(): - self.session.generate_reply( - instructions="Have the candidate walk you through their thought process on splitting the project work. If they already worked in a team for that project, gather their thoughts on what they would do differently." - ) - - elif ( - self._results["work_division_response"] - and "scaling_project_response" not in self._results.keys() - ): - self.session.generate_reply( - instructions="Allow the candidate to choose a scenario between expanding upon the project they are currently speaking of or creating a new project entirely. Dissect their thought process and decisions." + results = BehavioralResults( + strengths=self._results["strengths"], + weaknesses=self._results["weaknesses"], + work_style=self._results["work_style"], ) - - else: - self.complete(self._results) - - @function_tool() - async def record_work_division_response(self, work_division_response: str): - """Call once the candidate has provided a complete overview to their perspective on work division. - - Args: - work_division_response (str): The candidate's response to approaching work division, especially regarding their aforementioned project - """ - self._results["work_division_response"] = work_division_response - - @function_tool() - async def record_project_scale_response(self, chosen_scenario: str, scale_response: str): - """Call to record the candidate's response to scaling their project, either the aforementioned or a new one - - Args: - chosen_scenario (str): The scenario the candidate chose, either 'old_project' or 'new_project' - old_project_scale_response (str): An overview of the candidate's response - """ - self.session.generate_reply( - instructions="Express interest in seeing the candidate scale their project as they described in the future." - ) - results = {"scenario": chosen_scenario, "response": scale_response} - self._results["scaling_project_response"] = results + self.complete(results) -class ExperienceTask(AgentTask[dict]): +class ExperienceTask(AgentTask[ExperienceResults]): def __init__(self) -> None: super().__init__( instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. Record how many years of experience the candidate has and the descriptions of their previous jobs if any. There is no set required amount for this position. - Focus on the frameworks they have experience in and any gaps between jobs. Be sure to confirm details. Avoid listing out questions with bullet points or numbers, use a natural conversational tone. + Be sure to confirm details. Avoid listing out questions with bullet points or numbers, use a natural conversational tone. """, tools=[disqualify], ) - self._results = {} async def on_enter(self) -> None: await self.session.generate_reply( @@ -227,12 +199,13 @@ async def record_experience( years_of_experience (int): The years of experience the candidate has experience_description (str): A description of each role they previously held. Take note of the corresponding companies as well. """ - self._results["years_of_experience"] = years_of_experience - self._results["experience_description"] = experience_description - self.complete(self._results) + results = ExperienceResults( + years_of_experience=years_of_experience, experience_description=experience_description + ) + self.complete(results) -class CommuteTask(AgentTask[dict]): +class CommuteTask(AgentTask[CommuteResults]): def __init__(self) -> None: super().__init__( instructions=""" @@ -241,7 +214,6 @@ def __init__(self) -> None: """, tools=[disqualify], ) - self._result = {} async def on_enter(self) -> None: await self.session.generate_reply( @@ -250,25 +222,28 @@ async def on_enter(self) -> None: @function_tool() async def record_commute_flexibility( - self, context: RunContext, office_flexibility: str, commute_method: str + self, + context: RunContext, + can_commute: bool, + commute_method: Annotated[str, Field(json_schema_extra={"enum": CommuteMethods})], ) -> None: """Call to record the candidate's flexibility of going into office and notes about their commute. If they are able to commute, record their method of transportation. Args: - office_flexibility (str): How often the candidate can commute to the office - commute_method (str): The method of transportation the candidate will take to commute (e.g. personal car, bus, subway) + can_commute (bool): If the candidate can commute to the office + commute_method (str): The method of transportation the candidate will take to commute, either ['driving', 'bus', 'subway', 'none'] """ - self._result["office_flexibility"] = office_flexibility - self._result["commute_method"] = commute_method - if commute_method.lower() == "subway" or commute_method.lower() == "bus": + results = CommuteResults(can_commute=can_commute, commute_method=commute_method) + + if commute_method == "subway" or commute_method == "bus": self.session.generate_reply( - instructions="Inform the candidate that the company may sponsor their transportation expenses." + instructions="Inform the candidate that the company will sponsor their transportation expenses." ) - self.complete(self._result) + self.complete(results) -class IntroTask(AgentTask[dict]): +class IntroTask(AgentTask[IntroResults]): def __init__(self) -> None: super().__init__( instructions=""" @@ -277,7 +252,6 @@ def __init__(self) -> None: You will also be collecting their name and introduction. """, ) - self._results = {} async def on_enter(self) -> None: await self.session.generate_reply( @@ -285,17 +259,16 @@ async def on_enter(self) -> None: ) @function_tool() - async def record_name(self, context: RunContext, name: str, intro_notes: str) -> None: + async def record_intro(self, context: RunContext, name: str, intro_notes: str) -> None: """Call to record the candidate's name and any notes about their response Args: name (str): The candidate's name intro_notes (str): The candidate's introduction and any additional notes """ - self.session.userdata.task_results["name"] = name - self._results["name"] = name - self._results["intro_notes"] = intro_notes - self.complete(self._results) + self.session.userdata.candidate_name = name + results = IntroResults(name=name, intro=intro_notes) + self.complete(results) class SurveyAgent(Agent): @@ -321,16 +294,10 @@ async def on_enter(self) -> AgentTask: description="Collects years of experience", ) task_group.add( - lambda: BehaviorialTask(), + lambda: BehavioralTask(), id="behavorial_task", description="Gathers a holistic view of the candidate, including their strengths, weaknesses, and work style", ) - # TODO refactor to safely complete task - # task_group.add( - # lambda: ProjectTask(), - # id="project_task", - # description="Probes the user about their thought process on projects", - # ) results = await task_group results = results.task_results @@ -348,22 +315,13 @@ async def on_exit(self) -> None: ) -logger = logging.getLogger("SurveyAgent") - -load_dotenv(".env.local") - - -def prewarm(proc: JobProcess): - proc.userdata["vad"] = silero.VAD.load() - - server = AgentServer() @server.realtime_session() async def entrypoint(ctx: JobContext): session = AgentSession[Userdata]( - userdata=Userdata(filename="results.csv", task_results={}), + userdata=Userdata(filename="results.csv", candidate_name="", task_results={}), llm=openai.LLM(model="gpt-4.1"), stt=deepgram.STT(model="nova-3", language="multi"), tts=openai.TTS(), From 0f278cb75689c85b265cdf5039e806f7a9444648 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Thu, 23 Oct 2025 15:32:17 -0400 Subject: [PATCH 12/17] wip --- examples/survey/survey_agent.py | 36 +++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 9c4271497f..4fdd9eaa04 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -25,9 +25,10 @@ logger = logging.getLogger("SurveyAgent") -load_dotenv() +load_dotenv(".env.local") CommuteMethods = ["driving", "bus", "subway", "none"] +WorkStyles = ["independent", "team_player"] @dataclass @@ -121,7 +122,7 @@ def __init__(self) -> None: super().__init__( instructions="""You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. You will now be learning more about the candidate holistically. This includes their strengths, weaknesses, and work and communication style. You are testing the candidate for a good fit in the company. - The ideal candidate would be well spoken, energetic, and thorough in their answers. Do not mention the prerequisites. If the candidate does not fulfill the description or refuses to answer, call disqualify(). + If the candidate refuses to answer, call disqualify(). Be concise and to the point. Avoid listing out questions with bullet points or numbers, use a natural conversational tone. """, tools=[disqualify], @@ -154,11 +155,13 @@ async def record_weaknesses(self, weaknesses_summary: str): self._check_completion() @function_tool() - async def record_work_style(self, work_style: str): - """Call to record a summary of the candidate's work and communication style. + async def record_work_style( + self, work_style: Annotated[str, Field(json_schema_extra={"enum": WorkStyles})] + ): + """Call to record a summary of the candidate's work style. Args: - work_style (str): The candidate's work and communication style + work_style (str): The candidate's work style """ self._results["work_style"] = work_style self._check_completion() @@ -171,6 +174,10 @@ def _check_completion(self): work_style=self._results["work_style"], ) self.complete(results) + else: + self.session.generate_reply( + instructions="Continue incrementally collecting the remaining answers for the behavioral stage. Maintain a conversational tone." + ) class ExperienceTask(AgentTask[ExperienceResults]): @@ -210,7 +217,7 @@ def __init__(self) -> None: super().__init__( instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. Disqualify the candidate if they cannot commute at all. + Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. """, tools=[disqualify], ) @@ -228,18 +235,13 @@ async def record_commute_flexibility( commute_method: Annotated[str, Field(json_schema_extra={"enum": CommuteMethods})], ) -> None: """Call to record the candidate's flexibility of going into office and notes about their commute. If they are able to commute, record their method of transportation. + If the candidate chooses public transportation, (bus, subway) then their travel expenses may be subsidized by the company. Args: can_commute (bool): If the candidate can commute to the office commute_method (str): The method of transportation the candidate will take to commute, either ['driving', 'bus', 'subway', 'none'] """ results = CommuteResults(can_commute=can_commute, commute_method=commute_method) - - if commute_method == "subway" or commute_method == "bus": - self.session.generate_reply( - instructions="Inform the candidate that the company will sponsor their transportation expenses." - ) - self.complete(results) @@ -275,7 +277,7 @@ class SurveyAgent(Agent): def __init__(self) -> None: super().__init__( instructions=""" - You are a Survey agent screening candidates for a Software Engineer position. + You are a Survey agent screening candidates for a Software Engineer position. When the interview is concluded, call end_screening to hang up. """ ) @@ -308,12 +310,16 @@ async def on_enter(self) -> AgentTask: results["evaluation"] = evaluation self.session.userdata.task_results = results write_to_csv(filename=self.session.userdata.filename, data=results) - - async def on_exit(self) -> None: await self.session.generate_reply( instructions="The interview is now complete, alert the user and thank them for their time. They will hear back within 3 days." ) + @function_tool() + async def end_screening(self): + """Call when the interview/screening is concluded to hang up.""" + await self.session.generate_reply(instructions="Close out the interview and say goodbye.") + self.session.shutdown() + server = AgentServer() From 55a01489e7a07d5fd461d17c9f6f093620e6e3f8 Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Thu, 23 Oct 2025 23:46:52 -0400 Subject: [PATCH 13/17] async csv write function --- examples/survey/survey_agent.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 4fdd9eaa04..a3f66c3ad2 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -1,9 +1,10 @@ -import csv import logging import os from dataclasses import dataclass from typing import Annotated +import aiofiles +from aiocsv import AsyncWriter from dotenv import load_dotenv from pydantic import Field @@ -25,7 +26,7 @@ logger = logging.getLogger("SurveyAgent") -load_dotenv(".env.local") +load_dotenv() CommuteMethods = ["driving", "bus", "subway", "none"] WorkStyles = ["independent", "team_player"] @@ -63,13 +64,12 @@ class BehavioralResults: work_style: str -def write_to_csv(filename: str, data: dict): - with open(filename, "a", newline="") as csvfile: - fieldnames = data.keys() - csv_writer = csv.DictWriter(csvfile, fieldnames=fieldnames) +async def write_to_csv(filename: str, data: dict): + async with aiofiles.open(filename, "a", newline="") as csvfile: + writer = AsyncWriter(csvfile, data.keys()) if not os.path.exists(filename): - csv_writer.writeheader() - csv_writer.writerow(data) + await writer.writeheader() + await writer.writerow(data.values()) async def evaluate_candidate(llm_model, summary) -> str: @@ -113,7 +113,7 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: "name": context.session.userdata.task_results["name"], "disqualification reason": disqualification_reason, } - write_to_csv(context.session.userdata.filename, data) + await write_to_csv(context.session.userdata.filename, data) context.session.shutdown() @@ -235,11 +235,10 @@ async def record_commute_flexibility( commute_method: Annotated[str, Field(json_schema_extra={"enum": CommuteMethods})], ) -> None: """Call to record the candidate's flexibility of going into office and notes about their commute. If they are able to commute, record their method of transportation. - If the candidate chooses public transportation, (bus, subway) then their travel expenses may be subsidized by the company. Args: can_commute (bool): If the candidate can commute to the office - commute_method (str): The method of transportation the candidate will take to commute, either ['driving', 'bus', 'subway', 'none'] + commute_method (str): The method of transportation the candidate will take to commute """ results = CommuteResults(can_commute=can_commute, commute_method=commute_method) self.complete(results) @@ -309,7 +308,7 @@ async def on_enter(self) -> AgentTask: results["summary"] = summary.content results["evaluation"] = evaluation self.session.userdata.task_results = results - write_to_csv(filename=self.session.userdata.filename, data=results) + await write_to_csv(filename=self.session.userdata.filename, data=results) await self.session.generate_reply( instructions="The interview is now complete, alert the user and thank them for their time. They will hear back within 3 days." ) From 72972126be5379eb01c5439df6548c6f4b53911d Mon Sep 17 00:00:00 2001 From: tina nguyen Date: Fri, 24 Oct 2025 00:59:12 -0400 Subject: [PATCH 14/17] fix --- examples/survey/survey_agent.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index a3f66c3ad2..4910e42406 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -110,7 +110,7 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: ) disqualification_reason = "[DISQUALIFIED] " + disqualification_reason data = { - "name": context.session.userdata.task_results["name"], + "name": context.session.userdata.candidate_name, "disqualification reason": disqualification_reason, } await write_to_csv(context.session.userdata.filename, data) @@ -287,6 +287,8 @@ async def on_enter(self) -> AgentTask: id="get_name_intro_task", description="Collects name and introduction", ) + # dependent on PR #3711 + # task_group.add(lambda: GetEmailTask(extra_instructions="If the user refuses to provide their email, call disqualify() insted of decline_email_capture().", tools=[disqualify]), id="get_email_task", description="Collects email") task_group.add(lambda: GetEmailTask(), id="get_email_task", description="Collects email") task_group.add(lambda: CommuteTask(), id="commute_task", description="Asks about commute") task_group.add( From fc6ad3fa32c0389eec6cc38ca96992dc0ec02fb7 Mon Sep 17 00:00:00 2001 From: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> Date: Mon, 27 Oct 2025 15:18:41 -0400 Subject: [PATCH 15/17] uncomment emailtask --- examples/survey/survey_agent.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 4910e42406..30e24c51f5 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -287,9 +287,7 @@ async def on_enter(self) -> AgentTask: id="get_name_intro_task", description="Collects name and introduction", ) - # dependent on PR #3711 - # task_group.add(lambda: GetEmailTask(extra_instructions="If the user refuses to provide their email, call disqualify() insted of decline_email_capture().", tools=[disqualify]), id="get_email_task", description="Collects email") - task_group.add(lambda: GetEmailTask(), id="get_email_task", description="Collects email") + task_group.add(lambda: GetEmailTask(extra_instructions="If the user refuses to provide their email, call disqualify() insted of decline_email_capture().", tools=[disqualify]), id="get_email_task", description="Collects email") task_group.add(lambda: CommuteTask(), id="commute_task", description="Asks about commute") task_group.add( lambda: ExperienceTask(), From 38272e35e3fffca09ac9daaf68ca35a1d32fa2ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctinalenguyen=E2=80=9D?= Date: Thu, 30 Oct 2025 01:30:14 -0400 Subject: [PATCH 16/17] update --- examples/survey/survey_agent.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 30e24c51f5..1a14d43e65 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -23,6 +23,7 @@ from livekit.agents.beta.workflows import GetEmailTask, TaskGroup from livekit.agents.llm import function_tool from livekit.plugins import deepgram, openai, silero +from livekit.plugins.turn_detector.multilingual import MultilingualModel logger = logging.getLogger("SurveyAgent") @@ -106,7 +107,7 @@ async def disqualify(context: RunContext, disqualification_reason: str) -> None: disqualification_reason (str): The justification for ending the interview (ex. Refuses to answer question) """ context.session.generate_reply( - instructions=f"The interview is ending now, inform the candidate that the reason was {disqualification_reason}" + instructions=f"The interview is ending now, inform the candidate that the reason was {disqualification_reason}. Be respectful and natural." ) disqualification_reason = "[DISQUALIFIED] " + disqualification_reason data = { @@ -217,7 +218,7 @@ def __init__(self) -> None: super().__init__( instructions=""" You are an interviewer screening a candidate for a software engineering position. You have already been asking a series of questions, and this is another stage of the process. - Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. + Record if the candidate is able to commute to the office and their flexibility. Ideally, the candidate should commute to the office three days a week. Avoiding using parentheses in your response. """, tools=[disqualify], ) @@ -287,7 +288,14 @@ async def on_enter(self) -> AgentTask: id="get_name_intro_task", description="Collects name and introduction", ) - task_group.add(lambda: GetEmailTask(extra_instructions="If the user refuses to provide their email, call disqualify() insted of decline_email_capture().", tools=[disqualify]), id="get_email_task", description="Collects email") + task_group.add( + lambda: GetEmailTask( + extra_instructions="If the user refuses to provide their email, call disqualify() insted of decline_email_capture().", + tools=[disqualify], + ), + id="get_email_task", + description="Collects email", + ) task_group.add(lambda: CommuteTask(), id="commute_task", description="Asks about commute") task_group.add( lambda: ExperienceTask(), @@ -316,21 +324,21 @@ async def on_enter(self) -> AgentTask: @function_tool() async def end_screening(self): """Call when the interview/screening is concluded to hang up.""" - await self.session.generate_reply(instructions="Close out the interview and say goodbye.") self.session.shutdown() server = AgentServer() -@server.realtime_session() +@server.rtc_session() async def entrypoint(ctx: JobContext): session = AgentSession[Userdata]( userdata=Userdata(filename="results.csv", candidate_name="", task_results={}), - llm=openai.LLM(model="gpt-4.1"), + llm=openai.LLM(), stt=deepgram.STT(model="nova-3", language="multi"), tts=openai.TTS(), vad=silero.VAD.load(), + turn_detection=MultilingualModel(), preemptive_generation=True, ) From 01745a39b00d3dfed9796c4d462defbc702b8289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Ctinalenguyen=E2=80=9D?= Date: Thu, 30 Oct 2025 01:43:55 -0400 Subject: [PATCH 17/17] update task descriptions --- examples/survey/survey_agent.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/survey/survey_agent.py b/examples/survey/survey_agent.py index 1a14d43e65..bea3c196f2 100644 --- a/examples/survey/survey_agent.py +++ b/examples/survey/survey_agent.py @@ -296,11 +296,15 @@ async def on_enter(self) -> AgentTask: id="get_email_task", description="Collects email", ) - task_group.add(lambda: CommuteTask(), id="commute_task", description="Asks about commute") + task_group.add( + lambda: CommuteTask(), + id="commute_task", + description="Asks about commute and corresponding method of transportation.", + ) task_group.add( lambda: ExperienceTask(), id="experience_task", - description="Collects years of experience", + description="Collects years of experience and a description of their professionl work history.", ) task_group.add( lambda: BehavioralTask(),