Skip to content

Commit

Permalink
feat: log retriever endpoint (#2601)
Browse files Browse the repository at this point in the history
* log retriever endpoint

* disabled by default

* realtime log stream via http/2 SSE

* read and write lock

* unit test
  • Loading branch information
zzzming authored Jul 11, 2024
1 parent 7bd1050 commit 81849d5
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/backend/base/langflow/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from langflow.api.router import router
from langflow.api.health_check_router import health_check_router
from langflow.api.log_router import log_router

__all__ = ["router", "health_check_router"]
__all__ = ["router", "health_check_router", "log_router"]
86 changes: 86 additions & 0 deletions src/backend/base/langflow/api/log_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import json
from fastapi import APIRouter, Query, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from http import HTTPStatus
from langflow.utils.logger import log_buffer

log_router = APIRouter(tags=["Log"])


async def event_generator(request: Request):
# latest_timestamp = time.time()
global log_buffer

last_line = log_buffer.get_last_n(1)
latest_timestamp, _ = last_line.popitem()
while True:
if await request.is_disconnected():
break

new_logs = log_buffer.get_after_timestamp(timestamp=latest_timestamp, lines=100)
if new_logs:
temp_ts = 0.0
for ts, msg in new_logs.items():
if ts > latest_timestamp:
yield f"{json.dumps({ts:msg})}\n"
temp_ts = ts
# for the next query iteration
latest_timestamp = temp_ts
else:
yield ": keepalive\n\n"

await asyncio.sleep(1)


@log_router.get("/logs-stream")
async def stream_logs(
request: Request,
):
"""
HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs
it establishes a long-lived connection to the server and receives log messages in real-time
the client should use the head "Accept: text/event-stream"
"""
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)

return StreamingResponse(event_generator(request), media_type="text/event-stream")


@log_router.get("/logs")
async def logs(
lines_before: int = Query(1, ge=1, description="The number of logs before the timestamp or the last log"),
lines_after: int = Query(0, ge=1, description="The number of logs after the timestamp"),
timestamp: float = Query(0, description="The timestamp to start streaming logs from"),
):
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)

logs = dict()
if lines_after > 0 and timestamp == 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Timestamp is required when requesting logs after the timestamp",
)

if lines_after > 0 and timestamp > 0:
logs = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after)
return JSONResponse(content=logs)

if timestamp == 0:
if lines_before > 0:
logs = log_buffer.get_last_n(lines_before)
else:
if lines_before > 0:
logs = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before)

return JSONResponse(content=logs)
3 changes: 2 additions & 1 deletion src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from starlette.middleware.base import BaseHTTPMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from langflow.api import router, health_check_router
from langflow.api import router, health_check_router, log_router
from langflow.initial_setup.setup import (
create_or_update_starter_projects,
initialize_super_user_if_needed,
Expand Down Expand Up @@ -158,6 +158,7 @@ async def flatten_query_string_lists(request: Request, call_next):

app.include_router(router)
app.include_router(health_check_router)
app.include_router(log_router)

@app.exception_handler(Exception)
async def exception_handler(request: Request, exc: Exception):
Expand Down
99 changes: 99 additions & 0 deletions src/backend/base/langflow/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import json
import logging
import os
import sys
from pathlib import Path
from collections import OrderedDict
from itertools import islice
from threading import Lock, Semaphore
from typing import Optional

import orjson
Expand All @@ -12,6 +16,98 @@
VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]


class SizedLogBuffer:
def __init__(
self,
max_readers: int = 20, # max number of concurrent readers for the buffer
):
"""
a buffer for storing log messages for the log retrieval API
the buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE
because the logger is initialized before the settings_service are loaded
"""
self.max: int = 0
env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0")
if env_buffer_size.isdigit():
self.max = int(env_buffer_size)

self.buffer: OrderedDict[float, str] = OrderedDict()

self._max_readers = max_readers
self._wlock = Lock()
self._rsemaphore = Semaphore(max_readers)

