diff --git a/src/deepsparse/v2/operators/engine_operator.py b/src/deepsparse/v2/operators/engine_operator.py index bd58aefafa..9ee8d734c5 100644 --- a/src/deepsparse/v2/operators/engine_operator.py +++ b/src/deepsparse/v2/operators/engine_operator.py @@ -39,6 +39,28 @@ class EngineOperatorInputs(BaseModel): default=None, ) + @classmethod + def join(cls, inputs: List["EngineOperatorInputs"]) -> "EngineOperatorInputs": + """ + :param inputs: list of separate EngineOperatorInputs, batch size must be 1 + :return: list of inputs joined into a single input with a multi batch size + """ + all_engine_inputs = [engine_input.engine_inputs for engine_input in inputs] + + for engine_inputs in all_engine_inputs: + if engine_inputs[0].shape[0] != 1: + raise RuntimeError( + "join requires all inputs to have batch size 1, found input with " + f"batch size {engine_inputs[0].shape[0]}" + ) + + # use join_engine_outputs since dtype is the same + joined_engine_inputs = join_engine_outputs( + all_engine_inputs, len(all_engine_inputs) + ) + + return cls(engine_inputs=joined_engine_inputs) + class Config: arbitrary_types_allowed = True @@ -46,6 +68,16 @@ class Config: class EngineOperatorOutputs(BaseModel): engine_outputs: List = Field(description="engine outputs") + def split(self) -> List["EngineOperatorOutputs"]: + """ + :return: list of the current outputs split to a batch size of 1 each + """ + # using split_engine_inputs since input/output dtypes + # are the same (List[ndarray]) + split_outputs, _ = split_engine_inputs(self.engine_outputs, batch_size=1) + + return [self.__class__(engine_outputs=outputs) for outputs in split_outputs] + class EngineOperator(Operator): input_schema = EngineOperatorInputs diff --git a/src/deepsparse/v2/schedulers/utils/__init__.py b/src/deepsparse/v2/schedulers/utils/__init__.py index e2e25b1c90..521341a7fc 100644 --- a/src/deepsparse/v2/schedulers/utils/__init__.py +++ b/src/deepsparse/v2/schedulers/utils/__init__.py @@ -16,3 +16,4 @@ # limitations under the License. from .continuous_batching_queues import * +from .continuous_batching_executor import * diff --git a/src/deepsparse/v2/schedulers/utils/continuous_batching_executor.py b/src/deepsparse/v2/schedulers/utils/continuous_batching_executor.py new file mode 100644 index 0000000000..86afdf309c --- /dev/null +++ b/src/deepsparse/v2/schedulers/utils/continuous_batching_executor.py @@ -0,0 +1,79 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from threading import Thread +from typing import Dict + +from deepsparse import Engine +from deepsparse.v2.operators import EngineOperator +from deepsparse.v2.schedulers.utils.continuous_batching_queues import ( + ContinuousBatchingQueues, +) + + +__all__ = [ + "ContinuousBatchingExecutorThread", +] + + +class ContinuousBatchingExecutorThread(Thread): + """ + Thread that when started runs indefinitely, grabbing a valid batch from + the queues when possible and running them in the correct engine + + :param queues: ContinuousBatchingQueues object containing a queue for + each valid engine + :param operators_to_engines: dictionary mapping valid engine operators + to a dictionary of its valid batch sizes mapped to an engine compiled + for that batch size + """ + + def __init__( + self, + queues: ContinuousBatchingQueues, + operators_to_engines: Dict[EngineOperator, Dict[int, Engine]], + ): + self._queues = queues + self._operators_to_engines = operators_to_engines + self._should_stop = False + + super().__init__(target=self._working_loop) + self.daemon = True # worker thread should exit when main thread exits + + def _working_loop(self): + # indefinitely wait for batch, run batch, split and resolve futures + while True: + # wait for next batch to be available + engine_operator, batch = self._queues.pop_batch(block=True) + + # unpack batch of QueueEntry objects + engine_inputs, futures, _ = list(zip(*batch)) + batch_size = len(engine_inputs) + + # type is EngineOperatorInputs + joined_inputs = engine_operator.input_schema.join(engine_inputs) + + # get engine for this operator compiled to the popped batch size + # and set the inputs to execute with it + joined_inputs.engine = self._operators_to_engines[engine_operator][ + batch_size + ] + + # run the engine operator with the given engine at the joined batch size + joined_outputs = engine_operator(joined_inputs) + + # split outputs and return the results to their respective futures + split_outputs = joined_outputs.split() + for output, future in zip(split_outputs, futures): + future.set_result(output) diff --git a/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_executor.py b/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_executor.py new file mode 100644 index 0000000000..1d5ed9d92b --- /dev/null +++ b/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_executor.py @@ -0,0 +1,83 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from concurrent.futures import Future + +import numpy + +from deepsparse.v2.operators import EngineOperator +from deepsparse.v2.schedulers.utils import ( + ContinuousBatchingExecutorThread, + ContinuousBatchingQueues, +) + + +def test_continuous_batching_executor_thread(): + # mobilenet model with batch_size=2 + engine_operator = EngineOperator("zoo:mobilenet_v2-1.0-imagenet-base", batch_size=2) + + # create queues object and add operator + queues = ContinuousBatchingQueues() + queues.add_queue(engine_operator, batch_sizes=[2]) + + # create engine map + operators_to_engines = {engine_operator: {2: engine_operator.engine}} + + worker_thread = ContinuousBatchingExecutorThread(queues, operators_to_engines) + + # thread not started yet + assert not worker_thread.is_alive() + + # start and assert thread is alive + worker_thread.start() + assert worker_thread.is_alive() + + # create first input and add it to queue + input_1 = engine_operator.input_schema( + engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)] + ) + future_1 = Future() + queues.add_queue_item(engine_operator, input_1, future=future_1) + + # assert that future is not yet resolved + assert not future_1.done() + + # create second input and add it to queue + input_2 = engine_operator.input_schema( + engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)] + ) + future_2 = Future() + queues.add_queue_item(engine_operator, input_2, future=future_2) + + # wait 1 second to give engine time to complete + time.sleep(1) + + assert future_1.done() + assert future_2.done() + + result_1 = future_1.result() + result_2 = future_2.result() + + assert isinstance(result_1, engine_operator.output_schema) + assert isinstance(result_2, engine_operator.output_schema) + + def assert_batch_size_one(arrays): + for array in arrays: + assert array.shape[0] == 1 + + # make sure only a single batch item was returned to each future + # TODO: test that the correct bs1 item is returned (can test against bs1 engine) + assert_batch_size_one(result_1.engine_outputs) + assert_batch_size_one(result_2.engine_outputs)