Skip to content

Commit

Permalink
Merge branch 'main' into fix-do-not-mark-self.stop-caller
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielluiz authored Aug 14, 2024
2 parents 6c3e199 + e5ee0ba commit 79e216c
Show file tree
Hide file tree
Showing 58 changed files with 826 additions and 856 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ def get_output(self, name: str) -> Any:
return self._outputs[name]
raise ValueError(f"Output {name} not found in {self.__class__.__name__}")

def set_on_output(self, name: str, **kwargs):
output = self.get_output(name)
for key, value in kwargs.items():
if not hasattr(output, key):
raise ValueError(f"Output {name} does not have a method {key}")
setattr(output, key, value)

def set_output_value(self, name: str, value: Any):
if name in self._outputs:
self._outputs[name].value = value
Expand Down
23 changes: 11 additions & 12 deletions src/backend/base/langflow/graph/edge/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, source: "Vertex", target: "Vertex", edge: EdgeData):
self.target_param: str | None = None
self._target_handle: TargetHandleDict | str | None = None
self._data = edge.copy()
self.is_cycle = False
if data := edge.get("data", {}):
self._source_handle = data.get("sourceHandle", {})
self._target_handle = cast(TargetHandleDict, data.get("targetHandle", {}))
Expand Down Expand Up @@ -159,10 +160,11 @@ def _legacy_validate_edge(self, source, target) -> None:
raise ValueError(f"Edge between {source.vertex_type} and {target.vertex_type} " f"has no matched type")

def __repr__(self) -> str:
return (
f"Edge(source={self.source_id}, target={self.target_id}, target_param={self.target_param}"
f", matched_type={self.matched_type})"
)
if (hasattr(self, "source_handle") and self.source_handle) and (
hasattr(self, "target_handle") and self.target_handle
):
return f"{self.source_id} -[{self.source_handle.name}->{self.target_handle.field_name}]-> {self.target_id}"
return f"{self.source_id} -[{self.target_param}]-> {self.target_id}"

def __hash__(self) -> int:
return hash(self.__repr__())
Expand All @@ -176,12 +178,16 @@ def __eq__(self, __o: object) -> bool:
and self.target_param == __o.target_param
)

def __str__(self) -> str:
return self.__repr__()


class ContractEdge(Edge):
class CycleEdge(Edge):
def __init__(self, source: "Vertex", target: "Vertex", raw_edge: EdgeData):
super().__init__(source, target, raw_edge)
self.is_fulfilled = False # Whether the contract has been fulfilled.
self.result: Any = None
self.is_cycle = True

