Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/deepsparse/v2/schedulers/continuous_batching_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,32 @@
__all__ = ["ContinuousBatchingScheduler"]


_GLOBAL_SCHEDULER = None


class ContinuousBatchingScheduler(OperatorScheduler):
"""
Manages EngineOperator jobs that should be run with continuous batching.
Groups requests for the same engine into larger batches and returns
the result to the respeictive request threads after scheduled completion
the result to the respective request threads after scheduled completion

Example code for getting or creating a shared instance for scheduling
between pipelines and adding an engine operator to the scheduler
within a pipeline

```python

class MyPipeline(Pipeline):

def __init__(self):
...
engine_operator = EngineOperator(...)
...
continuous_batching_scheduler = ContinuousBatchingScheduler.get_instance()
continuous_batching_scheduler.add_engine_operator(engine_operator)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the lifecycle we're proposing, doesnt that mean we can only start with one worker at any given point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think in general we will only want one worker so that for a given batch the engine will have full access to all resources. otherwise we'd need to run the engines in multistream mode which I am not sure is supported for KV cache right now

super.__init__(...)
```

:param max_workers: maximum number of threads to execute at once, default 1
"""
Expand All @@ -54,6 +75,19 @@ def __init__(self, max_workers: int = 1):
for worker_thread in self._threads:
worker_thread.start()

@classmethod
def get_instance(cls) -> "ContinuousBatchingScheduler":
"""
:return: global instance of the continuous batching scheduler. If one
does not exist yet, a scheduler with a single worker thread to
schedule all jobs is created and started
"""
if _GLOBAL_SCHEDULER is not None:
return _GLOBAL_SCHEDULER # noqa: F823

_GLOBAL_SCHEDULER = cls(max_workers=1)
return _GLOBAL_SCHEDULER

@property
def max_workers(self) -> int:
"""
Expand Down