-
Notifications
You must be signed in to change notification settings - Fork 958
/
context.py
111 lines (85 loc) · 3.24 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
import uuid
from contextvars import ContextVar
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from lazify import LazyProxy
from chainlit.session import ClientType, HTTPSession, WebsocketSession
if TYPE_CHECKING:
from chainlit.emitter import BaseChainlitEmitter
from chainlit.step import Step
from chainlit.user import PersistedUser, User
CL_RUN_NAMES = ["on_chat_start", "on_message", "on_audio_end"]
class ChainlitContextException(Exception):
def __init__(self, msg="Chainlit context not found", *args, **kwargs):
super().__init__(msg, *args, **kwargs)
class ChainlitContext:
loop: asyncio.AbstractEventLoop
emitter: "BaseChainlitEmitter"
session: Union["HTTPSession", "WebsocketSession"]
@property
def current_step(self):
if previous_steps := local_steps.get():
return previous_steps[-1]
@property
def current_run(self):
if previous_steps := local_steps.get():
return next(
(step for step in previous_steps if step.name in CL_RUN_NAMES), None
)
def __init__(
self,
session: Union["HTTPSession", "WebsocketSession"],
emitter: Optional["BaseChainlitEmitter"] = None,
):
from chainlit.emitter import BaseChainlitEmitter, ChainlitEmitter
self.loop = asyncio.get_running_loop()
self.session = session
if emitter:
self.emitter = emitter
elif isinstance(self.session, HTTPSession):
self.emitter = BaseChainlitEmitter(self.session)
elif isinstance(self.session, WebsocketSession):
self.emitter = ChainlitEmitter(self.session)
context_var: ContextVar[ChainlitContext] = ContextVar("chainlit")
local_steps: ContextVar[Optional[List["Step"]]] = ContextVar("local_steps")
local_steps.set(None)
def init_ws_context(session_or_sid: Union[WebsocketSession, str]) -> ChainlitContext:
if not isinstance(session_or_sid, WebsocketSession):
session = WebsocketSession.require(session_or_sid)
else:
session = session_or_sid
context = ChainlitContext(session)
context_var.set(context)
return context
def init_http_context(
thread_id: Optional[str] = None,
user: Optional[Union["User", "PersistedUser"]] = None,
auth_token: Optional[str] = None,
user_env: Optional[Dict[str, str]] = None,
client_type: ClientType = "webapp",
) -> ChainlitContext:
from chainlit.data import get_data_layer
session_id = str(uuid.uuid4())
thread_id = thread_id or str(uuid.uuid4())
session = HTTPSession(
id=session_id,
thread_id=thread_id,
token=auth_token,
user=user,
client_type=client_type,
user_env=user_env,
)
context = ChainlitContext(session)
context_var.set(context)
if data_layer := get_data_layer():
if user_id := getattr(user, "id", None):
asyncio.create_task(
data_layer.update_thread(thread_id=thread_id, user_id=user_id)
)
return context
def get_context() -> ChainlitContext:
try:
return context_var.get()
except LookupError as e:
raise ChainlitContextException from e
context: ChainlitContext = LazyProxy(get_context, enable_cache=False)