From 64a77e76bd7357932cc5317a5df67b8148516f56 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Thu, 10 Oct 2024 16:58:30 +0200 Subject: [PATCH] Add ruff rules TRY3xx --- src/backend/base/langflow/api/v1/api_key.py | 8 +- src/backend/base/langflow/api/v1/chat.py | 157 +++++++++------- src/backend/base/langflow/api/v1/endpoints.py | 175 ++++++++++-------- src/backend/base/langflow/api/v1/files.py | 76 ++++---- src/backend/base/langflow/api/v1/flows.py | 45 +++-- src/backend/base/langflow/api/v1/folders.py | 44 +++-- src/backend/base/langflow/api/v1/monitor.py | 29 +-- src/backend/base/langflow/api/v1/variable.py | 19 +- .../base/langflow/base/models/model.py | 6 +- .../components/Notion/update_page_property.py | 6 +- .../components/assemblyai/AssemblyAILeMUR.py | 7 +- .../assemblyai/AssemblyAIListTranscripts.py | 5 +- .../assemblyai/AssemblyAIStartTranscript.py | 14 +- .../components/deactivated/MergeData.py | 46 ++--- .../components/deactivated/SubFlow.py | 21 ++- .../langflow/components/helpers/CSVtoData.py | 51 +++-- .../langflow/components/helpers/JSONtoData.py | 68 +++---- .../langflow/components/helpers/MergeData.py | 46 ++--- .../components/helpers/MessageToData.py | 16 +- .../components/prototypes/JSONCleaner.py | 12 +- .../langflow/components/prototypes/SubFlow.py | 21 ++- .../components/toolkits/ComposioAPI.py | 3 +- .../langflow/components/tools/Calculator.py | 37 ++-- .../base/langflow/components/tools/SerpAPI.py | 5 +- .../components/tools/TavilyAISearch.py | 6 +- .../langflow/components/tools/YfinanceTool.py | 4 +- .../components/vectorstores/Elasticsearch.py | 7 +- .../components/vectorstores/OpenSearch.py | 13 +- .../directory_reader/directory_reader.py | 2 +- src/backend/base/langflow/custom/utils.py | 140 +++++++------- src/backend/base/langflow/graph/graph/base.py | 37 ++-- .../base/langflow/graph/vertex/base.py | 3 +- .../langflow/interface/initialize/loading.py | 10 +- src/backend/base/langflow/load/utils.py | 45 ++--- src/backend/base/langflow/memory.py | 15 +- src/backend/base/langflow/schema/dotdict.py | 3 +- src/backend/base/langflow/schema/image.py | 2 +- src/backend/base/langflow/schema/message.py | 2 +- .../base/langflow/services/cache/service.py | 2 +- .../database/models/transactions/crud.py | 2 +- .../database/models/vertex_builds/crud.py | 2 +- .../base/langflow/services/storage/s3.py | 7 +- .../langflow/services/task/backends/anyio.py | 9 +- src/backend/base/langflow/services/utils.py | 5 +- src/backend/base/langflow/utils/version.py | 6 +- src/backend/base/langflow/worker.py | 2 +- src/backend/base/pyproject.toml | 2 +- 47 files changed, 677 insertions(+), 566 deletions(-) diff --git a/src/backend/base/langflow/api/v1/api_key.py b/src/backend/base/langflow/api/v1/api_key.py index 297250ef3a4d..c990a273c8a9 100644 --- a/src/backend/base/langflow/api/v1/api_key.py +++ b/src/backend/base/langflow/api/v1/api_key.py @@ -54,9 +54,9 @@ def delete_api_key_route( ): try: delete_api_key(db, api_key_id) - return {"detail": "API Key deleted"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e + return {"detail": "API Key deleted"} @router.post("/store") @@ -88,10 +88,11 @@ def save_store_api_key( domain=auth_settings.COOKIE_DOMAIN, ) - return {"detail": "API Key saved"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e + return {"detail": "API Key saved"} + @router.delete("/store") def delete_store_api_key( @@ -101,6 +102,7 @@ def delete_store_api_key( try: current_user.store_api_key = None db.commit() - return {"detail": "API Key deleted"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) from e + + return {"detail": "API Key deleted"} diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index e1cd6450c726..23ab0b0a5557 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -189,7 +189,6 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]: playgroundSuccess=True, ), ) - return first_layer, vertices_to_run, graph except Exception as exc: background_tasks.add_task( telemetry_service.log_package_playground, @@ -205,6 +204,8 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]: logger.exception("Error checking build status") raise HTTPException(status_code=500, detail=str(exc)) from exc + return first_layer, vertices_to_run, graph + async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse: flow_id_str = str(flow_id) @@ -302,7 +303,6 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage componentErrorMessage=error_message, ), ) - return build_response except Exception as exc: background_tasks.add_task( telemetry_service.log_package_component, @@ -317,6 +317,8 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage message = parse_exception(exc) raise HTTPException(status_code=500, detail=message) from exc + return build_response + async def build_vertices( vertex_id: str, graph: Graph, @@ -588,7 +590,6 @@ async def build_vertex( componentErrorMessage=error_message, ), ) - return build_response except Exception as exc: background_tasks.add_task( telemetry_service.log_package_component, @@ -603,6 +604,90 @@ async def build_vertex( message = parse_exception(exc) raise HTTPException(status_code=500, detail=message) from exc + return build_response + + +async def _stream_vertex(flow_id: str, vertex_id: str, chat_service: ChatService): + graph = None + try: + try: + cache = await chat_service.get_cache(flow_id) + except Exception as exc: # noqa: BLE001 + logger.exception("Error building Component") + yield str(StreamData(event="error", data={"error": str(exc)})) + return + + if not cache: + # If there's no cache + msg = f"No cache found for {flow_id}." + logger.error(msg) + yield str(StreamData(event="error", data={"error": msg})) + return + else: + graph = cache.get("result") + + try: + vertex: InterfaceVertex = graph.get_vertex(vertex_id) + except Exception as exc: # noqa: BLE001 + logger.exception("Error building Component") + yield str(StreamData(event="error", data={"error": str(exc)})) + return + + if not hasattr(vertex, "stream"): + msg = f"Vertex {vertex_id} does not support streaming" + logger.error(msg) + yield str(StreamData(event="error", data={"error": msg})) + return + + if isinstance(vertex._built_result, str) and vertex._built_result: + stream_data = StreamData( + event="message", + data={"message": f"Streaming vertex {vertex_id}"}, + ) + yield str(stream_data) + stream_data = StreamData( + event="message", + data={"chunk": vertex._built_result}, + ) + yield str(stream_data) + + elif not vertex.frozen or not vertex._built: + logger.debug(f"Streaming vertex {vertex_id}") + stream_data = StreamData( + event="message", + data={"message": f"Streaming vertex {vertex_id}"}, + ) + yield str(stream_data) + try: + async for chunk in vertex.stream(): + stream_data = StreamData( + event="message", + data={"chunk": chunk}, + ) + yield str(stream_data) + except Exception as exc: # noqa: BLE001 + logger.exception("Error building Component") + exc_message = parse_exception(exc) + if exc_message == "The message must be an iterator or an async iterator.": + exc_message = "This stream has already been closed." + yield str(StreamData(event="error", data={"error": exc_message})) + elif vertex.result is not None: + stream_data = StreamData( + event="message", + data={"chunk": vertex._built_result}, + ) + yield str(stream_data) + else: + msg = f"No result found for vertex {vertex_id}" + logger.error(msg) + yield str(StreamData(event="error", data={"error": msg})) + return + finally: + logger.debug("Closing stream") + if graph: + await chat_service.set_cache(flow_id, graph) + yield str(StreamData(event="close", data={"message": "Stream closed"})) + @router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse) async def build_vertex_stream( @@ -638,70 +723,6 @@ async def build_vertex_stream( HTTPException: If an error occurs while building the vertex. """ try: - flow_id_str = str(flow_id) - - async def stream_vertex(): - graph = None - try: - cache = await chat_service.get_cache(flow_id_str) - if not cache: - # If there's no cache - msg = f"No cache found for {flow_id_str}." - raise ValueError(msg) - else: - graph = cache.get("result") - - vertex: InterfaceVertex = graph.get_vertex(vertex_id) - if not hasattr(vertex, "stream"): - msg = f"Vertex {vertex_id} does not support streaming" - raise ValueError(msg) - if isinstance(vertex._built_result, str) and vertex._built_result: - stream_data = StreamData( - event="message", - data={"message": f"Streaming vertex {vertex_id}"}, - ) - yield str(stream_data) - stream_data = StreamData( - event="message", - data={"chunk": vertex._built_result}, - ) - yield str(stream_data) - - elif not vertex.frozen or not vertex._built: - logger.debug(f"Streaming vertex {vertex_id}") - stream_data = StreamData( - event="message", - data={"message": f"Streaming vertex {vertex_id}"}, - ) - yield str(stream_data) - async for chunk in vertex.stream(): - stream_data = StreamData( - event="message", - data={"chunk": chunk}, - ) - yield str(stream_data) - elif vertex.result is not None: - stream_data = StreamData( - event="message", - data={"chunk": vertex._built_result}, - ) - yield str(stream_data) - else: - msg = f"No result found for vertex {vertex_id}" - raise ValueError(msg) - - except Exception as exc: # noqa: BLE001 - logger.exception("Error building Component") - exc_message = parse_exception(exc) - if exc_message == "The message must be an iterator or an async iterator.": - exc_message = "This stream has already been closed." - yield str(StreamData(event="error", data={"error": exc_message})) - finally: - logger.debug("Closing stream") - if graph: - await chat_service.set_cache(flow_id_str, graph) - yield str(StreamData(event="close", data={"message": "Stream closed"})) - - return StreamingResponse(stream_vertex(), media_type="text/event-stream") + return StreamingResponse(_stream_vertex(str(flow_id), vertex_id, chat_service), media_type="text/event-stream") except Exception as exc: raise HTTPException(status_code=500, detail="Error building Component") from exc diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 7d3d32114be7..346de7539368 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -260,7 +260,6 @@ async def simplified_run_flow( telemetry_service.log_package_run, RunPayload(runIsWebhook=False, runSeconds=int(end_time - start_time), runSuccess=True, runErrorMessage=""), ) - return result except ValueError as exc: background_tasks.add_task( @@ -295,6 +294,8 @@ async def simplified_run_flow( ) raise APIException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, exception=exc, flow=flow) from exc + return result + @router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003 async def webhook_run_flow( @@ -320,59 +321,60 @@ async def webhook_run_flow( Raises: HTTPException: If the flow is not found or if there is an error processing the request. """ + start_time = time.perf_counter() + logger.debug("Received webhook request") + error_msg = "" try: - start_time = time.perf_counter() - logger.debug("Received webhook request") - data = await request.body() + try: + data = await request.body() + except Exception as exc: + error_msg = str(exc) + logger.exception(exc) + raise HTTPException(status_code=500, detail=error_msg) from exc + if not data: - logger.error("Request body is empty") - msg = "Request body is empty. You should provide a JSON payload containing the flow ID." - raise ValueError( - msg, + error_msg = "Request body is empty. You should provide a JSON payload containing the flow ID." + logger.error(error_msg) + raise HTTPException(status_code=400, detail=error_msg) + + try: + # get all webhook components in the flow + webhook_components = get_all_webhook_components_in_flow(flow.data) + tweaks = {} + + for component in webhook_components: + tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data} + input_request = SimplifiedAPIRequest( + input_value="", + input_type="chat", + output_type="chat", + tweaks=tweaks, + session_id=None, ) - # get all webhook components in the flow - webhook_components = get_all_webhook_components_in_flow(flow.data) - tweaks = {} - - for component in webhook_components: - tweaks[component["id"]] = {"data": data.decode() if isinstance(data, bytes) else data} - input_request = SimplifiedAPIRequest( - input_value="", - input_type="chat", - output_type="chat", - tweaks=tweaks, - session_id=None, - ) - - logger.debug("Starting background task") - background_tasks.add_task( - simple_run_flow_task, - flow=flow, - input_request=input_request, - api_key_user=user, - ) - background_tasks.add_task( - telemetry_service.log_package_run, - RunPayload( - runIsWebhook=True, runSeconds=int(time.perf_counter() - start_time), runSuccess=True, runErrorMessage="" - ), - ) - return {"message": "Task started in the background", "status": "in progress"} - except Exception as exc: + logger.debug("Starting background task") + background_tasks.add_task( + simple_run_flow_task, + flow=flow, + input_request=input_request, + api_key_user=user, + ) + except Exception as exc: + logger.exception(exc) + error_msg = str(exc) + raise HTTPException(status_code=500, detail=error_msg) from exc + finally: background_tasks.add_task( telemetry_service.log_package_run, RunPayload( runIsWebhook=True, runSeconds=int(time.perf_counter() - start_time), - runSuccess=False, - runErrorMessage=str(exc), + runSuccess=error_msg == "", + runErrorMessage=error_msg, ), ) - if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc): - raise HTTPException(status_code=400, detail=str(exc)) from exc - logger.exception(exc) - raise HTTPException(status_code=500, detail=str(exc)) from exc + + return {"message": "Task started in the background", "status": "in progress"} @router.post("/run/advanced/{flow_id}", response_model=RunResponse, response_model_exclude_none=True) @@ -434,35 +436,59 @@ async def experimental_run_flow( This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements. """ # noqa: E501 - try: - flow_id_str = str(flow_id) - if outputs is None: - outputs = [] - if inputs is None: - inputs = [InputValueRequest(components=[], input_value="")] - - if session_id: + flow_id_str = str(flow_id) + if outputs is None: + outputs = [] + if inputs is None: + inputs = [InputValueRequest(components=[], input_value="")] + + if session_id: + try: session_data = await session_service.load_session(session_id, flow_id=flow_id_str) - graph, _artifacts = session_data or (None, None) - if graph is None: - msg = f"Session {session_id} not found" - raise ValueError(msg) - else: + except Exception as exc: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + graph, _artifacts = session_data or (None, None) + if graph is None: + msg = f"Session {session_id} not found" + logger.exception(msg) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg) + else: + try: # Get the flow that matches the flow_id and belongs to the user # flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first() flow = session.exec( select(Flow).where(Flow.id == flow_id_str).where(Flow.user_id == api_key_user.id) ).first() - if flow is None: - msg = f"Flow {flow_id_str} not found" - raise ValueError(msg) + except sa.exc.StatementError as exc: + # StatementError('(builtins.ValueError) badly formed hexadecimal UUID string') + if "badly formed hexadecimal UUID string" in str(exc): + logger.exception(f"Flow ID {flow_id_str} is not a valid UUID") + # This means the Flow ID is not a valid UUID which means it can't find the flow + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + logger.exception(exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + except Exception as exc: + logger.exception(exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + + if flow is None: + msg = f"Flow {flow_id_str} not found" + logger.error(msg) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg) - if flow.data is None: - msg = f"Flow {flow_id_str} has no data" - raise ValueError(msg) + if flow.data is None: + msg = f"Flow {flow_id_str} has no data" + logger.error(msg) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=msg) + try: graph_data = flow.data graph_data = process_tweaks(graph_data, tweaks or {}) graph = Graph.from_payload(graph_data, flow_id=flow_id_str) + except Exception as exc: + logger.exception(exc) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + + try: task_result, session_id = await run_graph_internal( graph=graph, flow_id=flow_id_str, @@ -471,27 +497,12 @@ async def experimental_run_flow( outputs=outputs, stream=stream, ) - - return RunResponse(outputs=task_result, session_id=session_id) - except sa.exc.StatementError as exc: - # StatementError('(builtins.ValueError) badly formed hexadecimal UUID string') - if "badly formed hexadecimal UUID string" in str(exc): - logger.exception(f"Flow ID {flow_id_str} is not a valid UUID") - # This means the Flow ID is not a valid UUID which means it can't find the flow - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc - except ValueError as exc: - if f"Flow {flow_id_str} not found" in str(exc): - logger.exception(f"Flow {flow_id_str} not found") - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc - if f"Session {session_id} not found" in str(exc): - logger.exception(f"Session {session_id} not found") - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc - logger.exception(exc) - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc except Exception as exc: logger.exception(exc) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + return RunResponse(outputs=task_result, session_id=session_id) + @router.post( "/predict/{flow_id}", @@ -646,13 +657,13 @@ async def custom_component_update( field_value=code_request.field_value, field_name=code_request.field, ) - component_node["template"] = updated_build_config - - return component_node except Exception as exc: logger.exception(exc) raise HTTPException(status_code=400, detail=str(exc)) from exc + component_node["template"] = updated_build_config + return component_node + @router.get("/config", response_model=ConfigResponse) def get_config(): diff --git a/src/backend/base/langflow/api/v1/files.py b/src/backend/base/langflow/api/v1/files.py index 3d297cfec452..59196982ac2a 100644 --- a/src/backend/base/langflow/api/v1/files.py +++ b/src/backend/base/langflow/api/v1/files.py @@ -47,16 +47,24 @@ async def upload_file( ): try: max_file_size_upload = get_storage_service().settings_service.settings.max_file_size_upload - if file.size > max_file_size_upload * 1024 * 1024: - raise HTTPException( - status_code=413, detail=f"File size is larger than the maximum file size {max_file_size_upload}MB." - ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if file.size > max_file_size_upload * 1024 * 1024: + raise HTTPException( + status_code=413, detail=f"File size is larger than the maximum file size {max_file_size_upload}MB." + ) + try: flow_id_str = str(flow_id) flow = session.get(Flow, flow_id_str) - if flow.user_id != current_user.id: - raise HTTPException(status_code=403, detail="You don't have access to this flow") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if flow.user_id != current_user.id: + raise HTTPException(status_code=403, detail="You don't have access to this flow") + try: file_content = await file.read() timestamp = datetime.now(tz=timezone.utc).astimezone().strftime("%Y-%m-%d_%H-%M-%S") file_name = file.filename or hashlib.sha256(file_content).hexdigest() @@ -72,18 +80,20 @@ async def upload_file( async def download_file( file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)] ): - try: - flow_id_str = str(flow_id) - extension = file_name.split(".")[-1] - - if not extension: - raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") + flow_id_str = str(flow_id) + extension = file_name.split(".")[-1] + if not extension: + raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") + try: content_type = build_content_type_from_extension(extension) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e - if not content_type: - raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") + if not content_type: + raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") + try: file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) headers = { "Content-Disposition": f"attachment; filename={file_name} filename*=UTF-8''{file_name}", @@ -99,20 +109,22 @@ async def download_file( async def download_image( file_name: str, flow_id: UUID, storage_service: Annotated[StorageService, Depends(get_storage_service)] ): - try: - extension = file_name.split(".")[-1] - flow_id_str = str(flow_id) - - if not extension: - raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") + extension = file_name.split(".")[-1] + flow_id_str = str(flow_id) + if not extension: + raise HTTPException(status_code=500, detail=f"Extension not found for file {file_name}") + try: content_type = build_content_type_from_extension(extension) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e - if not content_type: - raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") - if not content_type.startswith("image"): - raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image") + if not content_type: + raise HTTPException(status_code=500, detail=f"Content type not found for extension {extension}") + if not content_type.startswith("image"): + raise HTTPException(status_code=500, detail=f"Content type {content_type} is not an image") + try: file_content = await storage_service.get_file(flow_id=flow_id_str, file_name=file_name) return StreamingResponse(BytesIO(file_content), media_type=content_type) except Exception as e: @@ -150,14 +162,14 @@ async def list_profile_pictures(storage_service: Annotated[StorageService, Depen people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type] space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type] - files = [f"People/{i}" for i in people] - files += [f"Space/{i}" for i in space] - - return {"files": files} - except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + files = [f"People/{i}" for i in people] + files += [f"Space/{i}" for i in space] + + return {"files": files} + @router.get("/list/{flow_id}") async def list_files( @@ -167,10 +179,11 @@ async def list_files( try: flow_id_str = str(flow_id) files = await storage_service.list_files(flow_id=flow_id_str) - return {"files": files} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return {"files": files} + @router.delete("/delete/{flow_id}/{file_name}") async def delete_file( @@ -181,6 +194,7 @@ async def delete_file( try: flow_id_str = str(flow_id) await storage_service.delete_file(flow_id=flow_id_str, file_name=file_name) - return {"message": f"File {file_name} deleted successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + + return {"message": f"File {file_name} deleted successfully"} diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 071b1641f87f..301a49b5d929 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -100,7 +100,6 @@ def create_flow( session.add(db_flow) session.commit() session.refresh(db_flow) - return db_flow except Exception as e: # If it is a validation error, return the error message if hasattr(e, "errors"): @@ -120,6 +119,8 @@ def create_flow( raise raise HTTPException(status_code=500, detail=str(e)) from e + return db_flow + @router.get("/", response_model=list[FlowRead], status_code=200) def read_flows( @@ -181,13 +182,11 @@ def read_flows( return [jsonable_encoder(flow) for flow in flows] -@router.get("/{flow_id}", response_model=FlowRead, status_code=200) -def read_flow( - *, - session: Session = Depends(get_session), +def _read_flow( + session: Session, flow_id: UUID, - current_user: User = Depends(get_current_active_user), - settings_service: SettingsService = Depends(get_settings_service), + current_user: User, + settings_service: SettingsService, ): """Read a flow.""" auth_settings = settings_service.auth_settings @@ -198,7 +197,19 @@ def read_flow( stmt = stmt.where( (Flow.user_id == current_user.id) | (Flow.user_id == None) # noqa: E711 ) - if user_flow := session.exec(stmt).first(): + return session.exec(stmt).first() + + +@router.get("/{flow_id}", response_model=FlowRead, status_code=200) +def read_flow( + *, + session: Session = Depends(get_session), + flow_id: UUID, + current_user: User = Depends(get_current_active_user), + settings_service: SettingsService = Depends(get_settings_service), +): + """Read a flow.""" + if user_flow := _read_flow(session, flow_id, current_user, settings_service): return user_flow raise HTTPException(status_code=404, detail="Flow not found") @@ -214,14 +225,19 @@ def update_flow( ): """Update a flow.""" try: - db_flow = read_flow( + db_flow = _read_flow( session=session, flow_id=flow_id, current_user=current_user, settings_service=settings_service, ) - if not db_flow: - raise HTTPException(status_code=404, detail="Flow not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if not db_flow: + raise HTTPException(status_code=404, detail="Flow not found") + + try: flow_data = flow.model_dump(exclude_unset=True) if settings_service.settings.remove_api_keys: flow_data = remove_api_keys(flow_data) @@ -238,7 +254,6 @@ def update_flow( session.add(db_flow) session.commit() session.refresh(db_flow) - return db_flow except Exception as e: # If it is a validation error, return the error message if hasattr(e, "errors"): @@ -254,10 +269,10 @@ def update_flow( raise HTTPException( status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" ) from e - if isinstance(e, HTTPException): - raise raise HTTPException(status_code=500, detail=str(e)) from e + return db_flow + @router.delete("/{flow_id}", status_code=200) async def delete_flow( @@ -268,7 +283,7 @@ async def delete_flow( settings_service=Depends(get_settings_service), ): """Delete a flow.""" - flow = read_flow( + flow = _read_flow( session=session, flow_id=flow_id, current_user=current_user, diff --git a/src/backend/base/langflow/api/v1/folders.py b/src/backend/base/langflow/api/v1/folders.py index 4fb9ae2d40aa..773652fa594b 100644 --- a/src/backend/base/langflow/api/v1/folders.py +++ b/src/backend/base/langflow/api/v1/folders.py @@ -72,10 +72,11 @@ def create_folder( session.exec(update_statement_flows) session.commit() - return new_folder except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return new_folder + @router.get("/", response_model=list[FolderRead], status_code=200) def read_folders( @@ -103,16 +104,17 @@ def read_folder( ): try: folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() - if not folder: - raise HTTPException(status_code=404, detail="Folder not found") - flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] - folder.flows = flows_from_current_user_in_folder - return folder except Exception as e: if "No result found" in str(e): raise HTTPException(status_code=404, detail="Folder not found") from e raise HTTPException(status_code=500, detail=str(e)) from e + if not folder: + raise HTTPException(status_code=404, detail="Folder not found") + flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] + folder.flows = flows_from_current_user_in_folder + return folder + @router.patch("/{folder_id}", response_model=FolderRead, status_code=200) def update_folder( @@ -126,8 +128,13 @@ def update_folder( existing_folder = session.exec( select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id) ).first() - if not existing_folder: - raise HTTPException(status_code=404, detail="Folder not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if not existing_folder: + raise HTTPException(status_code=404, detail="Folder not found") + + try: if folder.name and folder.name != existing_folder.name: existing_folder.name = folder.name session.add(existing_folder) @@ -164,11 +171,11 @@ def update_folder( session.exec(update_statement_components) session.commit() - return existing_folder - except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return existing_folder + @router.delete("/{folder_id}", status_code=204) async def delete_folder( @@ -184,11 +191,15 @@ async def delete_folder( await cascade_delete_flow(session, flow) folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() - if not folder: - raise HTTPException(status_code=404, detail="Folder not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if not folder: + raise HTTPException(status_code=404, detail="Folder not found") + + try: session.delete(folder) session.commit() - return Response(status_code=status.HTTP_204_NO_CONTENT) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -203,12 +214,17 @@ async def download_file( ): """Download all flows from folder.""" try: - return session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() + folder = session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)).first() except Exception as e: if "No result found" in str(e): raise HTTPException(status_code=404, detail="Folder not found") from e raise HTTPException(status_code=500, detail=str(e)) from e + if not folder: + raise HTTPException(status_code=404, detail="Folder not found") + + return folder + @router.post("/upload/", response_model=list[FlowRead], status_code=201) async def upload_file( diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index d284042d1f1a..3fd2dff31545 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -94,19 +94,23 @@ async def update_message( ): try: db_message = session.get(MessageTable, message_id) - if not db_message: - raise HTTPException(status_code=404, detail="Message not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e + + if not db_message: + raise HTTPException(status_code=404, detail="Message not found") + + try: message_dict = message.model_dump(exclude_unset=True, exclude_none=True) db_message.sqlmodel_update(message_dict) session.add(db_message) session.commit() session.refresh(db_message) - return db_message - except HTTPException: - raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return db_message + @router.patch("/messages/session/{old_session_id}", response_model=list[MessageResponse]) async def update_session_id( @@ -119,10 +123,13 @@ async def update_session_id( # Get all messages with the old session ID stmt = select(MessageTable).where(MessageTable.session_id == old_session_id) messages = session.exec(stmt).all() + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) from e - if not messages: - raise HTTPException(status_code=404, detail="No messages found with the given session ID") + if not messages: + raise HTTPException(status_code=404, detail="No messages found with the given session ID") + try: # Update all messages with the new session ID for message in messages: message.session_id = new_session_id @@ -134,12 +141,11 @@ async def update_session_id( for message in messages: session.refresh(message) message_responses.append(MessageResponse.model_validate(message, from_attributes=True)) - return message_responses - except HTTPException: - raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return message_responses + @router.delete("/messages/session/{session_id}", status_code=204) async def delete_messages_session( @@ -153,10 +159,11 @@ async def delete_messages_session( .execution_options(synchronize_session="fetch") ) session.commit() - return {"message": "Messages deleted successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e + return {"message": "Messages deleted successfully"} + @router.get("/transactions", response_model=list[TransactionReadResponse]) async def get_transactions( diff --git a/src/backend/base/langflow/api/v1/variable.py b/src/backend/base/langflow/api/v1/variable.py index d18d95c10036..f2bb87fc7a01 100644 --- a/src/backend/base/langflow/api/v1/variable.py +++ b/src/backend/base/langflow/api/v1/variable.py @@ -25,19 +25,18 @@ def create_variable( variable_service: DatabaseVariableService = Depends(get_variable_service), ): """Create a new variable.""" - try: - if not variable.name and not variable.value: - raise HTTPException(status_code=400, detail="Variable name and value cannot be empty") - - if not variable.name: - raise HTTPException(status_code=400, detail="Variable name cannot be empty") + if not variable.name and not variable.value: + raise HTTPException(status_code=400, detail="Variable name and value cannot be empty") - if not variable.value: - raise HTTPException(status_code=400, detail="Variable value cannot be empty") + if not variable.name: + raise HTTPException(status_code=400, detail="Variable name cannot be empty") - if variable.name in variable_service.list_variables(user_id=current_user.id, session=session): - raise HTTPException(status_code=400, detail="Variable name already exists") + if not variable.value: + raise HTTPException(status_code=400, detail="Variable value cannot be empty") + if variable.name in variable_service.list_variables(user_id=current_user.id, session=session): + raise HTTPException(status_code=400, detail="Variable name already exists") + try: return variable_service.create_variable( user_id=current_user.id, name=variable.name, diff --git a/src/backend/base/langflow/base/models/model.py b/src/backend/base/langflow/base/models/model.py index b6c7042d8b32..d229948755f2 100644 --- a/src/backend/base/langflow/base/models/model.py +++ b/src/backend/base/langflow/base/models/model.py @@ -82,12 +82,13 @@ def get_result(self, runnable: LLM, stream: bool, input_value: str): message = runnable.invoke(input_value) result = message.content if hasattr(message, "content") else message self.status = result - return result except Exception as e: if message := self._get_exception_message(e): raise ValueError(message) from e raise + return result + def build_status_message(self, message: AIMessage): """ Builds a status message from an AIMessage object. @@ -194,12 +195,13 @@ def get_chat_result( self.status = result else: self.status = result - return result except Exception as e: if message := self._get_exception_message(e): raise ValueError(message) from e raise + return result + @abstractmethod def build_model(self) -> LanguageModel: # type: ignore[type-var] """ diff --git a/src/backend/base/langflow/components/Notion/update_page_property.py b/src/backend/base/langflow/components/Notion/update_page_property.py index 0df3cb0ac9e0..512b75431849 100644 --- a/src/backend/base/langflow/components/Notion/update_page_property.py +++ b/src/backend/base/langflow/components/Notion/update_page_property.py @@ -90,9 +90,6 @@ def _update_notion_page(self, page_id: str, properties: str | dict[str, Any]) -> response = requests.patch(url, headers=headers, json=data) response.raise_for_status() updated_page = response.json() - - logger.info(f"Successfully updated Notion page. Response: {json.dumps(updated_page)}") - return updated_page except requests.exceptions.HTTPError as e: error_message = f"HTTP Error occurred: {e}" if e.response is not None: @@ -109,5 +106,8 @@ def _update_notion_page(self, page_id: str, properties: str | dict[str, Any]) -> logger.exception(error_message) return error_message + logger.info(f"Successfully updated Notion page. Response: {json.dumps(updated_page)}") + return updated_page + def __call__(self, *args, **kwargs): return self._update_notion_page(*args, **kwargs) diff --git a/src/backend/base/langflow/components/assemblyai/AssemblyAILeMUR.py b/src/backend/base/langflow/components/assemblyai/AssemblyAILeMUR.py index af6bf18f83a6..9c7597370d27 100644 --- a/src/backend/base/langflow/components/assemblyai/AssemblyAILeMUR.py +++ b/src/backend/base/langflow/components/assemblyai/AssemblyAILeMUR.py @@ -132,15 +132,16 @@ def run_lemur(self) -> Data: # Perform LeMUR action try: response = self.perform_lemur_action(transcript_group, self.endpoint) - result = Data(data=response) - self.status = result - return result except Exception as e: # noqa: BLE001 logger.opt(exception=True).debug("Error running LeMUR") error = f"An Error happened: {e}" self.status = error return Data(data={"error": error}) + result = Data(data=response) + self.status = result + return result + def perform_lemur_action(self, transcript_group: aai.TranscriptGroup, endpoint: str) -> dict: print("Endpoint:", endpoint, type(endpoint)) if endpoint == "task": diff --git a/src/backend/base/langflow/components/assemblyai/AssemblyAIListTranscripts.py b/src/backend/base/langflow/components/assemblyai/AssemblyAIListTranscripts.py index bcbcc0a7ccd0..de96112bb0f1 100644 --- a/src/backend/base/langflow/components/assemblyai/AssemblyAIListTranscripts.py +++ b/src/backend/base/langflow/components/assemblyai/AssemblyAIListTranscripts.py @@ -84,10 +84,11 @@ def convert_page_to_data_list(page): page = transcriber.list_transcripts(params) transcripts = convert_page_to_data_list(page) - self.status = transcripts - return transcripts except Exception as e: # noqa: BLE001 logger.opt(exception=True).debug("Error listing transcripts") error_data = Data(data={"error": f"An error occurred: {e}"}) self.status = [error_data] return [error_data] + + self.status = transcripts + return transcripts diff --git a/src/backend/base/langflow/components/assemblyai/AssemblyAIStartTranscript.py b/src/backend/base/langflow/components/assemblyai/AssemblyAIStartTranscript.py index 1fdd3b5c8916..de83a59e3d76 100644 --- a/src/backend/base/langflow/components/assemblyai/AssemblyAIStartTranscript.py +++ b/src/backend/base/langflow/components/assemblyai/AssemblyAIStartTranscript.py @@ -173,14 +173,14 @@ def create_transcription_job(self) -> Data: try: transcript = aai.Transcriber().submit(audio, config=config) - - if transcript.error: - self.status = transcript.error - return Data(data={"error": transcript.error}) - result = Data(data={"transcript_id": transcript.id}) - self.status = result - return result except Exception as e: # noqa: BLE001 logger.opt(exception=True).debug("Error submitting transcription job") self.status = f"An error occurred: {e}" return Data(data={"error": f"An error occurred: {e}"}) + + if transcript.error: + self.status = transcript.error + return Data(data={"error": transcript.error}) + result = Data(data={"transcript_id": transcript.id}) + self.status = result + return result diff --git a/src/backend/base/langflow/components/deactivated/MergeData.py b/src/backend/base/langflow/components/deactivated/MergeData.py index a6db8394bc26..5d430ee46949 100644 --- a/src/backend/base/langflow/components/deactivated/MergeData.py +++ b/src/backend/base/langflow/components/deactivated/MergeData.py @@ -46,27 +46,27 @@ def merge_data(self) -> list[Data]: """ logger.info("Initiating the data merging process.") - try: - data_inputs: list[Data] = self.data_inputs - logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") - - if not data_inputs: - logger.warning("No data inputs provided. Returning an empty list.") - return [] - - # Collect all unique keys from all Data objects - all_keys: set[str] = set() - for idx, data_input in enumerate(data_inputs): - if not isinstance(data_input, Data): - error_message = f"Data input at index {idx} is not of type Data." - logger.error(error_message) - type_error_message = ( - "All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}" - ) - raise TypeError(type_error_message) - all_keys.update(data_input.data.keys()) - logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + data_inputs: list[Data] = self.data_inputs + logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") + + if not data_inputs: + logger.warning("No data inputs provided. Returning an empty list.") + return [] + + # Collect all unique keys from all Data objects + all_keys: set[str] = set() + for idx, data_input in enumerate(data_inputs): + if not isinstance(data_input, Data): + error_message = f"Data input at index {idx} is not of type Data." + logger.error(error_message) + type_error_message = ( + f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" + ) + raise TypeError(type_error_message) + all_keys.update(data_input.data.keys()) + logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + try: # Create new list of Data objects with missing keys filled with empty strings merged_data_list = [] for idx, data_input in enumerate(data_inputs): @@ -86,9 +86,9 @@ def merge_data(self) -> list[Data]: merged_data_list.append(merged_data) logger.debug(f"Merged Data object created for input at index {idx}.") - logger.info("Data merging process completed successfully.") - return merged_data_list - except Exception: logger.exception("An error occurred during the data merging process.") raise + + logger.info("Data merging process completed successfully.") + return merged_data_list diff --git a/src/backend/base/langflow/components/deactivated/SubFlow.py b/src/backend/base/langflow/components/deactivated/SubFlow.py index 98de25e10009..68312a87f63b 100644 --- a/src/backend/base/langflow/components/deactivated/SubFlow.py +++ b/src/backend/base/langflow/components/deactivated/SubFlow.py @@ -46,16 +46,21 @@ def update_build_config(self, build_config: dotdict, field_value: Any, field_nam if field_value is not None and field_name == "flow_name": try: flow_data = self.get_flow(field_value) - if not flow_data: - msg = f"Flow {field_value} not found." - raise ValueError(msg) - graph = Graph.from_payload(flow_data.data["data"]) - # Get all inputs from the graph - inputs = get_flow_inputs(graph) - # Add inputs to the build config - build_config = self.add_inputs_to_build_config(inputs, build_config) except Exception: # noqa: BLE001 logger.exception(f"Error getting flow {field_value}") + else: + if not flow_data: + msg = f"Flow {field_value} not found." + logger.error(msg) + else: + try: + graph = Graph.from_payload(flow_data.data["data"]) + # Get all inputs from the graph + inputs = get_flow_inputs(graph) + # Add inputs to the build config + build_config = self.add_inputs_to_build_config(inputs, build_config) + except Exception: # noqa: BLE001 + logger.exception(f"Error building graph for flow {field_value}") return build_config diff --git a/src/backend/base/langflow/components/helpers/CSVtoData.py b/src/backend/base/langflow/components/helpers/CSVtoData.py index 61555f938961..076e08fbb50d 100644 --- a/src/backend/base/langflow/components/helpers/CSVtoData.py +++ b/src/backend/base/langflow/components/helpers/CSVtoData.py @@ -38,46 +38,42 @@ class CSVToDataComponent(Component): ] def load_csv_to_data(self) -> list[Data]: - try: - if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1: - msg = "Please provide exactly one of: CSV file, file path, or CSV string." - raise ValueError(msg) - - csv_data = None + if sum(bool(field) for field in [self.csv_file, self.csv_path, self.csv_string]) != 1: + msg = "Please provide exactly one of: CSV file, file path, or CSV string." + raise ValueError(msg) + csv_data = None + try: if self.csv_file: resolved_path = self.resolve_path(self.csv_file) file_path = Path(resolved_path) if file_path.suffix.lower() != ".csv": - msg = "The provided file must be a CSV file." - raise ValueError(msg) - with file_path.open(newline="", encoding="utf-8") as csvfile: - csv_data = csvfile.read() + self.status = "The provided file must be a CSV file." + else: + with file_path.open(newline="", encoding="utf-8") as csvfile: + csv_data = csvfile.read() elif self.csv_path: file_path = Path(self.csv_path) if file_path.suffix.lower() != ".csv": - msg = "The provided file must be a CSV file." - raise ValueError(msg) - with file_path.open(newline="", encoding="utf-8") as csvfile: - csv_data = csvfile.read() + self.status = "The provided file must be a CSV file." + else: + with file_path.open(newline="", encoding="utf-8") as csvfile: + csv_data = csvfile.read() - elif self.csv_string: + else: csv_data = self.csv_string - if not csv_data: - msg = "No CSV data provided." - raise ValueError(msg) + if csv_data: + csv_reader = csv.DictReader(io.StringIO(csv_data)) + result = [Data(data=row) for row in csv_reader] - csv_reader = csv.DictReader(io.StringIO(csv_data)) - result = [Data(data=row) for row in csv_reader] + if not result: + self.status = "The CSV data is empty." + return [] - if not result: - self.status = "The CSV data is empty." - return [] - - self.status = result - return result + self.status = result + return result except csv.Error as e: error_message = f"CSV parsing error: {e}" @@ -88,3 +84,6 @@ def load_csv_to_data(self) -> list[Data]: error_message = f"An error occurred: {e}" self.status = error_message raise ValueError(error_message) from e + + # An error occurred + raise ValueError(self.status) diff --git a/src/backend/base/langflow/components/helpers/JSONtoData.py b/src/backend/base/langflow/components/helpers/JSONtoData.py index 7be9d3a8eb0e..0543ab7a5866 100644 --- a/src/backend/base/langflow/components/helpers/JSONtoData.py +++ b/src/backend/base/langflow/components/helpers/JSONtoData.py @@ -41,53 +41,50 @@ class JSONToDataComponent(Component): ] def convert_json_to_data(self) -> Data | list[Data]: - try: - if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1: - msg = "Please provide exactly one of: JSON file, file path, or JSON string." - raise ValueError(msg) + if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1: + msg = "Please provide exactly one of: JSON file, file path, or JSON string." + self.status = msg + raise ValueError(msg) - json_data = None + json_data = None + try: if self.json_file: resolved_path = self.resolve_path(self.json_file) file_path = Path(resolved_path) if file_path.suffix.lower() != ".json": - msg = "The provided file must be a JSON file." - raise ValueError(msg) - with file_path.open(encoding="utf-8") as jsonfile: - json_data = jsonfile.read() + self.status = "The provided file must be a JSON file." + else: + with file_path.open(encoding="utf-8") as jsonfile: + json_data = jsonfile.read() elif self.json_path: file_path = Path(self.json_path) if file_path.suffix.lower() != ".json": - msg = "The provided file must be a JSON file." - raise ValueError(msg) - with file_path.open(encoding="utf-8") as jsonfile: - json_data = jsonfile.read() - - elif self.json_string: - json_data = self.json_string + self.status = "The provided file must be a JSON file." + else: + with file_path.open(encoding="utf-8") as jsonfile: + json_data = jsonfile.read() - if not json_data: - msg = "No JSON data provided." - raise ValueError(msg) - - # Try to parse the JSON string - try: - parsed_data = json.loads(json_data) - except json.JSONDecodeError: - # If JSON parsing fails, try to repair the JSON string - repaired_json_string = repair_json(json_data) - parsed_data = json.loads(repaired_json_string) - - # Check if the parsed data is a list - if isinstance(parsed_data, list): - result = [Data(data=item) for item in parsed_data] else: - result = Data(data=parsed_data) + json_data = self.json_string - self.status = result - return result + if json_data: + # Try to parse the JSON string + try: + parsed_data = json.loads(json_data) + except json.JSONDecodeError: + # If JSON parsing fails, try to repair the JSON string + repaired_json_string = repair_json(json_data) + parsed_data = json.loads(repaired_json_string) + + # Check if the parsed data is a list + if isinstance(parsed_data, list): + result = [Data(data=item) for item in parsed_data] + else: + result = Data(data=parsed_data) + self.status = result + return result except (json.JSONDecodeError, SyntaxError, ValueError) as e: error_message = f"Invalid JSON or Python literal: {e}" @@ -98,3 +95,6 @@ def convert_json_to_data(self) -> Data | list[Data]: error_message = f"An error occurred: {e}" self.status = error_message raise ValueError(error_message) from e + + # An error occurred + raise ValueError(self.status) diff --git a/src/backend/base/langflow/components/helpers/MergeData.py b/src/backend/base/langflow/components/helpers/MergeData.py index ddc778954db2..e2b58ce5b5b5 100644 --- a/src/backend/base/langflow/components/helpers/MergeData.py +++ b/src/backend/base/langflow/components/helpers/MergeData.py @@ -46,27 +46,27 @@ def merge_data(self) -> list[Data]: """ logger.info("Initiating the data merging process.") - try: - data_inputs: list[Data] = self.data_inputs - logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") - - if not data_inputs: - logger.warning("No data inputs provided. Returning an empty list.") - return [] - - # Collect all unique keys from all Data objects - all_keys: set[str] = set() - for idx, data_input in enumerate(data_inputs): - if not isinstance(data_input, Data): - error_message = f"Data input at index {idx} is not of type Data." - logger.error(error_message) - type_error_message = ( - "All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}" - ) - raise TypeError(type_error_message) - all_keys.update(data_input.data.keys()) - logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + data_inputs: list[Data] = self.data_inputs + logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") + + if not data_inputs: + logger.warning("No data inputs provided. Returning an empty list.") + return [] + + # Collect all unique keys from all Data objects + all_keys: set[str] = set() + for idx, data_input in enumerate(data_inputs): + if not isinstance(data_input, Data): + error_message = f"Data input at index {idx} is not of type Data." + logger.error(error_message) + type_error_message = ( + f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" + ) + raise TypeError(type_error_message) + all_keys.update(data_input.data.keys()) + logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + try: # Create new list of Data objects with missing keys filled with empty strings merged_data_list = [] for idx, data_input in enumerate(data_inputs): @@ -86,9 +86,9 @@ def merge_data(self) -> list[Data]: merged_data_list.append(merged_data) logger.debug("Merged Data object created for input at index: " + str(idx)) - logger.info("Data merging process completed successfully.") - return merged_data_list - except Exception: logger.exception("An error occurred during the data merging process.") raise + + logger.info("Data merging process completed successfully.") + return merged_data_list diff --git a/src/backend/base/langflow/components/helpers/MessageToData.py b/src/backend/base/langflow/components/helpers/MessageToData.py index 74ee499645a7..44ad8ece286d 100644 --- a/src/backend/base/langflow/components/helpers/MessageToData.py +++ b/src/backend/base/langflow/components/helpers/MessageToData.py @@ -26,18 +26,14 @@ class MessageToDataComponent(Component): ] def convert_message_to_data(self) -> Data: - try: - if not isinstance(self.message, Message): - msg = "Input must be a Message object" - raise TypeError(msg) - + if isinstance(self.message, Message): # Convert Message to Data data = Data(data=self.message.data) self.status = "Successfully converted Message to Data" return data - except Exception as e: # noqa: BLE001 - error_message = f"Error converting Message to Data: {e}" - logger.opt(exception=True).debug(error_message) - self.status = error_message - return Data(data={"error": error_message}) + + msg = "Error converting Message to Data: Input must be a Message object" + logger.opt(exception=True).debug(msg) + self.status = msg + return Data(data={"error": msg}) diff --git a/src/backend/base/langflow/components/prototypes/JSONCleaner.py b/src/backend/base/langflow/components/prototypes/JSONCleaner.py index 968179ac8a45..b1b7dcb85d46 100644 --- a/src/backend/base/langflow/components/prototypes/JSONCleaner.py +++ b/src/backend/base/langflow/components/prototypes/JSONCleaner.py @@ -57,12 +57,12 @@ def clean_json(self) -> Message: normalize_unicode = self.normalize_unicode validate_json = self.validate_json + start = json_str.find("{") + end = json_str.rfind("}") + if start == -1 or end == -1: + msg = "Invalid JSON string: Missing '{' or '}'" + raise ValueError(msg) try: - start = json_str.find("{") - end = json_str.rfind("}") - if start == -1 or end == -1: - msg = "Invalid JSON string: Missing '{' or '}'" - raise ValueError(msg) json_str = json_str[start : end + 1] if remove_control_chars: @@ -93,7 +93,7 @@ def _validate_json(self, s: str) -> str: """Validate the JSON string.""" try: json.loads(s) - return s except json.JSONDecodeError as e: msg = f"Invalid JSON string: {e}" raise ValueError(msg) from e + return s diff --git a/src/backend/base/langflow/components/prototypes/SubFlow.py b/src/backend/base/langflow/components/prototypes/SubFlow.py index b18ad083a49b..1cf7123afa7a 100644 --- a/src/backend/base/langflow/components/prototypes/SubFlow.py +++ b/src/backend/base/langflow/components/prototypes/SubFlow.py @@ -38,16 +38,21 @@ def update_build_config(self, build_config: dotdict, field_value: Any, field_nam if field_value is not None and field_name == "flow_name": try: flow_data = self.get_flow(field_value) - if not flow_data: - msg = f"Flow {field_value} not found." - raise ValueError(msg) - graph = Graph.from_payload(flow_data.data["data"]) - # Get all inputs from the graph - inputs = get_flow_inputs(graph) - # Add inputs to the build config - build_config = self.add_inputs_to_build_config(inputs, build_config) except Exception: # noqa: BLE001 logger.exception(f"Error getting flow {field_value}") + else: + if not flow_data: + msg = f"Flow {field_value} not found." + logger.error(msg) + else: + try: + graph = Graph.from_payload(flow_data.data["data"]) + # Get all inputs from the graph + inputs = get_flow_inputs(graph) + # Add inputs to the build config + build_config = self.add_inputs_to_build_config(inputs, build_config) + except Exception: # noqa: BLE001 + logger.exception(f"Error building graph for flow {field_value}") return build_config diff --git a/src/backend/base/langflow/components/toolkits/ComposioAPI.py b/src/backend/base/langflow/components/toolkits/ComposioAPI.py index 33286be3fa3b..ca357ee6b9d1 100644 --- a/src/backend/base/langflow/components/toolkits/ComposioAPI.py +++ b/src/backend/base/langflow/components/toolkits/ComposioAPI.py @@ -64,11 +64,12 @@ def _check_for_authorization(self, app: str) -> str: entity = toolset.client.get_entity(id=self.entity_id) try: entity.get_connection(app=app) - return f"{app} CONNECTED" except Exception: # noqa: BLE001 logger.opt(exception=True).debug("Authorization error") return self._handle_authorization_failure(toolset, entity, app) + return f"{app} CONNECTED" + def _handle_authorization_failure(self, toolset: ComposioToolSet, entity: Any, app: str) -> str: """ Handles the authorization failure by attempting to process API key auth or initiate default connection. diff --git a/src/backend/base/langflow/components/tools/Calculator.py b/src/backend/base/langflow/components/tools/Calculator.py index 8295b25479df..a5bed1172518 100644 --- a/src/backend/base/langflow/components/tools/Calculator.py +++ b/src/backend/base/langflow/components/tools/Calculator.py @@ -39,29 +39,28 @@ def build_tool(self) -> Tool: args_schema=self.CalculatorToolSchema, ) + def _eval_expr(self, node): + # Define the allowed operators + operators = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.Pow: operator.pow, + } + if isinstance(node, ast.Num): + return node.n + if isinstance(node, ast.BinOp): + return operators[type(node.op)](self._eval_expr(node.left), self._eval_expr(node.right)) + if isinstance(node, ast.UnaryOp): + return operators[type(node.op)](self._eval_expr(node.operand)) + raise TypeError(node) + def _evaluate_expression(self, expression: str) -> list[Data]: try: - # Define the allowed operators - operators = { - ast.Add: operator.add, - ast.Sub: operator.sub, - ast.Mult: operator.mul, - ast.Div: operator.truediv, - ast.Pow: operator.pow, - } - - def eval_expr(node): - if isinstance(node, ast.Num): - return node.n - if isinstance(node, ast.BinOp): - return operators[type(node.op)](eval_expr(node.left), eval_expr(node.right)) - if isinstance(node, ast.UnaryOp): - return operators[type(node.op)](eval_expr(node.operand)) - raise TypeError(node) - # Parse the expression and evaluate it tree = ast.parse(expression, mode="eval") - result = eval_expr(tree.body) + result = self._eval_expr(tree.body) # Format the result to a reasonable number of decimal places formatted_result = f"{result:.6f}".rstrip("0").rstrip(".") diff --git a/src/backend/base/langflow/components/tools/SerpAPI.py b/src/backend/base/langflow/components/tools/SerpAPI.py index e24d0d021e5d..0a16bc110e76 100644 --- a/src/backend/base/langflow/components/tools/SerpAPI.py +++ b/src/backend/base/langflow/components/tools/SerpAPI.py @@ -86,9 +86,10 @@ def run_model(self) -> list[Data]: data_list = [Data(data=result, text=result.get("snippet", "")) for result in results] - self.status = data_list - return data_list except Exception as e: # noqa: BLE001 logger.opt(exception=True).debug("Error running SerpAPI") self.status = f"Error: {e}" return [Data(data={"error": str(e)}, text=str(e))] + + self.status = data_list + return data_list diff --git a/src/backend/base/langflow/components/tools/TavilyAISearch.py b/src/backend/base/langflow/components/tools/TavilyAISearch.py index 9f690760fc4a..3c60ce723e45 100644 --- a/src/backend/base/langflow/components/tools/TavilyAISearch.py +++ b/src/backend/base/langflow/components/tools/TavilyAISearch.py @@ -148,9 +148,6 @@ def _tavily_search( if include_images and search_results.get("images"): data_results.append(Data(data={"images": search_results["images"]})) - self.status: Any = data_results - return data_results - except httpx.HTTPStatusError as e: error_message = f"HTTP error: {e.response.status_code} - {e.response.text}" self.status = error_message @@ -160,3 +157,6 @@ def _tavily_search( error_message = f"Unexpected error: {e}" self.status = error_message return [Data(data={"error": error_message})] + + self.status: Any = data_results + return data_results diff --git a/src/backend/base/langflow/components/tools/YfinanceTool.py b/src/backend/base/langflow/components/tools/YfinanceTool.py index f7b652b7fa88..fd322fedb9dd 100644 --- a/src/backend/base/langflow/components/tools/YfinanceTool.py +++ b/src/backend/base/langflow/components/tools/YfinanceTool.py @@ -94,10 +94,10 @@ def _yahoo_finance_tool( else: data_list = [Data(data={"result": result})] - return data_list - except Exception as e: # noqa: BLE001 error_message = f"Error retrieving data: {e}" logger.opt(exception=True).debug(error_message) self.status = error_message return [Data(data={"error": error_message})] + + return data_list diff --git a/src/backend/base/langflow/components/vectorstores/Elasticsearch.py b/src/backend/base/langflow/components/vectorstores/Elasticsearch.py index dd2f070527ea..27edf3f7bf1b 100644 --- a/src/backend/base/langflow/components/vectorstores/Elasticsearch.py +++ b/src/backend/base/langflow/components/vectorstores/Elasticsearch.py @@ -189,14 +189,15 @@ def search(self, query: str | None = None) -> list[dict[str, Any]]: if query: search_type = self.search_type.lower() + if search_type not in ["similarity", "mmr"]: + msg = f"Invalid search type: {self.search_type}" + logger.error(msg) + raise ValueError(msg) try: if search_type == "similarity": results = vector_store.similarity_search_with_score(query, **search_kwargs) elif search_type == "mmr": results = vector_store.max_marginal_relevance_search(query, **search_kwargs) - else: - msg = f"Invalid search type: {self.search_type}" - raise ValueError(msg) except Exception as e: msg = ( "Error occurred while querying the Elasticsearch VectorStore," diff --git a/src/backend/base/langflow/components/vectorstores/OpenSearch.py b/src/backend/base/langflow/components/vectorstores/OpenSearch.py index ebff84e196a6..12004a1befd6 100644 --- a/src/backend/base/langflow/components/vectorstores/OpenSearch.py +++ b/src/backend/base/langflow/components/vectorstores/OpenSearch.py @@ -229,15 +229,15 @@ def search(self, query: str | None = None) -> list[dict[str, Any]]: results = vector_store.max_marginal_relevance_search(query, **search_kwargs) return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] - error_message = f"Invalid search type:: {self.search_type}" - logger.exception(error_message) - raise ValueError(error_message) - except Exception as e: error_message = f"Error during search: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e + error_message = f"Error during search. Invalid search type: {self.search_type}" + logger.error(error_message) + raise ValueError(error_message) + def search_documents(self) -> list[Data]: """ Search for documents in the vector store based on the search input. @@ -253,9 +253,10 @@ def search_documents(self) -> list[Data]: ) for result in results ] - self.status = retrieved_data - return retrieved_data except Exception as e: error_message = f"Error during document search: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e + + self.status = retrieved_data + return retrieved_data diff --git a/src/backend/base/langflow/custom/directory_reader/directory_reader.py b/src/backend/base/langflow/custom/directory_reader/directory_reader.py index 65d973ec5da4..4b04a8957634 100644 --- a/src/backend/base/langflow/custom/directory_reader/directory_reader.py +++ b/src/backend/base/langflow/custom/directory_reader/directory_reader.py @@ -91,9 +91,9 @@ def validate_code(self, file_content): """ try: ast.parse(file_content) - return True except SyntaxError: return False + return True def validate_build(self, file_content): """ diff --git a/src/backend/base/langflow/custom/utils.py b/src/backend/base/langflow/custom/utils.py index 4338483b4bdd..42b018d7603b 100644 --- a/src/backend/base/langflow/custom/utils.py +++ b/src/backend/base/langflow/custom/utils.py @@ -260,33 +260,38 @@ def run_build_inputs( def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None): - try: - if custom_component._code is None: - msg = "Code is None" - raise ValueError(msg) - if isinstance(custom_component._code, str): + if custom_component._code is None: + error = "Code is None" + elif not isinstance(custom_component._code, str): + error = "Invalid code type" + else: + try: custom_class = eval_custom_component_code(custom_component._code) - else: - msg = "Invalid code type" - raise TypeError(msg) - except Exception as exc: - logger.exception("Error while evaluating custom component code") - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid type convertion. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) from exc + except Exception as exc: + logger.exception("Error while evaluating custom component code") + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid type conversion. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) from exc - try: - return custom_class(_user_id=user_id, _code=custom_component._code) - except Exception as exc: - logger.exception("Error while instantiating custom component") - if hasattr(exc, "detail") and "traceback" in exc.detail: - logger.error(exc.detail["traceback"]) + try: + return custom_class(_user_id=user_id, _code=custom_component._code) + except Exception as exc: + logger.exception("Error while instantiating custom component") + if hasattr(exc, "detail") and "traceback" in exc.detail: + logger.error(exc.detail["traceback"]) + + raise - raise + msg = f"Invalid type conversion: {error}. Please check your code and try again." + logger.error(msg) + raise HTTPException( + status_code=400, + detail={"error": msg}, + ) def run_build_config( @@ -295,46 +300,49 @@ def run_build_config( ) -> tuple[dict, CustomComponent]: """Build the field configuration for a custom component""" - try: - if custom_component._code is None: - msg = "Code is None" - raise ValueError(msg) - if isinstance(custom_component._code, str): + if custom_component._code is None: + error = "Code is None" + elif not isinstance(custom_component._code, str): + error = "Invalid code type" + else: + try: custom_class = eval_custom_component_code(custom_component._code) - else: - msg = "Invalid code type" - raise TypeError(msg) - except Exception as exc: - logger.exception("Error while evaluating custom component code") - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid type convertion. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) from exc + except Exception as exc: + logger.exception("Error while evaluating custom component code") + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid type conversion. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) from exc - try: - custom_instance = custom_class(_user_id=user_id) - build_config: dict = custom_instance.build_config() - - for field_name, field in build_config.copy().items(): - # Allow user to build Input as well - # as a dict with the same keys as Input - field_dict = get_field_dict(field) - # Let's check if "rangeSpec" is a RangeSpec object - if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): - field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() - build_config[field_name] = field_dict + try: + custom_instance = custom_class(_user_id=user_id) + build_config: dict = custom_instance.build_config() + + for field_name, field in build_config.copy().items(): + # Allow user to build Input as well + # as a dict with the same keys as Input + field_dict = get_field_dict(field) + # Let's check if "rangeSpec" is a RangeSpec object + if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): + field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() + build_config[field_name] = field_dict + except Exception as exc: + logger.exception("Error while building field config") + if hasattr(exc, "detail") and "traceback" in exc.detail: + logger.error(exc.detail["traceback"]) + raise return build_config, custom_instance - except Exception as exc: - logger.exception("Error while building field config") - if hasattr(exc, "detail") and "traceback" in exc.detail: - logger.error(exc.detail["traceback"]) - - raise + msg = f"Invalid type conversion: {error}. Please check your code and try again." + logger.error(msg) + raise HTTPException( + status_code=400, + detail={"error": msg}, + ) def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code): @@ -385,14 +393,14 @@ def build_custom_component_template( user_id: str | UUID | None = None, ) -> tuple[dict[str, Any], CustomComponent | Component]: """Build a custom component template""" + if not hasattr(custom_component, "template_config"): + raise HTTPException( + status_code=400, + detail={ + "error": ("Error building Component. Please check if you are importing Component correctly."), + }, + ) try: - if not hasattr(custom_component, "template_config"): - raise HTTPException( - status_code=400, - detail={ - "error": ("Please check if you are importing Component correctly."), - }, - ) if "inputs" in custom_component.template_config: return build_custom_component_template_from_inputs(custom_component, user_id=user_id) frontend_node = CustomComponentFrontendNode(**custom_component.template_config) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index f5e87cc68a1a..9c128d04df96 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -752,11 +752,12 @@ def run( try: # Attempt to get the running event loop; if none, an exception is raised loop = asyncio.get_running_loop() - if loop.is_closed(): - msg = "The running event loop is closed." - raise RuntimeError(msg) except RuntimeError: - # If there's no running event loop or it's closed, use asyncio.run + # If there's no running event loop, use asyncio.run + return asyncio.run(coro) + + # If the event loop is closed, use asyncio.run + if loop.is_closed(): return asyncio.run(coro) # If there's an existing, open event loop, use it to run the async function @@ -1031,7 +1032,6 @@ def from_payload( edges = payload["edges"] graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id) graph.add_nodes_and_edges(vertices, edges) - return graph except KeyError as exc: logger.exception(exc) if "nodes" not in payload and "edges" not in payload: @@ -1041,6 +1041,8 @@ def from_payload( msg = f"Error while creating graph from payload: {exc}" raise ValueError(msg) from exc + else: + return graph def __eq__(self, other: object) -> bool: if not isinstance(other, Graph): @@ -1400,23 +1402,24 @@ async def build_vertex( await set_cache(key=vertex.id, data=vertex_dict) - if vertex.result is not None: - params = f"{vertex._built_object_repr()}{params}" - valid = True - result_dict = vertex.result - artifacts = vertex.artifacts - else: - msg = f"No result found for vertex {vertex_id}" - raise ValueError(msg) - - return VertexBuildResult( - result_dict=result_dict, params=params, valid=valid, artifacts=artifacts, vertex=vertex - ) except Exception as exc: if not isinstance(exc, ComponentBuildException): logger.exception("Error building Component") raise + if vertex.result is not None: + params = f"{vertex._built_object_repr()}{params}" + valid = True + result_dict = vertex.result + artifacts = vertex.artifacts + else: + msg = f"Error building Component: no result found for vertex {vertex_id}" + raise ValueError(msg) + + return VertexBuildResult( + result_dict=result_dict, params=params, valid=valid, artifacts=artifacts, vertex=vertex + ) + def get_vertex_edges( self, vertex_id: str, diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 4720d067e54e..24e57b36b5e5 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -870,9 +870,10 @@ def __eq__(self, __o: object) -> bool: # self._data is a dict and we need to compare them # to check if they are equal data_are_equal = self.data == __o.data - return ids_are_equal and data_are_equal except AttributeError: return False + else: + return ids_are_equal and data_are_equal def __hash__(self) -> int: return id(self) diff --git a/src/backend/base/langflow/interface/initialize/loading.py b/src/backend/base/langflow/interface/initialize/loading.py index 81f8a1a3d6d7..6b4dc0d975ff 100644 --- a/src/backend/base/langflow/interface/initialize/loading.py +++ b/src/backend/base/langflow/interface/initialize/loading.py @@ -123,12 +123,12 @@ def update_params_with_load_from_db_fields( raise logger.debug(str(e)) if fallback_to_env_vars and key is None: - var = os.getenv(params[field]) - if var is None: + key = os.getenv(params[field]) + if key is None: msg = f"Environment variable {params[field]} is not set." - raise ValueError(msg) - key = var - logger.info(f"Using environment variable {params[field]} for {field}") + logger.error(msg) + else: + logger.info(f"Using environment variable {params[field]} for {field}") if key is None: logger.warning(f"Could not get value for {field}. Setting it to None.") diff --git a/src/backend/base/langflow/load/utils.py b/src/backend/base/langflow/load/utils.py index f72ffcd42f6b..885f1ed5cf09 100644 --- a/src/backend/base/langflow/load/utils.py +++ b/src/backend/base/langflow/load/utils.py @@ -22,7 +22,7 @@ def upload(file_path: str, host: str, flow_id: str): dict: A dictionary containing the file path. Raises: - Exception: If an error occurs during the upload process. + UploadError: If an error occurs during the upload process. """ try: url = f"{host}/api/v1/upload/{flow_id}" @@ -33,9 +33,9 @@ def upload(file_path: str, host: str, flow_id: str): except Exception as e: msg = f"Error uploading file: {e}" raise UploadError(msg) from e - else: - msg = f"Error uploading file: {response.status_code}" - raise UploadError(msg) + + msg = f"Error uploading file: {response.status_code}" + raise UploadError(msg) def upload_file(file_path: str, host: str, flow_id: str, components: list[str], tweaks: dict | None = None): @@ -54,26 +54,27 @@ def upload_file(file_path: str, host: str, flow_id: str, components: list[str], dict: A dictionary containing the file path and any tweaks that were applied. Raises: - Exception: If an error occurs during the upload process. + UploadError: If an error occurs during the upload process. """ - if not tweaks: - tweaks = {} try: response = upload(file_path, host, flow_id) - if response["file_path"]: - for component in components: - if isinstance(component, str): - tweaks[component] = {"path": response["file_path"]} - else: - msg = f"Component ID or name must be a string. Got {type(component)}" - raise TypeError(msg) - return tweaks except Exception as e: msg = f"Error uploading file: {e}" raise UploadError(msg) from e - else: - msg = "Error uploading file" - raise UploadError(msg) + + if not tweaks: + tweaks = {} + if response["file_path"]: + for component in components: + if isinstance(component, str): + tweaks[component] = {"path": response["file_path"]} + else: + msg = f"Error uploading file: component ID or name must be a string. Got {type(component)}" + raise UploadError(msg) + return tweaks + + msg = "Error uploading file" + raise UploadError(msg) def get_flow(url: str, flow_id: str): @@ -88,7 +89,7 @@ def get_flow(url: str, flow_id: str): dict: A dictionary containing the details of the flow. Raises: - Exception: If an error occurs during the retrieval process. + UploadError: If an error occurs during the retrieval process. """ try: flow_url = f"{url}/api/v1/flows/{flow_id}" @@ -99,6 +100,6 @@ def get_flow(url: str, flow_id: str): except Exception as e: msg = f"Error retrieving flow: {e}" raise UploadError(msg) from e - else: - msg = f"Error retrieving flow: {response.status_code}" - raise UploadError(msg) + + msg = f"Error retrieving flow: {response.status_code}" + raise UploadError(msg) diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index 8ba39aef909e..0c9b88b6ca8d 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -57,15 +57,16 @@ def add_messages(messages: Message | list[Message], flow_id: str | None = None): """ Add a message to the monitor service. """ - try: - if not isinstance(messages, list): - messages = [messages] - if not all(isinstance(message, Message) for message in messages): - types = ", ".join([str(type(message)) for message in messages]) - msg = f"The messages must be instances of Message. Found: {types}" - raise ValueError(msg) + if not isinstance(messages, list): + messages = [messages] + + if not all(isinstance(message, Message) for message in messages): + types = ", ".join([str(type(message)) for message in messages]) + msg = f"The messages must be instances of Message. Found: {types}" + raise ValueError(msg) + try: messages_models = [MessageTable.from_message(msg, flow_id=flow_id) for msg in messages] with session_scope() as session: messages_models = add_messagetables(messages_models, session) diff --git a/src/backend/base/langflow/schema/dotdict.py b/src/backend/base/langflow/schema/dotdict.py index d30defb17f3d..a3560a706def 100644 --- a/src/backend/base/langflow/schema/dotdict.py +++ b/src/backend/base/langflow/schema/dotdict.py @@ -28,10 +28,11 @@ def __getattr__(self, attr): if isinstance(value, dict) and not isinstance(value, dotdict): value = dotdict(value) self[attr] = value # Update self to nest dotdict for future accesses - return value except KeyError as e: msg = f"'dotdict' object has no attribute '{attr}'" raise AttributeError(msg) from e + else: + return value def __setattr__(self, key, value): """ diff --git a/src/backend/base/langflow/schema/image.py b/src/backend/base/langflow/schema/image.py index ecdabf01e951..ce63f64536ee 100644 --- a/src/backend/base/langflow/schema/image.py +++ b/src/backend/base/langflow/schema/image.py @@ -12,9 +12,9 @@ def is_image_file(file_path): try: with PILImage.open(file_path) as img: img.verify() # Verify that it is, in fact, an image - return True except (OSError, SyntaxError): return False + return True async def get_file_paths(files: list[str]): diff --git a/src/backend/base/langflow/schema/message.py b/src/backend/base/langflow/schema/message.py index accf6c98cd81..f69a4529e437 100644 --- a/src/backend/base/langflow/schema/message.py +++ b/src/backend/base/langflow/schema/message.py @@ -34,10 +34,10 @@ def _timestamp_to_str(timestamp: datetime | str) -> str: # Just check if the string is a valid datetime try: datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") # noqa: DTZ007 - return timestamp except ValueError as e: msg = f"Invalid timestamp: {timestamp}" raise ValueError(msg) from e + return timestamp return timestamp.strftime("%Y-%m-%d %H:%M:%S") diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 2f786b188ebd..db7dcf9846ea 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -235,10 +235,10 @@ def is_connected(self): try: self._client.ping() - return True except redis.exceptions.ConnectionError: logger.exception("RedisCache could not connect to the Redis server") return False + return True async def get(self, key, lock=None): """ diff --git a/src/backend/base/langflow/services/database/models/transactions/crud.py b/src/backend/base/langflow/services/database/models/transactions/crud.py index f2b086d3320f..a4dfe946b6bc 100644 --- a/src/backend/base/langflow/services/database/models/transactions/crud.py +++ b/src/backend/base/langflow/services/database/models/transactions/crud.py @@ -23,7 +23,7 @@ def log_transaction(db: Session, transaction: TransactionBase) -> TransactionTab db.add(table) try: db.commit() - return table except IntegrityError: db.rollback() raise + return table diff --git a/src/backend/base/langflow/services/database/models/vertex_builds/crud.py b/src/backend/base/langflow/services/database/models/vertex_builds/crud.py index 2ffd9928a633..a4cf5c2bf1be 100644 --- a/src/backend/base/langflow/services/database/models/vertex_builds/crud.py +++ b/src/backend/base/langflow/services/database/models/vertex_builds/crud.py @@ -23,10 +23,10 @@ def log_vertex_build(db: Session, vertex_build: VertexBuildBase) -> VertexBuildT db.add(table) try: db.commit() - return table except IntegrityError: db.rollback() raise + return table def delete_vertex_builds_by_flow_id(db: Session, flow_id: UUID) -> None: diff --git a/src/backend/base/langflow/services/storage/s3.py b/src/backend/base/langflow/services/storage/s3.py index 9f1412a883aa..def6bff14d97 100644 --- a/src/backend/base/langflow/services/storage/s3.py +++ b/src/backend/base/langflow/services/storage/s3.py @@ -61,13 +61,14 @@ async def list_files(self, folder: str): """ try: response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=folder) - files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]] - logger.info(f"{len(files)} files listed in folder {folder}.") - return files except ClientError: logger.exception(f"Error listing files in folder {folder}") raise + files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]] + logger.info(f"{len(files)} files listed in folder {folder}.") + return files + async def delete_file(self, folder: str, file_name: str): """ Delete a file from the S3 bucket. diff --git a/src/backend/base/langflow/services/task/backends/anyio.py b/src/backend/base/langflow/services/task/backends/anyio.py index b7dbf823dfda..2ad4143ac9aa 100644 --- a/src/backend/base/langflow/services/task/backends/anyio.py +++ b/src/backend/base/langflow/services/task/backends/anyio.py @@ -68,13 +68,14 @@ async def launch_task( try: task_result = AnyIOTaskResult(tg) tg.start_soon(task_result.run, task_func, *args, **kwargs) - task_id = str(id(task_result)) - self.tasks[task_id] = task_result - logger.info(f"Task {task_id} started.") - return task_id, task_result except Exception: # noqa: BLE001 logger.exception("An error occurred while launching the task") return None, None + task_id = str(id(task_result)) + self.tasks[task_id] = task_result + logger.info(f"Task {task_id} started.") + return task_id, task_result + def get_task(self, task_id: str) -> Any: return self.tasks.get(task_id) diff --git a/src/backend/base/langflow/services/utils.py b/src/backend/base/langflow/services/utils.py index cf7c32c93978..7586cc8a4e40 100644 --- a/src/backend/base/langflow/services/utils.py +++ b/src/backend/base/langflow/services/utils.py @@ -164,10 +164,7 @@ def initialize_services(fix_migration: bool = False, socketio_server=None): # Test cache connection get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory()) # Setup the superuser - try: - initialize_database(fix_migration=fix_migration) - except Exception: - raise + initialize_database(fix_migration=fix_migration) setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), next(get_session())) try: get_db_service().migrate_flows_if_auto_login() diff --git a/src/backend/base/langflow/utils/version.py b/src/backend/base/langflow/utils/version.py index b25c59863375..8f1e21f461cb 100644 --- a/src/backend/base/langflow/utils/version.py +++ b/src/backend/base/langflow/utils/version.py @@ -36,14 +36,14 @@ def _get_version_info(): __version__ = metadata.version(pkg_name) prerelease_version = __version__ version = _compute_non_prerelease_version(prerelease_version) - + except (ImportError, metadata.PackageNotFoundError): + pass + else: return { "version": prerelease_version, "main_version": version, "package": display_name, } - except (ImportError, metadata.PackageNotFoundError): - pass if __version__ is None: msg = f"Package not found from options {package_options}" diff --git a/src/backend/base/langflow/worker.py b/src/backend/base/langflow/worker.py index 7a9472c37f2c..34e5d7a5bca9 100644 --- a/src/backend/base/langflow/worker.py +++ b/src/backend/base/langflow/worker.py @@ -24,9 +24,9 @@ def build_vertex(self, vertex: Vertex) -> Vertex: try: vertex.task_id = self.request.id async_to_sync(vertex.build)() - return vertex except SoftTimeLimitExceeded as e: raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e + return vertex @celery_app.task(acks_late=True) diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 33a761cd6cc6..858b77b8ad81 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -52,6 +52,7 @@ ignore = [ "RUF012", # Pydantic models are currently not well detected. See https://github.com/astral-sh/ruff/issues/13630 "TD002", # Missing author in TODO "TD003", # Missing issue link in TODO + "TRY301", # A bit too harsh (Abstract `raise` to an inner function) # Rules that are TODOs "ANN", @@ -63,7 +64,6 @@ ignore = [ "S", "SLF", "T201", - "TRY3", ] [tool.ruff.lint.per-file-ignores]