Skip to content

Commit f7126a3

Browse files
committed
agent migrations with functions/triggers
1 parent a7ec4b9 commit f7126a3

File tree

17 files changed

+1796
-111
lines changed

17 files changed

+1796
-111
lines changed

simplexmq.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,13 @@ library
163163
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
164164
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
165165
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
166+
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250815_service_certs
166167
else
167168
exposed-modules:
168169
Simplex.Messaging.Agent.Store.SQLite
169170
Simplex.Messaging.Agent.Store.SQLite.Common
170171
Simplex.Messaging.Agent.Store.SQLite.DB
172+
Simplex.Messaging.Agent.Store.SQLite.Functions
171173
Simplex.Messaging.Agent.Store.SQLite.Migrations
172174
Simplex.Messaging.Agent.Store.SQLite.Migrations.App
173175
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial
@@ -217,6 +219,7 @@ library
217219
Simplex.Messaging.Agent.Store.Postgres.Common
218220
Simplex.Messaging.Agent.Store.Postgres.DB
219221
Simplex.Messaging.Agent.Store.Postgres.Migrations
222+
Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
220223
Simplex.Messaging.Agent.Store.Postgres.Util
221224
if !flag(client_library)
222225
exposed-modules:

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
20562056
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
20572057
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
20582058
|]
2059-
( (host server, port server, rcvId, rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
2059+
( (host server, port server, rcvId, BI rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
20602060
:. (sndId, queueMode, status, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
20612061
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
20622062
:. ntfCredsFields
@@ -2242,13 +2242,13 @@ rcvQueueQuery =
22422242

22432243
toRcvQueue ::
22442244
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
2245-
:. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, ServiceAssoc)
2245+
:. (QueueStatus, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, BoolInt)
22462246
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
22472247
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
22482248
RcvQueue
22492249
toRcvQueue
22502250
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
2251-
:. (status, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, rcvServiceAssoc)
2251+
:. (status, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, BI rcvServiceAssoc)
22522252
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
22532253
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
22542254
) =

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
88
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
99
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
1010
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
11+
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250815_service_certs
1112
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
1213

1314
schemaMigrations :: [(String, Text, Maybe Text)]
1415
schemaMigrations =
1516
[ ("20241210_initial", m20241210_initial, Nothing),
1617
("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies),
1718
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
18-
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete)
19+
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
20+
("20250815_service_certs", m20250815_service_certs, Just down_m20250815_service_certs)
1921
]
2022

2123
-- | The list of migrations in ascending order by date
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250815_service_certs where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
8+
import Text.RawString.QQ (r)
9+
10+
m20250815_service_certs :: Text
11+
m20250815_service_certs =
12+
createXorHashFuncs
13+
<> T.pack
14+
[r|
15+
CREATE TABLE client_services(
16+
user_id BIGINT NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
17+
host TEXT NOT NULL,
18+
port TEXT NOT NULL,
19+
service_cert BYTEA NOT NULL,
20+
service_cert_hash BYTEA NOT NULL,
21+
service_priv_key BYTEA NOT NULL,
22+
service_id BYTEA,
23+
service_queue_count BIGINT NOT NULL DEFAULT 0,
24+
service_queue_ids_hash BYTEA NOT NULL DEFAULT '\x00000000000000000000000000000000',
25+
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
26+
);
27+
28+
CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_id, host, port);
29+
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
30+
31+
ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc SMALLINT NOT NULL DEFAULT 0;
32+
33+
CREATE FUNCTION update_aggregates(p_user_id BIGINT, p_host TEXT, p_port TEXT, p_change BIGINT, p_rcv_id BYTEA) RETURNS VOID
34+
LANGUAGE plpgsql
35+
AS $$
36+
BEGIN
37+
UPDATE client_services
38+
SET service_queue_count = service_queue_count + p_change,
39+
service_queue_ids_hash = xor_combine(service_queue_ids_hash, public.digest(p_rcv_id, 'md5'))
40+
WHERE user_id = p_user_id AND host = p_host AND port = p_port;
41+
END;
42+
$$;
43+
44+
CREATE FUNCTION on_rcv_queue_insert() RETURNS TRIGGER
45+
LANGUAGE plpgsql
46+
AS $$
47+
BEGIN
48+
IF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
49+
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
50+
END IF;
51+
RETURN NEW;
52+
END;
53+
$$;
54+
55+
CREATE FUNCTION on_rcv_queue_delete() RETURNS TRIGGER
56+
LANGUAGE plpgsql
57+
AS $$
58+
BEGIN
59+
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
60+
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
61+
END IF;
62+
RETURN OLD;
63+
END;
64+
$$;
65+
66+
CREATE FUNCTION on_rcv_queue_update() RETURNS TRIGGER
67+
LANGUAGE plpgsql
68+
AS $$
69+
BEGIN
70+
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
71+
IF NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0) THEN
72+
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
73+
END IF;
74+
ELSIF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
75+
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
76+
END IF;
77+
RETURN NEW;
78+
END;
79+
$$;
80+
81+
CREATE TRIGGER tr_rcv_queue_insert
82+
AFTER INSERT ON rcv_queues
83+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_insert();
84+
85+
CREATE TRIGGER tr_rcv_queue_delete
86+
AFTER DELETE ON rcv_queues
87+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_delete();
88+
89+
CREATE TRIGGER tr_rcv_queue_update
90+
AFTER UPDATE ON rcv_queues
91+
FOR EACH ROW EXECUTE PROCEDURE on_rcv_queue_update();
92+
|]
93+
94+
down_m20250815_service_certs :: Text
95+
down_m20250815_service_certs =
96+
T.pack
97+
[r|
98+
DROP TRIGGER tr_rcv_queue_insert ON rcv_queues;
99+
DROP TRIGGER tr_rcv_queue_delete ON rcv_queues;
100+
DROP TRIGGER tr_rcv_queue_update ON rcv_queues;
101+
102+
DROP FUNCTION on_rcv_queue_insert;
103+
DROP FUNCTION on_rcv_queue_delete;
104+
DROP FUNCTION on_rcv_queue_update;
105+
106+
DROP FUNCTION update_aggregates;
107+
108+
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;
109+
110+
DROP INDEX idx_server_certs_host_port;
111+
DROP INDEX idx_server_certs_user_id_host_port;
112+
DROP TABLE client_services;
113+
|]
114+
<> dropXorHashFuncs
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Simplex.Messaging.Agent.Store.Postgres.Migrations.Util where
4+
5+
import Data.Text (Text)
6+
import qualified Data.Text as T
7+
import Text.RawString.QQ (r)
8+
9+
createXorHashFuncs :: Text
10+
createXorHashFuncs =
11+
T.pack
12+
[r|
13+
CREATE OR REPLACE FUNCTION xor_combine(state BYTEA, value BYTEA) RETURNS BYTEA
14+
LANGUAGE plpgsql IMMUTABLE STRICT
15+
AS $$
16+
DECLARE
17+
result BYTEA := state;
18+
i INTEGER;
19+
len INTEGER := octet_length(value);
20+
BEGIN
21+
IF octet_length(state) != len THEN
22+
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
23+
END IF;
24+
FOR i IN 0..len-1 LOOP
25+
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
26+
END LOOP;
27+
RETURN result;
28+
END;
29+
$$;
30+
31+
CREATE OR REPLACE AGGREGATE xor_aggregate(BYTEA) (
32+
SFUNC = xor_combine,
33+
STYPE = BYTEA,
34+
INITCOND = '\x00000000000000000000000000000000' -- 16 bytes
35+
);
36+
|]
37+
38+
dropXorHashFuncs :: Text
39+
dropXorHashFuncs =
40+
T.pack
41+
[r|
42+
DROP AGGREGATE xor_aggregate(BYTEA);
43+
DROP FUNCTION xor_combine;
44+
|]

0 commit comments

Comments
 (0)