-
Notifications
You must be signed in to change notification settings - Fork 191
[Pipeline Refactor] async #1380
Changes from 38 commits
6c75b65
75de103
aa5d885
8cc63ee
ab2b711
00cb85e
5cf4b3f
1b951dc
809cfc1
6336d8e
3f2193d
216ceea
02b74d4
37f090c
7bd25da
98bc123
ef8277b
664abdd
754ce2c
ed7cd58
5d56421
5086e1f
44156e6
740eb67
c991c30
473a691
8ed2a64
c2666dd
59f69d3
6276a77
235178b
fe15318
1645aa8
a7a003c
bafdd24
b981371
9a15bf6
8f7fbd6
0d82e49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,10 +12,10 @@ | |||||||
| # See the License for the specific language governing permissions and | ||||||||
| # limitations under the License. | ||||||||
|
|
||||||||
|
|
||||||||
| import asyncio | ||||||||
| import copy | ||||||||
| from concurrent.futures import Future | ||||||||
| from typing import Any, Dict, List, Union | ||||||||
| from typing import Any, Dict, List, Optional, Union | ||||||||
|
|
||||||||
| from deepsparse.v2.operators import EngineOperator, Operator | ||||||||
| from deepsparse.v2.routers import Router | ||||||||
|
|
@@ -69,10 +69,7 @@ def __init__( | |||||||
| self._scheduler_group = SchedulerGroup(self.schedulers) | ||||||||
|
|
||||||||
| def _run_next( | ||||||||
| self, | ||||||||
| inp: Any, | ||||||||
| inference_state: InferenceState, | ||||||||
| next_step: str, | ||||||||
| self, inp: Any, inference_state: InferenceState, next_step: str, **kwargs | ||||||||
| ): | ||||||||
| if ( | ||||||||
| isinstance(self.ops[next_step], EngineOperator) | ||||||||
|
|
@@ -89,10 +86,14 @@ def _run_next( | |||||||
| inp=inp, | ||||||||
| pipeline_state=self.pipeline_state, | ||||||||
| inference_state=inference_state, | ||||||||
| **kwargs, | ||||||||
| ) | ||||||||
|
|
||||||||
| def _run_sub_graphs( | ||||||||
| self, sub_graph_inputs: List[Any], sub_graphs: List[SubGraph] | ||||||||
| async def _run_sub_graphs( | ||||||||
| self, | ||||||||
| sub_graph_inputs: List[Any], | ||||||||
| sub_graphs: List[SubGraph], | ||||||||
| loop: Optional[asyncio.AbstractEventLoop] = None, | ||||||||
| ) -> List[Any]: | ||||||||
| """ | ||||||||
| Run a list of sub_graphs asynchronously. Polls to identify the sub graph that is | ||||||||
|
|
@@ -112,14 +113,16 @@ def _run_sub_graphs( | |||||||
| """ | ||||||||
| for i in range(len(sub_graphs)): | ||||||||
| sub_graphs[i].output = self._run_next( | ||||||||
| sub_graph_inputs[i], sub_graphs[i].inf, sub_graphs[i].step | ||||||||
| sub_graph_inputs[i], sub_graphs[i].inf, sub_graphs[i].step, loop=loop | ||||||||
| ) | ||||||||
|
|
||||||||
| # Execute all sub graphs until all graphs have been completed. | ||||||||
| while True: | ||||||||
| for sub_graph in sub_graphs: | ||||||||
| if isinstance(sub_graph.output, Future) and sub_graph.output.done(): | ||||||||
| if isinstance(sub_graph.output, (asyncio.Future, Future)): | ||||||||
| # get the result for the completed operator; resolve its output | ||||||||
| if isinstance(sub_graph.output, asyncio.Future): | ||||||||
| await sub_graph.output | ||||||||
| operator_output = sub_graph.output.result() | ||||||||
| operator_output = sub_graph.parse_output(operator_output) | ||||||||
|
|
||||||||
|
|
@@ -141,23 +144,87 @@ def _run_sub_graphs( | |||||||
| inp=operator_output, | ||||||||
| inference_state=sub_graph.inf, | ||||||||
| next_step=next_step, | ||||||||
| loop=loop, | ||||||||
| ) | ||||||||
| break | ||||||||
|
|
||||||||
| # keep running until all sub graphs have completed. | ||||||||
| if not any(isinstance(x.output, Future) for x in sub_graphs): | ||||||||
| if not any( | ||||||||
| isinstance(x.output, (asyncio.Future, Future)) for x in sub_graphs | ||||||||
| ): | ||||||||
| break | ||||||||
|
|
||||||||
| return [x.output for x in sub_graphs] | ||||||||
|
|
||||||||
| def _apply_split(self, inp: Any, inference_state: InferenceState): | ||||||||
| async def run_async(self, *args, inference_state: InferenceState, **kwargs): | ||||||||
| """ | ||||||||
| Split inputs using the pipeline's expand_inputs function. Inputs are split | ||||||||
| into a batch size of one when a SPLIT_ROUTE node is found in a given pipeline's | ||||||||
| provided router. The split batches are run asynchronously and then joined when | ||||||||
| a JOIN_ROUTE node is found, using the pipeline's condense_inputs function. | ||||||||
| Run through the operators using the provided router and scheduler. | ||||||||
| The input to a given operator is the output of the previous operator. | ||||||||
|
|
||||||||
| :param inference_state: inference_state for the pipeline. | ||||||||
| :param pipeline_state: pipeline_state for the pipeline. The values in the state | ||||||||
| are created during pipeline creation and are read-only during inference. | ||||||||
| """ | ||||||||
| loop = asyncio.get_running_loop() | ||||||||
|
|
||||||||
| next_step = self.router.START_ROUTE | ||||||||
| operator_output = None | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if there's any reason for an operator to have an output of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you could also consider adding another local variable to avoid overloading the use of
Suggested change
|
||||||||
|
|
||||||||
| while next_step != self.router.END_ROUTE: | ||||||||
| # Either a dictionary key or valid index | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment refers to |
||||||||
|
|
||||||||
| if next_step == self.router.SPLIT_ROUTE: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code flow is a bit funky, what do you think about making this an I think that would look like this: async def run_async(self, *args, inference_state: InferenceState, **kwargs):
"""
Run through the operators using the provided router and scheduler.
The input to a given operator is the output of the previous operator.
:param inference_state: inference_state for the pipeline.
:param pipeline_state: pipeline_state for the pipeline. The values in the state
are created during pipeline creation and are read-only during inference.
"""
loop = asyncio.get_running_loop()
next_step = self.router.START_ROUTE
operator_output = None
while next_step != self.router.END_ROUTE:
# Either a dictionary key or valid index
if next_step == self.router.START_ROUTE:
outputs = run_func(
*args,
func=self._scheduler_group.submit,
operator=self.ops[next_step],
inference_state=inference_state,
pipeline_state=self.pipeline_state,
loop=loop,
**kwargs,
)
await outputs
operator_output = outputs.result()
elif next_step == self.router.SPLIT_ROUTE:
if operator_output is None:
raise ValueError(
f"{self.router.SPLIT_ROUTE} should appear after "
f"{self.ROUTER.START_ROUTE}"
)
operator_output = await self._apply_split(
operator_output, inference_state, loop=loop
)
else:
outputs = self._run_next(
inp=operator_output,
next_step=next_step,
inference_state=inference_state,
loop=loop,
)
await outputs
operator_output = outputs.result()
if next_step == self.router.SPLIT_ROUTE:
next_step = self.router.route[self.router.JOIN_ROUTE]
continue
if isinstance(operator_output, tuple):
state_update = operator_output[-1]
operator_output = operator_output[0]
next_step = self.router.next(next_step, self.ops, operator_output)
if state_update:
inference_state.update_state(state_update)
return operator_output Maybe a little easier to reason about, but maybe not. |
||||||||
| if operator_output is None: | ||||||||
| raise ValueError( | ||||||||
| f"{self.router.SPLIT_ROUTE} should appear after " | ||||||||
| f"{self.ROUTER.START_ROUTE}" | ||||||||
| ) | ||||||||
|
|
||||||||
| operator_output = await self._apply_split( | ||||||||
| operator_output, inference_state, loop=loop | ||||||||
| ) | ||||||||
| next_step = self.router.route[self.router.JOIN_ROUTE] | ||||||||
| if next_step == self.router.END_ROUTE: | ||||||||
| return operator_output | ||||||||
|
|
||||||||
| if next_step == self.router.START_ROUTE: | ||||||||
| outputs = run_func( | ||||||||
| *args, | ||||||||
| func=self._scheduler_group.submit, | ||||||||
| operator=self.ops[next_step], | ||||||||
| inference_state=inference_state, | ||||||||
| pipeline_state=self.pipeline_state, | ||||||||
| loop=loop, | ||||||||
| **kwargs, | ||||||||
| ) | ||||||||
| await outputs | ||||||||
| operator_output = outputs.result() | ||||||||
|
|
||||||||
| else: | ||||||||
| outputs = self._run_next( | ||||||||
| inp=operator_output, | ||||||||
| next_step=next_step, | ||||||||
| inference_state=inference_state, | ||||||||
| loop=loop, | ||||||||
| ) | ||||||||
| await outputs | ||||||||
| operator_output = outputs.result() | ||||||||
|
|
||||||||
| if isinstance(operator_output, tuple): | ||||||||
| state_update = operator_output[-1] | ||||||||
| operator_output = operator_output[0] | ||||||||
dsikka marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
|
||||||||
| next_step = self.router.next(next_step, self.ops, operator_output) | ||||||||
| if state_update: | ||||||||
dsikka marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| inference_state.update_state(state_update) | ||||||||
| return operator_output | ||||||||
|
|
||||||||
| async def _apply_split( | ||||||||
| self, | ||||||||
| inp: Any, | ||||||||
| inference_state: InferenceState, | ||||||||
| loop: Optional[asyncio.AbstractEventLoop] = None, | ||||||||
| ): | ||||||||
| batches, orig_batch_size = self.expand_inputs(inp, 1) | ||||||||
|
|
||||||||
| # Create a list of SplitRoutes, per batch size 1 | ||||||||
|
|
@@ -173,8 +240,8 @@ def _apply_split(self, inp: Any, inference_state: InferenceState): | |||||||
| for i in range(len(batches)) | ||||||||
| ] | ||||||||
|
|
||||||||
| outputs = self._run_sub_graphs( | ||||||||
| sub_graph_inputs=batches, sub_graphs=split_graphs | ||||||||
| outputs = await self._run_sub_graphs( | ||||||||
| sub_graph_inputs=batches, sub_graphs=split_graphs, loop=loop | ||||||||
| ) | ||||||||
| return self.condense_inputs(outputs) | ||||||||
|
|
||||||||
|
|
@@ -205,7 +272,9 @@ def run( | |||||||
| f"{self.ROUTER.START_ROUTE}" | ||||||||
| ) | ||||||||
|
|
||||||||
| operator_output = self._apply_split(operator_output, inference_state) | ||||||||
| operator_output = asyncio.run( | ||||||||
| self._apply_split(operator_output, inference_state) | ||||||||
| ) | ||||||||
| next_step = self.router.route[self.router.JOIN_ROUTE] | ||||||||
| if next_step == self.router.END_ROUTE: | ||||||||
| return operator_output | ||||||||
|
|
@@ -237,8 +306,10 @@ def run( | |||||||
| end=[self.router.SPLIT_ROUTE, self.router.END_ROUTE], | ||||||||
| ) | ||||||||
|
|
||||||||
| operator_output = self._run_sub_graphs( | ||||||||
| sub_graph_inputs=[operator_output], sub_graphs=[graph] | ||||||||
| operator_output = asyncio.run( | ||||||||
| self._run_sub_graphs( | ||||||||
| sub_graph_inputs=[operator_output], sub_graphs=[graph] | ||||||||
| ) | ||||||||
| )[0] | ||||||||
|
|
||||||||
| inference_state = graph.inf | ||||||||
|
|
||||||||
Uh oh!
There was an error while loading. Please reload this page.