Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/deepsparse/v2/operators/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

from pydantic import BaseModel

from deepsparse.v2.utils import InferenceState
from deepsparse.v2.operators.registry import OperatorRegistry
from deepsparse.v2.utils import InferenceState


__all__ = ["Operator"]
Expand Down
11 changes: 3 additions & 8 deletions src/deepsparse/v2/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


import copy
from concurrent.futures import Future
from typing import Any, Dict, List, Union

from deepsparse.v2.operators import EngineOperator, Operator
Expand Down Expand Up @@ -116,9 +115,9 @@ def _run_sub_graphs(
)

# Execute all sub graphs until all graphs have been completed.
while True:
while any(not x.completed for x in sub_graphs):
for sub_graph in sub_graphs:
if isinstance(sub_graph.output, Future) and sub_graph.output.done():
if not sub_graph.completed:
# get the result for the completed operator; resolve its output
operator_output = sub_graph.output.result()
operator_output = sub_graph.parse_output(operator_output)
Expand All @@ -136,17 +135,13 @@ def _run_sub_graphs(
# update the output value
if next_step in sub_graph.end:
sub_graph.output = operator_output
sub_graph.completed = True
else:
sub_graph.output = self._run_next(
inp=operator_output,
inference_state=sub_graph.inf,
next_step=next_step,
)
break

# keep running until all sub graphs have completed.
if not any(isinstance(x.output, Future) for x in sub_graphs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what was the core issue? We were waiting idly for all the subgraphs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, I was using the concurrent Future done() method incorrectly.

break

return [x.output for x in sub_graphs]

Expand Down
3 changes: 1 addition & 2 deletions src/deepsparse/v2/text_generation/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ def __init__(
if continuous_batch_sizes:
if internal_kv_cache:
_LOGGER.warn(
"internal kv_cache is currently not supported with continuous ",
"batching",
"internal kv_cache is not supported with continuous_batching "
)
else:
continuous_batching_scheduler = self._get_continuous_batching_scheduler(
Expand Down
1 change: 1 addition & 0 deletions src/deepsparse/v2/utils/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SubGraph:
inf: InferenceState
end: List[str]
output: Any = None
completed: bool = False

def parse_output(self, operator_output: Any):
if isinstance(operator_output, tuple):
Expand Down