2
2
import os
3
3
import random
4
4
from datetime import datetime , time
5
+ import asyncio
5
6
7
+ import asyncpg
6
8
import isodate
7
- import psycopg2
8
9
9
10
create_dummy_data = int (os .getenv ("DUMMY_DATA" ))
10
11
delete_dummy_data = int (os .getenv ("CLEAR_DATA" ))
18
19
else datetime .combine (datetime .now ().today (), time .min )
19
20
)
20
21
21
- connection_url = "dbname={db} user={user} password={password} host='database' port='5432'" .format (
22
- db = os .getenv ("POSTGRES_DB" ),
23
- user = os .getenv ("POSTGRES_USER" ),
24
- password = os .getenv ("POSTGRES_PASSWORD" ),
25
- )
22
+ pgpool = None
23
+
24
+ async def get_pool ():
25
+ """
26
+ Retrieves or creates a connection pool to the PostgreSQL database.
27
+
28
+ Returns:
29
+ asyncpg.pool.Pool: The connection pool object.
30
+ """
31
+ global pgpool
32
+ if not pgpool :
33
+ pgpool = await asyncpg .create_pool (
34
+ dsn = f"postgresql://{ os .getenv ("POSTGRES_USER" )} :{ os .getenv ("POSTGRES_PASSWORD" )} @database:5432/{ os .getenv ("POSTGRES_DB" )} "
35
+ )
36
+ return pgpool
37
+
26
38
27
39
28
- def generate_things (cur ):
40
+ async def generate_things (conn ):
29
41
"""
30
42
Generate a list of things and insert them into the database.
31
43
@@ -44,12 +56,12 @@ def generate_things(cur):
44
56
45
57
insert_sql = """
46
58
INSERT INTO sensorthings."Thing" (description, name, properties)
47
- VALUES (%s, %s, %s )
59
+ VALUES ($1, $2, $3 )
48
60
"""
49
- cur .executemany (insert_sql , things )
61
+ await conn .executemany (insert_sql , things )
50
62
51
63
52
- def generate_locations (cur ):
64
+ async def generate_locations (conn ):
53
65
"""
54
66
Generate locations and insert them into the database.
55
67
@@ -69,12 +81,12 @@ def generate_locations(cur):
69
81
70
82
insert_sql = """
71
83
INSERT INTO sensorthings."Location" (description, name, location, "encodingType")
72
- VALUES (%s, %s, %s, %s )
84
+ VALUES ($1, $2, $3, $4 )
73
85
"""
74
- cur .executemany (insert_sql , locations )
86
+ await conn .executemany (insert_sql , locations )
75
87
76
88
77
- def generate_things_locations (cur ):
89
+ async def generate_things_locations (conn ):
78
90
"""
79
91
Generate and insert things locations into the database.
80
92
@@ -92,12 +104,12 @@ def generate_things_locations(cur):
92
104
93
105
insert_sql = """
94
106
INSERT INTO sensorthings."Thing_Location" (thing_id, location_id)
95
- VALUES (%s, %s )
107
+ VALUES ($1, $2 )
96
108
"""
97
- cur .executemany (insert_sql , things_locations )
109
+ await conn .executemany (insert_sql , things_locations )
98
110
99
111
100
- def generate_historicallocations (cur ):
112
+ async def generate_historicallocations (conn ):
101
113
"""
102
114
Generate historical locations for things and insert them into the database.
103
115
@@ -116,12 +128,12 @@ def generate_historicallocations(cur):
116
128
117
129
insert_sql = """
118
130
INSERT INTO sensorthings."HistoricalLocation" (time, thing_id)
119
- VALUES (%s, %s )
131
+ VALUES ($1, $2 )
120
132
"""
121
- cur .executemany (insert_sql , historicallocations )
133
+ await conn .executemany (insert_sql , historicallocations )
122
134
123
135
124
- def generate_locations_historicallocations (cur ):
136
+ async def generate_locations_historicallocations (conn ):
125
137
"""
126
138
Generate a list of tuples representing the relationship between location and historical location.
127
139
@@ -141,12 +153,12 @@ def generate_locations_historicallocations(cur):
141
153
142
154
insert_sql = """
143
155
INSERT INTO sensorthings."Location_HistoricalLocation" (location_id, historicallocation_id)
144
- VALUES (%s, %s )
156
+ VALUES ($1, $2 )
145
157
"""
146
- cur .executemany (insert_sql , locations_historicallocations )
158
+ await conn .executemany (insert_sql , locations_historicallocations )
147
159
148
160
149
- def generate_observedProperties (cur ):
161
+ async def generate_observedProperties (conn ):
150
162
"""
151
163
Generate observed properties and insert them into the database.
152
164
@@ -165,12 +177,12 @@ def generate_observedProperties(cur):
165
177
166
178
insert_sql = """
167
179
INSERT INTO sensorthings."ObservedProperty" (name, definition, description)
168
- VALUES (%s, %s, %s )
180
+ VALUES ($1, $2, $3 )
169
181
"""
170
- cur .executemany (insert_sql , observedProperties )
182
+ await conn .executemany (insert_sql , observedProperties )
171
183
172
184
173
- def generate_sensors (cur ):
185
+ async def generate_sensors (conn ):
174
186
"""
175
187
Generate a list of sensors with random descriptions, names, encoding types, and metadata.
176
188
@@ -190,12 +202,12 @@ def generate_sensors(cur):
190
202
191
203
insert_sql = """
192
204
INSERT INTO sensorthings."Sensor" (description, name, "encodingType", metadata)
193
- VALUES (%s, %s, %s, %s )
205
+ VALUES ($1, $2, $3, $4 )
194
206
"""
195
- cur .executemany (insert_sql , sensors )
207
+ await conn .executemany (insert_sql , sensors )
196
208
197
209
198
- def generate_datastreams (cur ):
210
+ async def generate_datastreams (conn ):
199
211
"""
200
212
Generate datastreams and insert them into the database.
201
213
@@ -237,12 +249,12 @@ def generate_datastreams(cur):
237
249
238
250
insert_sql = """
239
251
INSERT INTO sensorthings."Datastream" ("unitOfMeasurement", description, name, "observationType", thing_id, sensor_id, observedproperty_id)
240
- VALUES (%s, %s, %s, %s, %s, %s, %s )
252
+ VALUES ($1, $2, $3, $4, $5, $6, $7 )
241
253
"""
242
- cur .executemany (insert_sql , datastreams )
254
+ await conn .executemany (insert_sql , datastreams )
243
255
244
256
245
- def generate_featuresofinterest (cur ):
257
+ async def generate_featuresofinterest (conn ):
246
258
"""
247
259
Generate features of interest and insert them into the database.
248
260
@@ -262,12 +274,12 @@ def generate_featuresofinterest(cur):
262
274
263
275
insert_sql = """
264
276
INSERT INTO sensorthings."FeaturesOfInterest" (description, name, "encodingType", feature)
265
- VALUES (%s, %s, %s, %s )
277
+ VALUES ($1, $2, $3, $4 )
266
278
"""
267
- cur .executemany (insert_sql , featuresofinterest )
279
+ await conn .executemany (insert_sql , featuresofinterest )
268
280
269
281
270
- def generate_observations (cur ):
282
+ async def generate_observations (conn ):
271
283
"""
272
284
Generates observations and inserts them into the database.
273
285
@@ -298,12 +310,12 @@ def generate_observations(cur):
298
310
299
311
insert_sql = """
300
312
INSERT INTO sensorthings."Observation" ("phenomenonTime", "resultInteger", "resultType", datastream_id, featuresofinterest_id)
301
- VALUES (%s, %s, %s, %s, %s )
313
+ VALUES ($1, $2, $3, $4, $5 )
302
314
"""
303
- cur .executemany (insert_sql , observations )
315
+ await conn .executemany (insert_sql , observations )
304
316
305
317
306
- def create_data ():
318
+ async def create_data ():
307
319
"""
308
320
Generates dummy data and inserts it into the database.
309
321
@@ -314,29 +326,25 @@ def create_data():
314
326
315
327
After the creation is complete, the database connection is closed.
316
328
"""
317
- conn = psycopg2 .connect (connection_url )
318
- cur = conn .cursor ()
319
- try :
320
- generate_things (cur )
321
- generate_locations (cur )
322
- generate_things_locations (cur )
323
- generate_historicallocations (cur )
324
- generate_locations_historicallocations (cur )
325
- generate_observedProperties (cur )
326
- generate_sensors (cur )
327
- generate_datastreams (cur )
328
- generate_featuresofinterest (cur )
329
- generate_observations (cur )
330
- conn .commit ()
331
- except Exception as e :
332
- conn .rollback ()
333
- print (f"An error occurred: { e } " )
334
- finally :
335
- cur .close ()
336
- conn .close ()
337
-
338
-
339
- def delete_data ():
329
+ pool = await get_pool ()
330
+ async with pool .acquire () as conn :
331
+ async with conn .transaction ():
332
+ try :
333
+ await generate_things (conn )
334
+ await generate_locations (conn )
335
+ await generate_things_locations (conn )
336
+ await generate_historicallocations (conn )
337
+ await generate_locations_historicallocations (conn )
338
+ await generate_observedProperties (conn )
339
+ await generate_sensors (conn )
340
+ await generate_datastreams (conn )
341
+ await generate_featuresofinterest (conn )
342
+ await generate_observations (conn )
343
+ except Exception as e :
344
+ print (f"An error occured: { e } " )
345
+
346
+
347
+ async def delete_data ():
340
348
"""
341
349
Deletes all data from the sensorthings tables in the database.
342
350
@@ -358,30 +366,28 @@ def delete_data():
358
366
359
367
After the deletion is complete, the database connection is closed.
360
368
"""
361
- conn = psycopg2 .connect (connection_url )
362
- cur = conn .cursor ()
363
- try :
364
- cur .execute ('DELETE FROM sensorthings."Thing"' )
365
- cur .execute ('DELETE FROM sensorthings."Location"' )
366
- cur .execute ('DELETE FROM sensorthings."Thing_Location"' )
367
- cur .execute ('DELETE FROM sensorthings."HistoricalLocation"' )
368
- cur .execute ('DELETE FROM sensorthings."Location_HistoricalLocation"' )
369
- cur .execute ('DELETE FROM sensorthings."ObservedProperty"' )
370
- cur .execute ('DELETE FROM sensorthings."Sensor"' )
371
- cur .execute ('DELETE FROM sensorthings."Datastream"' )
372
- cur .execute ('DELETE FROM sensorthings."FeaturesOfInterest"' )
373
- cur .execute ('DELETE FROM sensorthings."Observation"' )
374
- conn .commit ()
375
- except Exception as e :
376
- conn .rollback ()
377
- print (f"An error occurred: { e } " )
378
- finally :
379
- cur .close ()
380
- conn .close ()
369
+ pool = await get_pool ()
370
+ async with pool .acquire () as conn :
371
+ async with conn .transaction ():
372
+ try :
373
+ await conn .execute ('DELETE FROM sensorthings."Thing"' )
374
+ await conn .execute ('DELETE FROM sensorthings."Location"' )
375
+ await conn .execute ('DELETE FROM sensorthings."Thing_Location"' )
376
+ await conn .execute ('DELETE FROM sensorthings."HistoricalLocation"' )
377
+ await conn .execute ('DELETE FROM sensorthings."Location_HistoricalLocation"' )
378
+ await conn .execute ('DELETE FROM sensorthings."ObservedProperty"' )
379
+ await conn .execute ('DELETE FROM sensorthings."Sensor"' )
380
+ await conn .execute ('DELETE FROM sensorthings."Datastream"' )
381
+ await conn .execute ('DELETE FROM sensorthings."FeaturesOfInterest"' )
382
+ await conn .execute ('DELETE FROM sensorthings."Observation"' )
383
+ except Exception as e :
384
+ print (f"An error occurred: { e } " )
385
+ finally :
386
+ await conn .close ()
381
387
382
388
383
389
if __name__ == "__main__" :
384
390
if delete_dummy_data :
385
- delete_data ()
391
+ asyncio . run ( delete_data () )
386
392
if create_dummy_data :
387
- create_data ()
393
+ asyncio . run ( create_data () )
0 commit comments