Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log retriever endpoint #2601

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/backend/base/langflow/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from langflow.api.router import router
from langflow.api.health_check_router import health_check_router
from langflow.api.log_router import log_router

__all__ = ["router", "health_check_router"]
__all__ = ["router", "health_check_router", "log_router"]
86 changes: 86 additions & 0 deletions src/backend/base/langflow/api/log_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import json
from fastapi import APIRouter, Query, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from http import HTTPStatus
from langflow.utils.logger import log_buffer

log_router = APIRouter(tags=["Log"])


async def event_generator(request: Request):
# latest_timestamp = time.time()
global log_buffer

last_line = log_buffer.get_last_n(1)
latest_timestamp, _ = last_line.popitem()
while True:
if await request.is_disconnected():
break

new_logs = log_buffer.get_after_timestamp(timestamp=latest_timestamp, lines=100)
if new_logs:
temp_ts = 0.0
for ts, msg in new_logs.items():
if ts > latest_timestamp:
yield f"{json.dumps({ts:msg})}\n"
temp_ts = ts
# for the next query iteration
latest_timestamp = temp_ts
else:
yield ": keepalive\n\n"

await asyncio.sleep(1)


@log_router.get("/logs-stream")
async def stream_logs(
request: Request,
):
"""
HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs
it establishes a long-lived connection to the server and receives log messages in real-time
the client should use the head "Accept: text/event-stream"
"""
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)

return StreamingResponse(event_generator(request), media_type="text/event-stream")


@log_router.get("/logs")
async def logs(
lines_before: int = Query(1, ge=1, description="The number of logs before the timestamp or the last log"),
lines_after: int = Query(0, ge=1, description="The number of logs after the timestamp"),
timestamp: float = Query(0, description="The timestamp to start streaming logs from"),
):
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)

logs = dict()
if lines_after > 0 and timestamp == 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Timestamp is required when requesting logs after the timestamp",
)

if lines_after > 0 and timestamp > 0:
logs = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after)
return JSONResponse(content=logs)

if timestamp == 0:
if lines_before > 0:
logs = log_buffer.get_last_n(lines_before)
else:
if lines_before > 0:
logs = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before)

return JSONResponse(content=logs)
3 changes: 2 additions & 1 deletion src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from starlette.middleware.base import BaseHTTPMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from langflow.api import router, health_check_router
from langflow.api import router, health_check_router, log_router
from langflow.initial_setup.setup import (
create_or_update_starter_projects,
initialize_super_user_if_needed,
Expand Down Expand Up @@ -157,6 +157,7 @@ async def flatten_query_string_lists(request: Request, call_next):

app.include_router(router)
app.include_router(health_check_router)
app.include_router(log_router)

FastAPIInstrumentor.instrument_app(app)

Expand Down
79 changes: 78 additions & 1 deletion src/backend/base/langflow/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import json
import logging
import os
import sys
import threading
from datetime import timedelta
from pathlib import Path
from typing import Optional
from collections import OrderedDict
from itertools import islice
from typing import Dict, Optional

import orjson
from loguru import logger
Expand All @@ -12,6 +17,75 @@
VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]


class SizedLogBuffer:
def __init__(self):
"""
a buffer for storing log messages for the log retrieval API
the buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE
because the logger is initialized before the settings_service are loaded
"""
self.max: int = 0
env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0")
if env_buffer_size.isdigit():
self.max = int(env_buffer_size)

self.buffer: OrderedDict[float, str] = OrderedDict()

self._lock = threading.Lock()
self.page_size: int = 100
self.sessions: Dict[str, Dict] = {}
self.session_timeout: timedelta = timedelta(minutes=5)

def write(self, message: str):
record = json.loads(message)
log_entry = record["text"]
epoch = record["record"]["time"]["timestamp"]
with self._lock:
if len(self.buffer) >= self.max:
# remove the oldest log entry if the buffer is full
self.buffer.popitem(last=False)
self.buffer[epoch] = log_entry

def __len__(self):
return len(self.buffer)

def get_after_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()
with self._lock:
for ts, msg in self.buffer.items():
if lines == 0:
break
if ts >= timestamp and lines > 0:
rc[ts] = msg
lines -= 1
return rc

def get_before_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()
with self._lock:
for ts, msg in reversed(self.buffer.items()):
if lines == 0:
break
if ts < timestamp and lines > 0:
rc[ts] = msg
lines -= 1
return rc

def get_last_n(self, last_idx: int) -> dict[float, str]:
with self._lock:
return dict(islice(reversed(self.buffer.items()), last_idx))

def enabled(self) -> bool:
return self.max > 0

def max_size(self) -> int:
return self.max


# log buffer for capturing log messages
log_buffer = SizedLogBuffer()


def serialize_log(record):
subset = {
"timestamp": record["time"].timestamp(),
Expand Down Expand Up @@ -85,6 +159,9 @@ def configure(
except Exception as exc:
logger.error(f"Error setting up log file: {exc}")

if log_buffer.enabled():
logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)

logger.debug(f"Logger set up with log level: {log_level}")

setup_uvicorn_logger()
Expand Down
Loading