async def honor(self, source: "Vertex", target: "Vertex") -> None:
"""
Expand Down Expand Up @@ -220,10 +226,3 @@ async def get_result_from_source(self, source: "Vertex", target: "Vertex"):
if target.params.get("message") == "":
return self.result
return self.result

def __repr__(self) -> str:
if (hasattr(self, "source_handle") and self.source_handle) and (
hasattr(self, "target_handle") and self.target_handle
):
return f"{self.source_id} -[{self.source_handle.name}->{self.target_handle.field_name}]-> {self.target_id}"
return f"{self.source_id} -[{self.target_param}]-> {self.target_id}"
100 changes: 62 additions & 38 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
from loguru import logger

from langflow.exceptions.component import ComponentBuildException
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge
from langflow.graph.edge.schema import EdgeData
from langflow.graph.graph.constants import Finish, lazy_load_vertex_dict
from langflow.graph.graph.runnable_vertices_manager import RunnableVerticesManager
from langflow.graph.graph.schema import GraphData, GraphDump, VertexBuildResult
from langflow.graph.graph.schema import GraphData, GraphDump, StartConfigDict, VertexBuildResult
from langflow.graph.graph.state_manager import GraphStateManager
from langflow.graph.graph.state_model import create_state_model_from_graph
from langflow.graph.graph.utils import find_start_component_id, process_flow, sort_up_to_vertex
from langflow.graph.graph.utils import find_start_component_id, process_flow, should_continue, sort_up_to_vertex
from langflow.graph.schema import InterfaceComponentTypes, RunOutputs
from langflow.graph.vertex.base import Vertex, VertexStates
from langflow.graph.vertex.schema import NodeData
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(
self.vertices_to_run: set[str] = set()
self.stop_vertex: Optional[str] = None
self.inactive_vertices: set = set()
self.edges: List[ContractEdge] = []
self.edges: List[CycleEdge] = []
self.vertices: List[Vertex] = []
self.run_manager = RunnableVerticesManager()
self.state_manager = GraphStateManager()
Expand All @@ -100,14 +100,16 @@ def __init__(
self._first_layer: List[str] = []
self._lock = asyncio.Lock()
self.raw_graph_data: GraphData = {"nodes": [], "edges": []}
self._is_cyclic: Optional[bool] = None
self._cycles: Optional[List[tuple[str, str]]] = None
try:
self.tracing_service: "TracingService" | None = get_tracing_service()
except Exception as exc:
logger.error(f"Error getting tracing service: {exc}")
self.tracing_service = None
if start is not None and end is not None:
self._set_start_and_end(start, end)
self.prepare()
self.prepare(start_component_id=start._id)
if (start is not None and end is None) or (start is None and end is not None):
raise ValueError("You must provide both input and output components")

Expand Down Expand Up @@ -247,7 +249,7 @@ def add_component_edge(self, source_id: str, output_input_tuple: Tuple[str, str]
}
self._add_edge(edge_data)

async def async_start(self, inputs: Optional[List[dict]] = None):
async def async_start(self, inputs: Optional[List[dict]] = None, max_iterations: Optional[int] = None):
if not self._prepared:
raise ValueError("Graph not prepared. Call prepare() first.")
# The idea is for this to return a generator that yields the result of
Expand All @@ -256,17 +258,40 @@ async def async_start(self, inputs: Optional[List[dict]] = None):
for key, value in _input.items():
vertex = self.get_vertex(key)
vertex.set_input_value(key, value)
while True:
# I want to keep a counter of how many tyimes result.vertex.id
# has been yielded
yielded_counts: dict[str, int] = defaultdict(int)

while should_continue(yielded_counts, max_iterations):
result = await self.astep()
yield result
if hasattr(result, "vertex"):
yielded_counts[result.vertex.id] += 1
if isinstance(result, Finish):
return

def start(self, inputs: Optional[List[dict]] = None) -> Generator:
raise ValueError("Max iterations reached")

def __apply_config(self, config: StartConfigDict):
for vertex in self.vertices:
if vertex._custom_component is None:
continue
for output in vertex._custom_component.outputs:
for key, value in config["output"].items():
setattr(output, key, value)

def start(
self,
inputs: Optional[List[dict]] = None,
max_iterations: Optional[int] = None,
config: Optional[StartConfigDict] = None,
) -> Generator:
if config is not None:
self.__apply_config(config)
#! Change this ASAP
nest_asyncio.apply()
loop = asyncio.get_event_loop()
async_gen = self.async_start(inputs)
async_gen = self.async_start(inputs, max_iterations)
async_gen_task = asyncio.ensure_future(async_gen.__anext__())

while True:
Expand Down Expand Up @@ -721,7 +746,7 @@ def metadata(self):
"flow_name": self.flow_name,
}

def build_graph_maps(self, edges: Optional[List[ContractEdge]] = None, vertices: Optional[List["Vertex"]] = None):
def build_graph_maps(self, edges: Optional[List[CycleEdge]] = None, vertices: Optional[List["Vertex"]] = None):
"""
Builds the adjacency maps for the graph.
"""
Expand Down Expand Up @@ -1252,7 +1277,7 @@ def get_vertex_edges(
vertex_id: str,
is_target: Optional[bool] = None,
is_source: Optional[bool] = None,
) -> List[ContractEdge]:
) -> List[CycleEdge]:
"""Returns a list of edges for a given vertex."""
# The idea here is to return the edges that have the vertex_id as source or target
# or both
Expand Down Expand Up @@ -1422,33 +1447,25 @@ def get_predecessors(self, vertex):
"""Returns the predecessors of a vertex."""
return [self.get_vertex(source_id) for source_id in self.predecessor_map.get(vertex.id, [])]

def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True):
# Recursively get the successors of the current vertex
# successors = vertex.successors
# if not successors:
# return []
# successors_result = []
# for successor in successors:
# # Just return a list of successors
# if recursive:
# next_successors = self.get_all_successors(successor)
# successors_result.extend(next_successors)
# successors_result.append(successor)
# return successors_result
# The above is the version without the flat parameter
# The below is the version with the flat parameter
# the flat parameter will define if each layer of successors
# becomes one list or if the result is a list of lists
# if flat is True, the result will be a list of vertices
# if flat is False, the result will be a list of lists of vertices
# each list will represent a layer of successors
def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True, visited=None):
if visited is None:
visited = set()

# Prevent revisiting vertices to avoid infinite loops in cyclic graphs
if vertex in visited:
return []

visited.add(vertex)

successors = vertex.successors
if not successors:
return []

successors_result = []

for successor in successors:
if recursive:
next_successors = self.get_all_successors(successor)
next_successors = self.get_all_successors(successor, recursive=recursive, flat=flat, visited=visited)
if flat:
successors_result.extend(next_successors)
else:
Expand All @@ -1457,6 +1474,10 @@ def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True):
successors_result.append(successor)
else:
successors_result.append([successor])

if not flat and successors_result:
return [successors] + successors_result

return successors_result

def get_successors(self, vertex: "Vertex") -> List["Vertex"]:
Expand All @@ -1483,29 +1504,29 @@ def get_vertex_neighbors(self, vertex: "Vertex") -> Dict["Vertex", int]:
neighbors[neighbor] += 1
return neighbors

def _build_edges(self) -> List[ContractEdge]:
def _build_edges(self) -> List[CycleEdge]:
"""Builds the edges of the graph."""
# Edge takes two vertices as arguments, so we need to build the vertices first
# and then build the edges
# if we can't find a vertex, we raise an error

edges: set[ContractEdge] = set()
edges: set[CycleEdge] = set()
for edge in self._edges:
new_edge = self.build_edge(edge)
edges.add(new_edge)
if self.vertices and not edges:
warnings.warn("Graph has vertices but no edges")
return list(edges)

def build_edge(self, edge: EdgeData) -> ContractEdge:
def build_edge(self, edge: EdgeData) -> CycleEdge:
source = self.get_vertex(edge["source"])
target = self.get_vertex(edge["target"])

if source is None:
raise ValueError(f"Source vertex {edge['source']} not found")
if target is None:
raise ValueError(f"Target vertex {edge['target']} not found")
new_edge = ContractEdge(source, target, edge)
new_edge = CycleEdge(source, target, edge)
return new_edge

def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type["Vertex"]:
Expand Down Expand Up @@ -1883,13 +1904,16 @@ def get_top_level_vertices(self, vertices_ids):
top_level_vertices.append(vertex_id)
return top_level_vertices

def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]:
def build_in_degree(self, edges: List[CycleEdge]) -> Dict[str, int]:
in_degree: Dict[str, int] = defaultdict(int)
for edge in edges:
in_degree[edge.target_id] += 1
for vertex in self.vertices:
if vertex.id not in in_degree:
in_degree[vertex.id] = 0
return in_degree

def build_adjacency_maps(self, edges: List[ContractEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
def build_adjacency_maps(self, edges: List[CycleEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
"""Returns the adjacency maps for the graph."""
predecessor_map: dict[str, list[str]] = defaultdict(list)
successor_map: dict[str, list[str]] = defaultdict(list)
Expand Down
8 changes: 8 additions & 0 deletions src/backend/base/langflow/graph/graph/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ class VertexBuildResult(NamedTuple):
valid: bool
artifacts: dict
vertex: "Vertex"


class OutputConfigDict(TypedDict):
cache: bool


class StartConfigDict(TypedDict):
output: OutputConfigDict
4 changes: 4 additions & 0 deletions src/backend/base/langflow/graph/graph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,12 @@ def get_updated_edges(base_flow, g_nodes, g_edges, group_node_id):
def get_successors(graph: Dict[str, Dict[str, List[str]]], vertex_id: str) -> List[str]:
successors_result = []
stack = [vertex_id]
visited = set()
while stack:
current_id = stack.pop()
if current_id in visited:
continue
visited.add(current_id)
successors_result.append(current_id)
stack.extend(graph[current_id]["successors"])
return successors_result
Expand Down
10 changes: 5 additions & 5 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

if TYPE_CHECKING:
from langflow.custom import Component
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge
from langflow.graph.graph.base import Graph


Expand Down Expand Up @@ -162,15 +162,15 @@ def set_artifacts(self) -> None:
pass

@property
def edges(self) -> List["ContractEdge"]:
def edges(self) -> List["CycleEdge"]:
return self.graph.get_vertex_edges(self.id)

@property
def outgoing_edges(self) -> List["ContractEdge"]:
def outgoing_edges(self) -> List["CycleEdge"]:
return [edge for edge in self.edges if edge.source_id == self.id]

@property
def incoming_edges(self) -> List["ContractEdge"]:
def incoming_edges(self) -> List["CycleEdge"]:
return [edge for edge in self.edges if edge.target_id == self.id]

@property
Expand Down Expand Up @@ -798,7 +798,7 @@ async def get_requester_result(self, requester: Optional["Vertex"]):
else await requester_edge.get_result_from_source(source=self, target=requester)
)

def add_edge(self, edge: "ContractEdge") -> None:
def add_edge(self, edge: "CycleEdge") -> None:
if edge not in self.edges:
self.edges.append(edge)

Expand Down
7 changes: 3 additions & 4 deletions src/backend/base/langflow/graph/vertex/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
from loguru import logger

from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, log_transaction, serialize_field
from langflow.graph.utils import UnbuiltObject, log_transaction, log_vertex_build, serialize_field
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.schema import NodeData
from langflow.inputs.inputs import InputTypes
from langflow.schema import Data
from langflow.schema.artifact import ArtifactType
from langflow.schema.message import Message
from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.graph.utils import log_vertex_build
from langflow.template.field.base import UNDEFINED, Output
from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse
from langflow.utils.util import unescape_string

if TYPE_CHECKING:
from langflow.graph.edge.base import ContractEdge
from langflow.graph.edge.base import CycleEdge


class CustomComponentVertex(Vertex):
Expand Down Expand Up @@ -70,7 +69,7 @@ def _update_built_object_and_artifacts(self, result):
for key, value in self._built_object.items():
self.add_result(key, value)

def get_edge_with_target(self, target_id: str) -> Generator["ContractEdge", None, None]:
def get_edge_with_target(self, target_id: str) -> Generator["CycleEdge", None, None]:
"""
Get the edge with the target id.
Expand Down
Loading

0 comments on commit 79e216c

Please sign in to comment.