Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6c75b65
initial functionality and working example with image classification
dsikka Oct 17, 2023
75de103
remove testing image
dsikka Oct 17, 2023
aa5d885
rebase fixes
dsikka Oct 31, 2023
8cc63ee
initial functionality and working example with image classification
dsikka Oct 17, 2023
ab2b711
text gen
dsikka Oct 20, 2023
00cb85e
updates func
dsikka Oct 24, 2023
5cf4b3f
prompt inference, initial functionality
dsikka Oct 25, 2023
1b951dc
remove image; update state docstring
dsikka Oct 25, 2023
809cfc1
Fix typo
dsikka Oct 25, 2023
6336d8e
add todo for split/join
dsikka Oct 25, 2023
3f2193d
remove context, clean-up args, remove prefill_preprocess_operaator
dsikka Nov 1, 2023
216ceea
fix docstrings
dsikka Nov 1, 2023
02b74d4
initial functionality and working example with image classification
dsikka Oct 17, 2023
37f090c
updates func
dsikka Oct 24, 2023
7bd25da
prompt inference, initial functionality
dsikka Oct 25, 2023
98bc123
finish generation operators and update routes
dsikka Oct 26, 2023
ef8277b
further breakdown operators
dsikka Oct 26, 2023
664abdd
add operators
dsikka Oct 26, 2023
754ce2c
fix can_operate condition
dsikka Oct 27, 2023
ed7cd58
update can_operate to not rely on the inference_state
dsikka Oct 30, 2023
5d56421
rebase + update
dsikka Nov 1, 2023
5086e1f
fix condition
dsikka Nov 2, 2023
44156e6
async initial functionality
dsikka Nov 2, 2023
740eb67
fix capacity settting again
dsikka Nov 2, 2023
c991c30
Merge branch 'v2' into features/v2/generation
dsikka Nov 3, 2023
473a691
Merge branch 'features/v2/generation' into features/v2/async
dsikka Nov 3, 2023
8ed2a64
Merge branch 'v2' into features/v2/async
dsikka Nov 3, 2023
c2666dd
add blocking
dsikka Nov 9, 2023
59f69d3
more testing
dsikka Nov 9, 2023
6276a77
Merge branch 'v2' into features/v2/async
dsikka Nov 10, 2023
235178b
update to use split/join
dsikka Nov 10, 2023
fe15318
fix
dsikka Nov 10, 2023
1645aa8
rebase fix
dsikka Nov 10, 2023
a7a003c
remove index
dsikka Nov 10, 2023
bafdd24
change event loop
dsikka Nov 20, 2023
b981371
Merge branch 'v2' into features/v2/async
dsikka Nov 21, 2023
9a15bf6
rebase fix
dsikka Nov 21, 2023
8f7fbd6
update async run to use new operator scheduling properly
dsikka Nov 27, 2023
0d82e49
Merge branch 'v2' into features/v2/async
dsikka Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 88 additions & 19 deletions src/deepsparse/v2/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio
import copy
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

