Skip to content

Commit

Permalink
fix: Improve vertex filtering and update is_vertex_runnable logic (la…
Browse files Browse the repository at this point in the history
…ngflow-ai#2612)

* feat: improve is_vertex_runnable method in RunnableVerticesManager

This commit improves the `is_vertex_runnable` method in the `RunnableVerticesManager` class. It adds an additional parameter `activated_vertices` to the method signature and updates the logic to check if a vertex is runnable based on the presence of activated vertices. This enhancement improves the accuracy of determining whether a vertex is runnable or not.

* fix: add predecessors to vertices_to_run

* style: fix lint issues

* feat: optimize vertex filtering in useFlowStore

This commit optimizes the vertex filtering logic in the `useFlowStore` function in `flowStore.ts`. It introduces a more efficient way to filter out vertices that are already being built, resulting in improved performance and accuracy.

* refactor: add is_active method to Vertex class

This commit adds the `is_active` method to the `Vertex` class in the `base.py` file. The `is_active` method checks if the state of the vertex is set to `ACTIVE` and returns a boolean value accordingly. This enhancement improves the readability and maintainability of the codebase.

* refactor: improve is_vertex_runnable method in RunnableVerticesManager

* refactor: improve find_runnable_predecessors_for_successors method in Graph class

* refactor: move test_create_function

(cherry picked from commit d28fe8e)
  • Loading branch information
ogabrielluiz authored and nicoloboschi committed Jul 11, 2024
1 parent 6f6fe23 commit 946f961
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 36 deletions.
10 changes: 6 additions & 4 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ def activate_state_vertices(self, name: str, caller: str):
edges_set.update(_vertex.edges)
if _vertex.state == VertexStates.INACTIVE:
_vertex.set_state("ACTIVE")

vertices_ids.add(_vertex.id)
edges = list(edges_set)
predecessor_map, _ = self.build_adjacency_maps(edges)
new_predecessor_map.update(predecessor_map)

# vertices_ids.update(new_predecessor_map.keys())
# vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)
vertices_ids.update(new_predecessor_map.keys())
vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)

self.activated_vertices = list(vertices_ids)
self.vertices_to_run.update(vertices_ids)
Expand Down Expand Up @@ -1459,7 +1461,7 @@ def sort_layer_by_avg_build_time(vertices_ids: List[str]) -> List[str]:

def is_vertex_runnable(self, vertex_id: str) -> bool:
"""Returns whether a vertex is runnable."""
return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices)
return self.run_manager.is_vertex_runnable(self.get_vertex(vertex_id))

def build_run_map(self):
"""
Expand All @@ -1479,7 +1481,7 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str]
This checks the direct predecessors of each successor to identify any that are
immediately runnable, expanding the search to ensure progress can be made.
"""
return self.run_manager.find_runnable_predecessors_for_successors(vertex_id, self.inactivated_vertices)
return self.run_manager.find_runnable_predecessors_for_successors(self.get_vertex(vertex_id))

def remove_from_predecessors(self, vertex_id: str):
self.run_manager.remove_from_predecessors(vertex_id)
Expand Down
32 changes: 15 additions & 17 deletions src/backend/base/langflow/graph/graph/runnable_vertices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,36 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set):
self.vertices_to_run.update(vertices_to_run)
self.build_run_map(self.run_predecessors, self.vertices_to_run)

def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) -> bool:
def is_vertex_runnable(self, vertex: "Vertex") -> bool:
"""Determines if a vertex is runnable."""

return (
vertex_id not in self.vertices_being_run
and not self.run_predecessors.get(vertex_id)
and vertex_id not in inactivated_vertices
vertex.is_active() and self.are_all_predecessors_fulfilled(vertex.id) and vertex.id in self.vertices_to_run
)

def find_runnable_predecessors_for_successors(self, vertex_id: str, inactivated_vertices: set[str]) -> List[str]:
def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool:
return not any(self.run_predecessors.get(vertex_id, []))

def find_runnable_predecessors_for_successors(self, vertex: "Vertex") -> List[str]:
"""Finds runnable predecessors for the successors of a given vertex."""
runnable_vertices = []
visited = set()
get_vertex = vertex.graph.get_vertex

