diff --git a/src/deepsparse/v2/operators/__init__.py b/src/deepsparse/v2/operators/__init__.py index 9d1a9812ac..bf58018493 100644 --- a/src/deepsparse/v2/operators/__init__.py +++ b/src/deepsparse/v2/operators/__init__.py @@ -1,4 +1,5 @@ # flake8: noqa +# isort: skip_file # Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. # @@ -14,3 +15,4 @@ # See the License for the specific language governing permissions and # limitations under the License. from .operator import * +from .engine_operator import * diff --git a/src/deepsparse/v2/schedulers/utils/__init__.py b/src/deepsparse/v2/schedulers/utils/__init__.py new file mode 100644 index 0000000000..e2e25b1c90 --- /dev/null +++ b/src/deepsparse/v2/schedulers/utils/__init__.py @@ -0,0 +1,18 @@ +# flake8: noqa +# isort: skip_file + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .continuous_batching_queues import * diff --git a/src/deepsparse/v2/schedulers/utils/continuous_batching_queues.py b/src/deepsparse/v2/schedulers/utils/continuous_batching_queues.py new file mode 100644 index 0000000000..84d4f38e3d --- /dev/null +++ b/src/deepsparse/v2/schedulers/utils/continuous_batching_queues.py @@ -0,0 +1,220 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import Future +from queue import Queue +from threading import Condition, Lock +from time import time +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple + + +__all__ = [ + "ContinuousBatchingQueue", + "ContinuousBatchingQueues", + "QueueEntry", +] + + +# maximum wait time of longest item in queue before it is prioritized +_MAX_WAIT_MS = 100 + + +class QueueEntry(NamedTuple): + value: Any + future: Optional[Future] + entry_time_ms: float + + def time_elapsed(self) -> float: + return _current_time_ms() - self.entry_time_ms + + +class ContinuousBatchingQueue(Queue): + """ + Extension of queue.Queue with helper functions for dequeueing valid + batch sizes for continuous batching + + :param batch_sizes: valid batch sizes that can be grouped for continuous + batching + """ + + def __init__(self, batch_sizes: List[int], *args, **kwargs): + super().__init__(*args, **kwargs) + + self._batch_sizes = batch_sizes + self._min_batch_size = min(self.batch_sizes) + + @property + def batch_sizes(self) -> List[int]: + """ + :return: valid batch sizes that this queue can return + """ + return self._batch_sizes + + def pop_batch(self) -> List[Any]: + """ + :return: + """ + batch_size = self.max_queued_batch_size() + if batch_size == 0: + raise RuntimeError( + f"Cannot create a batch with {self.qsize()} entries and valid " + f"batch sizes: {self.batch_sizes}" + ) + + return [self.get() for _ in range(batch_size)] + + def has_batch(self) -> bool: + """ + :return: True if a batch of valid size can be filled with the current qsize + """ + return self.qsize() >= self._min_batch_size + + def max_queued_batch_size(self) -> int: + """ + :return: the maximum batch size that can be filled by members of this queue + """ + num_entries = self.qsize() + max_size = 0 + + for batch_size in self.batch_sizes: + if num_entries >= batch_size > max_size: + # current batch size can be satisfied and is the largest so far + max_size = batch_size + + return max_size + + def peek(self): + """ + :return: threadsafe peek of the first item in the queue + """ + with self.mutex: + return self.queue[0] + + +class ContinuousBatchingQueues: + """ + Threadsafe collection of Queues designed to support continuous batching. + Each Queue should be keyed by an operator where possible, however keys + are kept generic. + + On request for next - a job will be returned with an operator key and + a batch of inputs. The default heuristic for the next job will be + a combination of wait time and largest batch that can be run + """ + + def __init__(self): + self._queues = {} # Dict[Any, ContinuousBatchingQueue] + self._mutex = Lock() + + # add condition for wait/notify when an item is added to any queue + self._item_added = Condition(self._mutex) + + def __contains__(self, key: Any) -> bool: + """ + :param key: key to look up + :return: True if the given key has a queue in this group + """ + with self._mutex: + return key in self._queues + + def add_queue(self, key: Any, batch_sizes: List[int]): + """ + Adds a queue for a single operator that can be run at multiple batch sizes + + :param key: key to identify queue with, preferably the engine operator + :param batch_sizes: batch sizes that the operator can be run at + """ + with self._mutex: + self._queues[key] = ContinuousBatchingQueue(batch_sizes=batch_sizes) + + def add_queue_item(self, key: Any, item: Any, future: Optional[Future] = None): + """ + Adds an item to the given queue + + :param key: key for queue to add to + :param item: item to add in queue + :param future: optional future that should be used for resolution of value + """ + if key not in self: + raise KeyError(f"Cannot add item to queue for unregistered key {key}") + + entry = QueueEntry(value=item, future=future, entry_time_ms=_current_time_ms()) + + with self._mutex: + self._queues[key].put(entry) + self._item_added.notify() + + def has_next_batch(self) -> bool: + """ + :return: true if any Queue has enough entries to fill a valid batch size + """ + with self._mutex: + return any(queue.has_batch() for queue in self._queues.values()) + + def pop_batch( + self, + select_fn: Callable[[Dict[Any, ContinuousBatchingQueue]], Any] = None, + block: bool = True, + ) -> Tuple[Any, List[QueueEntry]]: + """ + :param select_fn: function that takes in a dictionary of queue key + (i.e. EngineOperator) to its ContinuousBatchingQueue of QueueItem + objects and returns the key of the queue that should be returned. + Only keys with queues large enough to fill a batch will be given. + If not provided, the default select_fn will return the queue that + can fill the largest batch size, or the queue that has the first item + with the longest wait time if that time is over 100ms. + :param block: if True, will wait for a valid batch to be in a queue before + popping and returning, if False, will raise an error if a full batch + cannot be popped. Default True + :return: Tuple of the queue key (EngineOperator) and + batch of QueueEntry objects as a list that have been popped and should + be run as a batch + """ + with self._mutex: + while not (valid_queues := self._filter_empty_queues()): + if block: + # wait to search for a valid queue again until a new item is added + self._item_added.wait() + else: + raise RuntimeError( + "Cannot pop_batch when no queues have enough items to fill " + "a valid batch size, check with has_next_batch before calling " + "pop_batch" + ) + + select_fn = select_fn or _default_select_fn + selected_key = select_fn(valid_queues) + + return selected_key, self._queues[selected_key].pop_batch() + + def _filter_empty_queues(self) -> Dict[Any, ContinuousBatchingQueue]: + return {key: queue for key, queue in self._queues.items() if queue.has_batch()} + + +def _default_select_fn(queues: Dict[Any, ContinuousBatchingQueue]) -> Any: + # find the maximum wait time of a queue + wait_times = [(key, queue.peek().time_elapsed()) for key, queue in queues.items()] + max_wait_key, max_wait = max(wait_times, key=lambda x: x[1]) # key on time + + if max_wait >= _MAX_WAIT_MS: + # if max time is greater than the threshold return that queue + return max_wait_key + + # default to the largest batch size that can be satisfied + return max(queues.keys(), key=lambda key: queues[key].max_queued_batch_size()) + + +def _current_time_ms(): + return time() * 1000 diff --git a/tests/deepsparse/v2/schedulers/__init__.py b/tests/deepsparse/v2/schedulers/__init__.py new file mode 100644 index 0000000000..0c44f887a4 --- /dev/null +++ b/tests/deepsparse/v2/schedulers/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/deepsparse/v2/schedulers/utils/__init__.py b/tests/deepsparse/v2/schedulers/utils/__init__.py new file mode 100644 index 0000000000..0c44f887a4 --- /dev/null +++ b/tests/deepsparse/v2/schedulers/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_queues.py b/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_queues.py new file mode 100644 index 0000000000..1713d54f82 --- /dev/null +++ b/tests/deepsparse/v2/schedulers/utils/test_continuous_batching_queues.py @@ -0,0 +1,177 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from threading import Thread + +import pytest +from deepsparse.v2.schedulers.utils import ( + ContinuousBatchingQueue, + ContinuousBatchingQueues, + QueueEntry, +) + + +@pytest.mark.parametrize( + "batch_sizes,num_entries,expected_batch_size", + [ + ([1, 4, 8], 20, 8), + ([1, 4, 8], 6, 4), + ([1, 4, 8], 4, 4), + ([1, 4, 8], 3, 1), + ([4], 5, 4), + ], +) +def test_queue_single_pop(batch_sizes, num_entries, expected_batch_size): + queue = ContinuousBatchingQueue(batch_sizes=batch_sizes) + assert not queue.has_batch() + for i in range(num_entries): + queue.put(i) + + assert queue.has_batch() + assert queue.max_queued_batch_size() == expected_batch_size + + batch = queue.pop_batch() + assert len(batch) == expected_batch_size + assert batch == list(range(expected_batch_size)) + + +def test_queue_multi_pop(): + queue = ContinuousBatchingQueue(batch_sizes=[2, 4, 8]) + + for i in range(23): + if i < 2: + assert not queue.has_batch() + else: + assert queue.has_batch() + queue.put(i) + + def pop_and_assert_queue_size_and_pop(expected_qsize, expected_batch_size): + assert queue.qsize() == expected_qsize + assert queue.has_batch() + assert queue.max_queued_batch_size() == expected_batch_size + assert len(queue.pop_batch()) == expected_batch_size + + # pop items from queue, checkign remaining qsize and correct batch size is popped + pop_and_assert_queue_size_and_pop(23, 8) + pop_and_assert_queue_size_and_pop(15, 8) + pop_and_assert_queue_size_and_pop(7, 4) + pop_and_assert_queue_size_and_pop(3, 2) + + assert not queue.has_batch() + queue.put(23) + pop_and_assert_queue_size_and_pop(2, 2) + + assert queue.empty() + + +def test_queue_invalid_pop(): + queue = ContinuousBatchingQueue(batch_sizes=[4, 8]) + for i in range(3): + queue.put(i) + + with pytest.raises(RuntimeError): + # queue size 3, min batch size 4 + queue.pop_batch() + + +def test_queues_pop_batch_max_valid_batch(): + queues = ContinuousBatchingQueues() + + queues.add_queue("key_1", [2, 4]) + queues.add_queue("key_2", [3]) + + assert not queues.has_next_batch() + + queues.add_queue_item("key_1", 1) + queues.add_queue_item("key_1", 2) + assert queues.has_next_batch() + + queues.add_queue_item("key_2", 1) + queues.add_queue_item("key_2", 2) + queues.add_queue_item("key_2", 3) + # NOTE - if this block takes more than 100ms, test may fail + # as timeout may lead key_1 to be popped first + + # key_2 should be popped first because it has larger loaded batch size + first_popped_key, first_popped_batch = queues.pop_batch() + assert first_popped_key == "key_2" + assert len(first_popped_batch) == 3 + assert all(isinstance(item, QueueEntry) for item in first_popped_batch) + + assert queues.has_next_batch() + + second_popped_key, second_popped_batch = queues.pop_batch() + assert second_popped_key == "key_1" + assert len(second_popped_batch) == 2 + assert all(isinstance(item, QueueEntry) for item in second_popped_batch) + + +def test_queues_pop_batch_time_elapsed_priority(): + queues = ContinuousBatchingQueues() + + queues.add_queue("key_1", [2, 4]) + queues.add_queue("key_2", [3]) + + assert not queues.has_next_batch() + + queues.add_queue_item("key_1", 1) + queues.add_queue_item("key_1", 2) + assert queues.has_next_batch() + + # sleep 150ms (time threshold is 100ms) + time.sleep(0.15) + + queues.add_queue_item("key_2", 1) + queues.add_queue_item("key_2", 2) + queues.add_queue_item("key_2", 3) + + # key 1 should be popped first because its first item has been waiting longer + # than the time threshold and key_2 was just added + + popped_key, popped_batch = queues.pop_batch() + assert popped_key == "key_1" + assert len(popped_batch) == 2 + + +def test_queues_pop_batch_blocking(): + queues = ContinuousBatchingQueues() + queues.add_queue("key_1", [2]) + + def test_fn(): + # pop batch and block until true + key, batch = queues.pop_batch(block=True) + # compare to expected results + assert key == "key_1" + assert batch == [1, 2] + + # start a thread to pop batch + # it should hang indefinitely because block=True and there are no items yet in queue + thread = Thread(target=queues.pop_batch) + thread.start() + + # confirm thread is still running + assert thread.is_alive() + time.sleep(0.15) + # sleep and confirm thread is still hanging + assert thread.is_alive() + + # confirm thread still runs after a single insertion (min batch size is 2) + queues.add_queue_item("key_1", 1) + assert thread.is_alive() + + # add a second item and assert thread finishes + queues.add_queue_item("key_1", 2) + time.sleep(0.1) + assert not thread.is_alive()