Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3e252aa
feat: Add Rich fork dependency with pickle support for ConsoleThreadL…
pkusnail Sep 25, 2025
edac489
feat: Add Rich pickle serialization support for ConsoleThreadLocals o…
pkusnail Sep 25, 2025
298b135
feat: Integrate Rich pickle support into CacheServiceFactory
pkusnail Sep 25, 2025
2add672
feat: Enable module-level Rich pickle setup for cache services
pkusnail Sep 25, 2025
ce6ec2a
feat: Add Rich pickle status monitoring to health check endpoint
pkusnail Sep 25, 2025
f0d301e
refactor: Apply ruff code style fixes to Rich pickle functions
pkusnail Sep 25, 2025
7614f42
feat: Add cache normalization for serialization-safe DTOs
pkusnail Sep 25, 2025
f478ba7
feat: Add comprehensive cache normalization documentation and tests
pkusnail Sep 25, 2025
609a59f
refactor: Apply ruff code style fixes to Rich pickle functions
pkusnail Sep 25, 2025
834603d
feat: Add Redis cache sanitization with dill recursion protection
pkusnail Sep 25, 2025
b385a75
feat: Add Rich pickle status monitoring to health check endpoint
pkusnail Sep 25, 2025
fe2ee30
feat: Enable module-level Rich pickle setup for cache services
pkusnail Sep 25, 2025
d0b8e7a
feat: Integrate Rich pickle support into CacheServiceFactory
pkusnail Sep 25, 2025
92d393e
Remove documentation files from repository tracking
pkusnail Sep 25, 2025
f96263c
fix: Resolve ruff code style violations in cache validation script
pkusnail Sep 25, 2025
3e10014
[autofix.ci] apply automated fixes
autofix-ci[bot] Sep 25, 2025
4c6a558
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Sep 25, 2025
e615059
refactor: Apply ruff code style fixes to Rich pickle functions
pkusnail Sep 25, 2025
67bce7e
fix
pkusnail Sep 25, 2025
c36f8c6
Address review feedback: async logger shim, pickle sanitization (cycl…
pkusnail Sep 26, 2025
c384ddd
[autofix.ci] apply automated fixes
autofix-ci[bot] Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions scripts/run_local_cache_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Local cache normalization validation script.

This script validates cache normalization functionality by testing
the normalizer module and simulating ChatService cache operations.
"""

import asyncio
import importlib.util
import pickle
import sys
import types
from pathlib import Path

# Adjust sys.path for src-layout imports
ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT / "src" / "lfx" / "src"))
sys.path.insert(0, str(ROOT / "src" / "backend" / "base"))


def _load_normalizer():
"""Load the normalizer module dynamically.

Returns:
Module: The loaded normalizer module.

Raises:
ImportError: If the normalizer module cannot be loaded.
"""
path = ROOT / "src" / "lfx" / "src" / "lfx" / "serialization" / "normalizer.py"
spec = importlib.util.spec_from_file_location("_normalizer_local", path)
if not spec or not spec.loader:
msg = f"Cannot load normalizer from {path}"
raise ImportError(msg)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod


_normalizer = _load_normalizer()
normalize_for_cache = _normalizer.normalize_for_cache # type: ignore[attr-defined]

# Preload modules to avoid heavy lfx.serialization imports (numpy, pandas)

_serialization_pkg = types.ModuleType("lfx.serialization")
sys.modules["lfx.serialization"] = _serialization_pkg
sys.modules["lfx.serialization.normalizer"] = _normalizer

# Provide a minimal dill shim for imports in cache.service
_dill = types.ModuleType("dill")
_dill.dumps = lambda obj, *_args, **_kwargs: pickle.dumps(obj)
_dill.loads = lambda b: pickle.loads(b) # noqa: S301
sys.modules["dill"] = _dill


def check_normalizer():
"""Test cache normalization functionality.

Validates that the normalizer correctly handles dynamic classes,
functions, and vertex snapshots.

Raises:
AssertionError: If normalization tests fail.
"""
dynamic_type = type("Dynamic", (), {"x": 1})

def dyn_func():
return 42

test_value = 123
obj = {"cls": dynamic_type, "func": dyn_func, "value": test_value}
out = normalize_for_cache(obj)

if out["value"] != test_value:
msg = f"Expected value {test_value}, got {out['value']}"
raise ValueError(msg)
if "__class_path__" not in out["cls"]:
msg = "Missing __class_path__ in normalized class"
raise ValueError(msg)
if "__callable_path__" not in out["func"]:
msg = "Missing __callable_path__ in normalized function"
raise ValueError(msg)

vertex_snapshot = {
"built": True,
"results": {"x": 1},
"artifacts": {},
"built_object": dyn_func,
"built_result": {"y": 2},
"full_data": {"id": "v1"},
}
ov = normalize_for_cache(vertex_snapshot)

if ov["__cache_vertex__"] is not True:
msg = "Expected __cache_vertex__ to be True"
raise ValueError(msg)
if ov["built_object"] != {"__cache_placeholder__": "unbuilt"}:
msg = f"Expected built_object placeholder, got {ov['built_object']}"
raise ValueError(msg)


async def check_chatservice():
"""Test ChatService cache behavior simulation.

Simulates ChatService.set_cache behavior using normalize_for_cache
since the environment lacks optional dependencies for full ChatService import.

Raises:
ValueError: If chat service simulation tests fail.
"""
# Environment lacks optional dependencies to import ChatService.
# Instead, simulate ChatService.set_cache behavior using normalize_for_cache directly.
dynamic_cls = type("C", (), {})
value = {
"built": True,
"results": {"ok": 1},
"built_object": dynamic_cls,
"artifacts": {},
"built_result": {"foo": "bar"},
"full_data": {"id": "v"},
}
normalized = normalize_for_cache(value)
envelope = {"result": normalized, "type": "normalized", "__envelope_version__": 1}

if envelope["type"] != "normalized":
msg = f"Expected envelope type 'normalized', got {envelope['type']}"
raise ValueError(msg)

result = envelope["result"]
if result["__cache_vertex__"] is not True:
msg = "Expected __cache_vertex__ to be True in result"
raise ValueError(msg)
if result["built_object"] != {"__cache_placeholder__": "unbuilt"}:
msg = f"Expected built_object placeholder in result, got {result['built_object']}"
raise ValueError(msg)


def main():
"""Run all local cache validation tests.

Executes normalizer and chat service tests to validate
cache functionality.
"""
check_normalizer()
asyncio.run(check_chatservice())
print("LOCAL CACHE CHECKS: OK")


if __name__ == "__main__":
main()
45 changes: 44 additions & 1 deletion src/backend/base/langflow/api/health_check_router.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
import uuid

from fastapi import APIRouter, HTTPException, status
from lfx.log.logger import logger

# Try to import from lfx, fallback to async-compatible wrapper if unavailable
try:
from lfx.log.logger import logger
except ImportError:
import logging
from typing import Any

class _AsyncLogger:
"""Async-compatible wrapper over standard logging.Logger.

Provides awaitable methods used in this module (e.g., aexception, ainfo)
to avoid attribute errors when lfx logger is not installed.
"""

def __init__(self, base: logging.Logger) -> None:
self._base = base

# Pass-through for unknown attributes (sync logging API)
def __getattr__(self, name: str) -> Any: # pragma: no cover - thin shim
return getattr(self._base, name)

async def aexception(self, msg: str, *args: Any, **kwargs: Any) -> None:
self._base.exception(msg, *args, **kwargs)

async def ainfo(self, msg: str, *args: Any, **kwargs: Any) -> None:
self._base.info(msg, *args, **kwargs)

logger = _AsyncLogger(logging.getLogger(__name__))
from pydantic import BaseModel
from sqlmodel import select

from langflow.api.utils import DbSession
from langflow.services.cache.utils import is_rich_pickle_enabled, validate_rich_pickle_support
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import get_chat_service

Expand All @@ -16,6 +45,7 @@ class HealthResponse(BaseModel):
status: str = "nok"
chat: str = "error check the server logs"
db: str = "error check the server logs"
rich_pickle: str = "not_checked"
"""
Do not send exceptions and detailed error messages to the client because it might contain credentials and other
sensitive server information.
Expand Down Expand Up @@ -59,6 +89,19 @@ async def health_check(
except Exception: # noqa: BLE001
await logger.aexception("Error checking chat service")

# Check Rich pickle support status
try:
if is_rich_pickle_enabled():
if validate_rich_pickle_support():
response.rich_pickle = "ok"
else:
response.rich_pickle = "enabled_but_validation_failed"
else:
response.rich_pickle = "not_enabled"
except Exception: # noqa: BLE001
await logger.aexception("Error checking Rich pickle support")
response.rich_pickle = "error check the server logs"

if response.has_error():
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=response.model_dump())
response.status = "ok"
Expand Down
5 changes: 5 additions & 0 deletions src/backend/base/langflow/services/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.cache.utils import is_rich_pickle_enabled, setup_rich_pickle_support

from . import factory, service

# Setup Rich pickle support on module import
_rich_pickle_enabled = setup_rich_pickle_support()

__all__ = [
"AsyncInMemoryCache",
"CacheService",
"RedisCache",
"ThreadingInMemoryCache",
"factory",
"is_rich_pickle_enabled",
"service",
]
37 changes: 35 additions & 2 deletions src/backend/base/langflow/services/cache/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@

from typing import TYPE_CHECKING

from lfx.log.logger import logger
# Try to import logger, fallback to standard logging if lfx not available
try:
from lfx.log.logger import logger
except ImportError:
import logging

logger = logging.getLogger(__name__)

from typing_extensions import override

from langflow.services.cache.disk import AsyncDiskCache
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.cache.utils import setup_rich_pickle_support, validate_rich_pickle_support
from langflow.services.factory import ServiceFactory

if TYPE_CHECKING:
Expand All @@ -16,22 +24,47 @@
class CacheServiceFactory(ServiceFactory):
def __init__(self) -> None:
super().__init__(CacheService)
# Setup Rich pickle support when factory is initialized
self._rich_pickle_enabled = setup_rich_pickle_support()
if self._rich_pickle_enabled:
logger.debug("Rich pickle support enabled for cache serialization")
# Optionally validate the support
if validate_rich_pickle_support():
logger.debug("Rich pickle support validation successful")
else:
logger.warning("Rich pickle support validation failed")
else:
logger.info("Rich pickle support could not be enabled")

@override
def create(self, settings_service: SettingsService):
# Here you would have logic to create and configure a CacheService
# based on the settings_service

# Debug: Log the cache type being used
cache_type = settings_service.settings.cache_type
logger.info("Cache factory creating cache with type: %s", cache_type)

if settings_service.settings.cache_type == "redis":
logger.debug("Creating Redis cache")
return RedisCache(
cache: RedisCache = RedisCache(
host=settings_service.settings.redis_host,
port=settings_service.settings.redis_port,
db=settings_service.settings.redis_db,
url=settings_service.settings.redis_url,
expiration_time=settings_service.settings.redis_cache_expire,
)

# Log Rich pickle status for Redis caches
if self._rich_pickle_enabled:
logger.info("Redis cache created with Rich object serialization support")
else:
logger.warning(
"Redis cache created without Rich object serialization - may cause issues with console objects"
)

return cache

if settings_service.settings.cache_type == "memory":
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)
if settings_service.settings.cache_type == "async":
Expand Down
Loading
Loading