def find_runnable_predecessors(predecessor_id: str):
def find_runnable_predecessors(predecessor: "Vertex"):
predecessor_id = predecessor.id
if predecessor_id in visited:
return
visited.add(predecessor_id)
if self.is_vertex_runnable(predecessor_id, inactivated_vertices):
if self.is_vertex_runnable(predecessor):
runnable_vertices.append(predecessor_id)
else:
for pred_pred_id in self.run_predecessors.get(predecessor_id, []):
find_runnable_predecessors(pred_pred_id)
find_runnable_predecessors(get_vertex(pred_pred_id))

for successor_id in self.run_map.get(vertex_id, []):
for successor_id in self.run_map.get(vertex.id, []):
for predecessor_id in self.run_predecessors.get(successor_id, []):
find_runnable_predecessors(predecessor_id)
find_runnable_predecessors(get_vertex(predecessor_id))

return runnable_vertices

Expand All @@ -97,7 +100,6 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool):
if is_runnable:
self.vertices_to_run.add(vertex_id)
else:
self.vertices_to_run.discard(vertex_id)
self.vertices_being_run.discard(vertex_id)

async def get_next_runnable_vertices(
Expand All @@ -124,14 +126,10 @@ async def get_next_runnable_vertices(
"""
async with lock:
self.remove_vertex_from_runnables(vertex.id)
direct_successors_ready = [
v for v in vertex.successors_ids if self.is_vertex_runnable(v, graph.inactivated_vertices)
]
direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(graph.get_vertex(v))]
if not direct_successors_ready:
# No direct successors ready, look for runnable predecessors of successors
next_runnable_vertices = self.find_runnable_predecessors_for_successors(
vertex.id, graph.inactivated_vertices
)
next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex)
else:
next_runnable_vertices = direct_successors_ready

Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def set_state(self, state: str):
elif self.state == VertexStates.ACTIVE and self.id in self.graph.inactivated_vertices:
self.graph.inactivated_vertices.remove(self.id)

def is_active(self):
return self.state == VertexStates.ACTIVE

@property
def avg_build_time(self):
return sum(self.build_times) / len(self.build_times) if self.build_times else 0
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/stores/flowStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,23 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
const top_level_vertices = vertexBuildData.top_level_vertices.filter(
(vertex) => !vertexBuildData.inactivated_vertices?.includes(vertex),
);
const nextVertices: VertexLayerElementType[] = zip(
let nextVertices: VertexLayerElementType[] = zip(
next_vertices_ids,
top_level_vertices,
).map(([id, reference]) => ({ id: id!, reference }));

// Now we filter nextVertices to remove any vertices that are in verticesLayers
// because they are already being built
// each layer is a list of vertexlayerelementtypes
let lastLayer =
get().verticesBuild!.verticesLayers[
get().verticesBuild!.verticesLayers.length - 1
];
nextVertices = nextVertices.filter(
(vertex) =>
!lastLayer.some((layer) => layer.id === vertex.id) &&
!lastLayer.some((layer) => layer.reference === vertex.reference),
);
const newLayers = [
...get().verticesBuild!.verticesLayers,
nextVertices,
Expand Down
28 changes: 14 additions & 14 deletions tests/unit/test_validate_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@
from langflow.utils.validate import create_function, execute_function, extract_function_name, validate_code


def test_create_function():
code = """
from pathlib import Path
def my_function(x: str) -> Path:
return Path(x)
"""

function_name = extract_function_name(code)
function = create_function(code, function_name)
result = function("test")
assert result == Path("test")


def test_validate_code():
# Test case with a valid import and function
code1 = """
Expand Down Expand Up @@ -88,17 +102,3 @@ def my_function(x):
with mock.patch("requests.get", side_effect=MissingSchema):
with pytest.raises(MissingSchema):
execute_function(code, "my_function", "invalid_url")


def test_create_function():
code = """
from pathlib import Path
def my_function(x: str) -> Path:
return Path(x)
"""

function_name = extract_function_name(code)
function = create_function(code, function_name)
result = function("test")
assert result == Path("test")

0 comments on commit 946f961

Please sign in to comment.