Skip to content

Commit a761489

Browse files
authored
Merge pull request #701 from tisnik/lcore-741-quota-limiter-implementation-for-postgresql--
LCORE-741: quota limiter implementation for PostgreSQL
2 parents ee9f08f + 3ca6cf9 commit a761489

File tree

6 files changed

+308
-7
lines changed

6 files changed

+308
-7
lines changed

src/quota/cluster_quota_limiter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Simple cluster quota limiter where quota is fixed for the whole cluster."""
2+
3+
from models.config import QuotaHandlersConfiguration
4+
from log import get_logger
5+
from quota.revokable_quota_limiter import RevokableQuotaLimiter
6+
7+
logger = get_logger(__name__)
8+
9+
10+
class ClusterQuotaLimiter(RevokableQuotaLimiter):
11+
"""Simple cluster quota limiter where quota is fixed for the whole cluster."""
12+
13+
def __init__(
14+
self,
15+
configuration: QuotaHandlersConfiguration,
16+
initial_quota: int = 0,
17+
increase_by: int = 0,
18+
) -> None:
19+
"""Initialize quota limiter storage."""
20+
subject = "c" # cluster
21+
super().__init__(configuration, initial_quota, increase_by, subject)
22+
23+
# initialize connection to DB
24+
# and initialize tables too
25+
self.connect()
26+
27+
def __str__(self) -> str:
28+
"""Return textual representation of limiter instance."""
29+
name = type(self).__name__
30+
return f"{name}: initial quota: {self.initial_quota} increase by: {self.increase_by}"

src/quota/quota_limiter.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
"""Abstract class that is parent for all quota limiter implementations."""
22

3-
import logging
43
from abc import ABC, abstractmethod
54

5+
from typing import Optional
66

7-
logger = logging.getLogger(__name__)
7+
import psycopg2
8+
9+
from log import get_logger
10+
from models.config import SQLiteDatabaseConfiguration, PostgreSQLDatabaseConfiguration
11+
from quota.connect_pg import connect_pg
12+
from quota.connect_sqlite import connect_sqlite
13+
14+
15+
logger = get_logger(__name__)
816

917

1018
class QuotaLimiter(ABC):
@@ -34,7 +42,11 @@ def consume_tokens(
3442

3543
@abstractmethod
3644
def __init__(self) -> None:
37-
"""Initialize connection config."""
45+
"""Initialize connection configuration(s)."""
46+
self.sqlite_connection_config: Optional[SQLiteDatabaseConfiguration] = None
47+
self.postgres_connection_config: Optional[PostgreSQLDatabaseConfiguration] = (
48+
None
49+
)
3850

3951
@abstractmethod
4052
def _initialize_tables(self) -> None:
@@ -43,7 +55,31 @@ def _initialize_tables(self) -> None:
4355
# pylint: disable=W0201
4456
def connect(self) -> None:
4557
"""Initialize connection to database."""
58+
logger.info("Initializing connection to quota limiter database")
59+
if self.postgres_connection_config is not None:
60+
self.connection = connect_pg(self.postgres_connection_config)
61+
if self.sqlite_connection_config is not None:
62+
self.connection = connect_sqlite(self.sqlite_connection_config)
63+
64+
try:
65+
self._initialize_tables()
66+
except Exception as e:
67+
self.connection.close()
68+
logger.exception("Error initializing Postgres database:\n%s", e)
69+
raise
70+
71+
self.connection.autocommit = True
4672

4773
def connected(self) -> bool:
4874
"""Check if connection to cache is alive."""
49-
return True
75+
if self.connection is None:
76+
logger.warning("Not connected, need to reconnect later")
77+
return False
78+
try:
79+
with self.connection.cursor() as cursor:
80+
cursor.execute("SELECT 1")
81+
logger.info("Connection to storage is ok")
82+
return True
83+
except psycopg2.OperationalError as e:
84+
logger.error("Disconnected from storage: %s", e)
85+
return False

src/quota/quota_limiter_factory.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
"""Quota limiter factory class."""
22

3-
import logging
4-
3+
from log import get_logger
4+
import constants
55
from models.config import QuotaHandlersConfiguration
66

7+
from quota.user_quota_limiter import UserQuotaLimiter
8+
from quota.cluster_quota_limiter import ClusterQuotaLimiter
79
from quota.quota_limiter import QuotaLimiter
810

9-
logger = logging.getLogger(__name__)
11+
logger = get_logger(__name__)
1012

1113

1214
# pylint: disable=too-few-public-methods
@@ -24,9 +26,41 @@ def quota_limiters(config: QuotaHandlersConfiguration) -> list[QuotaLimiter]:
2426
"""
2527
limiters: list[QuotaLimiter] = []
2628

