diff --git a/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py b/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py index 96e0a502b6..669c5922a0 100644 --- a/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py +++ b/src/deepsparse/v2/schedulers/continuous_batching_scheduler.py @@ -28,11 +28,32 @@ __all__ = ["ContinuousBatchingScheduler"] +_GLOBAL_SCHEDULER = None + + class ContinuousBatchingScheduler(OperatorScheduler): """ Manages EngineOperator jobs that should be run with continuous batching. Groups requests for the same engine into larger batches and returns - the result to the respeictive request threads after scheduled completion + the result to the respective request threads after scheduled completion + + Example code for getting or creating a shared instance for scheduling + between pipelines and adding an engine operator to the scheduler + within a pipeline + + ```python + + class MyPipeline(Pipeline): + + def __init__(self): + ... + engine_operator = EngineOperator(...) + ... + continuous_batching_scheduler = ContinuousBatchingScheduler.get_instance() + continuous_batching_scheduler.add_engine_operator(engine_operator) + + super.__init__(...) + ``` :param max_workers: maximum number of threads to execute at once, default 1 """ @@ -54,6 +75,19 @@ def __init__(self, max_workers: int = 1): for worker_thread in self._threads: worker_thread.start() + @classmethod + def get_instance(cls) -> "ContinuousBatchingScheduler": + """ + :return: global instance of the continuous batching scheduler. If one + does not exist yet, a scheduler with a single worker thread to + schedule all jobs is created and started + """ + if _GLOBAL_SCHEDULER is not None: + return _GLOBAL_SCHEDULER # noqa: F823 + + _GLOBAL_SCHEDULER = cls(max_workers=1) + return _GLOBAL_SCHEDULER + @property def max_workers(self) -> int: """