-
Notifications
You must be signed in to change notification settings - Fork 3.3k
[WIP] Refactor vl video path to full async mode #12517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[WIP] Refactor vl video path to full async mode #12517
Conversation
Summary of ChangesHello @yuan-luo, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the multimodal data preprocessing pipeline, particularly for VLM (Vision-Language Model) video handling, by integrating asynchronous programming patterns. The primary goal is to enhance performance and system responsiveness by delegating blocking I/O and computationally intensive video processing tasks to dedicated thread pools. This ensures that the main application flow remains unblocked, leading to a more efficient and scalable data processing architecture. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request successfully refactors the multimodal data preprocessing pipeline to be fully asynchronous, which is a great improvement for performance in an async environment. The changes are well-structured, introducing async versions of data loading methods and correctly utilizing asyncio features. The video-specific preprocessing in qwen_vl.py is also nicely refactored to leverage this new async infrastructure. My feedback includes suggestions for using modern asyncio APIs, improving performance by selecting the appropriate executor for CPU-bound tasks, and enhancing code clarity and robustness.
| self.video_executor = concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=int(os.environ.get("SGLANG_VIDEO_WORKERS", 8)) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new video_executor is a ThreadPoolExecutor. Video preprocessing tasks, like those in qwen_vl.py::preprocess_video_task, often involve CPU-bound operations (e.g., decoding, resizing frames) that can be limited by Python's Global Interpreter Lock (GIL). For true parallelism with CPU-bound tasks, a ProcessPoolExecutor is typically more effective. Consider using the existing self.cpu_executor or creating a new ProcessPoolExecutor for video tasks to avoid potential performance bottlenecks. If ThreadPoolExecutor was chosen because the tasks are I/O-bound or release the GIL, a brief comment explaining this would be helpful for future maintainers.
| "Mismatch between image tokens and estimated frame counts." | ||
| ) | ||
|
|
||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| else: | ||
| prompt = prompt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| image_factor: int = IMAGE_FACTOR, | ||
| ) -> torch.Tensor: | ||
| if self.video_executor is not None: | ||
| loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.get_event_loop() is deprecated since Python 3.10 and its behavior can be surprising (e.g., creating a new loop if one is not running). It's recommended to use asyncio.get_running_loop() which is safer as it raises a RuntimeError if no loop is running. This also makes it consistent with the usage in base_processor.py.
| loop = asyncio.get_event_loop() | |
| loop = asyncio.get_running_loop() |
123ec2d to
8c2234f
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the video preprocessing pipeline to use asynchronous coroutines, aiming to resolve performance bottlenecks associated with the previous multiprocessing implementation. The changes introduce async versions of data loading and processing methods, currently targeting the Qwen VL model. The approach of using run_in_executor for blocking tasks is sound. My review focuses on improving maintainability by reducing code duplication in the new async methods and simplifying some of the logic. I've also suggested using a more modern and safer asyncio API for better consistency and robustness.
| async def load_mm_data_async( | ||
| self, | ||
| prompt: str, | ||
| multimodal_tokens: MultimodalSpecialTokens, | ||
| image_data: Optional[list] = None, | ||
| video_data: Optional[list] = None, | ||
| audio_data: Optional[list] = None, | ||
| return_text: Optional[bool] = True, | ||
| discard_alpha_channel: bool = True, | ||
| audio_sample_rate: Optional[int] = None, | ||
| ) -> BaseMultiModalProcessorOutput: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication between this new load_mm_data_async method and the existing load_mm_data method. The logic for processing the loaded data (from line 532 onwards) is nearly identical, with the main difference being await next(futures_iter) versus next(futures_iter).result().
To improve maintainability and reduce redundancy, consider refactoring the result-processing logic into a separate, private helper method. This helper could take the loaded data as an argument.
For example:
def _process_loaded_mm_data(self, text_parts, multimodal_tokens_pattern, task_info_iter, loaded_data):
# ... common result processing logic ...
async def load_mm_data_async(self, ...):
# ... submission logic ...
results = await asyncio.gather(*futures)
return self._process_loaded_mm_data(text_parts, multimodal_tokens, task_info_iter, results)
def load_mm_data(self, ...):
# ... submission logic ...
results = [f.result() for f in futures]
return self._process_loaded_mm_data(text_parts, multimodal_tokens, task_info_iter, results)This would make the code cleaner and easier to maintain, especially before extending this pattern to other models.
| ele = {} | ||
| total_frames, video_fps = len(vr), vr.get_avg_fps() | ||
| nframes = smart_nframes({}, total_frames=total_frames, video_fps=video_fps) | ||
| idx = torch.linspace(0, total_frames - 1, nframes).round().long().tolist() | ||
| video = vr.get_batch(idx).asnumpy() | ||
| video = torch.tensor(video).permute(0, 3, 1, 2) # Convert to TCHW format | ||
| nframes, _, height, width = video.shape | ||
| min_pixels = ele.get("min_pixels", VIDEO_MIN_PIXELS) | ||
| total_pixels = ele.get("total_pixels", VIDEO_TOTAL_PIXELS) | ||
| max_pixels = max( | ||
| min(VIDEO_MAX_PIXELS, total_pixels / nframes * FRAME_FACTOR), | ||
| int(min_pixels * 1.05), | ||
| ) | ||
| max_pixels_supposed = ele.get("max_pixels", max_pixels) | ||
| if max_pixels_supposed > max_pixels: | ||
| logger.warning( | ||
| f"The given max_pixels[{max_pixels_supposed}] exceeds limit[{max_pixels}]." | ||
| ) | ||
| max_pixels = min(max_pixels_supposed, max_pixels) | ||
| if "resized_height" in ele and "resized_width" in ele: | ||
| resized_height, resized_width = smart_resize( | ||
| ele["resized_height"], | ||
| ele["resized_width"], | ||
| factor=image_factor, | ||
| ) | ||
| else: | ||
| resized_height, resized_width = smart_resize( | ||
| height, | ||
| width, | ||
| factor=image_factor, | ||
| min_pixels=min_pixels, | ||
| max_pixels=max_pixels, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ele dictionary is initialized as empty and never modified, making the subsequent calls to ele.get(...) and checks like if "resized_height" in ele effectively dead code. This makes the logic unnecessarily complex and potentially confusing for future maintenance.
Consider removing the ele dictionary and simplifying the code to use the default values directly.
| ele = {} | |
| total_frames, video_fps = len(vr), vr.get_avg_fps() | |
| nframes = smart_nframes({}, total_frames=total_frames, video_fps=video_fps) | |
| idx = torch.linspace(0, total_frames - 1, nframes).round().long().tolist() | |
| video = vr.get_batch(idx).asnumpy() | |
| video = torch.tensor(video).permute(0, 3, 1, 2) # Convert to TCHW format | |
| nframes, _, height, width = video.shape | |
| min_pixels = ele.get("min_pixels", VIDEO_MIN_PIXELS) | |
| total_pixels = ele.get("total_pixels", VIDEO_TOTAL_PIXELS) | |
| max_pixels = max( | |
| min(VIDEO_MAX_PIXELS, total_pixels / nframes * FRAME_FACTOR), | |
| int(min_pixels * 1.05), | |
| ) | |
| max_pixels_supposed = ele.get("max_pixels", max_pixels) | |
| if max_pixels_supposed > max_pixels: | |
| logger.warning( | |
| f"The given max_pixels[{max_pixels_supposed}] exceeds limit[{max_pixels}]." | |
| ) | |
| max_pixels = min(max_pixels_supposed, max_pixels) | |
| if "resized_height" in ele and "resized_width" in ele: | |
| resized_height, resized_width = smart_resize( | |
| ele["resized_height"], | |
| ele["resized_width"], | |
| factor=image_factor, | |
| ) | |
| else: | |
| resized_height, resized_width = smart_resize( | |
| height, | |
| width, | |
| factor=image_factor, | |
| min_pixels=min_pixels, | |
| max_pixels=max_pixels, | |
| ) | |
| total_frames, video_fps = len(vr), vr.get_avg_fps() | |
| nframes = smart_nframes({}, total_frames=total_frames, video_fps=video_fps) | |
| idx = torch.linspace(0, total_frames - 1, nframes).round().long().tolist() | |
| video = vr.get_batch(idx).asnumpy() | |
| video = torch.tensor(video).permute(0, 3, 1, 2) # Convert to TCHW format | |
| nframes, _, height, width = video.shape | |
| min_pixels = VIDEO_MIN_PIXELS | |
| total_pixels = VIDEO_TOTAL_PIXELS | |
| max_pixels = max( | |
| min(VIDEO_MAX_PIXELS, total_pixels / nframes * FRAME_FACTOR), | |
| int(min_pixels * 1.05), | |
| ) | |
| resized_height, resized_width = smart_resize( | |
| height, | |
| width, | |
| factor=image_factor, | |
| min_pixels=min_pixels, | |
| max_pixels=max_pixels, | |
| ) |
e3296fe to
653ca1a
Compare
|
There are some child processes terminated abruptly, the the process pool is not usable. Investigating. |
|
The error is because can't import decord in parent process, move it into sub-process. After the fix, encounter new error. After investigating, the reason is stack import dependency in qwen_vl.py. The subprocess (worker) import a series of model layer, in linear and quantization have recursive import, which makes the worker process broken. |
653ca1a to
3b55810
Compare
|
The error prints in subprocess terminate in The full log backtrace is: |
|
The root cause of the error is that when a child process deserializes (pickles) a callable object which is submitted to ProcessPoolExecutor, it must import the module where that callable object is defined. We put the callable object under qwen_vl. Importing that module immediately pulls in various Qwen models → which triggers layers.linear → which then imports quantization → and auto_round.py in turn imports layers.linear back, creating a circular dependency. |
Motivation
Refactor the VLM's entire load/preprocess/process path with fully async co-routine mechanism.
The solution has several issues to consider:
Using ThreadPoolExecutor for CPU-side Python preprocessing (especially per-frame ops logic) is constrained by the GIL; the more threads it adds, the slower it gets.
If we switch to ProcessPoolExecutor, we are passing a decord.VideoReader instance (vr) as an argument across processes. VideoReader isn’t picklable, so it blows up once it crosses the process boundary. The following logs proves:
The most robust and high performance solution is to construct the VideoReader inside the worker process. The parent process should pass only a serializable “video input descriptor” (a file path), and use spawn or forkserver to create child processes (to avoid undefined behavior from inheriting GPU contexts).
[Update Nov 5] Working on the approach to move the VideoReader construction in the parent process into each sub-process, just pass a serializable video_path. As it will change the whole processing logic, set ETA Nov 7.
Currently just modified qwen vl model, will broadcast to the rest of the vl models in case the solution is ready.
Modifications
The core modifications are:
Accuracy Tests
Benchmarking and Profiling
Checklist