Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
1 change: 0 additions & 1 deletion src/deepsparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from .engine import *
from .tasks import *
from .timing import *
from .pipeline import *
from .loggers import *
from .version import __version__, is_release
Expand Down
75 changes: 40 additions & 35 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
validate_identifier,
)
from deepsparse.tasks import SupportedTasks, dynamic_import_task
from deepsparse.timing import InferencePhases, Timer
from deepsparse.utils import InferenceStages, StagedTimer, TimerManager


__all__ = [
Expand Down Expand Up @@ -155,13 +155,16 @@ def __init__(
context: Optional[Context] = None,
executor: Optional[Union[ThreadPoolExecutor, int]] = None,
logger: Optional[Union[BaseLogger, str]] = None,
benchmark: bool = False,
_delay_engine_initialize: bool = False, # internal use only
):
self._benchmark = benchmark
self._model_path_orig = model_path
self._model_path = model_path
self._engine_type = engine_type
self._batch_size = batch_size
self._alias = alias
self.timer_manager = TimerManager(enabled=True, multi=benchmark)
self.context = context
self.logger = (
logger
Expand Down Expand Up @@ -212,21 +215,22 @@ def __init__(
category=MetricCategories.SYSTEM,
)

def __call__(self, *args, **kwargs) -> BaseModel:
def __call__(
self, *args, timer: Optional[StagedTimer] = None, **kwargs
) -> BaseModel:
if "engine_inputs" in kwargs:
raise ValueError(
"invalid kwarg engine_inputs. engine inputs determined "
f"by {self.__class__.__qualname__}.parse_inputs"
)
timer = Timer()

timer.start(InferencePhases.TOTAL_INFERENCE)
timer = timer or self.timer_manager.new_timer()
timer.start(InferenceStages.TOTAL_INFERENCE)

# ------ PREPROCESSING ------
timer.start(InferencePhases.PRE_PROCESS)
timer.start(InferenceStages.PRE_PROCESS)
# parse inputs into input_schema
pipeline_inputs = self.parse_inputs(*args, **kwargs)

self.log(
identifier="pipeline_inputs",
value=pipeline_inputs,
Expand All @@ -244,30 +248,25 @@ def __call__(self, *args, **kwargs) -> BaseModel:
engine_inputs, postprocess_kwargs = engine_inputs
else:
postprocess_kwargs = {}
timer.stop(InferencePhases.PRE_PROCESS)

timer.stop(InferenceStages.PRE_PROCESS)
self.log(
identifier="engine_inputs",
value=engine_inputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.PRE_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.PRE_PROCESS),
category=MetricCategories.SYSTEM,
)

# ------ INFERENCE ------
# split inputs into batches of size `self._batch_size`
timer.start(InferencePhases.ENGINE_FORWARD)
timer.start(InferenceStages.ENGINE_FORWARD)
batches = self.split_engine_inputs(engine_inputs, self._batch_size)

# submit split batches to engine threadpool
batch_outputs = list(self.executor.map(self.engine_forward, batches))

# join together the batches of size `self._batch_size`
engine_outputs = self.join_engine_outputs(batch_outputs)
timer.stop(InferencePhases.ENGINE_FORWARD)
timer.stop(InferenceStages.ENGINE_FORWARD)

self.log(
identifier=f"{SystemGroups.INFERENCE_DETAILS}/input_batch_size_total",
Expand All @@ -278,20 +277,14 @@ def __call__(self, *args, **kwargs) -> BaseModel:
value=len(batch_outputs) * self._batch_size,
category=MetricCategories.SYSTEM,
)

self.log(
identifier="engine_outputs",
value=engine_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.ENGINE_FORWARD}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.ENGINE_FORWARD),
category=MetricCategories.SYSTEM,
)

# ------ POSTPROCESSING ------
timer.start(InferencePhases.POST_PROCESS)
timer.start(InferenceStages.POST_PROCESS)
pipeline_outputs = self.process_engine_outputs(
engine_outputs, **postprocess_kwargs
)
Expand All @@ -300,24 +293,16 @@ def __call__(self, *args, **kwargs) -> BaseModel:
f"Outputs of {self.__class__} must be instances of "
f"{self.output_schema} found output of type {type(pipeline_outputs)}"
)
timer.stop(InferencePhases.POST_PROCESS)
timer.stop(InferencePhases.TOTAL_INFERENCE)

timer.stop(InferenceStages.POST_PROCESS)
self.log(
identifier="pipeline_outputs",
value=pipeline_outputs,
category=MetricCategories.DATA,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.POST_PROCESS}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.POST_PROCESS),
category=MetricCategories.SYSTEM,
)
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{InferencePhases.TOTAL_INFERENCE}_seconds", # noqa E501
value=timer.time_delta(InferencePhases.TOTAL_INFERENCE),
category=MetricCategories.SYSTEM,
)

# ------ INFERENCE FINALIZATION ------
timer.stop(InferenceStages.TOTAL_INFERENCE)
self.log_inference_times(timer)

return pipeline_outputs

Expand Down Expand Up @@ -704,6 +689,15 @@ def engine_type(self) -> str:
"""
return self._engine_type

@property
def benchmark(self) -> bool:
return self._benchmark

@benchmark.setter
def benchmark(self, value: bool):
self._benchmark = value
self.timer_manager.multi = value

def to_config(self) -> "PipelineConfig":
"""
:return: PipelineConfig that can be used to reload this object
Expand Down Expand Up @@ -739,7 +733,7 @@ def log(
self,
identifier: str,
value: Any,
category: str,
category: Union[str, MetricCategories],
):
"""
Pass the logged data to the DeepSparse logger object (if present).
Expand Down Expand Up @@ -791,6 +785,17 @@ def engine_forward(self, engine_inputs: List[numpy.ndarray]) -> List[numpy.ndarr
"""
return self.engine(engine_inputs)

def log_inference_times(self, timer: StagedTimer):
"""
logs stage times in the given timer
"""
for stage, time in timer.times.items():
self.log(
identifier=f"{SystemGroups.PREDICTION_LATENCY}/{stage}_seconds",
value=time,
category=MetricCategories.SYSTEM,
)

def _initialize_engine(self) -> Union[Engine, ORTEngine]:
engine_type = self.engine_type.lower()

Expand Down
18 changes: 0 additions & 18 deletions src/deepsparse/timing/__init__.py

This file was deleted.

26 changes: 0 additions & 26 deletions src/deepsparse/timing/inference_phases.py

This file was deleted.

119 changes: 0 additions & 119 deletions src/deepsparse/timing/timer.py

This file was deleted.

1 change: 1 addition & 0 deletions src/deepsparse/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from .cli_helpers import *
from .data import *
from .onnx import *
from .timer import *
Loading