Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/deepsparse/v2/operators/engine_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,45 @@ class EngineOperatorInputs(BaseModel):
default=None,
)

@classmethod
def join(cls, inputs: List["EngineOperatorInputs"]) -> "EngineOperatorInputs":
"""
:param inputs: list of separate EngineOperatorInputs, batch size must be 1
:return: list of inputs joined into a single input with a multi batch size
"""
all_engine_inputs = [engine_input.engine_inputs for engine_input in inputs]

for engine_inputs in all_engine_inputs:
if engine_inputs[0].shape[0] != 1:
raise RuntimeError(
"join requires all inputs to have batch size 1, found input with "
f"batch size {engine_inputs[0].shape[0]}"
)

# use join_engine_outputs since dtype is the same
joined_engine_inputs = join_engine_outputs(
all_engine_inputs, len(all_engine_inputs)
)

return cls(engine_inputs=joined_engine_inputs)

class Config:
arbitrary_types_allowed = True


class EngineOperatorOutputs(BaseModel):
engine_outputs: List = Field(description="engine outputs")

def split(self) -> List["EngineOperatorOutputs"]:
"""
:return: list of the current outputs split to a batch size of 1 each
"""
# using split_engine_inputs since input/output dtypes
# are the same (List[ndarray])
split_outputs, _ = split_engine_inputs(self.engine_outputs, batch_size=1)

return [self.__class__(engine_outputs=outputs) for outputs in split_outputs]


class EngineOperator(Operator):
input_schema = EngineOperatorInputs
Expand Down
1 change: 1 addition & 0 deletions src/deepsparse/v2/schedulers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
# limitations under the License.

from .continuous_batching_queues import *
from .continuous_batching_executor import *
79 changes: 79 additions & 0 deletions src/deepsparse/v2/schedulers/utils/continuous_batching_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Thread
from typing import Dict

from deepsparse import Engine
from deepsparse.v2.operators import EngineOperator
from deepsparse.v2.schedulers.utils.continuous_batching_queues import (
ContinuousBatchingQueues,
)


__all__ = [
"ContinuousBatchingExecutorThread",
]


class ContinuousBatchingExecutorThread(Thread):
"""
Thread that when started runs indefinitely, grabbing a valid batch from
the queues when possible and running them in the correct engine

:param queues: ContinuousBatchingQueues object containing a queue for
each valid engine
:param operators_to_engines: dictionary mapping valid engine operators
to a dictionary of its valid batch sizes mapped to an engine compiled
for that batch size
"""

def __init__(
self,
queues: ContinuousBatchingQueues,
operators_to_engines: Dict[EngineOperator, Dict[int, Engine]],
):
self._queues = queues
self._operators_to_engines = operators_to_engines
self._should_stop = False

super().__init__(target=self._working_loop)
self.daemon = True # worker thread should exit when main thread exits

def _working_loop(self):
# indefinitely wait for batch, run batch, split and resolve futures
while True:
# wait for next batch to be available
engine_operator, batch = self._queues.pop_batch(block=True)

# unpack batch of QueueEntry objects
engine_inputs, futures, _ = list(zip(*batch))
batch_size = len(engine_inputs)

# type is EngineOperatorInputs
joined_inputs = engine_operator.input_schema.join(engine_inputs)

# get engine for this operator compiled to the popped batch size
# and set the inputs to execute with it
joined_inputs.engine = self._operators_to_engines[engine_operator][
batch_size
]

# run the engine operator with the given engine at the joined batch size
joined_outputs = engine_operator(joined_inputs)

# split outputs and return the results to their respective futures
split_outputs = joined_outputs.split()
for output, future in zip(split_outputs, futures):
future.set_result(output)
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from concurrent.futures import Future

import numpy

from deepsparse.v2.operators import EngineOperator
from deepsparse.v2.schedulers.utils import (
ContinuousBatchingExecutorThread,
ContinuousBatchingQueues,
)


def test_continuous_batching_executor_thread():
# mobilenet model with batch_size=2
engine_operator = EngineOperator("zoo:mobilenet_v2-1.0-imagenet-base", batch_size=2)

# create queues object and add operator
queues = ContinuousBatchingQueues()
queues.add_queue(engine_operator, batch_sizes=[2])

# create engine map
operators_to_engines = {engine_operator: {2: engine_operator.engine}}

worker_thread = ContinuousBatchingExecutorThread(queues, operators_to_engines)

# thread not started yet
assert not worker_thread.is_alive()

# start and assert thread is alive
worker_thread.start()
assert worker_thread.is_alive()

# create first input and add it to queue
input_1 = engine_operator.input_schema(
engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)]
)
future_1 = Future()
queues.add_queue_item(engine_operator, input_1, future=future_1)

# assert that future is not yet resolved
assert not future_1.done()

# create second input and add it to queue
input_2 = engine_operator.input_schema(
engine_inputs=[numpy.random.randn(1, 3, 224, 224).astype(numpy.float32)]
)
future_2 = Future()
queues.add_queue_item(engine_operator, input_2, future=future_2)

# wait 1 second to give engine time to complete
time.sleep(1)

assert future_1.done()
assert future_2.done()

result_1 = future_1.result()
result_2 = future_2.result()

assert isinstance(result_1, engine_operator.output_schema)
assert isinstance(result_2, engine_operator.output_schema)

def assert_batch_size_one(arrays):
for array in arrays:
assert array.shape[0] == 1

# make sure only a single batch item was returned to each future
# TODO: test that the correct bs1 item is returned (can test against bs1 engine)
assert_batch_size_one(result_1.engine_outputs)
assert_batch_size_one(result_2.engine_outputs)