Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add vertices_being_run set to RunnableVerticesManager #2589

Merged
merged 7 commits into from
Jul 8, 2024
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def retrieve_vertices_order(
# We need to get the id of each vertex
# and return the same structure but only with the ids
components_count = len(graph.vertices)
vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run)
vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run)))
await chat_service.set_cache(str(flow_id), graph)
background_tasks.add_task(
telemetry_service.log_package_playground,
Expand Down
24 changes: 13 additions & 11 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ def activate_state_vertices(self, name: str, caller: str):
vertices_ids = set()
new_predecessor_map = {}
for vertex_id in self._is_state_vertices:
if vertex_id == caller:
continue
caller_vertex = self.get_vertex(caller)
vertex = self.get_vertex(vertex_id)
if vertex_id == caller or vertex.display_name == caller_vertex.display_name:
continue
if (
isinstance(vertex._raw_params["name"], str)
and name in vertex._raw_params["name"]
Expand All @@ -152,15 +153,17 @@ def activate_state_vertices(self, name: str, caller: str):
# and run self.build_adjacency_maps(edges) to get the new predecessor map
# that is not complete but we can use to update the run_predecessors
edges_set = set()
for vertex in [vertex] + successors:
edges_set.update(vertex.edges)
vertices_ids.add(vertex.id)
if vertex.state == VertexStates.INACTIVE:
vertex.set_state("ACTIVE")
for _vertex in [vertex] + successors:
edges_set.update(_vertex.edges)
if _vertex.state == VertexStates.INACTIVE:
_vertex.set_state("ACTIVE")
edges = list(edges_set)
predecessor_map, _ = self.build_adjacency_maps(edges)
new_predecessor_map.update(predecessor_map)

# vertices_ids.update(new_predecessor_map.keys())
# vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)

self.activated_vertices = list(vertices_ids)
self.vertices_to_run.update(vertices_ids)
self.run_manager.update_run_state(
Expand Down Expand Up @@ -861,6 +864,7 @@ async def build_vertex(
ValueError: If no result is found for the vertex.
"""
vertex = self.get_vertex(vertex_id)
self.run_manager.add_to_vertices_being_run(vertex_id)
try:
params = ""
if vertex.frozen:
Expand Down Expand Up @@ -1494,8 +1498,6 @@ def build_adjacency_maps(self, edges: List[ContractEdge]) -> Tuple[Dict[str, Lis
predecessor_map: dict[str, list[str]] = defaultdict(list)
successor_map: dict[str, list[str]] = defaultdict(list)
for edge in edges:
if edge.source_id not in predecessor_map[edge.target_id]:
predecessor_map[edge.target_id].append(edge.source_id)
if edge.target_id not in successor_map[edge.source_id]:
successor_map[edge.source_id].append(edge.target_id)
predecessor_map[edge.target_id].append(edge.source_id)
successor_map[edge.source_id].append(edge.target_id)
return predecessor_map, successor_map
12 changes: 10 additions & 2 deletions src/backend/base/langflow/graph/graph/runnable_vertices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self):
self.run_map = defaultdict(list) # Tracks successors of each vertex
self.run_predecessors = defaultdict(set) # Tracks predecessors for each vertex
self.vertices_to_run = set() # Set of vertices that are ready to run
self.vertices_being_run = set() # Set of vertices that are currently running

def to_dict(self) -> dict:
return {
Expand Down Expand Up @@ -49,7 +50,7 @@ def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) ->
"""Determines if a vertex is runnable."""

return (
vertex_id in self.vertices_to_run
vertex_id not in self.vertices_being_run
and not self.run_predecessors.get(vertex_id)
and vertex_id not in inactivated_vertices
)
Expand Down Expand Up @@ -88,6 +89,7 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool):
self.vertices_to_run.add(vertex_id)
else:
self.vertices_to_run.discard(vertex_id)
self.vertices_being_run.discard(vertex_id)

async def get_next_runnable_vertices(
self,
Expand Down Expand Up @@ -125,7 +127,10 @@ async def get_next_runnable_vertices(
next_runnable_vertices = direct_successors_ready

for v_id in set(next_runnable_vertices): # Use set to avoid duplicates
self.remove_vertex_from_runnables(v_id)
if vertex.id == v_id:
next_runnable_vertices.remove(v_id)
else:
self.add_to_vertices_being_run(v_id)
if cache:
await set_cache_coro(data=graph, lock=lock) # type: ignore
return next_runnable_vertices
Expand All @@ -134,6 +139,9 @@ def remove_vertex_from_runnables(self, v_id):
self.update_vertex_run_state(v_id, is_runnable=False)
self.remove_from_predecessors(v_id)

def add_to_vertices_being_run(self, v_id):
self.vertices_being_run.add(v_id)

@staticmethod
def get_top_level_vertices(graph, vertices_ids):
"""
Expand Down