diff --git a/src/deepsparse/__init__.py b/src/deepsparse/__init__.py index 0c0e7e0d1b..83fc4d9632 100644 --- a/src/deepsparse/__init__.py +++ b/src/deepsparse/__init__.py @@ -32,7 +32,6 @@ ) from .engine import * from .tasks import * -from .timing import * from .pipeline import * from .loggers import * from .version import __version__, is_release diff --git a/src/deepsparse/pipeline.py b/src/deepsparse/pipeline.py index 88d5414992..9721148299 100644 --- a/src/deepsparse/pipeline.py +++ b/src/deepsparse/pipeline.py @@ -36,7 +36,7 @@ validate_identifier, ) from deepsparse.tasks import SupportedTasks, dynamic_import_task -from deepsparse.timing import InferencePhases, Timer +from deepsparse.utils import InferenceStages, StagedTimer, TimerManager __all__ = [ @@ -155,13 +155,16 @@ def __init__( context: Optional[Context] = None, executor: Optional[Union[ThreadPoolExecutor, int]] = None, logger: Optional[Union[BaseLogger, str]] = None, + benchmark: bool = False, _delay_engine_initialize: bool = False, # internal use only ): + self._benchmark = benchmark self._model_path_orig = model_path self._model_path = model_path self._engine_type = engine_type self._batch_size = batch_size self._alias = alias + self._timer_manager = TimerManager(enabled=True, multi=benchmark) self.context = context self.logger = ( logger @@ -213,111 +216,89 @@ def __init__( ) def __call__(self, *args, **kwargs) -> BaseModel: - if "engine_inputs" in kwargs: - raise ValueError( - "invalid kwarg engine_inputs. engine inputs determined " - f"by {self.__class__.__qualname__}.parse_inputs" - ) - timer = Timer() - - timer.start(InferencePhases.TOTAL_INFERENCE) - - # ------ PREPROCESSING ------ - timer.start(InferencePhases.PRE_PROCESS) - # parse inputs into input_schema - pipeline_inputs = self.parse_inputs(*args, **kwargs) - - self.log( - identifier="pipeline_inputs", - value=pipeline_inputs, - category=MetricCategories.DATA, - ) + with self.timer_manager.new_timer_context() as timer: + if "engine_inputs" in kwargs: + raise ValueError( + "invalid kwarg engine_inputs. engine inputs determined " + f"by {self.__class__.__qualname__}.parse_inputs" + ) - if not isinstance(pipeline_inputs, self.input_schema): - raise RuntimeError( - f"Unable to parse {self.__class__} inputs into a " - f"{self.input_schema} object. Inputs parsed to {type(pipeline_inputs)}" + # ------ PREPROCESSING ------ + timer.start(InferenceStages.PRE_PROCESS) + # parse inputs into input_schema + pipeline_inputs = self.parse_inputs(*args, **kwargs) + self.log( + identifier="pipeline_inputs", + value=pipeline_inputs, + category=MetricCategories.DATA, ) - # batch size of the inputs may be `> self._batch_size` at this point - engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs) - if isinstance(engine_inputs, tuple): - engine_inputs, postprocess_kwargs = engine_inputs - else: - postprocess_kwargs = {} - timer.stop(InferencePhases.PRE_PROCESS) - self.log( - identifier="engine_inputs", - value=engine_inputs, - category=MetricCategories.DATA, - ) - self.log( - identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.PRE_PROCESS}_seconds", # noqa E501 - value=timer.time_delta(InferencePhases.PRE_PROCESS), - category=MetricCategories.SYSTEM, - ) - - # ------ INFERENCE ------ - # split inputs into batches of size `self._batch_size` - timer.start(InferencePhases.ENGINE_FORWARD) - batches = self.split_engine_inputs(engine_inputs, self._batch_size) - - # submit split batches to engine threadpool - batch_outputs = list(self.executor.map(self.engine_forward, batches)) - - # join together the batches of size `self._batch_size` - engine_outputs = self.join_engine_outputs(batch_outputs) - timer.stop(InferencePhases.ENGINE_FORWARD) - - self.log( - identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total", - # to get the batch size of the inputs, we need to look - # to multiply the engine batch size (self._batch_size) - # by the number of batches processed by the engine during - # a single inference call - value=len(batch_outputs) * self._batch_size, - category=MetricCategories.SYSTEM, - ) + if not isinstance(pipeline_inputs, self.input_schema): + raise RuntimeError( + f"Unable to parse {self.__class__} inputs into a " + f"{self.input_schema} object. " + f"Inputs parsed to {type(pipeline_inputs)}" + ) + # batch size of the inputs may be `> self._batch_size` at this point + engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs) + if isinstance(engine_inputs, tuple): + engine_inputs, postprocess_kwargs = engine_inputs + else: + postprocess_kwargs = {} + + timer.stop(InferenceStages.PRE_PROCESS) + self.log( + identifier="engine_inputs", + value=engine_inputs, + category=MetricCategories.DATA, + ) - self.log( - identifier="engine_outputs", - value=engine_outputs, - category=MetricCategories.DATA, - ) - self.log( - identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.ENGINE_FORWARD}_seconds", # noqa E501 - value=timer.time_delta(InferencePhases.ENGINE_FORWARD), - category=MetricCategories.SYSTEM, - ) + # ------ INFERENCE ------ + # split inputs into batches of size `self._batch_size` + timer.start(InferenceStages.ENGINE_FORWARD) + batches = self.split_engine_inputs(engine_inputs, self._batch_size) + + # submit split batches to engine threadpool + batch_outputs = list(self.executor.map(self.engine_forward, batches)) + + # join together the batches of size `self._batch_size` + engine_outputs = self.join_engine_outputs(batch_outputs) + timer.stop(InferenceStages.ENGINE_FORWARD) + + self.log( + identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total", + # to get the batch size of the inputs, we need to look + # to multiply the engine batch size (self._batch_size) + # by the number of batches processed by the engine during + # a single inference call + value=len(batch_outputs) * self._batch_size, + category=MetricCategories.SYSTEM, + ) + self.log( + identifier="engine_outputs", + value=engine_outputs, + category=MetricCategories.DATA, + ) - # ------ POSTPROCESSING ------ - timer.start(InferencePhases.POST_PROCESS) - pipeline_outputs = self.process_engine_outputs( - engine_outputs, **postprocess_kwargs - ) - if not isinstance(pipeline_outputs, self.output_schema): - raise ValueError( - f"Outputs of {self.__class__} must be instances of " - f"{self.output_schema} found output of type {type(pipeline_outputs)}" + # ------ POSTPROCESSING ------ + timer.start(InferenceStages.POST_PROCESS) + pipeline_outputs = self.process_engine_outputs( + engine_outputs, **postprocess_kwargs + ) + if not isinstance(pipeline_outputs, self.output_schema): + raise ValueError( + f"Outputs of {self.__class__} must be instances of " + f"{self.output_schema} found output of type " + f"{type(pipeline_outputs)}" + ) + timer.stop(InferenceStages.POST_PROCESS) + self.log( + identifier="pipeline_outputs", + value=pipeline_outputs, + category=MetricCategories.DATA, ) - timer.stop(InferencePhases.POST_PROCESS) - timer.stop(InferencePhases.TOTAL_INFERENCE) - self.log( - identifier="pipeline_outputs", - value=pipeline_outputs, - category=MetricCategories.DATA, - ) - self.log( - identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.POST_PROCESS}_seconds", # noqa E501 - value=timer.time_delta(InferencePhases.POST_PROCESS), - category=MetricCategories.SYSTEM, - ) - self.log( - identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.TOTAL_INFERENCE}_seconds", # noqa E501 - value=timer.time_delta(InferencePhases.TOTAL_INFERENCE), - category=MetricCategories.SYSTEM, - ) + self.log_inference_times(timer) return pipeline_outputs @@ -704,6 +685,31 @@ def engine_type(self) -> str: """ return self._engine_type + @property + def timer_manager(self) -> TimerManager: + return self._timer_manager + + @property + def current_timer(self) -> Optional[StagedTimer]: + """ + :return: current timer for the pipeline, if any + """ + timer = self.timer_manager.current + + if timer is None: + timer = self.timer_manager.latest + + return timer + + @property + def benchmark(self) -> bool: + return self._benchmark + + @benchmark.setter + def benchmark(self, value: bool): + self._benchmark = value + self.timer_manager.multi = value + def to_config(self) -> "PipelineConfig": """ :return: PipelineConfig that can be used to reload this object @@ -739,7 +745,7 @@ def log( self, identifier: str, value: Any, - category: str, + category: Union[str, MetricCategories], ): """ Pass the logged data to the DeepSparse logger object (if present). @@ -791,6 +797,19 @@ def engine_forward(self, engine_inputs: List[numpy.ndarray]) -> List[numpy.ndarr """ return self.engine(engine_inputs) + def log_inference_times(self, timer: StagedTimer): + """ + logs stage times in the given timer + + :param timer: timer to log + """ + for stage, time in timer.times.items(): + self.log( + identifier=f"{SystemGroups.PREDICTION_LATENCY}/{stage}_seconds", + value=time, + category=MetricCategories.SYSTEM, + ) + def _initialize_engine(self) -> Union[Engine, ORTEngine]: engine_type = self.engine_type.lower() diff --git a/src/deepsparse/timing/__init__.py b/src/deepsparse/timing/__init__.py deleted file mode 100644 index 63cd072a0d..0000000000 --- a/src/deepsparse/timing/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# flake8: noqa - -from .inference_phases import * -from .timer import * diff --git a/src/deepsparse/timing/inference_phases.py b/src/deepsparse/timing/inference_phases.py deleted file mode 100644 index 0909eb439f..0000000000 --- a/src/deepsparse/timing/inference_phases.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dataclasses import dataclass - - -__all__ = ["InferencePhases"] - - -@dataclass(frozen=True) -class InferencePhases: - PRE_PROCESS: str = "pre_process" - ENGINE_FORWARD: str = "engine_forward" - POST_PROCESS: str = "post_process" - TOTAL_INFERENCE: str = "total_inference" diff --git a/src/deepsparse/timing/timer.py b/src/deepsparse/timing/timer.py deleted file mode 100644 index f2077ffe2e..0000000000 --- a/src/deepsparse/timing/timer.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - - -__all__ = ["Timer"] - - -class Timer: - """ - This object aggregates the durations - (time deltas in seconds) of various components of the inference - pipeline. - Example flow: - - ``` - timer = Timer() - - timer.start("total_inference") - - timer.start("pre_process") - do_something() - timer.stop("pre_process") - - # time delta can we fetched directly after - # record the stop time of the event - pre_process_time_delta = timer.time_delta("pre_process") - - timer.start("engine_forward") - do_something() - timer.stop("engine_forward") - - timer.start("post_process") - do_something() - timer.stop("post_process") - - timer.stop("total_inference") - - # alternatively, time delta can we fetched later if convenient - - engine_forward_time_delta = timer.time_delta("engine_forward") - post_process_time_delta = timer.time_delta("post_process") - total_inference_time_delta = timer.time_delta("total_inference") - ``` - The object may time the duration of an arbitrary number - of events (phases). Choice of naming for phases is left - for the user to decide. - """ - - def __init__(self): - self._start_times = {} - self._stop_times = {} - - def start(self, phase_name: str): - """ - Collect the starting time of the phase - - :param phase_name: The name of an event (phase), which duration - we are measuring - """ - if phase_name in self._start_times: - raise ValueError( - f"Attempting to overwrite the start time of the phase: {phase_name}" - ) - self._start_times[phase_name] = time.perf_counter() - - def stop(self, phase_name: str): - """ - Collect the finish time of the phase - - :param phase_name: The name of an event (phase), which duration - we are measuring - """ - if phase_name not in self._start_times: - raise ValueError( - f"Attempting to grab the stop time of the phase: {phase_name}," - f"but its start time is missing" - ) - if phase_name in self._stop_times: - raise ValueError( - f"Attempting to overwrite the stop time of the phase: {phase_name}" - ) - self._stop_times[phase_name] = time.perf_counter() - - def time_delta(self, phase_name: str) -> float: - """ - If available, get the time delta (in seconds) of the event (phase). - - :param phase_name: The name of an event (phase), which time delta - we want to get - :return: the time delta (in seconds) of the specified phase - """ - phase_start = self._start_times.get(phase_name) - phase_stop = self._stop_times.get(phase_name) - - if not phase_start: - raise ValueError( - f"Attempting to fetch the duration of the phase: {phase_name}, but" - f"its start time and stop time were not recorded" - ) - elif not phase_stop: - raise ValueError( - f"Attempting to fetch the duration of the phase: {phase_name}, but" - f"its stop time was not recorded" - ) - else: - return phase_stop - phase_start diff --git a/src/deepsparse/utils/__init__.py b/src/deepsparse/utils/__init__.py index aab66ff76f..8ad6b624da 100644 --- a/src/deepsparse/utils/__init__.py +++ b/src/deepsparse/utils/__init__.py @@ -17,3 +17,4 @@ from .cli_helpers import * from .data import * from .onnx import * +from .timer import * diff --git a/src/deepsparse/utils/timer.py b/src/deepsparse/utils/timer.py new file mode 100644 index 0000000000..b29bc35065 --- /dev/null +++ b/src/deepsparse/utils/timer.py @@ -0,0 +1,344 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextvars +import time +from contextlib import contextmanager +from dataclasses import dataclass +from typing import Dict, List, Optional + + +__all__ = ["timer_context", "InferenceStages", "StagedTimer", "TimerManager"] + + +timer_context = contextvars.ContextVar("timer_context") + + +@dataclass(frozen=True) +class InferenceStages: + PRE_PROCESS: str = "pre_process" + ENGINE_FORWARD: str = "engine_forward" + POST_PROCESS: str = "post_process" + TOTAL_INFERENCE: str = "total_inference" + + +class StagedTimer: + """ + Timer object that enables simultaneous starting and stopping of various stages. + + example usage of measuring two operation times with the overall time: + + ```python + timer = StagedTimer() + + timer.start("overall_time") + + timer.start("operation_1") + # DO OPERATION 1 + timer.stop("operation_1") + + timer.start("operation_2") + # DO OPERATION 2 + timer.stop("operation_2") + + timer.stop("overall_time") + ``` + + :param enabled: if False, start/stop become no-ops. default True + """ + + def __init__(self, enabled: bool = True): + self.enabled = enabled + self._staged_start_times = {} + self._staged_stop_times = {} + + def __repr__(self): + """ + Provide a string representation of the StagedTimer object. + + :return: a string representing the timer object with its times. + """ + return f"StagedTimer({self.times})" + + @property + def stages(self) -> List[str]: + """ + Get the stages for the timer object. + + :return: list of stages as strings. + """ + return list(self._staged_start_times.keys()) + + @property + def times(self) -> Dict[str, float]: + """ + Get the average time for each stage. + + :return: a dictionary with stage names as keys and their average time as values. + """ + return {stage: self.stage_average_time(stage) for stage in self.stages} + + @property + def all_times(self) -> Dict[str, List[float]]: + """ + Get the list of times for each stage. + + :return: a dictionary with stages as keys and their list of times as values. + """ + return {stage: self.stage_times(stage) for stage in self.stages} + + def clear(self): + """ + Clear all the stored start and stop times for all stages. + """ + self._staged_start_times.clear() + self._staged_stop_times.clear() + + def has_stage(self, stage: str) -> bool: + """ + Check if a stage exists in the timer. + + :param stage: the name of the stage to check. + :return: True if the stage exists, False otherwise. + """ + return stage in self.stages + + def start(self, stage: str): + """ + Start the timer for a specific stage. If the stage doesn't exist, + it's added to the timer. + + :param stage: the name of the stage to start. + :raises ValueError: if trying to start a stage before a previous one + has been stopped. + """ + if not self.enabled: + return + if stage not in self._staged_start_times: + self._staged_start_times[stage] = [] + self._staged_stop_times[stage] = [] + + if len(self._staged_start_times[stage]) != len(self._staged_stop_times[stage]): + raise ValueError( + f"Attempting to start {stage} before a previous has been stopped:" + f" start times len({self._staged_start_times[stage]});" + f" stop times len({self._staged_stop_times[stage]})" + ) + + self._staged_start_times[stage].append(time.perf_counter()) + + def stop(self, stage: str): + """ + Stop the timer for a specific stage. + + :param stage: the name of the stage to stop. + :raises ValueError: if trying to stop a stage that has not been started + or if trying to stop a stage before a previous one + has been started. + """ + if not self.enabled: + return + if stage not in self._staged_start_times: + raise ValueError( + "Attempting to stop a stage that has not been started: " f"{stage}" + ) + + if ( + len(self._staged_start_times[stage]) + != len(self._staged_stop_times[stage]) + 1 + ): + raise ValueError( + f"Attempting to stop {stage} before a previous has been started:" + f" start times len({self._staged_start_times[stage]});" + f" stop times len({self._staged_stop_times[stage]})" + ) + + self._staged_stop_times[stage].append(time.perf_counter()) + + def stage_times(self, stage: str) -> List[float]: + """ + Get the list of time deltas for a specific stage. + + :param stage: the name of the stage to get time deltas for. + :return: a list of time deltas. + :raises ValueError: if trying to get time deltas for a stage that has not been + started or a stage that has not been stopped. + """ + if stage not in self._staged_start_times: + raise ValueError( + "Attempting to get time deltas for a stage that has not been started: " + f"{stage}" + ) + + if len(self._staged_start_times[stage]) != len(self._staged_stop_times[stage]): + raise ValueError( + "Attempting to get time deltas for a stage that has not been stopped: " + f"{stage}" + ) + + return [ + self._staged_stop_times[stage][i] - self._staged_start_times[stage][i] + for i in range(len(self._staged_start_times[stage])) + ] + + def stage_average_time(self, stage: str) -> float: + """ + Get the average time for a specific stage. + + :param stage: the name of the stage to get the average time for. + :return: the average time for the specified stage. + """ + times = self.stage_times(stage) + + return sum(times) / len(times) + + +class TimerManager: + """ + Object to manage creation and aggregation of StagedTimers for benchmarking + performance timings. + + Intended workflow: + + ```python + timer_manager = TimerManager(multi=True) + + # process 1 + timer_1 = timer_manager.new_timer() + timer_2.start(...) + ... + + # process 2 + timer_2 = timer_manager.new_timer() + timer_2.start(...) + ... + + # aggregate times for benchmarking + do_some_postprocessing(timer_manager.all_times) + ``` + + :param enabled: if False, no timings are measured by new staged timers. Default True + :param multi: if True, keep track of all newly created staged timers. if False, only + stores the latest created staged timer. Default False + """ + + def __init__(self, enabled: bool = True, multi: bool = False): + self.multi = multi + self.enabled = enabled + self._timers = [] + + def __repr__(self): + """ + Provide a string representation of the TimerManager object. + + :return: a string representing the timer manager object with its times. + """ + return f"TimerManager({self.times})" + + @property + def latest(self) -> Optional[StagedTimer]: + """ + Get the latest created StagedTimer. + + :return: the latest created StagedTimer object or None if no timers are present. + """ + return self._timers[-1] if self._timers else None + + @property + def current(self) -> Optional[StagedTimer]: + """ + Get the current active StagedTimer in the context. + + :return: the current active StagedTimer object in the context or + None if no timers are active. + """ + try: + return timer_context.get() + except LookupError: + # no timer in context, return None + return None + + @property + def timers(self) -> List[StagedTimer]: + """ + Get the list of all StagedTimer objects. + + :return: a list of all StagedTimer objects. + """ + return self._timers + + @property + def stages(self) -> List[str]: + """ + Get the unique list of stages from all StagedTimer objects. + + :return: a list of unique stages. + """ + stages = set() + + for timer in self._timers: + stages.update(timer.stages) + + return list(stages) + + @property + def times(self) -> Dict[str, float]: + """ + Get the average time for each stage across all StagedTimer objects. + + :return: a dictionary with stage names as keys and their average time as values. + """ + all_times = self.all_times + + return { + stage: sum(all_times[stage]) / len(all_times[stage]) + for stage in self.stages + } + + @property + def all_times(self) -> Dict[str, List[float]]: + """ + Get the list of times for each stage across all StagedTimer objects. + + :return: a dictionary with stages as keys and their list of times as values. + """ + all_times = {stage: [] for stage in self.stages} + + for timer in self._timers: + for stage, times in timer.all_times.items(): + all_times[stage].extend(times) + + return all_times + + @contextmanager + def new_timer_context(self) -> StagedTimer: + """ + Create a new StagedTimer object and set it as the current context. + + :return: the new StagedTimer object. + """ + timer = StagedTimer(enabled=self.enabled) + timer.start(InferenceStages.TOTAL_INFERENCE) + + if self.multi: + self._timers.append(timer) + else: + self._timers = [timer] + + timer_context.set(timer) + + try: + yield timer + finally: + timer.stop(InferenceStages.TOTAL_INFERENCE) diff --git a/tests/deepsparse/timing/__init__.py b/tests/deepsparse/timing/__init__.py deleted file mode 100644 index 0c44f887a4..0000000000 --- a/tests/deepsparse/timing/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/deepsparse/timing/test_timer.py b/tests/deepsparse/timing/test_timer.py deleted file mode 100644 index 3344972f77..0000000000 --- a/tests/deepsparse/timing/test_timer.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -import pytest -from deepsparse.timing import Timer - - -def _sleep(sleep_time): - time.sleep(sleep_time) - - -@pytest.mark.parametrize("s1, s2, s3", [(0.1, 0.2, 0.3)]) -class TestTimer: - @pytest.fixture - def setup(self, s1, s2, s3): - timer = Timer() - yield s1, s2, s3, timer - - def test_happy_pathway(self, setup): - s1, s2, s3, timer = setup - - timer.start("total_inference") - timer.start("pre_process") - _sleep(s1) - timer.stop("pre_process") - - pre_process_time_delta = timer.time_delta("pre_process") - - timer.start("engine_forward") - _sleep(s2) - timer.stop("engine_forward") - - timer.start("post_process") - _sleep(s3) - timer.stop("post_process") - timer.stop("total_inference") - - engine_forward_time_delta = timer.time_delta("engine_forward") - post_process_time_delta = timer.time_delta("post_process") - total_inference_time_delta = timer.time_delta("total_inference") - - accuracy = 1.0e-02 - assert pre_process_time_delta == pytest.approx(s1, accuracy) - assert engine_forward_time_delta == pytest.approx(s2, accuracy) - assert post_process_time_delta == pytest.approx(s3, accuracy) - assert total_inference_time_delta == pytest.approx(s1 + s2 + s3, accuracy) - - def test_always_start_before_complete(self, setup): - _, _, _, timer = setup - - with pytest.raises(ValueError): - # builder.start("process") missing - timer.stop("process") - - def test_never_overwrite(self, setup): - s1, _, _, timer = setup - - timer.start("process") - _sleep(s1) - timer.stop("process") - with pytest.raises(ValueError): - timer.start("process") - - def test_cannot_compute_time_delta_before_start(self, setup): - _, _, _, timer = setup - - with pytest.raises(ValueError): - timer.time_delta("process") - - def test_cannot_compute_time_delta_before_stop(self, setup): - s1, _, _, timer = setup - - timer.start("process") - _sleep(s1) - with pytest.raises(ValueError): - timer.time_delta("process") diff --git a/tests/deepsparse/utils/test_timer.py b/tests/deepsparse/utils/test_timer.py new file mode 100644 index 0000000000..7aced4809f --- /dev/null +++ b/tests/deepsparse/utils/test_timer.py @@ -0,0 +1,106 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import concurrent.futures +import time + +from deepsparse.utils import InferenceStages, StagedTimer, TimerManager + + +def test_staged_timer(): + timer = StagedTimer(enabled=True) + + timer.start(InferenceStages.ENGINE_FORWARD) + time.sleep(1) # sleep for 1 second to measure time + timer.stop(InferenceStages.ENGINE_FORWARD) + + times = timer.times + all_times = timer.all_times + + assert InferenceStages.ENGINE_FORWARD in times + assert ( + 0.9 <= times[InferenceStages.ENGINE_FORWARD] <= 1.1 + ) # account for minor time differences + assert InferenceStages.ENGINE_FORWARD in all_times + assert len(all_times[InferenceStages.ENGINE_FORWARD]) == 1 + assert ( + 0.9 <= all_times[InferenceStages.ENGINE_FORWARD][0] <= 1.1 + ) # account for minor time differences + + +def test_timer_manager(): + timer_manager = TimerManager(enabled=True, multi=True) + + with timer_manager.new_timer_context() as timer: + timer.start(InferenceStages.ENGINE_FORWARD) + time.sleep(1) # sleep for 1 second to measure time + timer.stop(InferenceStages.ENGINE_FORWARD) + + times = timer_manager.times + all_times = timer_manager.all_times + + assert InferenceStages.ENGINE_FORWARD in times + assert ( + 0.9 <= times[InferenceStages.ENGINE_FORWARD] <= 1.1 + ) # account for minor time differences + assert InferenceStages.ENGINE_FORWARD in all_times + assert len(all_times[InferenceStages.ENGINE_FORWARD]) == 1 + assert ( + 0.9 <= all_times[InferenceStages.ENGINE_FORWARD][0] <= 1.1 + ) # account for minor time differences + + +def test_timer_manager_multithreaded(): + timer_manager = TimerManager(enabled=True, multi=True) + + def nested_func(): + timer = timer_manager.current + assert timer is not None + timer.start(InferenceStages.POST_PROCESS) + time.sleep(1) # sleep for 1 second to measure time + timer.stop(InferenceStages.POST_PROCESS) + + def worker(): + with timer_manager.new_timer_context() as timer: + timer.start(InferenceStages.ENGINE_FORWARD) + time.sleep(1) # sleep for 1 second to measure time + timer.stop(InferenceStages.ENGINE_FORWARD) + nested_func() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + executor.submit(worker) + executor.submit(worker) + + times = timer_manager.times + all_times = timer_manager.all_times + + # Checks for ENGINE_FORWARD stage + assert InferenceStages.ENGINE_FORWARD in times + assert ( + 0.9 <= times[InferenceStages.ENGINE_FORWARD] <= 1.1 + ) # account for minor time differences + assert InferenceStages.ENGINE_FORWARD in all_times + assert len(all_times[InferenceStages.ENGINE_FORWARD]) == 2 + for t in all_times[InferenceStages.ENGINE_FORWARD]: + assert 0.9 <= t <= 1.1 # account for minor time differences + + # Checks for POST_PROCESS stage + assert InferenceStages.POST_PROCESS in times + assert ( + 0.9 <= times[InferenceStages.POST_PROCESS] <= 1.1 + ) # account for minor time differences + assert InferenceStages.POST_PROCESS in all_times + assert len(all_times[InferenceStages.POST_PROCESS]) == 2 + for t in all_times[InferenceStages.POST_PROCESS]: + assert 0.9 <= t <= 1.1 # account for minor time differences