Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve vertex filtering and update is_vertex_runnable logic #2612

Merged
merged 9 commits into from
Jul 10, 2024
10 changes: 6 additions & 4 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ def activate_state_vertices(self, name: str, caller: str):
edges_set.update(_vertex.edges)
if _vertex.state == VertexStates.INACTIVE:
_vertex.set_state("ACTIVE")

vertices_ids.add(_vertex.id)
edges = list(edges_set)
predecessor_map, _ = self.build_adjacency_maps(edges)
new_predecessor_map.update(predecessor_map)

# vertices_ids.update(new_predecessor_map.keys())
# vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)
vertices_ids.update(new_predecessor_map.keys())
vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)

self.activated_vertices = list(vertices_ids)
self.vertices_to_run.update(vertices_ids)
Expand Down Expand Up @@ -1459,7 +1461,7 @@ def sort_layer_by_avg_build_time(vertices_ids: List[str]) -> List[str]:

def is_vertex_runnable(self, vertex_id: str) -> bool:
"""Returns whether a vertex is runnable."""
return self.run_manager.is_vertex_runnable(vertex_id, self.inactivated_vertices)
return self.run_manager.is_vertex_runnable(self.get_vertex(vertex_id))

def build_run_map(self):
"""
Expand All @@ -1479,7 +1481,7 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str]
This checks the direct predecessors of each successor to identify any that are
immediately runnable, expanding the search to ensure progress can be made.
"""
return self.run_manager.find_runnable_predecessors_for_successors(vertex_id, self.inactivated_vertices)
return self.run_manager.find_runnable_predecessors_for_successors(self.get_vertex(vertex_id))

def remove_from_predecessors(self, vertex_id: str):
self.run_manager.remove_from_predecessors(vertex_id)
Expand Down
32 changes: 15 additions & 17 deletions src/backend/base/langflow/graph/graph/runnable_vertices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,36 @@ def update_run_state(self, run_predecessors: dict, vertices_to_run: set):
self.vertices_to_run.update(vertices_to_run)
self.build_run_map(self.run_predecessors, self.vertices_to_run)

def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) -> bool:
def is_vertex_runnable(self, vertex: "Vertex") -> bool:
"""Determines if a vertex is runnable."""

return (
vertex_id not in self.vertices_being_run
and not self.run_predecessors.get(vertex_id)
and vertex_id not in inactivated_vertices
vertex.is_active() and self.are_all_predecessors_fulfilled(vertex.id) and vertex.id in self.vertices_to_run
)

def find_runnable_predecessors_for_successors(self, vertex_id: str, inactivated_vertices: set[str]) -> List[str]:
def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool:
return not any(self.run_predecessors.get(vertex_id, []))

def find_runnable_predecessors_for_successors(self, vertex: "Vertex") -> List[str]:
"""Finds runnable predecessors for the successors of a given vertex."""
runnable_vertices = []
visited = set()
get_vertex = vertex.graph.get_vertex

def find_runnable_predecessors(predecessor_id: str):
def find_runnable_predecessors(predecessor: "Vertex"):
predecessor_id = predecessor.id
if predecessor_id in visited:
return
visited.add(predecessor_id)
if self.is_vertex_runnable(predecessor_id, inactivated_vertices):
if self.is_vertex_runnable(predecessor):
runnable_vertices.append(predecessor_id)
else:
for pred_pred_id in self.run_predecessors.get(predecessor_id, []):
find_runnable_predecessors(pred_pred_id)
find_runnable_predecessors(get_vertex(pred_pred_id))

for successor_id in self.run_map.get(vertex_id, []):
for successor_id in self.run_map.get(vertex.id, []):
for predecessor_id in self.run_predecessors.get(successor_id, []):
find_runnable_predecessors(predecessor_id)
find_runnable_predecessors(get_vertex(predecessor_id))

return runnable_vertices

Expand All @@ -97,7 +100,6 @@ def update_vertex_run_state(self, vertex_id: str, is_runnable: bool):
if is_runnable:
self.vertices_to_run.add(vertex_id)
else:
self.vertices_to_run.discard(vertex_id)
self.vertices_being_run.discard(vertex_id)

async def get_next_runnable_vertices(
Expand All @@ -124,14 +126,10 @@ async def get_next_runnable_vertices(
"""
async with lock:
self.remove_vertex_from_runnables(vertex.id)
direct_successors_ready = [
v for v in vertex.successors_ids if self.is_vertex_runnable(v, graph.inactivated_vertices)
]
direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(graph.get_vertex(v))]
if not direct_successors_ready:
# No direct successors ready, look for runnable predecessors of successors
next_runnable_vertices = self.find_runnable_predecessors_for_successors(
vertex.id, graph.inactivated_vertices
)
next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex)
else:
next_runnable_vertices = direct_successors_ready

Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def set_state(self, state: str):
elif self.state == VertexStates.ACTIVE and self.id in self.graph.inactivated_vertices:
self.graph.inactivated_vertices.remove(self.id)

def is_active(self):
return self.state == VertexStates.ACTIVE

@property
def avg_build_time(self):
return sum(self.build_times) / len(self.build_times) if self.build_times else 0
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/stores/flowStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,23 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
const top_level_vertices = vertexBuildData.top_level_vertices.filter(
(vertex) => !vertexBuildData.inactivated_vertices?.includes(vertex),
);
const nextVertices: VertexLayerElementType[] = zip(
let nextVertices: VertexLayerElementType[] = zip(
next_vertices_ids,
top_level_vertices,
).map(([id, reference]) => ({ id: id!, reference }));

// Now we filter nextVertices to remove any vertices that are in verticesLayers
// because they are already being built
// each layer is a list of vertexlayerelementtypes
let lastLayer =
get().verticesBuild!.verticesLayers[
get().verticesBuild!.verticesLayers.length - 1
];
nextVertices = nextVertices.filter(
(vertex) =>
!lastLayer.some((layer) => layer.id === vertex.id) &&
!lastLayer.some((layer) => layer.reference === vertex.reference),
);
const newLayers = [
...get().verticesBuild!.verticesLayers,
nextVertices,
Expand Down
28 changes: 14 additions & 14 deletions tests/unit/test_validate_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@
from langflow.utils.validate import create_function, execute_function, extract_function_name, validate_code


def test_create_function():
code = """
from pathlib import Path

def my_function(x: str) -> Path:
return Path(x)
"""

function_name = extract_function_name(code)
function = create_function(code, function_name)
result = function("test")
assert result == Path("test")


def test_validate_code():
# Test case with a valid import and function
code1 = """
Expand Down Expand Up @@ -88,17 +102,3 @@ def my_function(x):
with mock.patch("requests.get", side_effect=MissingSchema):
with pytest.raises(MissingSchema):
execute_function(code, "my_function", "invalid_url")


def test_create_function():
code = """
from pathlib import Path

def my_function(x: str) -> Path:
return Path(x)
"""

function_name = extract_function_name(code)
function = create_function(code, function_name)
result = function("test")
assert result == Path("test")
Loading