-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathposgres_manager.py
151 lines (110 loc) · 4.16 KB
/
posgres_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from psycopg2 import pool
from abc import ABC, abstractmethod
# Connection Section
class AccessDenied(Exception):
def __init__(self, msg='You are not allowed'):
super().__init__(msg)
class Singleton(type):
_instance = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instance:
cls._instance[cls] = super().__call__(*args, **kwargs)
return cls._instance.get(cls)
class PostgresConnect(metaclass=Singleton):
@staticmethod
def connect(**kwargs):
return pool.SimpleConnectionPool(1, 10, **kwargs)
class ConnectProxyDescriptor:
def __set_name__(self, owner, name):
self.name = name
def __get__(self, instance, owner):
return instance.__dict__.get(self.name)
def __set__(self, instance, value):
if self.name == 'db_port':
if not isinstance(value, int):
raise ValueError('db_port must be int')
else:
value = str(value)
instance.__dict__[self.name] = value
def __delete__(self, instance):
raise AccessDenied()
class ConnectProxy:
db_name = ConnectProxyDescriptor()
db_host = ConnectProxyDescriptor()
db_password = ConnectProxyDescriptor()
db_port = ConnectProxyDescriptor()
db_user = ConnectProxyDescriptor()
def __init__(self, db_name, db_host, db_user, db_password, db_port : int):
self.database = str(db_name)
self.host = str(db_host)
self.password = str(db_password)
self.user = str(db_user)
self.port = db_port
def connect(self):
return PostgresConnect().connect(**self.__dict__)
# Operation section
class AbstractFetch(ABC):
def __init__(self, cursor, _size=2):
self._cursor = cursor
self._size = _size
@abstractmethod
def fetch(self):
pass
class FetchAll(AbstractFetch):
def fetch(self):
return self._cursor.fetchall()
class FetchOne(AbstractFetch):
def fetch(self):
return self._cursor.fetchone()
class FetchMnay(AbstractFetch):
def fetch(self):
return self._cursor.fetchone(self._size)
class AbstractOperation(ABC):
def __init__(self, connection_pool, fetch=FetchAll, _fetch_size=2):
self._connection_pool = connection_pool
self._connect = self._connection_pool.getconn()
self._fetch = fetch
self._fetch_size = _fetch_size
@abstractmethod
def execute(self, operations):
pass
class QueryOperation(AbstractOperation):
def execute(self, operations):
try:
with self._connect:
with self._connect.cursor() as cursor:
cursor.execute(operations['query'], operations.get('params'))
return self._fetch(cursor, self._fetch_size).fetch()
finally:
BackConnectionToPool.back_to_pool(self._connection_pool, self._connect)
class TransactionOperation(AbstractOperation):
def execute(self, operations: list):
fetch_list = []
try:
with self._connect as connection:
with self._connect.cursor() as cursor:
for _operation in operations:
cursor.execute(_operation['query'], _operation['params'])
if 'RETURNING' in _operation['query']:
fetch_list.append(self._fetch(cursor, self._fetch_size).fetch())
connection.commit()
return fetch_list
except Exception as e:
print('Transaction Error', e)
self._connect.rollback()
raise
finally:
BackConnectionToPool.back_to_pool(self._connection_pool, self._connect)
class BackConnectionToPool:
@staticmethod
def back_to_pool(connection_pool, connection):
connection_pool.putconn(connection)
class Client:
def __init__(self, **kwargs):
self._connection_pool = ConnectProxy(**kwargs).connect()
def execute(self, operation_type, operation):
operation_class = {
'query': QueryOperation,
'transaction': TransactionOperation
}
return operation_class[operation_type](self._connection_pool).execute(operation)