From 45c296317d33573649b7fe25f093be94ac6b99f1 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 14:59:18 -0300 Subject: [PATCH 01/11] feat: add diskcache package version 5.6.3 to poetry.lock and pyproject.toml for improved caching functionality --- src/backend/base/poetry.lock | 13 ++++++++++++- src/backend/base/pyproject.toml | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index e52b0c5b775..8deb46e0bfa 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -1208,6 +1208,17 @@ files = [ graph = ["objgraph (>=1.7.2)"] profile = ["gprof2dot (>=2022.7.29)"] +[[package]] +name = "diskcache" +version = "5.6.3" +description = "Disk Cache -- Disk and file backed persistent cache." +optional = false +python-versions = ">=3" +files = [ + {file = "diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19"}, + {file = "diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc"}, +] + [[package]] name = "distlib" version = "0.3.8" @@ -7654,4 +7665,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "747dad35b9e5b1338a989ea6bfd4ac3465ba34f792639aeabda3c1ca9b40c689" +content-hash = "fe6710d7325bc2cceeaa298d94d6f1157cfe1533c2acbabe3ecdca5594d9e007" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index d98c673e076..a8de78d9e01 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -79,6 +79,7 @@ filelock = "^3.15.4" grandalf = "^0.8.0" crewai = "^0.36.0" spider-client = "^0.0.27" +diskcache = "^5.6.3" [tool.poetry.extras] From d8a3f454dce37f9ba87b8fa025aa66b1aca524e3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 14:59:43 -0300 Subject: [PATCH 02/11] refactor: simplify CacheMiss import in cache service files for better clarity and maintainability --- src/backend/base/langflow/services/cache/service.py | 4 +--- src/backend/base/langflow/services/cache/utils.py | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 3d4131c239c..021c33f9028 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -8,9 +8,7 @@ from loguru import logger from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType -from langflow.services.cache.utils import CacheMiss - -CACHE_MISS = CacheMiss() +from langflow.services.cache.utils import CACHE_MISS class ThreadingInMemoryCache(CacheService, Generic[LockType]): # type: ignore diff --git a/src/backend/base/langflow/services/cache/utils.py b/src/backend/base/langflow/services/cache/utils.py index a89963f5681..c2f3c961124 100644 --- a/src/backend/base/langflow/services/cache/utils.py +++ b/src/backend/base/langflow/services/cache/utils.py @@ -166,3 +166,6 @@ def update_build_status(cache_service, flow_id: str, status: "BuildStatus"): cache_service[flow_id] = cached_flow cached_flow["status"] = status cache_service[flow_id] = cached_flow + + +CACHE_MISS = CacheMiss() From 6b6df7c3274edcc39ea97de59ecd493d538c29a2 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 14:59:57 -0300 Subject: [PATCH 03/11] feat: Add AsyncDiskCache class for disk-based caching --- .../base/langflow/services/cache/disk.py | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/backend/base/langflow/services/cache/disk.py diff --git a/src/backend/base/langflow/services/cache/disk.py b/src/backend/base/langflow/services/cache/disk.py new file mode 100644 index 00000000000..1d83ef6a69a --- /dev/null +++ b/src/backend/base/langflow/services/cache/disk.py @@ -0,0 +1,86 @@ +import asyncio +import pickle +import time +from typing import Generic, Optional + +from diskcache import Cache +from loguru import logger + +from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType +from langflow.services.cache.utils import CACHE_MISS + + +class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore + def __init__(self, cache_dir, max_size=None, expiration_time=3600): + self.cache = Cache(cache_dir) + self.lock = asyncio.Lock() + self.max_size = max_size + self.expiration_time = expiration_time + + async def get(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + return await self._get(key) + else: + return await self._get(key) + + async def _get(self, key): + item = await asyncio.to_thread(self.cache.get, key, default=None) + if item: + if time.time() - item["time"] < self.expiration_time: + await asyncio.to_thread(self.cache.touch, key) # Refresh the expiry time + return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] + else: + logger.info(f"Cache item for key '{key}' has expired and will be deleted.") + await self._delete(key) # Log before deleting the expired item + return CACHE_MISS + + async def set(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._set(key, value) + else: + await self._set(key, value) + + async def _set(self, key, value): + if self.max_size and len(self.cache) >= self.max_size: + await asyncio.to_thread(self.cache.cull) + item = {"value": pickle.dumps(value) if not isinstance(value, (str, bytes)) else value, "time": time.time()} + await asyncio.to_thread(self.cache.set, key, item) + + async def delete(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._delete(key) + else: + await self._delete(key) + + async def _delete(self, key): + await asyncio.to_thread(self.cache.delete, key) + + async def clear(self, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._clear() + else: + await self._clear() + + async def _clear(self): + await asyncio.to_thread(self.cache.clear) + + async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._upsert(key, value) + else: + await self._upsert(key, value) + + async def _upsert(self, key, value): + existing_value = await self.get(key) + if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): + existing_value.update(value) + value = existing_value + await self.set(key, value) + + def __contains__(self, key): + return asyncio.run(asyncio.to_thread(self.cache.__contains__, key)) From 21932e44aee5bc3b2459c9604af635ba464a15aa Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:00:15 -0300 Subject: [PATCH 04/11] feat: Add disk caching option in CacheServiceFactory with AsyncDiskCache --- src/backend/base/langflow/services/cache/factory.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/backend/base/langflow/services/cache/factory.py b/src/backend/base/langflow/services/cache/factory.py index 5cc6b12afe0..74364dbfc0e 100644 --- a/src/backend/base/langflow/services/cache/factory.py +++ b/src/backend/base/langflow/services/cache/factory.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from langflow.services.cache.disk import AsyncDiskCache from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache from langflow.services.factory import ServiceFactory from langflow.utils.logger import logger @@ -36,3 +37,8 @@ def create(self, settings_service: "SettingsService"): return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire) elif settings_service.settings.cache_type == "async": return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire) + elif settings_service.settings.cache_type == "disk": + return AsyncDiskCache( + cache_dir=settings_service.settings.config_dir, + expiration_time=settings_service.settings.cache_expire, + ) From aacecf269b0f2186f477b2633536a0d6832789c4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:01:05 -0300 Subject: [PATCH 05/11] feat: Restrict cache_type to specific literals: async, redis, memory, disk for enhanced type safety and clarity --- src/backend/base/langflow/services/settings/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 658edd57e1c..88c2a64b49f 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -3,7 +3,7 @@ import os from pathlib import Path from shutil import copy2 -from typing import Any, List, Optional, Tuple, Type +from typing import Any, List, Literal, Optional, Tuple, Type import orjson import yaml @@ -79,7 +79,7 @@ class Settings(BaseSettings): """SQLite pragmas to use when connecting to the database.""" # cache configuration - cache_type: str = "async" + cache_type: Literal["async", "redis", "memory", "disk"] = "async" """The cache type can be 'async' or 'redis'.""" cache_expire: int = 3600 """The cache expire in seconds.""" From 58161bcd1d3e6c981e03465b28a7f6e82c6ae2b9 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:01:19 -0300 Subject: [PATCH 06/11] feat: Change get_requester_result to async await for proper async handling in Vertex class --- src/backend/base/langflow/graph/vertex/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 8284736acf3..0b7941734b4 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -753,7 +753,7 @@ async def build( return if self.frozen and self._built: - return self.get_requester_result(requester) + return await self.get_requester_result(requester) elif self._built and requester is not None: # This means that the vertex has already been built # and we are just getting the result for the requester From 8eb7fe090a0f19df6806aa1e90ce972211ce2f4e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:01:37 -0300 Subject: [PATCH 07/11] fix: Update outputs dictionary in ResultData class to use key-value pairs for better readability and maintainability --- src/backend/base/langflow/graph/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/graph/schema.py b/src/backend/base/langflow/graph/schema.py index eab0040c6d3..fdabcdaaa64 100644 --- a/src/backend/base/langflow/graph/schema.py +++ b/src/backend/base/langflow/graph/schema.py @@ -43,7 +43,7 @@ def validate_model(cls, values): stream_url = StreamURL(location=message["stream_url"]) values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])}) elif "type" in message: - values["outputs"].update({OutputValue(message=message, type=message["type"])}) + values["outputs"].update({key: OutputValue(message=message, type=message["type"])}) return values From f7437389a8095c629fb2e24855606caed5546edd Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:02:12 -0300 Subject: [PATCH 08/11] fix: Improve caching logic in Graph class by ensuring vertex builds properly handle exceptions and cache updates more reliably --- src/backend/base/langflow/graph/graph/base.py | 62 +++++++++++-------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 42de21ac918..2d1bc3a7b8d 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1125,43 +1125,53 @@ async def build_vertex( self.run_manager.add_to_vertices_being_run(vertex_id) try: params = "" - if vertex.frozen: + should_build = False + if not vertex.frozen: + should_build = True + else: # Check the cache for the vertex if get_cache is not None: cached_result = await get_cache(key=vertex.id) else: cached_result = None if isinstance(cached_result, CacheMiss): - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - if cached_result and not isinstance(cached_result, CacheMiss): - cached_vertex = cached_result["result"] - # Now set update the vertex with the cached vertex - vertex._built = cached_vertex._built - vertex.result = cached_vertex.result - vertex.results = cached_vertex.results - vertex.artifacts = cached_vertex.artifacts - vertex._built_object = cached_vertex._built_object - vertex._custom_component = cached_vertex._custom_component - if vertex.result is not None: - vertex.result.used_frozen_result = True + should_build = True else: - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - else: + try: + cached_vertex_dict = cached_result["result"] + # Now set update the vertex with the cached vertex + vertex._built = cached_vertex_dict["_built"] + vertex.artifacts = cached_vertex_dict["artifacts"] + vertex._built_object = cached_vertex_dict["_built_object"] + vertex._built_result = cached_vertex_dict["_built_result"] + vertex._data = cached_vertex_dict["_data"] + vertex.results = cached_vertex_dict["results"] + try: + vertex._finalize_build() + if vertex.result is not None: + vertex.result.used_frozen_result = True + except Exception: + should_build = True + except KeyError: + should_build = True + + if should_build: await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files ) if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - - if vertex.result is not None: + vertex_dict = { + "_built": vertex._built, + "results": vertex.results, + "artifacts": vertex.artifacts, + "_built_object": vertex._built_object, + "_built_result": vertex._built_result, + "_data": vertex._data, + } + + await set_cache(key=vertex.id, data=vertex_dict) + + if vertex.results is not None: params = f"{vertex._built_object_repr()}{params}" valid = True result_dict = vertex.result From 2a7c845698cfecbcbac2c5e9c562cfb4447e459c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:08:19 -0300 Subject: [PATCH 09/11] feat: Add teardown method to AsyncDiskCache for clearing cache directory during cleanup process --- src/backend/base/langflow/services/cache/disk.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/backend/base/langflow/services/cache/disk.py b/src/backend/base/langflow/services/cache/disk.py index 1d83ef6a69a..4fd86957f65 100644 --- a/src/backend/base/langflow/services/cache/disk.py +++ b/src/backend/base/langflow/services/cache/disk.py @@ -84,3 +84,7 @@ async def _upsert(self, key, value): def __contains__(self, key): return asyncio.run(asyncio.to_thread(self.cache.__contains__, key)) + + async def teardown(self): + # Clean up the cache directory + self.cache.clear(retry=True) From 331d84615c0fe1f0ebb4d28f34ba3107451cfde1 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 8 Aug 2024 15:29:14 -0300 Subject: [PATCH 10/11] fix: Correct variable name in Graph class to ensure proper handling of vertex results in caching logic --- src/backend/base/langflow/graph/graph/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 2d1bc3a7b8d..05744b68441 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1171,7 +1171,7 @@ async def build_vertex( await set_cache(key=vertex.id, data=vertex_dict) - if vertex.results is not None: + if vertex.result is not None: params = f"{vertex._built_object_repr()}{params}" valid = True result_dict = vertex.result From fc3e7af7f472e33a9f5b3da47963300ed129372d Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 9 Aug 2024 09:17:11 -0300 Subject: [PATCH 11/11] feat: Clear AsyncDiskCache on initialization to align behavior with in-memory cache until frontend handling is implemented --- src/backend/base/langflow/services/cache/disk.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/backend/base/langflow/services/cache/disk.py b/src/backend/base/langflow/services/cache/disk.py index 4fd86957f65..dbbd85f1335 100644 --- a/src/backend/base/langflow/services/cache/disk.py +++ b/src/backend/base/langflow/services/cache/disk.py @@ -13,6 +13,12 @@ class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore def __init__(self, cache_dir, max_size=None, expiration_time=3600): self.cache = Cache(cache_dir) + # Let's clear the cache for now to maintain a similar + # behavior as the in-memory cache + # Later we should implement endpoints for the frontend to grab + # output logs from the cache + if len(self.cache) > 0: + self.cache.clear() self.lock = asyncio.Lock() self.max_size = max_size self.expiration_time = expiration_time