Skip to content

Commit

Permalink
fixing ThreadingInMemoryCache usage (langflow-ai#2604)
Browse files Browse the repository at this point in the history
* ThreadingInMemoryCache usage is broken. This commit addresses those issues along with missing documentation about the caching options.

* make lint & make unit_tests fixes

* removing unnecessary changes from unclear test results in  last run

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
himan-k and autofix-ci[bot] authored Jul 10, 2024
1 parent d28fe8e commit 7414a01
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ LANGFLOW_OPEN_BROWSER=
# Example: LANGFLOW_REMOVE_API_KEYS=false
LANGFLOW_REMOVE_API_KEYS=

# Whether to use RedisCache or InMemoryCache
# Values: memory, redis
# Whether to use RedisCache or ThreadingInMemoryCache or AsyncInMemoryCache
# Values: async, memory, redis
# Example: LANGFLOW_CACHE_TYPE=memory
# If you want to use redis then the following environment variables must be set:
# LANGFLOW_REDIS_HOST (default: localhost)
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,4 @@ prof/*
src/frontend/temp
*-shm
*-wal
.history
2 changes: 2 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"version": "0.2.0",
"configurations": [

{
"name": "Debug Backend",
"type": "debugpy",
Expand Down Expand Up @@ -38,6 +39,7 @@
"--env-file",
"${workspaceFolder}/.env"
],
// "python": "/path/to/your/python_env/python", // Replace with the path to your Python executable
"jinja": true,
"justMyCode": false
},
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@

# 📝 Content

- [](#)
- [📝 Content](#-content)
- [📦 Get Started](#-get-started)
- [🎨 Create Flows](#-create-flows)
- [Deploy](#deploy)
- [DataStax Langflow](#datastax-langflow)
- [Deploy Langflow on Hugging Face Spaces](#deploy-langflow-on-hugging-face-spaces)
- [Deploy Langflow on Google Cloud Platform](#deploy-langflow-on-google-cloud-platform)
- [Deploy on Railway](#deploy-on-railway)
- [Deploy on Render](#deploy-on-render)
Expand All @@ -64,6 +68,13 @@ You can install Langflow with pip:
# Make sure you have >=Python 3.10 installed on your system.
python -m pip install langflow -U
```
Or

If you would like to install from your cloned repo, you can build and install Langflow's frontend and backend with:

```shell
make install_frontend && make build_frontend && make install_backend
```

Then, run Langflow with:

Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async def get_next_runnable_vertices(
list: A list of IDs of the next runnable vertices.
"""
async with chat_service._cache_locks[flow_id] as lock:
async with chat_service._async_cache_locks[flow_id] as lock:
graph.remove_from_predecessors(vertex_id)
direct_successors_ready = [v for v in vertex.successors_ids if graph.is_vertex_runnable(v)]
if not direct_successors_ready:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def build_vertex(
vertex = graph.get_vertex(vertex_id)

try:
lock = chat_service._cache_locks[flow_id_str]
lock = chat_service._async_cache_locks[flow_id_str]
(
result_dict,
params,
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ async def process(self, fallback_to_env_vars: bool, start_component_id: Optional
self.set_run_id(run_id)
self.set_run_name()
await self.initialize_run()
lock = chat_service._cache_locks[self.run_id]
lock = chat_service._async_cache_locks[self.run_id]
while to_process:
current_batch = list(to_process) # Copy current deque items to a list
to_process.clear() # Clear the deque for new items
Expand Down
43 changes: 39 additions & 4 deletions src/backend/base/langflow/interface/types.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import json

from typing import TYPE_CHECKING
from loguru import logger

from loguru import logger
from langflow.custom.utils import abuild_custom_components, build_custom_components
from langflow.services.cache.base import AsyncBaseCacheService

if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
Expand Down Expand Up @@ -63,9 +63,44 @@ async def get_and_cache_all_types_dict(
force_refresh: bool = False,
lock: asyncio.Lock | None = None,
):
all_types_dict = await cache_service.get(key="all_types_dict", lock=lock)
async def get_from_cache(key):
"""
Retrieves a value from the cache based on the given key.
Args:
key: The key to retrieve the value from the cache.
Returns:
The value associated with the given key in the cache.
Raises:
None.
"""
if isinstance(cache_service, AsyncBaseCacheService):
return await cache_service.get(key=key, lock=lock)
else:
return cache_service.get(key=key, lock=lock)

async def set_in_cache(key, value):
"""
Sets the given key-value pair in the cache.
Parameters:
- key: The key to set in the cache.
- value: The value to associate with the key in the cache.
Returns:
None
"""
if isinstance(cache_service, AsyncBaseCacheService):
await cache_service.set(key=key, value=value, lock=lock)
else:
cache_service.set(key=key, value=value, lock=lock)

all_types_dict = await get_from_cache("all_types_dict")
if not all_types_dict or force_refresh:
logger.debug("Building langchain types dict")
all_types_dict = await aget_all_types_dict(settings_service.settings.components_path)
await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock)
await set_in_cache("all_types_dict", all_types_dict)

return all_types_dict
86 changes: 80 additions & 6 deletions src/backend/base/langflow/services/chat/service.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,113 @@
import asyncio
from collections import defaultdict
from threading import RLock
from typing import Any, Optional

from langflow.services.base import Service
from langflow.services.cache.base import AsyncBaseCacheService
from langflow.services.deps import get_cache_service


class ChatService(Service):
"""
Service class for managing chat-related operations.
"""

name = "chat_service"

def __init__(self):
self._cache_locks = defaultdict(asyncio.Lock)
self._async_cache_locks = defaultdict(asyncio.Lock)
self._sync_cache_locks = defaultdict(RLock)
self.cache_service = get_cache_service()

def _get_lock(self, key: str):
"""
Retrieves the lock associated with the given key.
Args:
key (str): The key to retrieve the lock for.
Returns:
threading.Lock or asyncio.Lock: The lock associated with the given key.
"""
if isinstance(self.cache_service, AsyncBaseCacheService):
return self._async_cache_locks[key]
else:
return self._sync_cache_locks[key]

async def _perform_cache_operation(
self, operation: str, key: str, data: Any = None, lock: Optional[asyncio.Lock] = None
):
"""
Perform a cache operation based on the given operation type.
Args:
operation (str): The type of cache operation to perform. Possible values are "upsert", "get", or "delete".
key (str): The key associated with the cache operation.
data (Any, optional): The data to be stored in the cache. Only applicable for "upsert" operation. Defaults to None.
lock (Optional[asyncio.Lock], optional): The lock to be used for the cache operation. Defaults to None.
Returns:
Any: The result of the cache operation. Only applicable for "get" operation.
Raises:
None
"""
lock = lock or self._get_lock(key)
if isinstance(self.cache_service, AsyncBaseCacheService):
if operation == "upsert":
await self.cache_service.upsert(str(key), data, lock=lock)
elif operation == "get":
return await self.cache_service.get(key, lock=lock)
elif operation == "delete":
await self.cache_service.delete(key, lock=lock)
else:
if operation == "upsert":
self.cache_service.upsert(str(key), data, lock=lock)
elif operation == "get":
return self.cache_service.get(key, lock=lock)
elif operation == "delete":
self.cache_service.delete(key, lock=lock)

async def set_cache(self, key: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool:
"""
Set the cache for a client.
Args:
key (str): The cache key.
data (Any): The data to be cached.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
Returns:
bool: True if the cache was set successfully, False otherwise.
"""
# client_id is the flow id but that already exists in the cache
# so we need to change it to something else
result_dict = {
"result": data,
"type": type(data),
}
await self.cache_service.upsert(str(key), result_dict, lock=lock or self._cache_locks[key])
await self._perform_cache_operation("upsert", key, result_dict, lock)
return key in self.cache_service

async def get_cache(self, key: str, lock: Optional[asyncio.Lock] = None) -> Any:
"""
Get the cache for a client.
Args:
key (str): The cache key.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
Returns:
Any: The cached data.
"""
return await self.cache_service.get(key, lock=lock or self._cache_locks[key])
return await self._perform_cache_operation("get", key, lock=lock or self._get_lock(key))

async def clear_cache(self, key: str, lock: Optional[asyncio.Lock] = None):
"""
Clear the cache for a client.
Args:
key (str): The cache key.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
"""
await self.cache_service.delete(key, lock=lock or self._cache_locks[key])
await self._perform_cache_operation("delete", key, lock=lock or self._get_lock(key))

0 comments on commit 7414a01

Please sign in to comment.