Skip to content

feat(pooling): Add dedicated async preprocessing support to PluginWithIOProcessorPlugins#40030

Closed
mgazz wants to merge 5 commits intovllm-project:mainfrom
mgazz:async_ioproc
Closed

feat(pooling): Add dedicated async preprocessing support to PluginWithIOProcessorPlugins#40030
mgazz wants to merge 5 commits intovllm-project:mainfrom
mgazz:async_ioproc

Conversation

@mgazz
Copy link
Copy Markdown
Contributor

@mgazz mgazz commented Apr 16, 2026

Purpose

Adds asynchronous preprocessing support to PluginWithIOProcessorPlugins to enable IOProcessor plugins that perform async operations, such as asynchronous data loading in Terratorch plugins. Here an example of plugin using pre_process_async

Test Plan

The test is the same as before

python -m pytest tests/plugins_tests/test_terratorch_io_processor_plugins.py -v

Test Result


Essential Elements of an Effective PR Description Checklist
  • [ x] The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • [ x] The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

mgazz added 2 commits April 15, 2026 13:21
Signed-off-by: Michele Gazzetti <michele.gazzetti1@ibm.com>
Signed-off-by: Michele Gazzetti <michele.gazzetti1@ibm.com>
@mergify mergify Bot added the frontend label Apr 16, 2026
@mgazz mgazz marked this pull request as ready for review April 16, 2026 14:50
@mgazz mgazz requested a review from noooop as a code owner April 16, 2026 14:50
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces asynchronous preprocessing support by adding pre_process_async to the processor and pre_process_online_async to the IO processor. While the changes aim to improve concurrency, the current implementation of pre_process_online_async blocks the asyncio event loop by calling synchronous rendering logic, which negates the benefits of an async entry point. Additionally, the new async method is missing a request type assertion present in the synchronous version.

Comment on lines +72 to +79
async def pre_process_online_async(self, ctx: PoolingServeContext):
validated_prompt = self.io_processor.parse_data(ctx.request.data)

raw_prompts = await self.io_processor.pre_process_async(
prompt=validated_prompt, request_id=ctx.request_id
)