def write(self, message: str):
record = json.loads(message)
log_entry = record["text"]
epoch = record["record"]["time"]["timestamp"]

# wait until all reader semaphore are released
while self._rsemaphore._value != self._max_readers:
continue

with self._wlock:
if len(self.buffer) >= self.max:
# remove the oldest log entry if the buffer is full
self.buffer.popitem(last=False)
self.buffer[epoch] = log_entry

def __len__(self):
return len(self.buffer)

def get_after_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()

# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
for ts, msg in self.buffer.items():
if lines == 0:
break
if ts >= timestamp and lines > 0:
rc[ts] = msg
lines -= 1
self._rsemaphore.release()

return rc

def get_before_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()
# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
for ts, msg in reversed(self.buffer.items()):
if lines == 0:
break
if ts < timestamp and lines > 0:
rc[ts] = msg
lines -= 1
self._rsemaphore.release()

return rc

def get_last_n(self, last_idx: int) -> dict[float, str]:
# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
rc = dict(islice(reversed(self.buffer.items()), last_idx))
self._rsemaphore.release()
return rc

def enabled(self) -> bool:
return self.max > 0

def max_size(self) -> int:
return self.max


# log buffer for capturing log messages
log_buffer = SizedLogBuffer()


def serialize_log(record):
subset = {
"timestamp": record["time"].timestamp(),
Expand Down Expand Up @@ -85,6 +181,9 @@ def configure(
except Exception as exc:
logger.error(f"Error setting up log file: {exc}")

if log_buffer.enabled():
logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)

logger.debug(f"Logger set up with log level: {log_level}")

setup_uvicorn_logger()
Expand Down
102 changes: 102 additions & 0 deletions tests/unit/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pytest
import os
import json
from collections import OrderedDict
from unittest.mock import patch
from langflow.utils.logger import SizedLogBuffer # Replace 'your_module' with the actual module name


@pytest.fixture
def sized_log_buffer():
return SizedLogBuffer()


def test_init_default():
buffer = SizedLogBuffer()
assert buffer.max == 0
assert buffer._max_readers == 20
assert isinstance(buffer.buffer, OrderedDict)


def test_init_with_env_variable():
with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}):
buffer = SizedLogBuffer()
assert buffer.max == 100


def test_write(sized_log_buffer):
message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600}}})
sized_log_buffer.max = 1 # Set max size to 1 for testing
sized_log_buffer.write(message)
assert len(sized_log_buffer.buffer) == 1
assert 1625097600 in sized_log_buffer.buffer
assert sized_log_buffer.buffer[1625097600] == "Test log"


def test_write_overflow(sized_log_buffer):
sized_log_buffer.max = 2
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
for message in messages:
sized_log_buffer.write(message)

assert len(sized_log_buffer.buffer) == 2
assert 1625097601 in sized_log_buffer.buffer
assert 1625097602 in sized_log_buffer.buffer


def test_len(sized_log_buffer):
sized_log_buffer.max = 3
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
for message in messages:
sized_log_buffer.write(message)

assert len(sized_log_buffer) == 3


def test_get_after_timestamp(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)

result = sized_log_buffer.get_after_timestamp(1625097602, lines=2)
assert len(result) == 2
assert 1625097603 in result
assert 1625097602 in result


def test_get_before_timestamp(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)

result = sized_log_buffer.get_before_timestamp(1625097603, lines=2)
assert len(result) == 2
assert 1625097601 in result
assert 1625097602 in result


def test_get_last_n(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)

result = sized_log_buffer.get_last_n(3)
assert len(result) == 3
assert 1625097602 in result
assert 1625097603 in result
assert 1625097604 in result


def test_enabled(sized_log_buffer):
assert not sized_log_buffer.enabled()
sized_log_buffer.max = 1
assert sized_log_buffer.enabled()


def test_max_size(sized_log_buffer):
assert sized_log_buffer.max_size() == 0
sized_log_buffer.max = 100
assert sized_log_buffer.max_size() == 100

0 comments on commit 81849d5

Please sign in to comment.