From 7bde9e22b6936633e7aa6c87be20b2cdc4d1a21e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 18:18:51 -0300 Subject: [PATCH 1/8] feat: improve is_vertex_runnable method in RunnableVerticesManager This commit improves the `is_vertex_runnable` method in the `RunnableVerticesManager` class. It adds an additional parameter `activated_vertices` to the method signature and updates the logic to check if a vertex is runnable based on the presence of activated vertices. This enhancement improves the accuracy of determining whether a vertex is runnable or not. --- .../graph/graph/runnable_vertices_manager.py | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index df3b1598471..88dda02d58d 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -46,16 +46,25 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set): self.vertices_to_run.update(vertices_to_run) self.build_run_map(self.run_predecessors, self.vertices_to_run) - def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) -> bool: + def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: set[str]) -> bool: """Determines if a vertex is runnable.""" + if not activated_vertices: + # check vertices_to_run + should_run = vertex_id in self.vertices_to_run + else: + # check run_predecessors + should_run = vertex_id in activated_vertices return ( vertex_id not in self.vertices_being_run and not self.run_predecessors.get(vertex_id) and vertex_id not in inactivated_vertices + and should_run ) - def find_runnable_predecessors_for_successors(self, vertex_id: str, inactivated_vertices: set[str]) -> List[str]: + def find_runnable_predecessors_for_successors( + self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: set[str] + ) -> List[str]: """Finds runnable predecessors for the successors of a given vertex.""" runnable_vertices = [] visited = set() @@ -64,7 +73,7 @@ def find_runnable_predecessors(predecessor_id: str): if predecessor_id in visited: return visited.add(predecessor_id) - if self.is_vertex_runnable(predecessor_id, inactivated_vertices): + if self.is_vertex_runnable(predecessor_id, inactivated_vertices, activated_vertices): runnable_vertices.append(predecessor_id) else: for pred_pred_id in self.run_predecessors.get(predecessor_id, []): @@ -125,12 +134,14 @@ async def get_next_runnable_vertices( async with lock: self.remove_vertex_from_runnables(vertex.id) direct_successors_ready = [ - v for v in vertex.successors_ids if self.is_vertex_runnable(v, graph.inactivated_vertices) + v + for v in vertex.successors_ids + if self.is_vertex_runnable(v, graph.inactivated_vertices, graph.activated_vertices) ] if not direct_successors_ready: # No direct successors ready, look for runnable predecessors of successors next_runnable_vertices = self.find_runnable_predecessors_for_successors( - vertex.id, graph.inactivated_vertices + vertex.id, graph.inactivated_vertices, graph.activated_vertices ) else: next_runnable_vertices = direct_successors_ready From ff4a5d2236e02b1c5d9d9c9775cee23ccc453621 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 18:32:50 -0300 Subject: [PATCH 2/8] fix: add predecessors to vertices_to_run --- src/backend/base/langflow/graph/graph/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index b67b2733c46..86ba5d329c7 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -157,12 +157,14 @@ def activate_state_vertices(self, name: str, caller: str): edges_set.update(_vertex.edges) if _vertex.state == VertexStates.INACTIVE: _vertex.set_state("ACTIVE") + + vertices_ids.add(_vertex.id) edges = list(edges_set) predecessor_map, _ = self.build_adjacency_maps(edges) new_predecessor_map.update(predecessor_map) - # vertices_ids.update(new_predecessor_map.keys()) - # vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) + vertices_ids.update(new_predecessor_map.keys()) + vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) self.activated_vertices = list(vertices_ids) self.vertices_to_run.update(vertices_ids) From 4e75ac7e8764ae41c2e48bfc45370229400f496e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 19:28:35 -0300 Subject: [PATCH 3/8] style: fix lint issues --- src/backend/base/langflow/graph/graph/base.py | 6 ++++-- .../base/langflow/graph/graph/runnable_vertices_manager.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 86ba5d329c7..90cee088036 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1461,7 +1461,7 @@ def sort_layer_by_avg_build_time(vertices_ids: List[str]) -> List[str]: def is_vertex_runnable(self, vertex_id: str) -> bool: """Returns whether a vertex is runnable.""" - return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices) + return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices, self.activated_vertices) def build_run_map(self): """ @@ -1481,7 +1481,9 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str] This checks the direct predecessors of each successor to identify any that are immediately runnable, expanding the search to ensure progress can be made. """ - return self.run_manager.find_runnable_predecessors_for_successors(vertex_id, self.inactivated_vertices) + return self.run_manager.find_runnable_predecessors_for_successors( + vertex_id, self.inactivated_vertices, self.activated_vertices + ) def remove_from_predecessors(self, vertex_id: str): self.run_manager.remove_from_predecessors(vertex_id) diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index 88dda02d58d..85e8dce791c 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -46,7 +46,7 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set): self.vertices_to_run.update(vertices_to_run) self.build_run_map(self.run_predecessors, self.vertices_to_run) - def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: set[str]) -> bool: + def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: list[str]) -> bool: """Determines if a vertex is runnable.""" if not activated_vertices: From c6df10936572bde8bb5356ad162a4eb20287131b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 21:24:34 -0300 Subject: [PATCH 4/8] feat: optimize vertex filtering in useFlowStore This commit optimizes the vertex filtering logic in the `useFlowStore` function in `flowStore.ts`. It introduces a more efficient way to filter out vertices that are already being built, resulting in improved performance and accuracy. --- src/frontend/src/stores/flowStore.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 331c1ed0691..f37d484a43e 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -510,11 +510,23 @@ const useFlowStore = create((set, get) => ({ const top_level_vertices = vertexBuildData.top_level_vertices.filter( (vertex) => !vertexBuildData.inactivated_vertices?.includes(vertex), ); - const nextVertices: VertexLayerElementType[] = zip( + let nextVertices: VertexLayerElementType[] = zip( next_vertices_ids, top_level_vertices, ).map(([id, reference]) => ({ id: id!, reference })); + // Now we filter nextVertices to remove any vertices that are in verticesLayers + // because they are already being built + // each layer is a list of vertexlayerelementtypes + let lastLayer = + get().verticesBuild!.verticesLayers[ + get().verticesBuild!.verticesLayers.length - 1 + ]; + nextVertices = nextVertices.filter( + (vertex) => + !lastLayer.some((layer) => layer.id === vertex.id) && + !lastLayer.some((layer) => layer.reference === vertex.reference), + ); const newLayers = [ ...get().verticesBuild!.verticesLayers, nextVertices, From 01c814b643de217a8d57698631a8b062d3e0a63d Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 21:55:05 -0300 Subject: [PATCH 5/8] refactor: add is_active method to Vertex class This commit adds the `is_active` method to the `Vertex` class in the `base.py` file. The `is_active` method checks if the state of the vertex is set to `ACTIVE` and returns a boolean value accordingly. This enhancement improves the readability and maintainability of the codebase. --- src/backend/base/langflow/graph/vertex/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index a3ad19dc225..2bfc16d1f1e 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -108,6 +108,9 @@ def set_state(self, state: str): elif self.state == VertexStates.ACTIVE and self.id in self.graph.inactivated_vertices: self.graph.inactivated_vertices.remove(self.id) + def is_active(self): + return self.state == VertexStates.ACTIVE + @property def avg_build_time(self): return sum(self.build_times) / len(self.build_times) if self.build_times else 0 From 5edb6c977f2f355b7d1ef5fc73451cd9deec4321 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 21:55:59 -0300 Subject: [PATCH 6/8] refactor: improve is_vertex_runnable method in RunnableVerticesManager --- src/backend/base/langflow/graph/graph/base.py | 2 +- .../graph/graph/runnable_vertices_manager.py | 43 +++++++------------ 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 90cee088036..2bc34ab6dbc 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1461,7 +1461,7 @@ def sort_layer_by_avg_build_time(vertices_ids: List[str]) -> List[str]: def is_vertex_runnable(self, vertex_id: str) -> bool: """Returns whether a vertex is runnable.""" - return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices, self.activated_vertices) + return self.run_manager.is_vertex_runnable(self.get_vertex(vertex_id)) def build_run_map(self): """ diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index 85e8dce791c..fb9e120b653 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -46,42 +46,36 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set): self.vertices_to_run.update(vertices_to_run) self.build_run_map(self.run_predecessors, self.vertices_to_run) - def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: list[str]) -> bool: + def is_vertex_runnable(self, vertex: "Vertex") -> bool: """Determines if a vertex is runnable.""" - if not activated_vertices: - # check vertices_to_run - should_run = vertex_id in self.vertices_to_run - else: - # check run_predecessors - should_run = vertex_id in activated_vertices return ( - vertex_id not in self.vertices_being_run - and not self.run_predecessors.get(vertex_id) - and vertex_id not in inactivated_vertices - and should_run + vertex.is_active() and self.are_all_predecessors_fulfilled(vertex.id) and vertex.id in self.vertices_to_run ) - def find_runnable_predecessors_for_successors( - self, vertex_id: str, inactivated_vertices: set[str], activated_vertices: set[str] - ) -> List[str]: + def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool: + return not any(self.run_predecessors.get(vertex_id, [])) + + def find_runnable_predecessors_for_successors(self, vertex: "Vertex") -> List[str]: """Finds runnable predecessors for the successors of a given vertex.""" runnable_vertices = [] visited = set() + get_vertex = vertex.graph.get_vertex - def find_runnable_predecessors(predecessor_id: str): + def find_runnable_predecessors(predecessor: "Vertex"): + predecessor_id = predecessor.id if predecessor_id in visited: return visited.add(predecessor_id) - if self.is_vertex_runnable(predecessor_id, inactivated_vertices, activated_vertices): + if self.is_vertex_runnable(predecessor): runnable_vertices.append(predecessor_id) else: for pred_pred_id in self.run_predecessors.get(predecessor_id, []): - find_runnable_predecessors(pred_pred_id) + find_runnable_predecessors(get_vertex(pred_pred_id)) - for successor_id in self.run_map.get(vertex_id, []): + for successor_id in self.run_map.get(vertex.id, []): for predecessor_id in self.run_predecessors.get(successor_id, []): - find_runnable_predecessors(predecessor_id) + find_runnable_predecessors(get_vertex(predecessor_id)) return runnable_vertices @@ -106,7 +100,6 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool): if is_runnable: self.vertices_to_run.add(vertex_id) else: - self.vertices_to_run.discard(vertex_id) self.vertices_being_run.discard(vertex_id) async def get_next_runnable_vertices( @@ -133,16 +126,10 @@ async def get_next_runnable_vertices( """ async with lock: self.remove_vertex_from_runnables(vertex.id) - direct_successors_ready = [ - v - for v in vertex.successors_ids - if self.is_vertex_runnable(v, graph.inactivated_vertices, graph.activated_vertices) - ] + direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(graph.get_vertex(v))] if not direct_successors_ready: # No direct successors ready, look for runnable predecessors of successors - next_runnable_vertices = self.find_runnable_predecessors_for_successors( - vertex.id, graph.inactivated_vertices, graph.activated_vertices - ) + next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex) else: next_runnable_vertices = direct_successors_ready From 7391e32f1eaa9f40b4036abb84eba07b00cc0ea0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 22:40:49 -0300 Subject: [PATCH 7/8] refactor: improve find_runnable_predecessors_for_successors method in Graph class --- src/backend/base/langflow/graph/graph/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 2bc34ab6dbc..6b81e2e7890 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1481,9 +1481,7 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str] This checks the direct predecessors of each successor to identify any that are immediately runnable, expanding the search to ensure progress can be made. """ - return self.run_manager.find_runnable_predecessors_for_successors( - vertex_id, self.inactivated_vertices, self.activated_vertices - ) + return self.run_manager.find_runnable_predecessors_for_successors(self.get_vertex(vertex_id)) def remove_from_predecessors(self, vertex_id: str): self.run_manager.remove_from_predecessors(vertex_id) From fd359d78a3665fddd292941cc464aa4196f99c92 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Jul 2024 22:53:37 -0300 Subject: [PATCH 8/8] refactor: move test_create_function --- tests/unit/test_validate_code.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_validate_code.py b/tests/unit/test_validate_code.py index c003ab10d46..a30fd619187 100644 --- a/tests/unit/test_validate_code.py +++ b/tests/unit/test_validate_code.py @@ -7,6 +7,20 @@ from langflow.utils.validate import create_function, execute_function, extract_function_name, validate_code +def test_create_function(): + code = """ +from pathlib import Path + +def my_function(x: str) -> Path: + return Path(x) +""" + + function_name = extract_function_name(code) + function = create_function(code, function_name) + result = function("test") + assert result == Path("test") + + def test_validate_code(): # Test case with a valid import and function code1 = """ @@ -88,17 +102,3 @@ def my_function(x): with mock.patch("requests.get", side_effect=MissingSchema): with pytest.raises(MissingSchema): execute_function(code, "my_function", "invalid_url") - - -def test_create_function(): - code = """ -from pathlib import Path - -def my_function(x: str) -> Path: - return Path(x) -""" - - function_name = extract_function_name(code) - function = create_function(code, function_name) - result = function("test") - assert result == Path("test")