Skip to content

Conversation

xyxinyang
Copy link
Contributor

实现了异步引擎 async_llm,支持并发请求。

Copy link

paddle-bot bot commented Oct 16, 2025

Thanks for your contribution!

@paddle-bot paddle-bot bot added the contributor External developers label Oct 16, 2025
@Jiang-Jia-Jun Jiang-Jia-Jun requested a review from Copilot October 16, 2025 11:26
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds a new asynchronous LLM engine with streaming output and concurrency support, and wires it into the scheduling loop; also provides integration tests for concurrent generation.

  • Introduces AsyncLLMEngine with async request submission, output processing, and graceful shutdown.
  • Updates scheduler to be more robust when the request-fetch thread pool is shut down.
  • Adds tests validating single and concurrent prompt generation via the async engine.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.

File Description
tests/engine/test_async_llm.py Adds integration-style async tests for single and concurrent generations, plus setup/teardown hygiene.
fastdeploy/engine/common_engine.py Makes scheduler thread-pool submission resilient around shutdown; minor logging in scheduler loop.
fastdeploy/engine/async_llm.py Implements the new AsyncLLMEngine, async output routing, worker/process lifecycle management, and request handling.

Comment on lines 382 to 386
# Create output queue
output_queue = AsyncRequestQueue(request_id)

if request_id is None:
request_id = str(uuid.uuid4())
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request_id is generated after the AsyncRequestQueue is created, leaving output_queue.request_id as None while the registered key uses the generated UUID. This also causes abort_request(request_id) in generate() to use None when the caller doesn't pass a request_id, so the request cannot be aborted properly. Generate the request_id before creating the queue and ensure generate() uses the effective request_id.

Suggested change
# Create output queue
output_queue = AsyncRequestQueue(request_id)
if request_id is None:
request_id = str(uuid.uuid4())
# Ensure request_id is set before creating output queue
if request_id is None:
request_id = str(uuid.uuid4())
output_queue = AsyncRequestQueue(request_id)

Copilot uses AI. Check for mistakes.

current_file_path = os.path.abspath(__file__)
current_dir_path = os.path.split(current_file_path)[0]
# TODO
uncache_worker_stdout = "" if os.getenv("UNCACHE_WORKER_STDOUT", "0") == 1 else "-u"
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.getenv returns a string; comparing it to the integer 1 always evaluates to False, so -u is always used. Compare against the string '1' or cast to int.

Suggested change
uncache_worker_stdout = "" if os.getenv("UNCACHE_WORKER_STDOUT", "0") == 1 else "-u"
uncache_worker_stdout = "" if os.getenv("UNCACHE_WORKER_STDOUT", "0") == "1" else "-u"

Copilot uses AI. Check for mistakes.

Comment on lines +966 to +970
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid eval() on text parsed from logs; use int(match.group(1)) instead to prevent code execution risk.