from deepsparse.v2.operators import EngineOperator, Operator
from deepsparse.v2.routers import Router
Expand Down Expand Up @@ -68,10 +68,7 @@ def __init__(
self._scheduler_group = SchedulerGroup(self.schedulers)

def _run_next(
self,
inp: Any,
inference_state: InferenceState,
next_step: str,
self, inp: Any, inference_state: InferenceState, next_step: str, **kwargs
):
if (
isinstance(self.ops[next_step], EngineOperator)
Expand All @@ -88,10 +85,14 @@ def _run_next(
inp=inp,
pipeline_state=self.pipeline_state,
inference_state=inference_state,
**kwargs,
)

def _run_sub_graphs(
self, sub_graph_inputs: List[Any], sub_graphs: List[SubGraph]
async def _run_sub_graphs(
self,
sub_graph_inputs: List[Any],
sub_graphs: List[SubGraph],
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> List[Any]:
"""
Run a list of sub_graphs asynchronously. Polls to identify the sub graph that is
Expand All @@ -111,14 +112,16 @@ def _run_sub_graphs(
"""
for i in range(len(sub_graphs)):
sub_graphs[i].output = self._run_next(
sub_graph_inputs[i], sub_graphs[i].inf, sub_graphs[i].step
sub_graph_inputs[i], sub_graphs[i].inf, sub_graphs[i].step, loop=loop
)

# Execute all sub graphs until all graphs have been completed.
while any(not x.completed for x in sub_graphs):
for sub_graph in sub_graphs:
if not sub_graph.completed:
# get the result for the completed operator; resolve its output
if isinstance(sub_graph.output, asyncio.Future):
await sub_graph.output
operator_output = sub_graph.output.result()
operator_output = sub_graph.parse_output(operator_output)

Expand All @@ -141,18 +144,80 @@ def _run_sub_graphs(
inp=operator_output,
inference_state=sub_graph.inf,
next_step=next_step,
loop=loop,
)

return [x.output for x in sub_graphs]

def _apply_split(self, inp: Any, inference_state: InferenceState):
async def run_async(self, *args, inference_state: InferenceState, **kwargs):
"""
Split inputs using the pipeline's expand_inputs function. Inputs are split
into a batch size of one when a SPLIT_ROUTE node is found in a given pipeline's
provided router. The split batches are run asynchronously and then joined when
a JOIN_ROUTE node is found, using the pipeline's condense_inputs function.
Run through the operators using the provided router and scheduler.
The input to a given operator is the output of the previous operator.

:param inference_state: inference_state for the pipeline.
:param pipeline_state: pipeline_state for the pipeline. The values in the state
are created during pipeline creation and are read-only during inference.
"""
loop = asyncio.get_running_loop()

next_step = self.router.START_ROUTE
operator_output = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there's any reason for an operator to have an output of None, but if so, you might consider using another sentinel value here and in the check below 🤷🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could also consider adding another local variable to avoid overloading the use of operator_output
something like

Suggested change
operator_output = None
operator_output = None
processed_start_route = False


while next_step != self.router.END_ROUTE:
# Either a dictionary key or valid index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment refers to next_step maybe? If so I'd suggest moving the comment to be where next_step is first declared


if next_step == self.router.SPLIT_ROUTE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code flow is a bit funky, what do you think about making this an elif in the same block as if next_step == self.router.START_ROUTE: and adding another predicate like this one when deciding next_step?

I think that would look like this:

    async def run_async(self, *args, inference_state: InferenceState, **kwargs):
        """
        Run through the operators using the provided router and scheduler.
        The input to a given operator is the output of the previous operator.

        :param inference_state: inference_state for the pipeline.
        :param pipeline_state: pipeline_state for the pipeline. The values in the state
            are created during pipeline creation and are read-only during inference.
        """
        loop = asyncio.get_running_loop()

        next_step = self.router.START_ROUTE
        operator_output = None

        while next_step != self.router.END_ROUTE:
            # Either a dictionary key or valid index

            if next_step == self.router.START_ROUTE:
                outputs = run_func(
                    *args,
                    func=self._scheduler_group.submit,
                    operator=self.ops[next_step],
                    inference_state=inference_state,
                    pipeline_state=self.pipeline_state,
                    loop=loop,
                    **kwargs,
                )
                await outputs
                operator_output = outputs.result()
            elif next_step == self.router.SPLIT_ROUTE:
                if operator_output is None:
                    raise ValueError(
                        f"{self.router.SPLIT_ROUTE} should appear after "
                        f"{self.ROUTER.START_ROUTE}"
                    )

                operator_output = await self._apply_split(
                    operator_output, inference_state, loop=loop
                )
            else:
                outputs = self._run_next(
                    inp=operator_output,
                    next_step=next_step,
                    inference_state=inference_state,
                    loop=loop,
                )
                await outputs
                operator_output = outputs.result()
         
            if next_step == self.router.SPLIT_ROUTE:
                next_step = self.router.route[self.router.JOIN_ROUTE]
                continue

            if isinstance(operator_output, tuple):
                state_update = operator_output[-1]
                operator_output = operator_output[0]
            next_step = self.router.next(next_step, self.ops, operator_output)
            if state_update:
                inference_state.update_state(state_update)
        return operator_output   

Maybe a little easier to reason about, but maybe not.

if operator_output is None:
raise ValueError(
f"{self.router.SPLIT_ROUTE} should appear after "
f"{self.ROUTER.START_ROUTE}"
)

operator_output = await self._apply_split(
operator_output, inference_state, loop=loop
)
next_step = self.router.route[self.router.JOIN_ROUTE]
if next_step == self.router.END_ROUTE:
return operator_output

if next_step == self.router.START_ROUTE:
outputs = run_func(
*args,
func=self._scheduler_group.submit,
operator=self.ops[next_step],
inference_state=inference_state,
pipeline_state=self.pipeline_state,
loop=loop,
**kwargs,
)
await outputs
operator_output = outputs.result()

else:
outputs = self._run_next(
inp=operator_output,
next_step=next_step,
inference_state=inference_state,
loop=loop,
)
await outputs
operator_output = outputs.result()

if isinstance(operator_output, tuple):
state_update = operator_output[-1]
operator_output = operator_output[0]

next_step = self.router.next(next_step, self.ops, operator_output)
if state_update:
inference_state.update_state(state_update)
return operator_output

async def _apply_split(
self,
inp: Any,
inference_state: InferenceState,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
batches, orig_batch_size = self.expand_inputs(inp, 1)

# Create a list of SplitRoutes, per batch size 1
Expand All @@ -168,8 +233,8 @@ def _apply_split(self, inp: Any, inference_state: InferenceState):
for i in range(len(batches))
]

outputs = self._run_sub_graphs(
sub_graph_inputs=batches, sub_graphs=split_graphs
outputs = await self._run_sub_graphs(
sub_graph_inputs=batches, sub_graphs=split_graphs, loop=loop
)
return self.condense_inputs(outputs)

Expand Down Expand Up @@ -200,7 +265,9 @@ def run(
f"{self.ROUTER.START_ROUTE}"
)

operator_output = self._apply_split(operator_output, inference_state)
operator_output = asyncio.run(
self._apply_split(operator_output, inference_state)
)
next_step = self.router.route[self.router.JOIN_ROUTE]
if next_step == self.router.END_ROUTE:
return operator_output
Expand Down Expand Up @@ -232,8 +299,10 @@ def run(
end=[self.router.SPLIT_ROUTE, self.router.END_ROUTE],
)

operator_output = self._run_sub_graphs(
sub_graph_inputs=[operator_output], sub_graphs=[graph]
operator_output = asyncio.run(
self._run_sub_graphs(
sub_graph_inputs=[operator_output], sub_graphs=[graph]
)
)[0]

inference_state = graph.inf
Expand Down
24 changes: 18 additions & 6 deletions src/deepsparse/v2/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.


import asyncio
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable
from typing import Callable, Optional

from deepsparse.v2.operators import Operator

Expand All @@ -37,6 +38,21 @@ class OperatorScheduler:
def __init__(self, max_workers: int = 1):
self._threadpool = ThreadPoolExecutor(max_workers=max_workers)

def async_run(
self,
*args,
operator: Operator,
loop: Optional[asyncio.AbstractEventLoop],
**kwargs,
) -> asyncio.Future:
import functools

"""Use an asyncio event loop to run the operator"""

return loop.run_in_executor(
self._threadpool, functools.partial(operator, *args, **kwargs)
)

def submit(
self,
*args,
Expand All @@ -47,11 +63,7 @@ def submit(
:param operator: operator to run
:return: future referencing the asynchronously run output of the operator
"""
return self._threadpool.submit(
operator,
*args,
**kwargs,
)
return self._threadpool.submit(operator, *args, **kwargs)

def can_process(
self,
Expand Down
7 changes: 6 additions & 1 deletion src/deepsparse/v2/schedulers/scheduler_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from concurrent.futures import Future
from typing import List
from typing import Any, List

from deepsparse.v2.operators import Operator
from deepsparse.v2.schedulers.scheduler import OperatorScheduler
Expand All @@ -38,6 +38,7 @@ def submit(
self,
*args,
operator: Operator,
loop: Any = None,
**kwargs,
) -> Future:
"""
Expand All @@ -50,6 +51,10 @@ def submit(
operator=operator,
**kwargs,
):
if loop:
return scheduler.async_run(
*args, operator=operator, loop=loop, **kwargs
)
return scheduler.submit(
*args,
operator=operator,
Expand Down