Skip to content

Commit

Permalink
Add ruff rules TRY3xx
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Oct 10, 2024
1 parent de055f2 commit 64a77e7
Show file tree
Hide file tree
Showing 47 changed files with 677 additions and 566 deletions.
8 changes: 5 additions & 3 deletions src/backend/base/langflow/api/v1/api_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def delete_api_key_route(
):
try:
delete_api_key(db, api_key_id)
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"detail": "API Key deleted"}


@router.post("/store")
Expand Down Expand Up @@ -88,10 +88,11 @@ def save_store_api_key(
domain=auth_settings.COOKIE_DOMAIN,
)

return {"detail": "API Key saved"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e

return {"detail": "API Key saved"}


@router.delete("/store")
def delete_store_api_key(
Expand All @@ -101,6 +102,7 @@ def delete_store_api_key(
try:
current_user.store_api_key = None
db.commit()
return {"detail": "API Key deleted"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e

return {"detail": "API Key deleted"}
157 changes: 89 additions & 68 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
playgroundSuccess=True,
),
)
return first_layer, vertices_to_run, graph
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_playground,
Expand All @@ -205,6 +204,8 @@ async def build_graph_and_get_order() -> tuple[list[str], list[str], Graph]:
logger.exception("Error checking build status")
raise HTTPException(status_code=500, detail=str(exc)) from exc

return first_layer, vertices_to_run, graph

async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManager) -> VertexBuildResponse:
flow_id_str = str(flow_id)

Expand Down Expand Up @@ -302,7 +303,6 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
Expand All @@ -317,6 +317,8 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc

return build_response

async def build_vertices(
vertex_id: str,
graph: Graph,
Expand Down Expand Up @@ -588,7 +590,6 @@ async def build_vertex(
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
Expand All @@ -603,6 +604,90 @@ async def build_vertex(
message = parse_exception(exc)
raise HTTPException(status_code=500, detail=message) from exc

return build_response


async def _stream_vertex(flow_id: str, vertex_id: str, chat_service: ChatService):
graph = None
try:
try:
cache = await chat_service.get_cache(flow_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return

if not cache:
# If there's no cache
msg = f"No cache found for {flow_id}."
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
else:
graph = cache.get("result")

try:
vertex: InterfaceVertex = graph.get_vertex(vertex_id)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
yield str(StreamData(event="error", data={"error": str(exc)}))
return

if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return

if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)

elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
try:
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
logger.error(msg)
yield str(StreamData(event="error", data={"error": msg}))
return
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))


@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
Expand Down Expand Up @@ -638,70 +723,6 @@ async def build_vertex_stream(
HTTPException: If an error occurs while building the vertex.
"""
try:
flow_id_str = str(flow_id)

async def stream_vertex():
graph = None
try:
cache = await chat_service.get_cache(flow_id_str)
if not cache:
# If there's no cache
msg = f"No cache found for {flow_id_str}."
raise ValueError(msg)
else:
graph = cache.get("result")

vertex: InterfaceVertex = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
msg = f"Vertex {vertex_id} does not support streaming"
raise ValueError(msg)
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)

elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
msg = f"No result found for vertex {vertex_id}"
raise ValueError(msg)

except Exception as exc: # noqa: BLE001
logger.exception("Error building Component")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
finally:
logger.debug("Closing stream")
if graph:
await chat_service.set_cache(flow_id_str, graph)
yield str(StreamData(event="close", data={"message": "Stream closed"}))

return StreamingResponse(stream_vertex(), media_type="text/event-stream")
return StreamingResponse(_stream_vertex(str(flow_id), vertex_id, chat_service), media_type="text/event-stream")
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building Component") from exc
Loading

0 comments on commit 64a77e7

Please sign in to comment.