Skip to content

Commit

Permalink
asyncify/de-wrap pusher service getters; merge Data into Service
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Sep 6, 2024
1 parent e443a48 commit 0ba53b0
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 104 deletions.
4 changes: 2 additions & 2 deletions src/api/client/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub(crate) async fn get_pushers_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");

Ok(get_pushers::v3::Response {
pushers: services.pusher.get_pushers(sender_user)?,
pushers: services.pusher.get_pushers(sender_user).await,
})
}

Expand All @@ -393,7 +393,7 @@ pub(crate) async fn set_pushers_route(
) -> Result<set_pusher::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");

services.pusher.set_pusher(sender_user, &body.action)?;
services.pusher.set_pusher(sender_user, &body.action);

Ok(set_pusher::v3::Response::default())
}
Expand Down
78 changes: 0 additions & 78 deletions src/service/pusher/data.rs

This file was deleted.

62 changes: 49 additions & 13 deletions src/service/pusher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod data;

use std::{fmt::Debug, mem, sync::Arc};

use bytes::BytesMut;
use conduit::{debug_error, err, trace, utils::string_from_bytes, warn, Err, PduEvent, Result};
use database::{Deserialized, Ignore, Interfix, Map};
use futures::{Stream, StreamExt};
use ipaddress::IPAddress;
use ruma::{
api::{
Expand All @@ -22,12 +22,11 @@ use ruma::{
uint, RoomId, UInt, UserId,
};

use self::data::Data;
use crate::{client, globals, rooms, users, Dep};

pub struct Service {
services: Services,
db: Data,
services: Services,
}

struct Services {
Expand All @@ -38,37 +37,74 @@ struct Services {
users: Dep<users::Service>,
}

struct Data {
senderkey_pusher: Arc<Map>,
}

impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
senderkey_pusher: args.db["senderkey_pusher"].clone(),
},
services: Services {
globals: args.depend::<globals::Service>("globals"),
client: args.depend::<client::Service>("client"),
state_accessor: args.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
users: args.depend::<users::Service>("users"),
},
db: Data::new(args.db),
}))
}

fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}

impl Service {
pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) -> Result<()> {
self.db.set_pusher(sender, pusher)
pub fn set_pusher(&self, sender: &UserId, pusher: &set_pusher::v3::PusherAction) {
match pusher {
set_pusher::v3::PusherAction::Post(data) => {
let mut key = sender.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(data.pusher.ids.pushkey.as_bytes());
self.db
.senderkey_pusher
.insert(&key, &serde_json::to_vec(pusher).expect("Pusher is valid JSON value"));
},
set_pusher::v3::PusherAction::Delete(ids) => {
let mut key = sender.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(ids.pushkey.as_bytes());
self.db.senderkey_pusher.remove(&key);
},
}
}

pub fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Result<Option<Pusher>> {
self.db.get_pusher(sender, pushkey)
pub async fn get_pusher(&self, sender: &UserId, pushkey: &str) -> Option<Pusher> {
let senderkey = (sender, pushkey);
self.db
.senderkey_pusher
.qry(&senderkey)
.await
.deserialized_json()
}

pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> { self.db.get_pushers(sender) }
pub async fn get_pushers(&self, sender: &UserId) -> Vec<Pusher> {
let prefix = (sender, Interfix);
self.db
.senderkey_pusher
.stream_prefix(&prefix)
.map(|(_, val): (Ignore, &[u8])| serde_json::from_slice(val).expect("Invalid Pusher in db."))
.collect()
.await
}

#[must_use]
pub fn get_pushkeys(&self, sender: &UserId) -> Box<dyn Iterator<Item = Result<String>> + '_> {
self.db.get_pushkeys(sender)
pub fn get_pushkeys<'a>(&'a self, sender: &'a UserId) -> impl Stream<Item = &str> + 'a {
let prefix = (sender, Interfix);
self.db
.senderkey_pusher
.keys_prefix(&prefix)
.map(|(_, pushkey): (Ignore, &str)| pushkey)
}

#[tracing::instrument(skip(self, dest, request))]
Expand Down
18 changes: 13 additions & 5 deletions src/service/rooms/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use conduit::{
utils::{MutexMap, MutexMapGuard},
validated, warn, Error, Result, Server,
};
use futures::{future, StreamExt};
use itertools::Itertools;
use ruma::{
api::{client::error::ErrorKind, federation},
Expand Down Expand Up @@ -422,11 +423,18 @@ impl Service {
highlights.push(user.clone());
}

for push_key in self.services.pusher.get_pushkeys(user) {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key?)?;
}
self.services
.pusher
.get_pushkeys(user)
.for_each(|push_key| {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key.to_owned())
.expect("TODO: replace with future");

future::ready(())
})
.await;
}

self.db
Expand Down
7 changes: 1 addition & 6 deletions src/service/sending/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,7 @@ impl Service {
}
}

let Some(pusher) = self
.services
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (dest.clone(), e))?
else {
let Some(pusher) = self.services.pusher.get_pusher(userid, pushkey).await else {
continue;
};

Expand Down

0 comments on commit 0ba53b0

Please sign in to comment.