Suggested change
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
self.worker_init_status["weight_loadding"] = int(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = int(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers

Copilot uses AI. Check for mistakes.

Comment on lines +966 to +970
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid eval() on text parsed from logs; use int(match.group(1)) instead to prevent code execution risk.

Suggested change
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
self.worker_init_status["weight_loadding"] = int(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = int(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers

Copilot uses AI. Check for mistakes.

Comment on lines 709 to 715
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
if not get_request_pool._shutdown:
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing the private attribute _shutdown on ThreadPoolExecutor is brittle. Rely on catching RuntimeError from submit() and use the instance logger for consistency. Suggested change: remove the _shutdown check and replace llm_logger with self.llm_logger.

Suggested change
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
if not get_request_pool._shutdown:
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
# Try to submit the task; catch RuntimeError if the pool is shut down.
try:
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")

Copilot uses AI. Check for mistakes.

Comment on lines 191 to 197
Creates an LLM engine from the provided engine arguments.
Args:
engine_args (EngineArgs): Engine arguments object.
Returns:
LLMEngine: Instance of the LLMEngine class.
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type in the docstring is incorrect; this factory returns AsyncLLMEngine, not LLMEngine.

Suggested change
Creates an LLM engine from the provided engine arguments.
Args:
engine_args (EngineArgs): Engine arguments object.
Returns:
LLMEngine: Instance of the LLMEngine class.
Creates an AsyncLLMEngine from the provided engine arguments.
Args:
engine_args (EngineArgs): Engine arguments object.
Returns:
AsyncLLMEngine: Instance of the AsyncLLMEngine class.

Copilot uses AI. Check for mistakes.

self.checking_worker_status_thread.start()

# display weight loadding progress
with tqdm(total=100, desc="Loading Weights") as pbar:
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct spelling of 'loadding' to 'loading' in related variable names within this section for clarity and consistency.

Copilot uses AI. Check for mistakes.

return False

# display layer loadding progress
with tqdm(total=100, desc="Loading Layers") as pbar:
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct spelling of 'loadding' to 'loading' in related variable names within this section for clarity and consistency.

Copilot uses AI. Check for mistakes.

llm_logger.info(f"Request {request_id} generator exit (outer)")
return
except Exception as e:
await self.abort_request(request_id)
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On exception, generate() aborts using the caller-supplied request_id, which may be None when the engine generated the ID internally. Capture and use the effective request_id (e.g., from the result of add_request or output_queue.request_id) to ensure the correct request is aborted.

Copilot uses AI. Check for mistakes.

@Jiang-Jia-Jun Jiang-Jia-Jun requested a review from Copilot October 16, 2025 12:03
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

Comment on lines +493 to +516
try:
# Ensure output processor is running
self._start_output_handler()

# Async add request
output_queue = await self.add_request(request_id, prompt, sampling_params, **kwargs)

finished = False

while not finished:
# Prefer non-blocking get first
output = output_queue.get_nowait() or await output_queue.get()
finished = output.finished
yield output

except EngineError:
raise
except GeneratorExit:
llm_logger.info(f"Request {request_id} generator exit (outer)")
return
except Exception as e:
await self.abort_request(request_id)
llm_logger.error(f"Request {request_id} failed: {e}")
raise EngineError(str(e), error_code=500) from e
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request_id is None (common case), add_request will generate a new ID, but generate() continues to log and abort using the original None value. This prevents aborting the actual request and produces misleading logs. Assign the actual ID after add_request, e.g., request_id = output_queue.request_id, and use it for logging/abort.

Copilot uses AI. Check for mistakes.

Comment on lines +966 to +970
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using eval on log-derived text is unsafe and unnecessary. Replace eval(match.group(1)) with int(match.group(1)) to parse integers safely.

Suggested change
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
self.worker_init_status["weight_loadding"] = int(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = int(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers

Copilot uses AI. Check for mistakes.

Comment on lines +881 to +888
def check_health(self, time_interval_threashold=30):
"""
Check the health of the model server by checking whether all workers are alive.
"""
if self.engine_service.worker_healthy_live_signal.value[0]:
elapsed_time = time.time() - self.engine_service.worker_healthy_live_signal.value[0]
if elapsed_time > time_interval_threashold:
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the parameter name typo 'time_interval_threashold' to 'time_interval_threshold'.

Suggested change
def check_health(self, time_interval_threashold=30):
"""
Check the health of the model server by checking whether all workers are alive.
"""
if self.engine_service.worker_healthy_live_signal.value[0]:
elapsed_time = time.time() - self.engine_service.worker_healthy_live_signal.value[0]
if elapsed_time > time_interval_threashold:
def check_health(self, time_interval_threshold=30):
"""
Check the health of the model server by checking whether all workers are alive.
"""
if self.engine_service.worker_healthy_live_signal.value[0]:
elapsed_time = time.time() - self.engine_service.worker_healthy_live_signal.value[0]
if elapsed_time > time_interval_threshold:

Copilot uses AI. Check for mistakes.


if not results:
# No results, minimal delay to yield control
await asyncio.sleep(0)
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using await asyncio.sleep(0) can create a near-busy loop and waste CPU. Consider a small backoff (e.g., await asyncio.sleep(0.001)) or a condition-based await to reduce spin.

Suggested change
await asyncio.sleep(0)
await asyncio.sleep(0.001)

Copilot uses AI. Check for mistakes.

Comment on lines +855 to +860
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
llm_logger.info(f"Launch worker service command: {pd_cmd}")
p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid shell=True with string commands where possible to reduce injection risk. Build an argument list and invoke Popen with shell=False (e.g., use a list for python, -m, launch, flags, and script arguments) or carefully sanitize inputs.

Suggested change
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
llm_logger.info(f"Launch worker service command: {pd_cmd}")
p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
# Build the command as a list of arguments
pd_cmd_list = pd_cmd.split()
if self.cfg.nnode > 1:
pd_cmd_list += ["--ips", str(ips), "--nnodes", str(len(self.cfg.ips))]
for worker_flag, value in worker_append_flag.items():
if value:
pd_cmd_list.append(f"--{worker_flag}")
# Open the log file for stderr redirection
log_file = open(f"{log_dir}/launch_worker.log", "w")
llm_logger.info(f"Launch worker service command: {' '.join(pd_cmd_list)}")
p = subprocess.Popen(
pd_cmd_list,
stdout=subprocess.PIPE,
stderr=log_file,
shell=False,

Copilot uses AI. Check for mistakes.

get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Use the instance logger self.llm_logger for consistency with the rest of EngineService instead of the module-level llm_logger.

Suggested change
llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")

Copilot uses AI. Check for mistakes.

@Jiang-Jia-Jun Jiang-Jia-Jun changed the title add async_llm [Feature] Support AsyncLLM Oct 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contributor External developers

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant