-
Notifications
You must be signed in to change notification settings - Fork 0
/
resource.py
232 lines (165 loc) · 6.47 KB
/
resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import functools
import logging
from sqlite3 import IntegrityError as sqlite3IntegrityError
from flask import current_app, request, abort
from sqlalchemy import or_
from sqlalchemy.exc import IntegrityError
from psycopg2.errors import (
UniqueViolation as pgsqlUniqueViolation,
NotNullViolation as pgsqlNotNullViolation,
)
from ProdManager.plugins import db, lang
from ProdManager.models import EventType
import ProdManager.helpers.notification as NotificationHelper
import ProdManager.helpers.event as EventHelper
from .response import (
NotFoundError, ServerError, ConflictError,
UndeletableRessourceError, DependencyError,
)
from .pagination import PAGINATION_MAX_PER_PAGE
logger = logging.getLogger('gunicorn.error')
def list_resources_from_query(ressource_class, query, orders=None, filters=None, paginate=True, limit=PAGINATION_MAX_PER_PAGE):
if orders is None:
orders = ressource_class.default_order()
if not isinstance(orders, tuple):
orders = (orders,)
if filters is not None:
if not isinstance(filters, tuple):
filters = (filters,)
query = query.filter(*filters)
result = query.order_by(*orders)
if not paginate and limit:
result = result.limit(limit)
# https://flask-sqlalchemy.palletsprojects.com/en/2.x/api/#flask_sqlalchemy.query.Query.paginate
# page, per_page and max_per_page are retreived from the Flask.request object
if paginate:
result = result.paginate(error_out=False, max_per_page=limit)
return result
def list_resources(ressource_class, *args, **kwargs):
return list_resources_from_query(ressource_class, ressource_class.query, *args, **kwargs)
def list_resources_as_choices(ressource_class, order=None):
if order is None:
order = ressource_class.id.asc()
return [
(resource.id, resource.name) for resource in
ressource_class.query.order_by(order)
]
def get_resource(resource_class, ressource_id):
resource = db.session.get(resource_class, ressource_id)
if resource is None:
raise NotFoundError(ressource_id)
return resource
def update_resource(resource_class, ressource_id, attributs):
resource = db.session.get(resource_class, ressource_id)
if resource is None:
raise NotFoundError(ressource_id)
changed = {}
should_notify = False
for attribute, new_value in attributs.items():
old_value = getattr(resource, attribute)
if old_value != new_value:
setattr(resource, attribute, new_value)
changed[attribute] = (old_value, new_value)
if hasattr(resource_class, 'notify_attributs') \
and (attribute in resource_class.notify_attributs):
should_notify = True
try:
db.session.commit()
except IntegrityError as error:
db.session.rollback()
if isinstance(error.orig, pgsqlUniqueViolation) \
or (isinstance(error.orig, sqlite3IntegrityError) and "UNIQUE constraint failed" in str(error)):
raise ConflictError(attributs) from error
raise ServerError(error) from error
except Exception as error:
current_app.logger.error(error)
raise error
if len(changed.keys()) > 0:
EventHelper.create_event(EventType.UPDATE, resource_class, resource, changed)
if should_notify:
NotificationHelper.notify(
NotificationHelper.NotificationType.UPDATE,
resource_class,
resource
)
logger.info("Updated %s", resource)
return resource, changed
def delete_resource(resource_class, ressource_id):
resource = db.session.get(resource_class, ressource_id)
if resource is None:
raise NotFoundError(ressource_id)
if hasattr(resource, "default") and resource.default:
raise UndeletableRessourceError("This is a default resource")
db.session.delete(resource)
try:
db.session.commit()
except IntegrityError as error:
db.session.rollback()
if isinstance(error.orig, pgsqlNotNullViolation) \
or (isinstance(error.orig, sqlite3IntegrityError) and "NOT NULL constraint failed" in str(error)):
raise DependencyError("This resource is a dependency from other resources") from error
raise ServerError(error) from error
except Exception as error:
current_app.logger.error(error)
raise ServerError(error) from error
logger.info("Deleted %s", resource)
def create_resource(resource_class, attributs):
resource = resource_class(**attributs)
db.session.add(resource)
try:
db.session.commit()
except IntegrityError as error:
db.session.rollback()
if isinstance(error.orig, pgsqlUniqueViolation) \
or (isinstance(error.orig, sqlite3IntegrityError) and "UNIQUE constraint failed" in str(error)):
raise ConflictError(attributs) from error
raise ServerError(error) from error
except Exception as error:
current_app.logger.error(error)
raise ServerError(error) from error
EventHelper.create_event(EventType.CREATE, resource_class, resource)
NotificationHelper.notify(
NotificationHelper.NotificationType.CREATE,
resource_class,
resource
)
logger.info("Created %s", resource)
return resource
def resource_filters(filter_fields):
def decorate(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
filters = ()
for filter_name in request.args:
if filter_name not in filter_fields:
continue
filter_field, filter_type, filter_operator = filter_fields[filter_name]
filter_values = request.args.getlist(filter_name, type=filter_type)
filter_values_count = len(filter_values)
if filter_values_count == 0:
continue
if filter_values_count > PAGINATION_MAX_PER_PAGE:
abort(400, dict(
message=lang.get("filter_overflow"),
reasons=dict(filter_name=[f"Filter occurrence count higher than the limit {PAGINATION_MAX_PER_PAGE}"])
))
current_filter = []
for filter_value in filter_values:
match filter_operator:
case 'gt':
current_filter.append(filter_field > filter_value)
case 'ge':
current_filter.append(filter_field >= filter_value)
case 'lt':
current_filter.append(filter_field < filter_value)
case 'le':
current_filter.append(filter_field < filter_value)
case 'ne':
current_filter.append(filter_field != filter_value)
case _:
current_filter.append(filter_field == filter_value)
filters += (or_(*current_filter),)
kwargs['filters'] = filters
return view(**kwargs)
return wrapped_view
return decorate