-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
login.py
143 lines (128 loc) · 5.05 KB
/
login.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session
from langflow.api.v1.schemas import Token
from langflow.services.auth.utils import (
authenticate_user,
create_refresh_token,
create_user_longterm_token,
create_user_tokens,
)
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.deps import get_session, get_settings_service, get_variable_service
from langflow.services.settings.service import SettingsService
from langflow.services.variable.service import VariableService
router = APIRouter(tags=["Login"])
@router.post("/login", response_model=Token)
async def login_to_get_access_token(
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_session),
# _: Session = Depends(get_current_active_user)
settings_service=Depends(get_settings_service),
variable_service: VariableService = Depends(get_variable_service),
):
auth_settings = settings_service.auth_settings
try:
user = authenticate_user(form_data.username, form_data.password, db)
except Exception as exc:
if isinstance(exc, HTTPException):
raise exc
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exc),
) from exc
if user:
tokens = create_user_tokens(user_id=user.id, db=db, update_last_login=True)
response.set_cookie(
"refresh_token_lf",
tokens["refresh_token"],
httponly=auth_settings.REFRESH_HTTPONLY,
samesite=auth_settings.REFRESH_SAME_SITE,
secure=auth_settings.REFRESH_SECURE,
expires=auth_settings.REFRESH_TOKEN_EXPIRE_SECONDS,
domain=auth_settings.COOKIE_DOMAIN,
)
response.set_cookie(
"access_token_lf",
tokens["access_token"],
httponly=auth_settings.ACCESS_HTTPONLY,
samesite=auth_settings.ACCESS_SAME_SITE,
secure=auth_settings.ACCESS_SECURE,
expires=auth_settings.ACCESS_TOKEN_EXPIRE_SECONDS,
domain=auth_settings.COOKIE_DOMAIN,
)
variable_service.initialize_user_variables(user.id, db)
# Create default folder for user if it doesn't exist
create_default_folder_if_it_doesnt_exist(db, user.id)
return tokens
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
@router.get("/auto_login")
async def auto_login(
response: Response, db: Session = Depends(get_session), settings_service=Depends(get_settings_service)
):
auth_settings = settings_service.auth_settings
if settings_service.auth_settings.AUTO_LOGIN:
user_id, tokens = create_user_longterm_token(db)
response.set_cookie(
"access_token_lf",
tokens["access_token"],
httponly=auth_settings.ACCESS_HTTPONLY,
samesite=auth_settings.ACCESS_SAME_SITE,
secure=auth_settings.ACCESS_SECURE,
expires=None, # Set to None to make it a session cookie
domain=auth_settings.COOKIE_DOMAIN,
)
return tokens
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"message": "Auto login is disabled. Please enable it in the settings",
"auto_login": False,
},
)
@router.post("/refresh")
async def refresh_token(
request: Request,
response: Response,
settings_service: "SettingsService" = Depends(get_settings_service),
):
auth_settings = settings_service.auth_settings
token = request.cookies.get("refresh_token_lf")
if token:
tokens = create_refresh_token(token)
response.set_cookie(
"refresh_token_lf",
tokens["refresh_token"],
httponly=auth_settings.REFRESH_HTTPONLY,
samesite=auth_settings.REFRESH_SAME_SITE,
secure=auth_settings.REFRESH_SECURE,
expires=auth_settings.REFRESH_TOKEN_EXPIRE_SECONDS,
domain=auth_settings.COOKIE_DOMAIN,
)
response.set_cookie(
"access_token_lf",
tokens["access_token"],
httponly=auth_settings.ACCESS_HTTPONLY,
samesite=auth_settings.ACCESS_SAME_SITE,
secure=auth_settings.ACCESS_SECURE,
expires=auth_settings.ACCESS_TOKEN_EXPIRE_SECONDS,
domain=auth_settings.COOKIE_DOMAIN,
)
return tokens
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
@router.post("/logout")
async def logout(response: Response):
response.delete_cookie("refresh_token_lf")
response.delete_cookie("access_token_lf")
return {"message": "Logout successful"}