self._set_engine_inputs_and_params(raw_prompts, ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The pre_process_online_async method is missing the assert isinstance(ctx.request, IOProcessorRequest) check that is present in pre_process_online. More importantly, it calls _set_engine_inputs_and_params, which is a synchronous helper that performs blocking operations like tokenization and multimodal processing via self.renderer.render_cmpl. This blocks the asyncio event loop, defeating the purpose of an async entry point. To maintain responsiveness, you should use self.renderer.render_cmpl_async in the async path. Consider refactoring the shared logic into smaller helpers to avoid duplication while allowing both sync and async rendering.

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 16, 2026

Thanks for your contribution

but once #39763 lands, we will no longer need pre_process_async, as it will already achieve the fastest possible speed.

It also utilizes thread pool concurrency with almost no additional overhead.

Do you have any test data showing that using pre_process_async is faster than offloading blocking preprocessing and postprocessing ops to a thread pool?

Also try increasing renderer_num_workers and api-server-count together.

After all, async only uses a single thread, while renderer_num_workers uses multiple threads, and api-server-count uses multiple processes. They can effectively bypass the GIL.

@mgazz
Copy link
Copy Markdown
Contributor Author

mgazz commented Apr 16, 2026

Thank you for the reference. I see the PR is merged, I will test with main and check if the we still have issues loading Terratorch IOProcessors.

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 16, 2026

PTAL #34789 (comment)

Because the GIL makes it nearly impossible for python multi-threading to speed up preprocessing, ultimately we need api-server-count to use multi-processing to accelerate preprocessing.

@DarkLight1337 DarkLight1337 added the ready ONLY add when PR is ready to merge/full CI is needed label Apr 16, 2026
@DarkLight1337
Copy link
Copy Markdown
Member

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 16, 2026

Can you also fix https://buildkite.com/vllm/ci/builds/61639/steps/table?sid=019d96c9-ca44-4bed-92d4-140c1581a99e&tab=output in this PR? Thanks

@mgazz

Please help fix plugins_tests/test_terratorch_io_processor_plugins.py, it caused by #39763, and now the main branch is failing.

@DarkLight1337
Copy link
Copy Markdown
Member

Otherwise @noooop can fix it tomorrow, see who got time

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 16, 2026

You could try having the entrypoint directly call pre_process_async again. Since pre_process is essentially running loop.run_until_complete(self.pre_process_async(prompt, request_id, **kwargs)), I don’t think it’s very compatible with the current thread pool.

…mpl_async

Signed-off-by: Michele Gazzetti <michele.gazzetti1@ibm.com>
@mgazz
Copy link
Copy Markdown
Contributor Author

mgazz commented Apr 16, 2026

Do you have any test data showing that using pre_process_async is faster than offloading blocking preprocessing and postprocessing ops to a thread pool?

Good question, we did some performance testing with the previous IOProcessor implementatino, but we have not evaluated the thread pool approach yet.

This PR aimed at extending the current IOProcessor wrapping layer to support the async interface that plugins like the Terratorch one already implement, so existing code continues to work properly.

Terratorch IOProcessors are designed for I/O-bound operations: downloading satellite imagery over the network and reading large geospatial files. For these workloads, async was the natural fit because when most time is spent downloading data (not CPU processing), async can handle many more concurrent operations efficiently. That said our performance evaluation showed that, if downloads are fast, we experienced diminishing returns with an async implementation. In this case, using thread pool should allow us to get better performance so it's great to have it already as an option.

Another important aspect is that TerraTorch IOProcessors implement the plugin IOProcessor interface (which includes async hooks like pre_process_async). vllm/plugins/io_processors/interface.py. However, at runtime, the plugin is loaded and wrapped by PluginWithIOProcessorPlugins (a PoolingIOProcessor). This introduces two overlapping IOProcessor-style interfaces.

You could try having the entrypoint directly call pre_process_async again. Since pre_process is essentially running loop.run_until_complete(self.pre_process_async(prompt, request_id, **kwargs)), I don’t think it’s very compatible with the current thread pool.

Thank you for the suggestion, I’m happy to look into it.

FIY @christian-pinto

Signed-off-by: Michele Gazzetti <michele.gazzetti1@ibm.com>
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Apr 17, 2026

Hi @mgazz, the pre-commit checks have failed. Please run:

uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

Tip

Is mypy failing?
mypy is run differently in CI. If the failure is related to this check, please use the following command to run it locally:
# For mypy (substitute "3.10" with the failing version if needed)
pre-commit run --hook-stage manual mypy-3.10

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 17, 2026

I cherry-picked your last commit to unblock the CI.

#40083

@noooop
Copy link
Copy Markdown
Collaborator

noooop commented Apr 17, 2026

# Shared thread pool executor for preprocessing and postprocessing.
self._executor: Executor = models.renderer._executor
self._preprocessing_async = make_async(
self._preprocessing, executor=self._executor
)
self._postprocessing_async = make_async(
self._postprocessing, executor=self._executor
)
async def __call__(
self,
request: AnyPoolingRequest,
raw_request: Request | None = None,
) -> Response:
io_processor = self.get_io_processor(request)
ctx = await self._init_ctx(io_processor, request, raw_request)
await self._preprocessing_async(io_processor, ctx)
await self._prepare_generators(ctx)
await self._collect_batch(ctx)
return await self._postprocessing_async(io_processor, ctx)
@abstractmethod
def get_io_processor(self, request: AnyPoolingRequest) -> PoolingIOProcessor:
raise NotImplementedError
@torch.inference_mode()
def _preprocessing(
self, io_processor: PoolingIOProcessor, ctx: PoolingServeContext
):
return io_processor.pre_process_online(ctx)

Currently, only the sync interface (pre_process_online) is used in _preprocessing, and the async interface (pre_process_async) is not used. Therefore, you need to modify vllm/entrypoints/pooling/base/serving.py to re-enable this path.

We will accept this if you find through testing that calling the async interface is faster.

We are optimizing the vLLM Observability system to make this kind of performance comparison easier.

#39979

@mgazz
Copy link
Copy Markdown
Contributor Author

mgazz commented Apr 17, 2026

I tested the main branch and we are not experiencing the original error anymore. we can make it work with small changes on our side. (I was testing a modified version of the IOProcessor).

Why it works?
Calling loop.run_until_complete would fail with "cannot run two event loops" error because vLLM's serving infrastructure runs in an event loop.

https://github.com/terrastackai/terratorch/blob/13b932e2560174eb6dcc93db939f9677fbd7f4dd/terratorch/vllm/plugins/segmentation/segmentation_io_processor.py#L361-L362

def pre_process(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:

        #loop = asyncio.get_event_loop()
        # return loop.run_until_complete(self.pre_process_async(prompt, request_id, **kwargs))
        return asyncio.run(self.pre_process_async(prompt, request_id, **kwargs))

With the thread pool, pre_process executes in an isolated worker thread that has no pre-existing event loop, allowing asyncio.run() to safely create its own event loop. I am going to close this issue.

We are optimizing the vLLM Observability system to make this kind of performance comparison easier. #39979

This is a feature that was also in our roadmap and I am happy to help.

@mgazz mgazz closed this Apr 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

frontend ready ONLY add when PR is ready to merge/full CI is needed

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants