Skip to content

Commit

Permalink
feat: add vertices_being_run set to RunnableVerticesManager (langflow…
Browse files Browse the repository at this point in the history
…-ai#2589)

* refactor(base.py): remove unnecessary conditional statements for adding edges to predecessor and successor maps in Graph class

* refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class

* refactor(graph/base.py): remove unnecessary line that adds vertex_id to vertices_ids set
refactor(graph/base.py): fix indentation for predecessor_map and successor_map dictionaries to improve code readability

* feat: Add vertices_being_run set to RunnableVerticesManager

This commit adds a new set called `vertices_being_run` to the `RunnableVerticesManager` class. This set keeps track of vertices that are currently running. The purpose of this set is to prevent a vertex from being considered runnable if it is already being run.

Co-authored-by: Gabriel Luiz Freitas Almeida <[email protected]>

* chore: Remove unnecessary vertex from next_runnable_vertices in RunnableVerticesManager

* feat: Update vertices_to_run logic in retrieve_vertices_order function

This commit updates the logic for retrieving the vertices to run in the `retrieve_vertices_order` function in `chat.py`. The previous implementation used the `list` function to convert the `vertices_to_run` set to a list and then concatenated it with the result of the `get_top_level_vertices` function. The updated logic uses the `union` method to combine the two sets directly. This change improves the efficiency and readability of the code.

* refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class
  • Loading branch information
ogabrielluiz committed Jul 9, 2024
1 parent 6ff2234 commit 6ea295f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def retrieve_vertices_order(
# We need to get the id of each vertex
# and return the same structure but only with the ids
components_count = len(graph.vertices)
vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run)
vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run)))
await chat_service.set_cache(str(flow_id), graph)
background_tasks.add_task(
telemetry_service.log_package_playground,
Expand Down
24 changes: 13 additions & 11 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ def activate_state_vertices(self, name: str, caller: str):
vertices_ids = set()
new_predecessor_map = {}
for vertex_id in self._is_state_vertices:
if vertex_id == caller:
continue
caller_vertex = self.get_vertex(caller)
vertex = self.get_vertex(vertex_id)
if vertex_id == caller or vertex.display_name == caller_vertex.display_name:
continue
if (
isinstance(vertex._raw_params["name"], str)
and name in vertex._raw_params["name"]
Expand All @@ -152,15 +153,17 @@ def activate_state_vertices(self, name: str, caller: str):
# and run self.build_adjacency_maps(edges) to get the new predecessor map
# that is not complete but we can use to update the run_predecessors
edges_set = set()
for vertex in [vertex] + successors:
edges_set.update(vertex.edges)
vertices_ids.add(vertex.id)
if vertex.state == VertexStates.INACTIVE:
vertex.set_state("ACTIVE")
for _vertex in [vertex] + successors:
edges_set.update(_vertex.edges)
if _vertex.state == VertexStates.INACTIVE:
_vertex.set_state("ACTIVE")
edges = list(edges_set)
predecessor_map, _ = self.build_adjacency_maps(edges)
new_predecessor_map.update(predecessor_map)

# vertices_ids.update(new_predecessor_map.keys())
# vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)

self.activated_vertices = list(vertices_ids)
self.vertices_to_run.update(vertices_ids)
self.run_manager.update_run_state(
Expand Down Expand Up @@ -861,6 +864,7 @@ async def build_vertex(
ValueError: If no result is found for the vertex.
"""
vertex = self.get_vertex(vertex_id)
self.run_manager.add_to_vertices_being_run(vertex_id)
try:
params = ""
if vertex.frozen:
Expand Down Expand Up @@ -1494,8 +1498,6 @@ def build_adjacency_maps(self, edges: List[ContractEdge]) -> Tuple[Dict[str, Lis
predecessor_map: dict[str, list[str]] = defaultdict(list)
successor_map: dict[str, list[str]] = defaultdict(list)
for edge in edges:
if edge.source_id not in predecessor_map[edge.target_id]:
predecessor_map[edge.target_id].append(edge.source_id)
if edge.target_id not in successor_map[edge.source_id]:
successor_map[edge.source_id].append(edge.target_id)
predecessor_map[edge.target_id].append(edge.source_id)
successor_map[edge.source_id].append(edge.target_id)
return predecessor_map, successor_map
12 changes: 10 additions & 2 deletions src/backend/base/langflow/graph/graph/runnable_vertices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self):
self.run_map = defaultdict(list) # Tracks successors of each vertex
self.run_predecessors = defaultdict(set) # Tracks predecessors for each vertex
self.vertices_to_run = set() # Set of vertices that are ready to run
self.vertices_being_run = set() # Set of vertices that are currently running

def to_dict(self) -> dict:
return {
Expand Down Expand Up @@ -49,7 +50,7 @@ def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) ->
"""Determines if a vertex is runnable."""

return (
vertex_id in self.vertices_to_run
vertex_id not in self.vertices_being_run
and not self.run_predecessors.get(vertex_id)
and vertex_id not in inactivated_vertices
)
Expand Down Expand Up @@ -88,6 +89,7 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool):
self.vertices_to_run.add(vertex_id)
else:
self.vertices_to_run.discard(vertex_id)
self.vertices_being_run.discard(vertex_id)

async def get_next_runnable_vertices(
self,
Expand Down Expand Up @@ -125,7 +127,10 @@ async def get_next_runnable_vertices(
next_runnable_vertices = direct_successors_ready

for v_id in set(next_runnable_vertices): # Use set to avoid duplicates
self.remove_vertex_from_runnables(v_id)
if vertex.id == v_id:
next_runnable_vertices.remove(v_id)
else:
self.add_to_vertices_being_run(v_id)
if cache:
await set_cache_coro(data=graph, lock=lock) # type: ignore
return next_runnable_vertices
Expand All @@ -134,6 +139,9 @@ def remove_vertex_from_runnables(self, v_id):
self.update_vertex_run_state(v_id, is_runnable=False)
self.remove_from_predecessors(v_id)

def add_to_vertices_being_run(self, v_id):
self.vertices_being_run.add(v_id)

@staticmethod
def get_top_level_vertices(graph, vertices_ids):
"""
Expand Down

0 comments on commit 6ea295f

Please sign in to comment.