29+
# storage (Postgres) configuration
30+
if config.sqlite is None and config.postgres is None:
31+
logger.warning("Storage configuration for quota limiters not specified")
32+
return limiters
33+
2734
limiters_config = config.limiters
2835
if limiters_config is None:
2936
logger.warning("Quota limiters are not specified in configuration")
3037
return limiters
3138

39+
# fill-in list of initialized quota limiters
40+
for limiter_config in config.limiters:
41+
limiter_type = limiter_config.type
42+
limiter_name = limiter_config.name
43+
initial_quota = limiter_config.initial_quota
44+
increase_by = limiter_config.quota_increase
45+
limiter = QuotaLimiterFactory.create_limiter(
46+
config, limiter_type, initial_quota, increase_by
47+
)
48+
limiters.append(limiter)
49+
logger.info("Set up quota limiter '%s'", limiter_name)
3250
return limiters
51+
52+
@staticmethod
53+
def create_limiter(
54+
configuration: QuotaHandlersConfiguration,
55+
limiter_type: str,
56+
initial_quota: int,
57+
increase_by: int,
58+
) -> QuotaLimiter:
59+
"""Create selected quota limiter."""
60+
match limiter_type:
61+
case constants.USER_QUOTA_LIMITER:
62+
return UserQuotaLimiter(configuration, initial_quota, increase_by)
63+
case constants.CLUSTER_QUOTA_LIMITER:
64+
return ClusterQuotaLimiter(configuration, initial_quota, increase_by)
65+
case _:
66+
raise ValueError(f"Invalid limiter type: {limiter_type}.")
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""Simple quota limiter where quota can be revoked."""
2+
3+
from datetime import datetime
4+
5+
from models.config import QuotaHandlersConfiguration
6+
from log import get_logger
7+
from utils.connection_decorator import connection
8+
from quota.quota_exceed_error import QuotaExceedError
9+
from quota.quota_limiter import QuotaLimiter
10+
from quota.sql import (
11+
CREATE_QUOTA_TABLE,
12+
UPDATE_AVAILABLE_QUOTA_PG,
13+
SELECT_QUOTA_PG,
14+
SET_AVAILABLE_QUOTA_PG,
15+
INIT_QUOTA_PG,
16+
)
17+
18+
logger = get_logger(__name__)
19+
20+
21+
class RevokableQuotaLimiter(QuotaLimiter):
22+
"""Simple quota limiter where quota can be revoked."""
23+
24+
def __init__(
25+
self,
26+
configuration: QuotaHandlersConfiguration,
27+
initial_quota: int,
28+
increase_by: int,
29+
subject_type: str,
30+
) -> None:
31+
"""Initialize quota limiter."""
32+
self.subject_type = subject_type
33+
self.initial_quota = initial_quota
34+
self.increase_by = increase_by
35+
self.sqlite_connection_config = configuration.sqlite
36+
self.postgres_connection_config = configuration.postgres
37+
38+
@connection
39+
def available_quota(self, subject_id: str = "") -> int:
40+
"""Retrieve available quota for given subject."""
41+
if self.subject_type == "c":
42+
subject_id = ""
43+
with self.connection.cursor() as cursor:
44+
cursor.execute(
45+
SELECT_QUOTA_PG,
46+
(subject_id, self.subject_type),
47+
)
48+
value = cursor.fetchone()
49+
if value is None:
50+
self._init_quota(subject_id)
51+
return self.initial_quota
52+
return value[0]
53+
54+
@connection
55+
def revoke_quota(self, subject_id: str = "") -> None:
56+
"""Revoke quota for given subject."""
57+
if self.subject_type == "c":
58+
subject_id = ""
59+
# timestamp to be used
60+
revoked_at = datetime.now()
61+
62+
with self.connection.cursor() as cursor:
63+
cursor.execute(
64+
SET_AVAILABLE_QUOTA_PG,
65+
(self.initial_quota, revoked_at, subject_id, self.subject_type),
66+
)
67+
self.connection.commit()
68+
69+
@connection
70+
def increase_quota(self, subject_id: str = "") -> None:
71+
"""Increase quota for given subject."""
72+
if self.subject_type == "c":
73+
subject_id = ""
74+
# timestamp to be used
75+
updated_at = datetime.now()
76+
77+
with self.connection.cursor() as cursor:
78+
cursor.execute(
79+
UPDATE_AVAILABLE_QUOTA_PG,
80+
(self.increase_by, updated_at, subject_id, self.subject_type),
81+
)
82+
self.connection.commit()
83+
84+
def ensure_available_quota(self, subject_id: str = "") -> None:
85+
"""Ensure that there's avaiable quota left."""
86+
if self.subject_type == "c":
87+
subject_id = ""
88+
available = self.available_quota(subject_id)
89+
logger.info("Available quota for subject %s is %d", subject_id, available)
90+
# check if ID still have available tokens to be consumed
91+
if available <= 0:
92+
e = QuotaExceedError(subject_id, self.subject_type, available)
93+
logger.exception("Quota exceed: %s", e)
94+
raise e
95+
96+
@connection
97+
def consume_tokens(
98+
self,
99+
input_tokens: int = 0,
100+
output_tokens: int = 0,
101+
subject_id: str = "",
102+
) -> None:
103+
"""Consume tokens by given subject."""
104+
if self.subject_type == "c":
105+
subject_id = ""
106+
logger.info(
107+
"Consuming %d input and %d output tokens for subject %s",
108+
input_tokens,
109+
output_tokens,
110+
subject_id,
111+
)
112+
to_be_consumed = input_tokens + output_tokens
113+
114+
with self.connection.cursor() as cursor:
115+
# timestamp to be used
116+
updated_at = datetime.now()
117+
118+
cursor.execute(
119+
UPDATE_AVAILABLE_QUOTA_PG,
120+
(-to_be_consumed, updated_at, subject_id, self.subject_type),
121+
)
122+
self.connection.commit()
123+
124+
def _initialize_tables(self) -> None:
125+
"""Initialize tables used by quota limiter."""
126+
logger.info("Initializing tables for quota limiter")
127+
cursor = self.connection.cursor()
128+
cursor.execute(CREATE_QUOTA_TABLE)
129+
cursor.close()
130+
self.connection.commit()
131+
132+
def _init_quota(self, subject_id: str = "") -> None:
133+
"""Initialize quota for given ID."""
134+
# timestamp to be used
135+
revoked_at = datetime.now()
136+
137+
with self.connection.cursor() as cursor:
138+
cursor.execute(
139+
INIT_QUOTA_PG,
140+
(
141+
subject_id,
142+
self.subject_type,
143+
self.initial_quota,
144+
self.initial_quota,
145+
revoked_at,
146+
),
147+
)
148+
self.connection.commit()

