diff --git a/src/deepsparse/v2/__init__.py b/src/deepsparse/v2/__init__.py new file mode 100644 index 0000000000..4a897be06f --- /dev/null +++ b/src/deepsparse/v2/__init__.py @@ -0,0 +1,21 @@ +# flake8: noqa + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .pipeline import * +from .operators import * +from .routers import * +from .schedulers import * +from .utils import * diff --git a/src/deepsparse/v2/operators/__init__.py b/src/deepsparse/v2/operators/__init__.py new file mode 100644 index 0000000000..8f7e6a169d --- /dev/null +++ b/src/deepsparse/v2/operators/__init__.py @@ -0,0 +1,17 @@ +# flake8: noqa + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .operator import * diff --git a/src/deepsparse/v2/operators/operator.py b/src/deepsparse/v2/operators/operator.py new file mode 100644 index 0000000000..30e1a48379 --- /dev/null +++ b/src/deepsparse/v2/operators/operator.py @@ -0,0 +1,90 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Optional, Type + +from pydantic import BaseModel + +from deepsparse.v2.utils import Context, OperatorSchema + + +__all__ = ["Operator"] + + +class Operator(ABC): + """ + Base operator class - can represent any part of an ML pipeline + """ + + # expected structured input and output types, to be defined by child classes + input_schema: Optional[Type[OperatorSchema]] = None + output_schema: Optional[Type[OperatorSchema]] = None + + @abstractmethod + def run(self, inp: OperatorSchema, context: Context) -> OperatorSchema: + """ + :param inp: operator input, as the defined input schema if applicable + :param context: pipeline context of already run operators + :return: result of this operator as the defined output schema if applicable + """ + raise NotImplementedError + + @classmethod + def has_input_schema(cls) -> bool: + """ + :return: True if this class has a defined pydantic input schema + """ + return issubclass(cls.input_schema, BaseModel) + + @classmethod + def has_output_schema(cls) -> bool: + """ + :return: True if this class has a defined pydantic input schema + """ + return issubclass(cls.output_schema, BaseModel) + + def __call__( + self, + *args, + context: Optional[Context] = None, + **kwargs, + ) -> OperatorSchema: + """ + Parses inputs to this Operator and runs the run() method of this operator + + :param args: an unnamed arg may only be provided + if it is of the type of the input_schema + :param context: pipeline context to pass to operator + :param kwargs: kwargs when not initializing from an instantiated schema + :return: operator output + """ + if len(args) > 1: + raise ValueError( + f"Only 1 unnamed arg may be supplied to an Operator, found {len(args)}" + ) + + if len(args) == 1: + if self.input_schema is not None and isinstance(args[0], self.input_schema): + inference_input = args[0] + else: + raise ValueError( + f"1 arg supplied to Operator {self.__class__.__name__} but was not " + f"of expected type {self.input_schema}, found {type(args[0])}" + ) + elif self.has_input_schema(): + inference_input = self.input_schema(**kwargs) + else: + inference_input = kwargs + return self.run(inference_input, context=context) diff --git a/src/deepsparse/v2/pipeline.py b/src/deepsparse/v2/pipeline.py new file mode 100644 index 0000000000..0ec580687d --- /dev/null +++ b/src/deepsparse/v2/pipeline.py @@ -0,0 +1,102 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import List + +from pydantic import BaseModel, Field, PrivateAttr + +from deepsparse.v2.operators import Operator +from deepsparse.v2.routers import Router +from deepsparse.v2.schedulers import OperatorScheduler, SchedulerGroup + + +__all__ = ["Pipeline"] + + +class Pipeline(BaseModel): + """ + Pipeline accepts a series of operators, schedulers, and a router which define + an end to end ML transformation. + + Calling a pipeline runs these transformations + """ + + stages: List[Operator] = Field( + required=True, + description="In-order list of operators that make up this pipeline", + ) + router: Router = Field( + default_factor=Router, + description="Router object to determine order and run the stages. " + "Defaults to the base Router object", + ) + schedulers: List[OperatorScheduler] = Field( + default_factor=lambda: [OperatorScheduler()], + description="List of schedulers to run operators in order of priority", + ) + + _scheduler_group: SchedulerGroup = PrivateAttr() + + class Config: + arbitrary_types_allowed = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.validate() + + # SchedulerGroup handles running all schedulers in order of priority + self._scheduler_group = SchedulerGroup(self.schedulers) + + def __call__(self, *args, return_context: bool = False, **kwargs): + """ + :param return_context: if True, retrns tuple of the pipelien output + and entire context. Default False + :return: output of the pipeline stages ran with the router for the given input + """ + if len(args) > 1: + raise ValueError( + "Only 1 in-line argument may be supplied to Pipeline which " + f"must be a Schema, found: {len(args)}" + ) + if args and kwargs: + raise ValueError( + "Pipeline can only run either a single in-line argument schema or a " + f"series of kwargs, found {len(args)} args and {len(kwargs)} kwargs" + ) + + pipeline_input = args[0] or kwargs + pipeline_output, context = self.router.run( + inp=pipeline_input, + operators=self.stages, + scheduler=self._scheduler_group, + ) + + if return_context: + return pipeline_output, context + + return pipeline_output + + def validate(self): + router_validation = self.router.validate(self.stages) + + if router_validation is False: + # default error message + stage_types = [type(stage) for stage in self.stages] + raise ValueError( + f"Invalid Router: {type(self.router)} for stages: {stage_types}" + ) + elif isinstance(router_validation, str): + raise ValueError(f"Invalid Router for stages: {router_validation}") diff --git a/src/deepsparse/v2/routers/__init__.py b/src/deepsparse/v2/routers/__init__.py new file mode 100644 index 0000000000..8718bedeb4 --- /dev/null +++ b/src/deepsparse/v2/routers/__init__.py @@ -0,0 +1,17 @@ +# flake8: noqa + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .router import * diff --git a/src/deepsparse/v2/routers/router.py b/src/deepsparse/v2/routers/router.py new file mode 100644 index 0000000000..284c348c10 --- /dev/null +++ b/src/deepsparse/v2/routers/router.py @@ -0,0 +1,95 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import List, Tuple, Union + +from deepsparse.v2.operators import Operator +from deepsparse.v2.schedulers import OperatorScheduler +from deepsparse.v2.utils import Context, OperatorSchema + + +__all__ = ["Router"] + + +class Router: + """ + Routers must implement a run method which runs a series of operators + for a pipeline for a given input. Base Router runs operators linearly + in a series + """ + + @staticmethod + def run( + inp: OperatorSchema, + operators: List[Operator], + scheduler: OperatorScheduler, + ) -> Tuple[OperatorSchema, Context]: + """ + :param inp: input to the first operator of the series + :param operators: list of operators to run + :param scheduler: scheudler to submit operators to + :return: final output of the operators + """ + context = Context() + + # run operators linearly + operator_input = inp + for operator in operators: + output_future = scheduler.submit( + operator=operator, operator_input=operator_input, context=context + ) + + # wait for future to resolve + operator_output = output_future.result() + + # update context + context.update( + operator=operator, + input=operator_input, + output=operator_output, + ) + + # previous output becomes next input + operator_input = operator_output + + return operator_output, context + + @staticmethod + def validate(operators: List[Operator]) -> Union[bool, str]: + """ + :param operators: operators that this Router could potentially run over + :return: True if this Router can run this series of operators. Base Router + runs any series of operators that is non empty and whose input and output + schemas align. If not valid, either False or an error string will be + returned + """ + if len(operators) < 1: + return "No operators found" + + for idx in range(len(operators) - 1): + current_output_schema = operators[idx].output_schema + next_input_schema = operators[idx + 1].input_schema + + if current_output_schema is None or next_input_schema is None: + # if no input/output schema defined, assume operator can run + # without schema + continue + + if current_output_schema != next_input_schema: + return ( + f"Operator at idx {idx}: {type(operators[idx])} has invalid " + f"output schema {current_output_schema} for next operator " + f"{type(operators[idx + 1])} which requires {next_input_schema}" + ) diff --git a/src/deepsparse/v2/schedulers/__init__.py b/src/deepsparse/v2/schedulers/__init__.py new file mode 100644 index 0000000000..04c37077e1 --- /dev/null +++ b/src/deepsparse/v2/schedulers/__init__.py @@ -0,0 +1,18 @@ +# flake8: noqa + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .scheduler import * +from .scheduler_group import * diff --git a/src/deepsparse/v2/schedulers/scheduler.py b/src/deepsparse/v2/schedulers/scheduler.py new file mode 100644 index 0000000000..53f0c8f625 --- /dev/null +++ b/src/deepsparse/v2/schedulers/scheduler.py @@ -0,0 +1,63 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from concurrent.futures import Future, ThreadPoolExecutor + +from deepsparse.v2.operators import Operator +from deepsparse.v2.utils import Context, OperatorSchema + + +__all__ = ["OperatorScheduler"] + + +class OperatorScheduler: + """ + OperatorSchedulers should implement a `submit` function that asynchronously + runs an operator and its input and returns a Future. Priority of operators + to run and resources they are run on are deferred to specific OperatorScheduler + implementations + + Base OperatorScheduler behaves as a simple queue deferring to ThreadPoolExecutor + + :param max_workers: maximum number of threads to execute at once + """ + + def __init__(self, max_workers: int = 1): + self._threadpool = ThreadPoolExecutor(max_workers=max_workers) + + def submit( + self, + operator: Operator, + operator_input: OperatorSchema, + context: Context, + ) -> Future: + """ + :param operator: operator to run + :param operator_input: input schema to the operator + :param context: context of already run operators + :return: future referencing the asynchronously run output of the operator + """ + if isinstance(operator_input, dict): + return self._threadpool.submit(operator, context=context, **operator_input) + return self._threadpool.submit(operator, operator_input, context=context) + + def can_process(self, operator: Operator, operator_input: OperatorSchema) -> bool: + """ + :param operator: operator to check + :param operator_input: operator_input to check + :return: True if this Operator can process the given operator and input. + Base OperatorScheduler always returns True + """ + return True diff --git a/src/deepsparse/v2/schedulers/scheduler_group.py b/src/deepsparse/v2/schedulers/scheduler_group.py new file mode 100644 index 0000000000..2f797b30c7 --- /dev/null +++ b/src/deepsparse/v2/schedulers/scheduler_group.py @@ -0,0 +1,64 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from concurrent.futures import Future +from typing import List + +from deepsparse.v2.operators import Operator +from deepsparse.v2.schedulers.scheduler import OperatorScheduler +from deepsparse.v2.utils import Context, OperatorSchema + + +__all__ = ["SchedulerGroup"] + + +class SchedulerGroup(OperatorScheduler): + """ + Wrapper for a series of schedulers. Runs submitted operators on the first + scheduler that can process a given input + + :param schedulers: list of schedulers to pass operators to + """ + + def __init__(self, schedulers: List[OperatorScheduler]): + self.schedulers = schedulers + + def submit( + self, + operator: Operator, + operator_input: OperatorSchema, + context: Context, + ) -> Future: + """ + :param operator: operator to run + :param operator_input: input schema to the operator + :param context: context of already run operators + :return: future referencing the asynchronously run output of the operator + """ + for scheduler in self.schedulers: + if scheduler.can_process(operator, operator_input): + return scheduler.submit(operator, operator_input, context) + + def can_process(self, operator: Operator, operator_input: OperatorSchema) -> bool: + """ + :param operator: operator to check + :param operator_input: operator_input to check + :return: True if this Operator can process the given operator and input. + SchedulerGroup always returns True + """ + return any( + scheduler.can_process(operator, operator_input) + for scheduler in self.schedulers + ) diff --git a/src/deepsparse/v2/utils/__init__.py b/src/deepsparse/v2/utils/__init__.py new file mode 100644 index 0000000000..4f36eeb448 --- /dev/null +++ b/src/deepsparse/v2/utils/__init__.py @@ -0,0 +1,18 @@ +# flake8: noqa + +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .context import * +from .types import * diff --git a/src/deepsparse/v2/utils/context.py b/src/deepsparse/v2/utils/context.py new file mode 100644 index 0000000000..81fe26de61 --- /dev/null +++ b/src/deepsparse/v2/utils/context.py @@ -0,0 +1,42 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import Callable, List, NamedTuple + +from deepsparse.v2.utils.types import OperatorSchema + + +__all__ = ["Context"] + + +class StageInfo(NamedTuple): + operator: Callable + input: OperatorSchema + output: OperatorSchema + + +class Context: + """ + Context contains the full history of operators and their inputs and outputs + in a pipeline + """ + + def __init__(self): + self.stages_executed: List[StageInfo] = [] + + def update(self, operator: Callable, input: OperatorSchema, output: OperatorSchema): + self.stages_executed.append( + StageInfo(operator=operator, input=input, output=output) + ) diff --git a/src/deepsparse/v2/utils/types.py b/src/deepsparse/v2/utils/types.py new file mode 100644 index 0000000000..3e4b974453 --- /dev/null +++ b/src/deepsparse/v2/utils/types.py @@ -0,0 +1,28 @@ +# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Types to support deepsparse pipelines +""" + +from typing import Any, Dict, Union + +from pydantic import BaseModel + + +__all__ = ["OperatorSchema"] + + +# Operator inputs and outputs may either be a pydantic base model or a dict of kwargs +OperatorSchema = Union[BaseModel, Dict[str, Any]] diff --git a/tests/deepsparse/v2/__init__.py b/tests/deepsparse/v2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/deepsparse/v2/test_basic_pipeline.py b/tests/deepsparse/v2/test_basic_pipeline.py new file mode 100644 index 0000000000..d39bc61c8c --- /dev/null +++ b/tests/deepsparse/v2/test_basic_pipeline.py @@ -0,0 +1,45 @@ +""" +Simple example and test of a dummy pipeline +""" + +from pydantic import BaseModel + +from deepsparse.v2 import Pipeline +from deepsparse.v2.operators import Operator +from deepsparse.v2.routers import Router +from deepsparse.v2.schedulers import OperatorScheduler +from deepsparse.v2.utils import Context, OperatorSchema + + +class IntSchema(BaseModel): + value: int + + +class AddOneOperator(Operator): + input_schema = IntSchema + output_schema = IntSchema + + def run(self, inp: IntSchema, context: Context) -> OperatorSchema: + return IntSchema(value=inp.value + 1) + + +class AddTwoOperator(Operator): + input_schema = IntSchema + output_schema = IntSchema + + def run(self, inp: IntSchema, context: Context) -> OperatorSchema: + return IntSchema(value=inp.value + 2) + + +AddThreePipeline = Pipeline( + stages=[AddOneOperator(), AddTwoOperator()], + router=Router(), + schedulers=[OperatorScheduler()], +) + + +def test_run_simple_pipeline(): + pipeline_input = IntSchema(value=5) + pipeline_output = AddThreePipeline(pipeline_input) + + assert pipeline_output.value == 8