diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index b67b2733c46..6b81e2e7890 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -157,12 +157,14 @@ def activate_state_vertices(self, name: str, caller: str): edges_set.update(_vertex.edges) if _vertex.state == VertexStates.INACTIVE: _vertex.set_state("ACTIVE") + + vertices_ids.add(_vertex.id) edges = list(edges_set) predecessor_map, _ = self.build_adjacency_maps(edges) new_predecessor_map.update(predecessor_map) - # vertices_ids.update(new_predecessor_map.keys()) - # vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) + vertices_ids.update(new_predecessor_map.keys()) + vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) self.activated_vertices = list(vertices_ids) self.vertices_to_run.update(vertices_ids) @@ -1459,7 +1461,7 @@ def sort_layer_by_avg_build_time(vertices_ids: List[str]) -> List[str]: def is_vertex_runnable(self, vertex_id: str) -> bool: """Returns whether a vertex is runnable.""" - return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices) + return self.run_manager.is_vertex_runnable(self.get_vertex(vertex_id)) def build_run_map(self): """ @@ -1479,7 +1481,7 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str] This checks the direct predecessors of each successor to identify any that are immediately runnable, expanding the search to ensure progress can be made. """ - return self.run_manager.find_runnable_predecessors_for_successors(vertex_id, self.inactivated_vertices) + return self.run_manager.find_runnable_predecessors_for_successors(self.get_vertex(vertex_id)) def remove_from_predecessors(self, vertex_id: str): self.run_manager.remove_from_predecessors(vertex_id) diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index df3b1598471..fb9e120b653 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -46,33 +46,36 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set): self.vertices_to_run.update(vertices_to_run) self.build_run_map(self.run_predecessors, self.vertices_to_run) - def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) -> bool: + def is_vertex_runnable(self, vertex: "Vertex") -> bool: """Determines if a vertex is runnable.""" return ( - vertex_id not in self.vertices_being_run - and not self.run_predecessors.get(vertex_id) - and vertex_id not in inactivated_vertices + vertex.is_active() and self.are_all_predecessors_fulfilled(vertex.id) and vertex.id in self.vertices_to_run ) - def find_runnable_predecessors_for_successors(self, vertex_id: str, inactivated_vertices: set[str]) -> List[str]: + def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool: + return not any(self.run_predecessors.get(vertex_id, [])) + + def find_runnable_predecessors_for_successors(self, vertex: "Vertex") -> List[str]: """Finds runnable predecessors for the successors of a given vertex.""" runnable_vertices = [] visited = set() + get_vertex = vertex.graph.get_vertex - def find_runnable_predecessors(predecessor_id: str): + def find_runnable_predecessors(predecessor: "Vertex"): + predecessor_id = predecessor.id if predecessor_id in visited: return visited.add(predecessor_id) - if self.is_vertex_runnable(predecessor_id, inactivated_vertices): + if self.is_vertex_runnable(predecessor): runnable_vertices.append(predecessor_id) else: for pred_pred_id in self.run_predecessors.get(predecessor_id, []): - find_runnable_predecessors(pred_pred_id) + find_runnable_predecessors(get_vertex(pred_pred_id)) - for successor_id in self.run_map.get(vertex_id, []): + for successor_id in self.run_map.get(vertex.id, []): for predecessor_id in self.run_predecessors.get(successor_id, []): - find_runnable_predecessors(predecessor_id) + find_runnable_predecessors(get_vertex(predecessor_id)) return runnable_vertices @@ -97,7 +100,6 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool): if is_runnable: self.vertices_to_run.add(vertex_id) else: - self.vertices_to_run.discard(vertex_id) self.vertices_being_run.discard(vertex_id) async def get_next_runnable_vertices( @@ -124,14 +126,10 @@ async def get_next_runnable_vertices( """ async with lock: self.remove_vertex_from_runnables(vertex.id) - direct_successors_ready = [ - v for v in vertex.successors_ids if self.is_vertex_runnable(v, graph.inactivated_vertices) - ] + direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(graph.get_vertex(v))] if not direct_successors_ready: # No direct successors ready, look for runnable predecessors of successors - next_runnable_vertices = self.find_runnable_predecessors_for_successors( - vertex.id, graph.inactivated_vertices - ) + next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex) else: next_runnable_vertices = direct_successors_ready diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index a3ad19dc225..2bfc16d1f1e 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -108,6 +108,9 @@ def set_state(self, state: str): elif self.state == VertexStates.ACTIVE and self.id in self.graph.inactivated_vertices: self.graph.inactivated_vertices.remove(self.id) + def is_active(self): + return self.state == VertexStates.ACTIVE + @property def avg_build_time(self): return sum(self.build_times) / len(self.build_times) if self.build_times else 0 diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 331c1ed0691..f37d484a43e 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -510,11 +510,23 @@ const useFlowStore = create((set, get) => ({ const top_level_vertices = vertexBuildData.top_level_vertices.filter( (vertex) => !vertexBuildData.inactivated_vertices?.includes(vertex), ); - const nextVertices: VertexLayerElementType[] = zip( + let nextVertices: VertexLayerElementType[] = zip( next_vertices_ids, top_level_vertices, ).map(([id, reference]) => ({ id: id!, reference })); + // Now we filter nextVertices to remove any vertices that are in verticesLayers + // because they are already being built + // each layer is a list of vertexlayerelementtypes + let lastLayer = + get().verticesBuild!.verticesLayers[ + get().verticesBuild!.verticesLayers.length - 1 + ]; + nextVertices = nextVertices.filter( + (vertex) => + !lastLayer.some((layer) => layer.id === vertex.id) && + !lastLayer.some((layer) => layer.reference === vertex.reference), + ); const newLayers = [ ...get().verticesBuild!.verticesLayers, nextVertices, diff --git a/tests/unit/test_validate_code.py b/tests/unit/test_validate_code.py index c003ab10d46..a30fd619187 100644 --- a/tests/unit/test_validate_code.py +++ b/tests/unit/test_validate_code.py @@ -7,6 +7,20 @@ from langflow.utils.validate import create_function, execute_function, extract_function_name, validate_code +def test_create_function(): + code = """ +from pathlib import Path + +def my_function(x: str) -> Path: + return Path(x) +""" + + function_name = extract_function_name(code) + function = create_function(code, function_name) + result = function("test") + assert result == Path("test") + + def test_validate_code(): # Test case with a valid import and function code1 = """ @@ -88,17 +102,3 @@ def my_function(x): with mock.patch("requests.get", side_effect=MissingSchema): with pytest.raises(MissingSchema): execute_function(code, "my_function", "invalid_url") - - -def test_create_function(): - code = """ -from pathlib import Path - -def my_function(x: str) -> Path: - return Path(x) -""" - - function_name = extract_function_name(code) - function = create_function(code, function_name) - result = function("test") - assert result == Path("test")