Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/deepsparse/v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# flake8: noqa

# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .pipeline import *
from .operators import *
from .routers import *
from .schedulers import *
from .utils import *
17 changes: 17 additions & 0 deletions src/deepsparse/v2/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# flake8: noqa

# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .operator import *
90 changes: 90 additions & 0 deletions src/deepsparse/v2/operators/operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional, Type

from pydantic import BaseModel

from deepsparse.v2.utils import Context, OperatorSchema


__all__ = ["Operator"]


class Operator(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we also going to define an abstract class for EngineOperators (mentioned in your design doc some time ago)?

"""
Base operator class - can represent any part of an ML pipeline
"""

# expected structured input and output types, to be defined by child classes
input_schema: Optional[Type[OperatorSchema]] = None
output_schema: Optional[Type[OperatorSchema]] = None

@abstractmethod
def run(self, inp: OperatorSchema, context: Context) -> OperatorSchema:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the input_schema isn't mandatory, the typing on this shouldn't always be OperatorSchema?

"""
:param inp: operator input, as the defined input schema if applicable
:param context: pipeline context of already run operators
:return: result of this operator as the defined output schema if applicable
"""
raise NotImplementedError

@classmethod
def has_input_schema(cls) -> bool:
"""
:return: True if this class has a defined pydantic input schema
"""
return issubclass(cls.input_schema, BaseModel)

@classmethod
def has_output_schema(cls) -> bool:
"""
:return: True if this class has a defined pydantic input schema
"""
return issubclass(cls.output_schema, BaseModel)

def __call__(
self,
*args,
context: Optional[Context] = None,
**kwargs,
) -> OperatorSchema:
"""
Parses inputs to this Operator and runs the run() method of this operator

:param args: an unnamed arg may only be provided
if it is of the type of the input_schema
:param context: pipeline context to pass to operator
:param kwargs: kwargs when not initializing from an instantiated schema
:return: operator output
"""
if len(args) > 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's see if we can push this to kwargs only for inputs to simplify understanding

raise ValueError(
f"Only 1 unnamed arg may be supplied to an Operator, found {len(args)}"
)

if len(args) == 1:
if self.input_schema is not None and isinstance(args[0], self.input_schema):
inference_input = args[0]
else:
raise ValueError(
f"1 arg supplied to Operator {self.__class__.__name__} but was not "
f"of expected type {self.input_schema}, found {type(args[0])}"
)
elif self.has_input_schema():
inference_input = self.input_schema(**kwargs)
else:
inference_input = kwargs
return self.run(inference_input, context=context)
102 changes: 102 additions & 0 deletions src/deepsparse/v2/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import List

from pydantic import BaseModel, Field, PrivateAttr

from deepsparse.v2.operators import Operator
from deepsparse.v2.routers import Router
from deepsparse.v2.schedulers import OperatorScheduler, SchedulerGroup


__all__ = ["Pipeline"]


class Pipeline(BaseModel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markurtz to send over some UX examples for constructors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this inheriting from the BaseModel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? Why are you worried about it Dipika?

"""
Pipeline accepts a series of operators, schedulers, and a router which define
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing BasePipeline? Seems like we'd still want the concept of a task for each pipeline and registry?

an end to end ML transformation.

Calling a pipeline runs these transformations
"""

stages: List[Operator] = Field(
required=True,
description="In-order list of operators that make up this pipeline",
)
router: Router = Field(
default_factor=Router,
description="Router object to determine order and run the stages. "
"Defaults to the base Router object",
)
schedulers: List[OperatorScheduler] = Field(
default_factor=lambda: [OperatorScheduler()],
description="List of schedulers to run operators in order of priority",
)

_scheduler_group: SchedulerGroup = PrivateAttr()

class Config:
arbitrary_types_allowed = True

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.validate()

# SchedulerGroup handles running all schedulers in order of priority
self._scheduler_group = SchedulerGroup(self.schedulers)

def __call__(self, *args, return_context: bool = False, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to have three cases here:

  • sync
  • async
  • generator

"""
:param return_context: if True, retrns tuple of the pipelien output
and entire context. Default False
:return: output of the pipeline stages ran with the router for the given input
"""
if len(args) > 1:
raise ValueError(
"Only 1 in-line argument may be supplied to Pipeline which "
f"must be a Schema, found: {len(args)}"
)
if args and kwargs:
raise ValueError(
"Pipeline can only run either a single in-line argument schema or a "
f"series of kwargs, found {len(args)} args and {len(kwargs)} kwargs"
)

pipeline_input = args[0] or kwargs
pipeline_output, context = self.router.run(
inp=pipeline_input,
operators=self.stages,
scheduler=self._scheduler_group,
)

if return_context:
return pipeline_output, context

return pipeline_output

def validate(self):
router_validation = self.router.validate(self.stages)

if router_validation is False:
# default error message
stage_types = [type(stage) for stage in self.stages]
raise ValueError(
f"Invalid Router: {type(self.router)} for stages: {stage_types}"
)
elif isinstance(router_validation, str):
raise ValueError(f"Invalid Router for stages: {router_validation}")
17 changes: 17 additions & 0 deletions src/deepsparse/v2/routers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# flake8: noqa

# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .router import *
95 changes: 95 additions & 0 deletions src/deepsparse/v2/routers/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import List, Tuple, Union

from deepsparse.v2.operators import Operator
from deepsparse.v2.schedulers import OperatorScheduler
from deepsparse.v2.utils import Context, OperatorSchema


__all__ = ["Router"]


class Router:
"""
Routers must implement a run method which runs a series of operators
for a pipeline for a given input. Base Router runs operators linearly
in a series
"""

@staticmethod
def run(
inp: OperatorSchema,
operators: List[Operator],
scheduler: OperatorScheduler,
) -> Tuple[OperatorSchema, Context]:
"""
:param inp: input to the first operator of the series
:param operators: list of operators to run
:param scheduler: scheudler to submit operators to
:return: final output of the operators
"""
context = Context()

# run operators linearly
operator_input = inp
for operator in operators:
output_future = scheduler.submit(
operator=operator, operator_input=operator_input, context=context
)

# wait for future to resolve
operator_output = output_future.result()

# update context
context.update(
operator=operator,
input=operator_input,
output=operator_output,
)

# previous output becomes next input
operator_input = operator_output

return operator_output, context

@staticmethod
def validate(operators: List[Operator]) -> Union[bool, str]:
"""
:param operators: operators that this Router could potentially run over
:return: True if this Router can run this series of operators. Base Router
runs any series of operators that is non empty and whose input and output
schemas align. If not valid, either False or an error string will be
returned
"""
if len(operators) < 1:
return "No operators found"

for idx in range(len(operators) - 1):
current_output_schema = operators[idx].output_schema
next_input_schema = operators[idx + 1].input_schema

if current_output_schema is None or next_input_schema is None:
# if no input/output schema defined, assume operator can run
# without schema
continue

if current_output_schema != next_input_schema:
return (
f"Operator at idx {idx}: {type(operators[idx])} has invalid "
f"output schema {current_output_schema} for next operator "
f"{type(operators[idx + 1])} which requires {next_input_schema}"
)
18 changes: 18 additions & 0 deletions src/deepsparse/v2/schedulers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# flake8: noqa

# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .scheduler import *
from .scheduler_group import *
Loading