src/quota/sql.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,26 @@
4343
WHERE subject=?
4444
AND revoked_at < datetime('now', ?);
4545
"""
46+
47+
INIT_QUOTA_PG = """
48+
INSERT INTO quota_limits (id, subject, quota_limit, available, revoked_at)
49+
VALUES (%s, %s, %s, %s, %s)
50+
"""
51+
52+
SELECT_QUOTA_PG = """
53+
SELECT available
54+
FROM quota_limits
55+
WHERE id=%s and subject=%s LIMIT 1
56+
"""
57+
58+
SET_AVAILABLE_QUOTA_PG = """
59+
UPDATE quota_limits
60+
SET available=%s, revoked_at=%s
61+
WHERE id=%s and subject=%s
62+
"""
63+
64+
UPDATE_AVAILABLE_QUOTA_PG = """
65+
UPDATE quota_limits
66+
SET available=available+%s, updated_at=%s
67+
WHERE id=%s and subject=%s
68+
"""

src/quota/user_quota_limiter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Simple user quota limiter where each user have fixed quota."""
2+
3+
from models.config import QuotaHandlersConfiguration
4+
from log import get_logger
5+
from quota.revokable_quota_limiter import RevokableQuotaLimiter
6+
7+
logger = get_logger(__name__)
8+
9+
10+
class UserQuotaLimiter(RevokableQuotaLimiter):
11+
"""Simple user quota limiter where each user have fixed quota."""
12+
13+
def __init__(
14+
self,
15+
configuration: QuotaHandlersConfiguration,
16+
initial_quota: int = 0,
17+
increase_by: int = 0,
18+
) -> None:
19+
"""Initialize quota limiter storage."""
20+
subject = "u" # user
21+
super().__init__(configuration, initial_quota, increase_by, subject)
22+
23+
# initialize connection to DB
24+
# and initialize tables too
25+
self.connect()
26+
27+
def __str__(self) -> str:
28+
"""Return textual representation of limiter instance."""
29+
name = type(self).__name__
30+
return f"{name}: initial quota: {self.initial_quota} increase by: {self.increase_by}"

0 commit comments

Comments
 (0)