From 5289fb216ba0015b5e80c82e864bea81dbb3dfcf Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Thu, 13 Feb 2025 12:24:54 -0800 Subject: [PATCH 01/11] drive-by clippy fixes --- crates/notedeck_chrome/src/notedeck.rs | 8 ++++---- crates/notedeck_columns/src/accounts/route.rs | 2 +- crates/notedeck_columns/src/timeline/route.rs | 2 +- crates/notedeck_columns/src/ui/add_column.rs | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/notedeck_chrome/src/notedeck.rs b/crates/notedeck_chrome/src/notedeck.rs index 0d049340..55764984 100644 --- a/crates/notedeck_chrome/src/notedeck.rs +++ b/crates/notedeck_chrome/src/notedeck.rs @@ -151,9 +151,9 @@ mod tests { let args: Vec = [ "--testrunner", "--datapath", - &datapath.to_str().unwrap(), + datapath.to_str().unwrap(), "--dbpath", - &dbpath.to_str().unwrap(), + dbpath.to_str().unwrap(), ] .iter() .map(|s| s.to_string()) @@ -224,8 +224,8 @@ mod tests { .unwrap(); assert_eq!(app.timeline_cache.timelines.len(), 2); - assert!(app.timeline_cache.timelines.get(&tl1).is_some()); - assert!(app.timeline_cache.timelines.get(&tl2).is_some()); + assert!(app.timeline_cache.timelines.contains_key(tl1)); + assert!(app.timeline_cache.timelines.contains_key(tl2)); rmrf(tmpdir); } diff --git a/crates/notedeck_columns/src/accounts/route.rs b/crates/notedeck_columns/src/accounts/route.rs index befcfc87..18e6dbb0 100644 --- a/crates/notedeck_columns/src/accounts/route.rs +++ b/crates/notedeck_columns/src/accounts/route.rs @@ -65,7 +65,7 @@ mod tests { let data_str = "accounts:show"; let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = AccountsRoute::parse_from_tokens(&mut parser).unwrap(); let expected = AccountsRoute::Accounts; parsed.serialize_tokens(&mut token_writer); diff --git a/crates/notedeck_columns/src/timeline/route.rs b/crates/notedeck_columns/src/timeline/route.rs index 76bd13c0..a7bd0d3f 100644 --- a/crates/notedeck_columns/src/timeline/route.rs +++ b/crates/notedeck_columns/src/timeline/route.rs @@ -154,7 +154,7 @@ mod tests { let data_str = format!("thread:{}", note_id_hex); let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = TimelineKind::parse(&mut parser, &Pubkey::new(*note_id.bytes())).unwrap(); let expected = TimelineKind::Thread(ThreadSelection::from_root_id( RootNoteIdBuf::new_unsafe(*note_id.bytes()), diff --git a/crates/notedeck_columns/src/ui/add_column.rs b/crates/notedeck_columns/src/ui/add_column.rs index 9783599b..d1e964ac 100644 --- a/crates/notedeck_columns/src/ui/add_column.rs +++ b/crates/notedeck_columns/src/ui/add_column.rs @@ -790,7 +790,7 @@ mod tests { let data_str = "column:algo_selection:last_per_pubkey"; let data = &data_str.split(":").collect::>(); let mut token_writer = TokenWriter::default(); - let mut parser = TokenParser::new(&data); + let mut parser = TokenParser::new(data); let parsed = AddColumnRoute::parse_from_tokens(&mut parser).unwrap(); let expected = AddColumnRoute::Algo(AddAlgoRoute::LastPerPubkey); parsed.serialize_tokens(&mut token_writer); From 28eb0bfb70fb7e7562edddab323ca47ba3618650 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Mon, 9 Dec 2024 15:22:33 -0800 Subject: [PATCH 02/11] introduce SubMan, integrate into Notedeck and AppContext --- Cargo.lock | 3 + Cargo.toml | 1 + crates/enostr/src/relay/pool.rs | 2 +- crates/notedeck/Cargo.toml | 3 + crates/notedeck/src/app.rs | 18 +- crates/notedeck/src/context.rs | 5 +- crates/notedeck/src/lib.rs | 6 + crates/notedeck/src/subman.rs | 851 +++++++++++++++++++ crates/notedeck/src/util/mod.rs | 4 + crates/notedeck/src/util/test_util.rs | 86 ++ crates/notedeck_columns/src/app.rs | 217 +++-- crates/notedeck_columns/src/decks.rs | 2 +- crates/notedeck_columns/src/nav.rs | 14 +- crates/notedeck_columns/src/ui/add_column.rs | 4 +- 14 files changed, 1080 insertions(+), 136 deletions(-) create mode 100644 crates/notedeck/src/subman.rs create mode 100644 crates/notedeck/src/util/mod.rs create mode 100644 crates/notedeck/src/util/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 6cd25be5..c4e0a974 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2751,8 +2751,10 @@ dependencies = [ "eframe", "egui", "enostr", + "futures", "hex", "image", + "nostr", "nostrdb", "poll-promise", "puffin 0.19.1 (git+https://github.com/jb55/puffin?rev=70ff86d5503815219b01a009afd3669b7903a057)", @@ -2765,6 +2767,7 @@ dependencies = [ "strum_macros", "tempfile", "thiserror 2.0.7", + "tokio", "tracing", "url", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 73da0bf5..231f48b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ egui_virtual_list = "0.5.0" ehttp = "0.2.0" enostr = { path = "crates/enostr" } ewebsock = { version = "0.2.0", features = ["tls"] } +futures = "0.3.31" hex = "0.4.3" image = { version = "0.25", features = ["jpeg", "png", "webp"] } indexmap = "2.6.0" diff --git a/crates/enostr/src/relay/pool.rs b/crates/enostr/src/relay/pool.rs index 284e089a..84b34736 100644 --- a/crates/enostr/src/relay/pool.rs +++ b/crates/enostr/src/relay/pool.rs @@ -340,7 +340,7 @@ impl RelayPool { } // standardize the format (ie, trailing slashes) - fn canonicalize_url(url: String) -> String { + pub fn canonicalize_url(url: String) -> String { match Url::parse(&url) { Ok(parsed_url) => parsed_url.to_string(), Err(_) => url, // If parsing fails, return the original URL. diff --git a/crates/notedeck/Cargo.toml b/crates/notedeck/Cargo.toml index 05f6406b..48a0cc3a 100644 --- a/crates/notedeck/Cargo.toml +++ b/crates/notedeck/Cargo.toml @@ -13,6 +13,7 @@ dirs = { workspace = true } enostr = { workspace = true } egui = { workspace = true } eframe = { workspace = true } +futures = { workspace = true } image = { workspace = true } base32 = { workspace = true } poll-promise = { workspace = true } @@ -28,6 +29,8 @@ sha2 = { workspace = true } [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true } +nostr = { workspace = true } [target.'cfg(target_os = "macos")'.dependencies] security-framework = { workspace = true } diff --git a/crates/notedeck/src/app.rs b/crates/notedeck/src/app.rs index dfb4468b..aaf4d4ad 100644 --- a/crates/notedeck/src/app.rs +++ b/crates/notedeck/src/app.rs @@ -1,7 +1,7 @@ use crate::persist::{AppSizeHandler, ZoomHandler}; use crate::{ Accounts, AppContext, Args, DataPath, DataPathType, Directory, FileKeyStorage, ImageCache, - KeyStorageType, NoteCache, RelayDebugView, ThemeHandler, UnknownIds, + KeyStorageType, NoteCache, RelayDebugView, SubMan, ThemeHandler, UnknownIds, }; use egui::ThemePreference; use enostr::RelayPool; @@ -21,7 +21,6 @@ pub struct Notedeck { ndb: Ndb, img_cache: ImageCache, unknown_ids: UnknownIds, - pool: RelayPool, note_cache: NoteCache, accounts: Accounts, path: DataPath, @@ -31,6 +30,7 @@ pub struct Notedeck { zoom: ZoomHandler, app_size: AppSizeHandler, unrecognized_args: BTreeSet, + subman: SubMan, } fn margin_top(narrow: bool) -> f32 { @@ -71,7 +71,7 @@ impl eframe::App for Notedeck { puffin::GlobalProfiler::lock().new_frame(); // handle account updates - self.accounts.update(&mut self.ndb, &mut self.pool, ctx); + self.accounts.update(&mut self.ndb, self.subman.pool(), ctx); main_panel(&ctx.style(), crate::ui::is_narrow(ctx)).show(ctx, |ui| { // render app @@ -85,11 +85,11 @@ impl eframe::App for Notedeck { self.app_size.try_save_app_size(ctx); if self.args.relay_debug { - if self.pool.debug.is_none() { - self.pool.use_debug(); + if self.subman.pool().debug.is_none() { + self.subman.pool().use_debug(); } - if let Some(debug) = &mut self.pool.debug { + if let Some(debug) = &mut self.subman.pool().debug { RelayDebugView::window(ctx, debug); } } @@ -199,11 +199,12 @@ impl Notedeck { error!("error migrating image cache: {e}"); } + let subman = SubMan::new(ndb.clone(), pool); + Self { ndb, img_cache, unknown_ids, - pool, note_cache, accounts, path: path.clone(), @@ -213,6 +214,7 @@ impl Notedeck { zoom, app_size, unrecognized_args, + subman, } } @@ -226,12 +228,12 @@ impl Notedeck { ndb: &mut self.ndb, img_cache: &mut self.img_cache, unknown_ids: &mut self.unknown_ids, - pool: &mut self.pool, note_cache: &mut self.note_cache, accounts: &mut self.accounts, path: &self.path, args: &self.args, theme: &mut self.theme, + subman: &mut self.subman, } } diff --git a/crates/notedeck/src/context.rs b/crates/notedeck/src/context.rs index 5801fbac..0255ca4c 100644 --- a/crates/notedeck/src/context.rs +++ b/crates/notedeck/src/context.rs @@ -1,6 +1,5 @@ -use crate::{Accounts, Args, DataPath, ImageCache, NoteCache, ThemeHandler, UnknownIds}; +use crate::{Accounts, Args, DataPath, ImageCache, NoteCache, SubMan, ThemeHandler, UnknownIds}; -use enostr::RelayPool; use nostrdb::Ndb; // TODO: make this interface more sandboxed @@ -9,10 +8,10 @@ pub struct AppContext<'a> { pub ndb: &'a mut Ndb, pub img_cache: &'a mut ImageCache, pub unknown_ids: &'a mut UnknownIds, - pub pool: &'a mut RelayPool, pub note_cache: &'a mut NoteCache, pub accounts: &'a mut Accounts, pub path: &'a DataPath, pub args: &'a Args, pub theme: &'a mut ThemeHandler, + pub subman: &'a mut SubMan, } diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs index 61c31fba..cd592193 100644 --- a/crates/notedeck/src/lib.rs +++ b/crates/notedeck/src/lib.rs @@ -15,6 +15,7 @@ pub mod relayspec; mod result; pub mod storage; mod style; +pub mod subman; pub mod theme; mod time; mod timecache; @@ -23,6 +24,10 @@ pub mod ui; mod unknowns; mod user_account; +/// Various utilities +#[macro_use] +pub mod util; + pub use accounts::{AccountData, Accounts, AccountsAction, AddAccountAction, SwitchAccountAction}; pub use app::{App, Notedeck}; pub use args::Args; @@ -42,6 +47,7 @@ pub use storage::{ DataPath, DataPathType, Directory, FileKeyStorage, KeyStorageResponse, KeyStorageType, }; pub use style::NotedeckTextStyle; +pub use subman::SubMan; pub use theme::ColorTheme; pub use time::time_ago_since; pub use timecache::TimeCached; diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs new file mode 100644 index 00000000..abc10858 --- /dev/null +++ b/crates/notedeck/src/subman.rs @@ -0,0 +1,851 @@ +use futures::{channel::mpsc, FutureExt, StreamExt}; +use std::collections::BTreeMap; +use std::fmt; +use std::{cell::RefCell, cmp::Ordering, rc::Rc}; +use thiserror::Error; +use tracing::{debug, error, info, trace, warn}; +use uuid::Uuid; + +use enostr::{ClientMessage, Filter, PoolRelay, RelayEvent, RelayMessage, RelayPool}; +use nostrdb::{self, Ndb, Subscription, SubscriptionStream}; + +/// The Subscription Manager +/// +/// ```no_run +/// use std::error::Error; +/// +/// use nostrdb::{Config, Ndb}; +/// use enostr::{Filter, RelayPool}; +/// use notedeck::subman::{SubConstraint, SubMan, SubSpecBuilder, SubError}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; +/// let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); +/// +/// // Define a filter and build the subscription specification +/// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); +/// let spec = SubSpecBuilder::new() +/// .filters(vec![filter]) +/// .constraint(SubConstraint::OnlyLocal) +/// .build(); +/// +/// // Subscribe and obtain a SubReceiver +/// let mut receiver = subman.subscribe(spec)?; +/// +/// // Process incoming note keys +/// loop { +/// match receiver.next().await { +/// Ok(note_keys) => { +/// // Process the note keys +/// println!("Received note keys: {:?}", note_keys); +/// }, +/// Err(SubError::StreamEnded) => { +/// // Not really an error; we should clean up +/// break; +/// }, +/// Err(err) => { +/// // Handle other errors +/// eprintln!("Error: {:?}", err); +/// break; +/// }, +/// } +/// } +/// +/// // Unsubscribe when the subscription is no longer needed +/// subman.unsubscribe_lclid(&receiver.lclid().unwrap())?; +/// +/// Ok(()) +/// } +/// ``` +/// +/// Supported Operational Modes: +/// +/// | mode | Constraints | lcl | rmt | end mechanism | +/// |-----------------+--------------------+-----+-----+---------------------| +/// | normal | | sub | sub | client-closes | +/// | local | OnlyLocal | sub | | client-closes | +/// | normal one-shot | OneShot | sub | sub | EOSE -> StreamEnded | +/// | local one-shot | OneShot+OnlyLocal | qry | | query, StreamEnded | +/// | "prefetch" | OneShot+OnlyRemote | | sub | EOSE -> StreamEnded | + +#[derive(Debug, Error)] +pub enum SubError { + #[error("Stream ended")] + StreamEnded, + + #[error("Internal error: {0}")] + InternalError(String), + + #[error("nostrdb error: {0}")] + NdbError(#[from] nostrdb::Error), +} + +pub type SubResult = Result; + +#[derive(Debug, Clone, Copy)] +pub struct LclId(nostrdb::Subscription); + +impl fmt::Display for LclId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0.id()) + } +} + +impl From for LclId { + fn from(subscription: Subscription) -> Self { + LclId(subscription) + } +} + +impl Ord for LclId { + fn cmp(&self, other: &Self) -> Ordering { + self.0.id().cmp(&other.0.id()) + } +} + +impl PartialOrd for LclId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LclId { + fn eq(&self, other: &Self) -> bool { + self.0.id() == other.0.id() + } +} + +impl Eq for LclId {} + +// nostr remote sub id +pub type RmtId = String; + +#[derive(Debug, Clone)] +pub enum SubConstraint { + OneShot, // terminate subscription after initial query and EOSE + OnlyLocal, // only query the local db, no remote subs + OnlyRemote, // prefetch from remote, nothing returned + OutboxRelays(Vec), // ensure one of these is in the active relay set + AllowedRelays(Vec), // if not empty, only use these relays + BlockedRelays(Vec), // if not empty, don't use these relays +} + +#[derive(Debug, Default, Clone)] +pub struct SubSpecBuilder { + rmtid: Option, + filters: Vec, + constraints: Vec, +} + +impl SubSpecBuilder { + pub fn new() -> Self { + SubSpecBuilder::default() + } + pub fn rmtid(mut self, id: String) -> Self { + self.rmtid = Some(id); + self + } + pub fn filters(mut self, filters: Vec) -> Self { + self.filters.extend(filters); + self + } + pub fn constraint(mut self, constraint: SubConstraint) -> Self { + self.constraints.push(constraint); + self + } + pub fn build(self) -> SubSpec { + let mut outbox_relays = Vec::new(); + let mut allowed_relays = Vec::new(); + let mut blocked_relays = Vec::new(); + let mut is_oneshot = false; + let mut is_onlylocal = false; + let mut is_onlyremote = false; + + for constraint in self.constraints { + match constraint { + SubConstraint::OneShot => is_oneshot = true, + SubConstraint::OnlyLocal => is_onlylocal = true, + SubConstraint::OnlyRemote => is_onlyremote = true, + SubConstraint::OutboxRelays(relays) => outbox_relays.extend(relays), + SubConstraint::AllowedRelays(relays) => allowed_relays.extend(relays), + SubConstraint::BlockedRelays(relays) => blocked_relays.extend(relays), + } + } + + let rmtid = self.rmtid.unwrap_or_else(|| Uuid::new_v4().to_string()); + + SubSpec { + rmtid, + filters: self.filters, + outbox_relays, + allowed_relays, + blocked_relays, + is_oneshot, + is_onlylocal, + is_onlyremote, + } + } +} + +#[derive(Clone)] +pub struct SubSpec { + pub rmtid: String, // unused if is_onlylocal + pub filters: Vec, + pub outbox_relays: Vec, + pub allowed_relays: Vec, + pub blocked_relays: Vec, + pub is_oneshot: bool, + pub is_onlylocal: bool, + pub is_onlyremote: bool, +} + +impl fmt::Debug for SubSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Convert each Filter to its JSON representation. + let filters_json: Vec<_> = self + .filters + .iter() + .map(|filter| filter.json().unwrap()) + .collect(); + f.debug_struct("SubSpec") + .field("rmtid", &self.rmtid) + .field("filters", &filters_json) + .field("outbox_relays", &self.outbox_relays) + .field("allowed_relays", &self.allowed_relays) + .field("blocked_relays", &self.blocked_relays) + .field("is_oneshot", &self.is_oneshot) + .field("is_onlylocal", &self.is_onlylocal) + .field("is_onlyremote", &self.is_onlyremote) + .finish() + } +} + +// State for a local subscription +#[derive(Debug)] +struct LclSubState { + lclid: LclId, +} + +// State of a remote subscription on a specific relay +#[allow(unused)] +#[derive(Default, Debug, Clone, Eq, PartialEq)] +enum RelaySubState { + #[default] + Pending, // before relay open or subscribed + Syncing, // before EOSE + Current, // after EOSE + Error(String), // went wrong + Closed, // closed +} + +// State for a remote subscription +#[derive(Debug)] +struct RmtSubState { + rmtid: RmtId, + relays: BTreeMap, + tx_ended: mpsc::Sender<()>, // send StreamEnded to receiver +} + +impl RmtSubState { + pub fn update_rss(&mut self, relay: &str, newstate: RelaySubState) { + let rss = self.relays.get_mut(relay).expect("couldn't find relay"); + debug!( + "RmtSubState update_rss {} {}: {:?} -> {:?}", + self.rmtid, relay, *rss, newstate + ); + *rss = newstate; + } + + // if this is a one-shot and there are no relays left syncing we are done + pub fn consider_finished(&mut self, is_oneshot: bool) -> bool { + let still_syncing: Vec = self + .relays + .iter() + .filter(|(_k, v)| **v == RelaySubState::Syncing) + .map(|(k, _v)| k.clone()) + .collect(); + + if still_syncing.is_empty() { + if is_oneshot { + debug!( + "handle_eose {}: all relays done syncing, sending one-shot ending", + self.rmtid + ); + self.tx_ended.try_send(()).ok(); + true + } else { + debug!("handle_eose {}: all relays done syncing", self.rmtid); + false + } + } else { + debug!( + "handle_eose {}: still_syncing {:?}", + self.rmtid, still_syncing + ); + false + } + } +} + +// State of a subscription +#[allow(unused)] +#[derive(Debug)] +pub struct SubState { + spec: SubSpec, + lcl: Option, + rmt: Option, +} +pub type SubStateRef = Rc>; + +impl Drop for SubState { + fn drop(&mut self) { + debug!("dropping SubState for {}", self.spec.rmtid); + } +} + +pub struct SubMan { + ndb: Ndb, + pool: RelayPool, + lcl: BTreeMap, + rmt: BTreeMap, +} + +impl SubMan { + pub fn new(ndb: Ndb, pool: RelayPool) -> Self { + SubMan { + ndb, + pool, + lcl: BTreeMap::new(), + rmt: BTreeMap::new(), + } + } + + pub fn ndb(&self) -> Ndb { + self.ndb.clone() + } + + // deprecated, use SubMan directly instead + pub fn pool(&mut self) -> &mut RelayPool { + &mut self.pool + } + + pub fn subscribe(&mut self, spec: SubSpec) -> SubResult { + let (substate, subrcvr) = self.make_subscription(&spec)?; + let state = Rc::new(RefCell::new(substate)); + if let Some(lclid) = subrcvr.lclid() { + self.lcl.insert(lclid, Rc::clone(&state)); + } + if let Some(rmtid) = subrcvr.rmtid() { + self.rmt.insert(rmtid, Rc::clone(&state)); + } + Ok(subrcvr) + } + + pub fn unsubscribe_lclid(&mut self, lclid: &LclId) -> SubResult<()> { + // find the substate and delegate to unsubscribe_substate + let ssref = match self.lcl.get(lclid) { + None => { + return Err(SubError::InternalError(format!( + "substate for {} not found", + lclid + ))) + } + Some(ssref) => ssref.clone(), // clone to drop the borrow on the map + }; + self.unsubscribe_substate(&ssref) + } + + pub fn unsubscribe_rmtid(&mut self, rmtid: &RmtId) -> SubResult<()> { + // find the substate and delegate to unsubscribe_substate + let ssref = match self.rmt.get(rmtid) { + None => { + return Err(SubError::InternalError(format!( + "substate for {} not found", + rmtid + ))) + } + Some(ssref) => ssref.clone(), // clone to drop the borrow on the map + }; + self.unsubscribe_substate(&ssref) + } + + fn unsubscribe_substate(&mut self, ssref: &SubStateRef) -> SubResult<()> { + let mut substate = ssref.borrow_mut(); + if let Some(&mut ref mut rmtsubstate) = substate.rmt.as_mut() { + let rmtid = rmtsubstate.rmtid.clone(); + // unsubscribe from all remote relays + for (url, rss) in rmtsubstate.relays.iter() { + match rss { + RelaySubState::Syncing | RelaySubState::Current => { + SubMan::close_relay_sub(&mut self.pool, &rmtid, url); + // not worth marking as closed because we drop below + } + _ => {} + } + } + // send StreamEnded to the receiver + rmtsubstate.tx_ended.try_send(()).ok(); + // remove from the SubMan index + self.rmt.remove(&rmtid).expect("removed from rmt index"); + } + if let Some(lclsubstate) = &substate.lcl { + let lclid = &lclsubstate.lclid; + // remove from the SubMan index + self.lcl.remove(lclid).expect("removed from lcl index"); + } + Ok(()) + } + + pub fn remove_substate_rmtid(&mut self, rmtid: &RmtId) -> SubResult<()> { + // remove from the local sub index if needed + if let Some(ssref) = self.rmt.get(rmtid) { + let substate = ssref.borrow(); + if let Some(lclsubstate) = &substate.lcl { + self.lcl.remove(&lclsubstate.lclid); + } + } + // remove from the remote sub index + match self.rmt.remove(rmtid) { + Some(_) => Ok(()), + None => Err(SubError::InternalError(format!( + "substate for {} not found", + rmtid + ))), + } + } + + fn make_subscription(&mut self, spec: &SubSpec) -> SubResult<(SubState, SubReceiver)> { + // Setup local ndb subscription state + let (maybe_lclstate, lclsub) = if spec.is_onlyremote { + (None, None) + } else { + let subscription = self.ndb.subscribe(&spec.filters)?; + let lclstrm = subscription.stream(&self.ndb).notes_per_await(1); + let lclid = subscription.into(); + ( + Some(LclSubState { lclid }), + Some(LclSub { + ndb: self.ndb.clone(), + lclid, + lclstrm, + }), + ) + }; + + // Setup remote nostr relay subscription state + let (maybe_rmtstate, rmtsub) = if spec.is_onlylocal { + (None, None) + } else { + let (tx_ended, rx_ended) = mpsc::channel(1); + // FIXME - need to choose specific relays! + let relays: BTreeMap = self + .pool + .relays + .iter() + .map(|s| match s.url() { + "multicast" => (s.url().to_string(), RelaySubState::Current), + _ => (s.url().to_string(), RelaySubState::Syncing), + }) + .collect(); + let rmtid = spec.rmtid.clone(); + self.pool + .subscribe(spec.rmtid.clone(), spec.filters.clone()); + ( + Some(RmtSubState { + rmtid: rmtid.clone(), + relays, + tx_ended, + }), + Some(RmtSub { rmtid, rx_ended }), + ) + }; + + Ok(( + SubState { + spec: spec.clone(), + lcl: maybe_lclstate, + rmt: maybe_rmtstate, + }, + SubReceiver { lclsub, rmtsub }, + )) + } + + fn close_relay_sub(pool: &mut RelayPool, sid: &str, url: &str) { + debug!("closing relay sub {} {}", sid, url); + if let Some(relay) = pool.relays.iter_mut().find(|r| r.url() == url) { + let cmd = ClientMessage::close(sid.to_string()); + if let Err(err) = relay.send(&cmd) { + error!("trouble closing relay sub: {} {}: {:?}", sid, url, err); + } + } + } + + pub fn process_relays( + &mut self, + legacy_relay_handler: &mut H, + ) -> SubResult<()> { + let wakeup = move || { + // ignore + }; + self.pool.keepalive_ping(wakeup); + + // NOTE: we don't use the while let loop due to borrow issues + #[allow(clippy::while_let_loop)] + loop { + let ev = if let Some(ev) = self.pool.try_recv() { + ev.into_owned() + } else { + break; + }; + + let relay = RelayPool::canonicalize_url(ev.relay.clone()); + + match (&ev.event).into() { + RelayEvent::Opened => { + debug!("handle_opened {}", relay); + + // handle legacy subscriptions + legacy_relay_handler.handle_opened(&mut self.ndb, &mut self.pool, &relay); + + // send our remote subscriptions for this relay + for ssr in self.rmt.values_mut() { + let mut substate = ssr.borrow_mut(); + let rmtid = substate.spec.rmtid.clone(); + let filters = substate.spec.filters.clone(); + if let Some(rmtsubstate) = &mut substate.rmt { + if let Some(rss) = &rmtsubstate.relays.get(&relay) { + match rss { + RelaySubState::Pending => { + debug!( + "SubMan handle_opened: sending sub {} {}: {:?}", + rmtid, + relay, + filters + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); + self.pool + .send_to(&ClientMessage::req(rmtid, filters), &relay); + rmtsubstate.update_rss(&relay, RelaySubState::Syncing); + } + _ => { + debug!( + "SubMan handle_opened: {} {} ignored in state {:?}", + rmtid, relay, rss + ); + } + } + } + } + } + } + // TODO: handle reconnects + RelayEvent::Closed => warn!("{} connection closed", &relay), + RelayEvent::Error(e) => error!("{}: {}", &relay, e), + RelayEvent::Other(msg) => trace!("other event {:?}", &msg), + RelayEvent::Message(msg) => { + self.process_message(legacy_relay_handler, &relay, &msg); + } + } + } + Ok(()) + } + + pub fn process_message( + &mut self, + legacy_relay_handler: &mut H, + relay: &str, + msg: &RelayMessage, + ) { + match msg { + RelayMessage::Event(_subid, ev) => { + let relay = if let Some(relay) = self.pool.relays.iter().find(|r| r.url() == relay) + { + relay + } else { + error!("couldn't find relay {} for note processing!?", relay); + return; + }; + + match relay { + PoolRelay::Websocket(_) => { + //info!("processing event {}", event); + if let Err(err) = self.ndb.process_event(ev) { + error!("error processing event {ev}: {err}"); + } + } + PoolRelay::Multicast(_) => { + // multicast events are client events + if let Err(err) = self.ndb.process_client_event(ev) { + error!("error processing multicast event {ev}: {err}"); + } + } + } + } + RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), + RelayMessage::OK(cr) => info!("OK {:?}", cr), + RelayMessage::Eose(sid) => { + debug!("SubMan process_message Eose {} {}", sid, relay); + let mut substate_finished = false; + // do we have this sub in the subman remote subscriptions? + if let Some(ss) = self.rmt.get_mut(*sid) { + let is_oneshot = ss.borrow().spec.is_oneshot; + let mut substate = ss.borrow_mut(); + if let Some(rmtsubstate) = &mut substate.rmt { + rmtsubstate.update_rss(relay, RelaySubState::Current); + + if is_oneshot { + SubMan::close_relay_sub(&mut self.pool, sid, relay); + rmtsubstate.update_rss(relay, RelaySubState::Closed); + } + + // any relays left syncing? + substate_finished = rmtsubstate.consider_finished(is_oneshot); + } + } else { + // we didn't find it in the subman, delegate to the legacy code + legacy_relay_handler.handle_eose(&mut self.ndb, &mut self.pool, sid, relay); + } + if substate_finished { + if let Err(err) = self.remove_substate_rmtid(&sid.to_string()) { + error!("trouble removing substate for {}: {:?}", sid, err); + } + } + } + } + } +} + +pub trait LegacyRelayHandler { + fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str); + fn handle_eose(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, id: &str, relay: &str); +} + +struct LclSub { + ndb: Ndb, + lclid: LclId, // ndb id + lclstrm: SubscriptionStream, +} + +#[allow(unused)] +struct RmtSub { + rmtid: RmtId, // remote nostr sub id + rx_ended: mpsc::Receiver<()>, // end-of-stream +} + +pub struct SubReceiver { + lclsub: Option, + rmtsub: Option, +} + +impl Drop for SubReceiver { + fn drop(&mut self) { + debug!("dropping Receiver for {}", self.idstr()); + } +} + +impl SubReceiver { + pub fn idstr(&self) -> String { + let mut idstr = "".to_string(); + if let Some(lsub) = &self.lclsub { + idstr.push_str(&format!("lcl:{}", lsub.lclid)); + } + if let Some(rsub) = &self.rmtsub { + if !idstr.is_empty() { + idstr.push_str(", "); + } + idstr.push_str(&format!("rmt:{}", rsub.rmtid)); + } + if idstr.is_empty() { + "query".to_string() + } else { + idstr + } + } + + pub fn lclid(&self) -> Option { + self.lclsub.as_ref().map(|lsub| lsub.lclid) + } + + pub fn rmtid(&self) -> Option { + self.rmtsub.as_ref().map(|rsub| rsub.rmtid.clone()) + } + + pub async fn next(&mut self) -> SubResult> { + if let (Some(lsub), Some(rsub)) = (&mut self.lclsub, &mut self.rmtsub) { + // local and remote subs + futures::select! { + notes = lsub.lclstrm.next().fuse() => { + match notes { + Some(notes) => Ok(notes), + None => Err(SubError::StreamEnded), + } + }, + _ = rsub.rx_ended.next().fuse() => { + Err(SubError::StreamEnded) + } + } + } else if let Some(lsub) = &mut self.lclsub { + // only local sub + lsub.lclstrm.next().await.ok_or(SubError::StreamEnded) + } else if let Some(rsub) = &mut self.rmtsub { + // only remote sub (prefetch only, values not returned) + match rsub.rx_ended.next().await { + Some(_) => Err(SubError::StreamEnded), + None => Err(SubError::InternalError( + "trouble reading from rx_ended".to_string(), + )), + } + } else { + // query case + Err(SubError::InternalError("unimplmented".to_string())) + } + } + + pub fn poll(&mut self, max_notes: u32) -> Vec { + assert!(self.lclsub.is_some()); // FIXME - local only + let lclsub = self.lclsub.as_mut().unwrap(); + lclsub.ndb.poll_for_notes(lclsub.lclid.0, max_notes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::testdbs_path_async; + use crate::util::test_util::{raw_msg, test_keypair, ManagedNdb}; + use nostrdb::{NoteKey, Transaction}; + + // test basic subscription functionality + #[tokio::test] + async fn test_subman_sub() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + )?; + let lclid = receiver.lclid().unwrap(); + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // receiver should now see the msg + let nks = receiver.next().await?; + assert_eq!(nks.len(), 1); + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // now nothing should be available again + assert_eq!(receiver.poll(1), vec![]); + + subman.unsubscribe_lclid(&lclid)?; + Ok(()) + } + + // ensure that the subscription works when it is waiting before the event + #[tokio::test] + async fn test_subman_sub_with_waiting_thread() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + )?; + let lclid = receiver.lclid().unwrap(); + + // spawn a task to wait for the next message + let handle = tokio::spawn(async move { + let nks = receiver.next().await.unwrap(); + assert_eq!(nks.len(), 1); // Ensure one message is received + (receiver, nks) // return the receiver as well + }); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // await the spawned task to ensure it completes + let (mut receiver, nks) = handle.await?; + + // validate the received message + let txn = Transaction::new(&ndb)?; + let note = ndb.get_note_by_key(&txn, nks[0])?; + assert_eq!(note.pubkey(), keys1.pubkey.bytes()); + assert_eq!(note.kind(), kind); + assert_eq!(note.content(), content); + + // ensure no additional messages are available + assert_eq!(receiver.poll(1), vec![]); + + subman.unsubscribe_lclid(&lclid)?; + Ok(()) + } + + // test subscription poll and next interaction + #[tokio::test] + async fn test_subman_poll_and_next() -> Result<(), Box> { + // setup an ndb and subman to test + let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); + let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + + // subscribe to some stuff + let mut receiver = subman.subscribe( + SubSpecBuilder::new() + .filters(vec![Filter::new().kinds(vec![1]).build()]) + .constraint(SubConstraint::OnlyLocal) + .build(), + )?; + let lclid = receiver.lclid().unwrap(); + + // nothing should be available yet + assert_eq!(receiver.poll(1), vec![]); + + // process a test event that matches the subscription + let keys1 = test_keypair(1); + let kind = 1; + let content = "abc"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + std::thread::sleep(std::time::Duration::from_millis(150)); + + // now poll should consume the note + assert_eq!(receiver.poll(1), vec![NoteKey::new(1)]); + + // nothing more available + assert_eq!(receiver.poll(1), vec![]); + + // process a second event + let content = "def"; + ndb.process_event(&raw_msg("subid", &keys1, kind, content))?; + + // now receiver should now see the second note + assert_eq!(receiver.next().await?, vec![NoteKey::new(2)]); + + subman.unsubscribe_lclid(&lclid)?; + Ok(()) + } +} diff --git a/crates/notedeck/src/util/mod.rs b/crates/notedeck/src/util/mod.rs new file mode 100644 index 00000000..0964f7ae --- /dev/null +++ b/crates/notedeck/src/util/mod.rs @@ -0,0 +1,4 @@ +#[allow(missing_docs)] +#[cfg(test)] +#[macro_use] +pub mod test_util; diff --git a/crates/notedeck/src/util/test_util.rs b/crates/notedeck/src/util/test_util.rs new file mode 100644 index 00000000..aaaa6a78 --- /dev/null +++ b/crates/notedeck/src/util/test_util.rs @@ -0,0 +1,86 @@ +use enostr::{FullKeypair, Pubkey}; +use nostrdb::{Config, Ndb, NoteBuilder}; +use std::fs; +use std::path::Path; + +// FIXME - make nostrdb::test_util::cleanup_db accessible instead +#[allow(dead_code)] +fn cleanup_db(path: &str) { + let p = Path::new(path); + let _ = fs::remove_file(p.join("data.mdb")); + let _ = fs::remove_file(p.join("lock.mdb")); +} + +// managed ndb handle that cleans up test data when dropped +pub struct ManagedNdb { + pub path: String, + pub ndb: Ndb, +} +impl ManagedNdb { + pub fn setup(path: &str) -> (Self, Ndb) { + cleanup_db(path); // ensure a clean slate before starting + let ndb = Ndb::new(path, &Config::new()) + .unwrap_or_else(|err| panic!("Failed to create Ndb at {}: {}", path, err)); + ( + Self { + path: path.to_string(), + ndb: ndb.clone(), + }, + ndb, + ) + } +} +impl Drop for ManagedNdb { + fn drop(&mut self) { + cleanup_db(&self.path); // comment this out to leave the db for inspection + } +} + +// generate a testdbs_path for an async test automatically +#[macro_export] +macro_rules! testdbs_path_async { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + core::any::type_name::() + } + let name = type_name_of(f); + + // Find and cut the rest of the path + let test_name = match &name[..name.len() - 3].strip_suffix("::{{closure}}") { + Some(stripped) => match &stripped.rfind(':') { + Some(pos) => &stripped[pos + 1..stripped.len()], + None => &stripped, + }, + None => &name[..name.len() - 3], + }; + + format!("target/testdbs/{}", test_name) + }}; +} + +// generate a deterministic keypair for testing +pub fn test_keypair(input: u64) -> FullKeypair { + use sha2::{Digest, Sha256}; + + let mut hasher = Sha256::new(); + hasher.update(input.to_le_bytes()); + let hash = hasher.finalize(); + + let secret_key = nostr::SecretKey::from_slice(&hash).expect("valid secret key"); + let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); + let pubkey = Pubkey::new(xopk.serialize()); + + FullKeypair::new(pubkey, secret_key) +} + +// generate a basic raw message from scratch +pub fn raw_msg(subid: &str, keys: &FullKeypair, kind: u32, content: &str) -> String { + let note = NoteBuilder::new() + .kind(kind) + .content(content) + .sign(&keys.secret_key.to_secret_bytes()) + .build() + .expect("note"); + format!(r#"["EVENT", "{}", {}]"#, subid, note.json().expect("json")) +} diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index e7d3747c..cdc83bbd 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -13,9 +13,12 @@ use crate::{ Result, }; -use notedeck::{Accounts, AppContext, DataPath, DataPathType, FilterState, ImageCache, UnknownIds}; +use notedeck::{ + subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState, + ImageCache, NoteCache, UnknownIds, +}; -use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage, RelayPool}; +use enostr::{ClientMessage, Keypair, Pubkey, RelayPool}; use uuid::Uuid; use egui_extras::{Size, StripBuilder}; @@ -25,7 +28,7 @@ use nostrdb::{Ndb, Transaction}; use std::collections::{BTreeSet, HashMap}; use std::path::Path; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; #[derive(Debug, Eq, PartialEq, Clone)] pub enum DamusState { @@ -80,59 +83,96 @@ fn handle_key_events(input: &egui::InputState, columns: &mut Columns) { } } -fn try_process_event( - damus: &mut Damus, - app_ctx: &mut AppContext<'_>, - ctx: &egui::Context, -) -> Result<()> { - let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache); - ctx.input(|i| handle_key_events(i, current_columns)); - - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; +struct RelayHandler<'a> { + // From AppContext + unknown_ids: &'a mut UnknownIds, + note_cache: &'a mut NoteCache, + accounts: &'a mut Accounts, - app_ctx.pool.keepalive_ping(wakeup); + // From Damus + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, - // NOTE: we don't use the while let loop due to borrow issues - #[allow(clippy::while_let_loop)] - loop { - let ev = if let Some(ev) = app_ctx.pool.try_recv() { - ev.into_owned() - } else { - break; - }; + since_optimize: bool, +} - match (&ev.event).into() { - RelayEvent::Opened => { - app_ctx - .accounts - .send_initial_filters(app_ctx.pool, &ev.relay); - - timeline::send_initial_timeline_filters( - app_ctx.ndb, - damus.since_optimize, - &mut damus.timeline_cache, - &mut damus.subscriptions, - app_ctx.pool, - &ev.relay, - ); - } - // TODO: handle reconnects - RelayEvent::Closed => warn!("{} connection closed", &ev.relay), - RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), - RelayEvent::Other(msg) => trace!("other event {:?}", &msg), - RelayEvent::Message(msg) => { - process_message(damus, app_ctx, &ev.relay, &msg); - } +impl<'a> RelayHandler<'a> { + fn new( + unknown_ids: &'a mut UnknownIds, + note_cache: &'a mut NoteCache, + accounts: &'a mut Accounts, + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, + since_optimize: bool, + ) -> Self { + RelayHandler { + unknown_ids, + accounts, + note_cache, + subscriptions, + timeline_cache, + since_optimize, } } +} - for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { - let is_ready = - timeline::is_timeline_ready(app_ctx.ndb, app_ctx.pool, app_ctx.note_cache, timeline); +impl LegacyRelayHandler for RelayHandler<'_> { + /// Handle relay opened + fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str) { + self.accounts.send_initial_filters(pool, relay); + timeline::send_initial_timeline_filters( + ndb, + self.since_optimize, + self.timeline_cache, + self.subscriptions, + pool, + relay, + ); + } + + /// Handle end-of-stored-events + fn handle_eose(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, sid: &str, relay: &str) { + do_handle_eose( + ndb, + pool, + &*self.subscriptions, + self.timeline_cache, + sid, + relay, + self.unknown_ids, + self.note_cache, + ) + .ok(); // we've already logged the error and intend to keep going + } +} +fn try_process_event<'a>( + damus: &'a mut Damus, + app_ctx: &'a mut AppContext<'_>, + ctx: &egui::Context, +) { + let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache); + ctx.input(|i| handle_key_events(i, current_columns)); + + { + let mut relay_handler = RelayHandler::new( + app_ctx.unknown_ids, + app_ctx.note_cache, + app_ctx.accounts, + &mut damus.subscriptions, + &mut damus.timeline_cache, + damus.since_optimize, + ); + app_ctx.subman.process_relays(&mut relay_handler).ok(); + } + + for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { + let is_ready = timeline::is_timeline_ready( + app_ctx.ndb, + app_ctx.subman.pool(), + app_ctx.note_cache, + timeline, + ); if is_ready { let txn = Transaction::new(app_ctx.ndb).expect("txn"); // only thread timelines are reversed @@ -153,10 +193,8 @@ fn try_process_event( } if app_ctx.unknown_ids.ready_to_send() { - unknown_id_send(app_ctx.unknown_ids, app_ctx.pool); + unknown_id_send(app_ctx.unknown_ids, app_ctx.subman.pool()); } - - Ok(()) } fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut RelayPool) { @@ -191,25 +229,27 @@ fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Con DamusState::Initialized => (), }; - if let Err(err) = try_process_event(damus, app_ctx, ctx) { - error!("error processing event: {}", err); - } + try_process_event(damus, app_ctx, ctx); } -fn handle_eose( +#[allow(clippy::too_many_arguments)] +fn do_handle_eose( + ndb: &mut Ndb, + pool: &mut RelayPool, subscriptions: &Subscriptions, timeline_cache: &mut TimelineCache, - ctx: &mut AppContext<'_>, subid: &str, relay_url: &str, + unknown_ids: &mut UnknownIds, + note_cache: &mut NoteCache, ) -> Result<()> { let sub_kind = if let Some(sub_kind) = subscriptions.subs.get(subid) { sub_kind } else { let n_subids = subscriptions.subs.len(); warn!( - "got unknown eose subid {}, {} tracked subscriptions", - subid, n_subids + "got unknown eose subid {} relay {}, {} tracked subscriptions", + subid, relay_url, n_subids ); return Ok(()); }; @@ -219,24 +259,14 @@ fn handle_eose( // eose on timeline? whatevs } SubKind::Initial => { - let txn = Transaction::new(ctx.ndb)?; - unknowns::update_from_columns( - &txn, - ctx.unknown_ids, - timeline_cache, - ctx.ndb, - ctx.note_cache, - ); - // this is possible if this is the first time - if ctx.unknown_ids.ready_to_send() { - unknown_id_send(ctx.unknown_ids, ctx.pool); - } + let txn = Transaction::new(ndb)?; + unknowns::update_from_columns(&txn, unknown_ids, timeline_cache, ndb, note_cache); } // oneshot subs just close when they're done SubKind::OneShot => { let msg = ClientMessage::close(subid.to_string()); - ctx.pool.send_to(&msg, relay_url); + pool.send_to(&msg, relay_url); } SubKind::FetchingContactList(timeline_uid) => { @@ -284,47 +314,6 @@ fn handle_eose( Ok(()) } -fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) { - match msg { - RelayMessage::Event(_subid, ev) => { - let relay = if let Some(relay) = ctx.pool.relays.iter().find(|r| r.url() == relay) { - relay - } else { - error!("couldn't find relay {} for note processing!?", relay); - return; - }; - - match relay { - PoolRelay::Websocket(_) => { - //info!("processing event {}", event); - if let Err(err) = ctx.ndb.process_event(ev) { - error!("error processing event {ev}: {err}"); - } - } - PoolRelay::Multicast(_) => { - // multicast events are client events - if let Err(err) = ctx.ndb.process_client_event(ev) { - error!("error processing multicast event {ev}: {err}"); - } - } - } - } - RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), - RelayMessage::OK(cr) => info!("OK {:?}", cr), - RelayMessage::Eose(sid) => { - if let Err(err) = handle_eose( - &damus.subscriptions, - &mut damus.timeline_cache, - ctx, - sid, - relay, - ) { - error!("error handling eose: {}", err); - } - } - } -} - fn render_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ui: &mut egui::Ui) { if notedeck::ui::is_narrow(ui.ctx()) { render_damus_mobile(damus, app_ctx, ui); @@ -387,7 +376,7 @@ impl Damus { &txn, ctx.ndb, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &timeline_kind, ) { add_result.process( diff --git a/crates/notedeck_columns/src/decks.rs b/crates/notedeck_columns/src/decks.rs index 903b8f3d..61c4dc3e 100644 --- a/crates/notedeck_columns/src/decks.rs +++ b/crates/notedeck_columns/src/decks.rs @@ -323,7 +323,7 @@ pub fn demo_decks( &txn, ctx.ndb, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &kind, ) { results.process( diff --git a/crates/notedeck_columns/src/nav.rs b/crates/notedeck_columns/src/nav.rs index 3f496a72..119d29d7 100644 --- a/crates/notedeck_columns/src/nav.rs +++ b/crates/notedeck_columns/src/nav.rs @@ -72,7 +72,7 @@ impl SwitchingAction { let kinds_to_pop = get_active_columns_mut(ctx.accounts, decks_cache).delete_column(index); for kind in &kinds_to_pop { - if let Err(err) = timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("error popping timeline: {err}"); } } @@ -144,7 +144,7 @@ impl RenderNavResponse { let kinds_to_pop = app.columns_mut(ctx.accounts).delete_column(col); for kind in &kinds_to_pop { - if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("error popping timeline: {err}"); } } @@ -154,7 +154,7 @@ impl RenderNavResponse { RenderNavAction::PostAction(post_action) => { let txn = Transaction::new(ctx.ndb).expect("txn"); - let _ = post_action.execute(ctx.ndb, &txn, ctx.pool, &mut app.drafts); + let _ = post_action.execute(ctx.ndb, &txn, ctx.subman.pool(), &mut app.drafts); get_active_columns_mut(ctx.accounts, &mut app.decks_cache) .column_mut(col) .router_mut() @@ -170,7 +170,7 @@ impl RenderNavResponse { col, &mut app.timeline_cache, ctx.note_cache, - ctx.pool, + ctx.subman.pool(), &txn, ctx.unknown_ids, ); @@ -187,7 +187,7 @@ impl RenderNavResponse { profile_action.process( &mut app.view_state.pubkey_to_profile_state, ctx.ndb, - ctx.pool, + ctx.subman.pool(), get_active_columns_mut(ctx.accounts, &mut app.decks_cache) .column_mut(col) .router_mut(), @@ -206,7 +206,7 @@ impl RenderNavResponse { .pop(); if let Some(Route::Timeline(kind)) = &r { - if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.pool) { + if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.subman.pool()) { error!("popping timeline had an error: {err} for {:?}", kind); } }; @@ -275,7 +275,7 @@ fn render_nav_body( .map(|f| RenderNavAction::SwitchingAction(SwitchingAction::Accounts(f))) } Route::Relays => { - let manager = RelayPoolManager::new(ctx.pool); + let manager = RelayPoolManager::new(ctx.subman.pool()); RelayView::new(ctx.accounts, manager, &mut app.view_state.id_string_map).ui(ui); None } diff --git a/crates/notedeck_columns/src/ui/add_column.rs b/crates/notedeck_columns/src/ui/add_column.rs index d1e964ac..2669b6ee 100644 --- a/crates/notedeck_columns/src/ui/add_column.rs +++ b/crates/notedeck_columns/src/ui/add_column.rs @@ -634,7 +634,7 @@ pub fn render_add_column_routes( &mut timeline, ctx.ndb, &mut app.subscriptions, - ctx.pool, + ctx.subman.pool(), ctx.note_cache, app.since_optimize, ); @@ -675,7 +675,7 @@ pub fn render_add_column_routes( &mut timeline, ctx.ndb, &mut app.subscriptions, - ctx.pool, + ctx.subman.pool(), ctx.note_cache, app.since_optimize, ); From ebb6290526f8bb629bb3255e5a0e811e9da08912 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Tue, 11 Feb 2025 10:34:29 -0800 Subject: [PATCH 03/11] use subman API to resolve unknown ids --- crates/notedeck/src/lib.rs | 2 +- crates/notedeck/src/unknowns.rs | 98 ++++++++++++++++++++---------- crates/notedeck_columns/src/app.rs | 41 +++++++++---- 3 files changed, 97 insertions(+), 44 deletions(-) diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs index cd592193..dfa18be6 100644 --- a/crates/notedeck/src/lib.rs +++ b/crates/notedeck/src/lib.rs @@ -47,7 +47,7 @@ pub use storage::{ DataPath, DataPathType, Directory, FileKeyStorage, KeyStorageResponse, KeyStorageType, }; pub use style::NotedeckTextStyle; -pub use subman::SubMan; +pub use subman::{SubError, SubMan}; pub use theme::ColorTheme; pub use time::time_ago_since; pub use timecache::TimeCached; diff --git a/crates/notedeck/src/unknowns.rs b/crates/notedeck/src/unknowns.rs index f2510b94..bacf39de 100644 --- a/crates/notedeck/src/unknowns.rs +++ b/crates/notedeck/src/unknowns.rs @@ -1,12 +1,13 @@ use crate::{ note::NoteRef, notecache::{CachedNote, NoteCache}, + subman::{SubConstraint, SubSpec, SubSpecBuilder}, Result, }; use enostr::{Filter, NoteId, Pubkey}; use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::time::{Duration, Instant}; use tracing::error; @@ -118,13 +119,74 @@ impl UnknownIds { &mut self.ids } + pub fn numids(&self) -> usize { + self.ids.len() + } + pub fn clear(&mut self) { self.ids = HashMap::default(); } - pub fn filter(&self) -> Option> { - let ids: Vec<&UnknownId> = self.ids.keys().collect(); - get_unknown_ids_filter(&ids) + pub fn generate_resolution_requests(&self) -> Vec { + // 1. resolve as many ids per request as possible + // 2. each request only has one filter (https://github.com/nostr-protocol/nips/pull/1645) + // 3. each request is limited to MAX_CHUNK_IDS + // 4. use relay hints when available + + // Collect the unknown ids by relay + let mut ids_by_relay: BTreeMap, Vec)> = BTreeMap::new(); + for (unknown_id, relay_hints) in self.ids.iter() { + // 1. use default relays (empty RelayUrl) if no hints are available + // 2. query the default relays even when hints are available + for relay in std::iter::once("".to_string()).chain(relay_hints.iter().cloned()) { + match unknown_id { + UnknownId::Pubkey(pk) => { + ids_by_relay + .entry(relay) + .or_insert_with(|| (Vec::new(), Vec::new())) + .0 + .push(*pk); + } + UnknownId::Id(nid) => { + ids_by_relay + .entry(relay) + .or_insert_with(|| (Vec::new(), Vec::new())) + .1 + .push(*nid); + } + } + } + } + + const MAX_CHUNK_IDS: usize = 500; + + let mut subspecs = vec![]; + for (relay, (pubkeys, noteids)) in ids_by_relay { + // make a template SubSpecBuilder w/ the common parts + let mut ssb = SubSpecBuilder::new() + .constraint(SubConstraint::OneShot) + .constraint(SubConstraint::OnlyRemote); + if !relay.is_empty() { + ssb = ssb.constraint(SubConstraint::AllowedRelays(vec![relay])); + } + for chunk in pubkeys.chunks(MAX_CHUNK_IDS) { + let pks: Vec<&[u8; 32]> = chunk.iter().map(|pk| pk.bytes()).collect(); + subspecs.push( + ssb.clone() + .filters(vec![Filter::new().authors(pks).kinds([0]).build()]) + .build(), + ); + } + for chunk in noteids.chunks(MAX_CHUNK_IDS) { + let nids: Vec<&[u8; 32]> = chunk.iter().map(|nid| nid.bytes()).collect(); + subspecs.push( + ssb.clone() + .filters(vec![Filter::new().ids(nids).build()]) + .build(), + ); + } + } + subspecs } /// We've updated some unknown ids, update the last_updated time to now @@ -350,31 +412,3 @@ pub fn get_unknown_note_ids<'a>( Ok(()) } - -fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option> { - if ids.is_empty() { - return None; - } - - let ids = &ids[0..500.min(ids.len())]; - let mut filters: Vec = vec![]; - - let pks: Vec<&[u8; 32]> = ids - .iter() - .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes())) - .collect(); - if !pks.is_empty() { - let pk_filter = Filter::new().authors(pks).kinds([0]).build(); - filters.push(pk_filter); - } - - let note_ids: Vec<&[u8; 32]> = ids - .iter() - .flat_map(|id| id.is_id().map(|id| id.bytes())) - .collect(); - if !note_ids.is_empty() { - filters.push(Filter::new().ids(note_ids).build()); - } - - Some(filters) -} diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index cdc83bbd..de4b95de 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -15,7 +15,7 @@ use crate::{ use notedeck::{ subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState, - ImageCache, NoteCache, UnknownIds, + ImageCache, NoteCache, SubError, SubMan, UnknownIds, }; use enostr::{ClientMessage, Keypair, Pubkey, RelayPool}; @@ -193,20 +193,39 @@ fn try_process_event<'a>( } if app_ctx.unknown_ids.ready_to_send() { - unknown_id_send(app_ctx.unknown_ids, app_ctx.subman.pool()); + unknown_id_send(app_ctx.unknown_ids, app_ctx.subman); } } -fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut RelayPool) { - debug!("unknown_id_send called on: {:?}", &unknown_ids); - let filter = unknown_ids.filter().expect("filter"); - info!( - "Getting {} unknown ids from relays", - unknown_ids.ids_iter().len() - ); - let msg = ClientMessage::req("unknownids".to_string(), filter); +fn unknown_id_send(unknown_ids: &mut UnknownIds, subman: &mut SubMan) { + info!("Getting {} unknown ids from relays", &unknown_ids.numids()); + for subspec in unknown_ids.generate_resolution_requests() { + debug!("unknown_ids subscribe: {:?}", subspec); + match subman.subscribe(subspec) { + Err(err) => error!("unknown_id_send subscribe failed: {:?}", err), + Ok(mut rcvr) => { + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("unknown_id_send: {} complete", rcvr.idstr()); + break; + } + Err(err) => { + error!("unknown_id_send: {}: error: {:?}", rcvr.idstr(), err); + break; + } + Ok(note_keys) => { + debug!("{}: received note keys: {:?}", rcvr.idstr(), note_keys); + // only need the prefetch into ndb, all done + } + } + } + }); + } + } + } unknown_ids.clear(); - pool.send(&msg); } fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Context) { From feea9eed38f2e4f4b0002293071a90770bbb210f Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Wed, 19 Feb 2025 15:25:26 -0800 Subject: [PATCH 04/11] convert account muted support to subman Consider changing the actual update of the Muted struct to polled instead of synchronized. --- crates/notedeck/Cargo.toml | 2 +- crates/notedeck/src/accounts.rs | 135 +++++++++++++++++--------------- crates/notedeck/src/app.rs | 2 +- crates/notedeck/src/muted.rs | 2 +- 4 files changed, 73 insertions(+), 68 deletions(-) diff --git a/crates/notedeck/Cargo.toml b/crates/notedeck/Cargo.toml index 48a0cc3a..869e351c 100644 --- a/crates/notedeck/Cargo.toml +++ b/crates/notedeck/Cargo.toml @@ -26,10 +26,10 @@ thiserror = { workspace = true } puffin = { workspace = true, optional = true } puffin_egui = { workspace = true, optional = true } sha2 = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] } [dev-dependencies] tempfile = { workspace = true } -tokio = { workspace = true } nostr = { workspace = true } [target.'cfg(target_os = "macos")'.dependencies] diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index dae29a80..02a065e3 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -1,8 +1,9 @@ use tracing::{debug, error, info}; +use crate::subman::{RmtId, SubSpecBuilder}; use crate::{ - KeyStorageResponse, KeyStorageType, MuteFun, Muted, RelaySpec, SingleUnkIdAction, UnknownIds, - UserAccount, + KeyStorageResponse, KeyStorageType, MuteFun, Muted, RelaySpec, SingleUnkIdAction, SubError, + SubMan, UnknownIds, UserAccount, }; use enostr::{ClientMessage, FilledKeypair, Keypair, RelayPool}; use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Subscription, Transaction}; @@ -12,7 +13,7 @@ use url::Url; use uuid::Uuid; // TODO: remove this -use std::sync::Arc; +use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] pub struct SwitchAccountAction { @@ -187,9 +188,8 @@ impl AccountRelayData { pub struct AccountMutedData { filter: Filter, - subid: Option, - sub: Option, - muted: Arc, + subrmtid: Option, + muted: Arc>, } impl AccountMutedData { @@ -215,46 +215,63 @@ impl AccountMutedData { AccountMutedData { filter, - subid: None, - sub: None, - muted: Arc::new(muted), + subrmtid: None, + muted: Arc::new(Mutex::new(muted)), } } // make this account the current selected account - pub fn activate(&mut self, ndb: &Ndb, pool: &mut RelayPool) { - debug!("activating muted sub {}", self.filter.json().unwrap()); - assert_eq!(self.subid, None, "subid already exists"); - assert_eq!(self.sub, None, "sub already exists"); - - // local subscription - let sub = ndb - .subscribe(&[self.filter.clone()]) - .expect("ndb muted subscription"); - - // remote subscription - let subid = Uuid::new_v4().to_string(); - pool.subscribe(subid.clone(), vec![self.filter.clone()]); - - self.sub = Some(sub); - self.subid = Some(subid); + pub fn activate(&mut self, subman: &mut SubMan) { + assert!(self.subrmtid.is_none(), "subscription already exists"); + let ndb = subman.ndb(); + let subspec = SubSpecBuilder::new() + .filters(vec![self.filter.clone()]) + .build(); + if let Ok(mut rcvr) = subman.subscribe(subspec) { + let idstr = rcvr.idstr(); + self.subrmtid = rcvr.rmtid(); + debug!( + "activating account muted sub {}: {}", + idstr, + self.filter.json().unwrap() + ); + let mutedref = self.muted.clone(); + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("account muted: sub {} complete", idstr); + break; + } + Err(err) => { + error!("account muted: sub {}: error: {:?}", idstr, err); + break; + } + Ok(nks) => { + debug!("account muted: sub {}: note keys: {:?}", idstr, nks); + let txn = Transaction::new(&ndb).expect("txn"); + let muted = AccountMutedData::harvest_nip51_muted(&ndb, &txn, &nks); + debug!("updated muted {:?}", muted); + *mutedref.lock().unwrap() = muted; + } + } + } + }); + } } // this account is no longer the selected account - pub fn deactivate(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - debug!("deactivating muted sub {}", self.filter.json().unwrap()); - assert_ne!(self.subid, None, "subid doesn't exist"); - assert_ne!(self.sub, None, "sub doesn't exist"); - - // remote subscription - pool.unsubscribe(self.subid.as_ref().unwrap().clone()); - - // local subscription - ndb.unsubscribe(self.sub.unwrap()) - .expect("ndb muted unsubscribe"); + pub fn deactivate(&mut self, subman: &mut SubMan) { + assert!(self.subrmtid.is_some(), "subscription doesn't exist"); + let rmtid = self.subrmtid.as_ref().unwrap(); + debug!( + "deactivating account muted sub {}: {}", + rmtid, + self.filter.json().unwrap() + ); - self.sub = None; - self.subid = None; + subman.unsubscribe_rmtid(rmtid).ok(); + self.subrmtid = None; } fn harvest_nip51_muted(ndb: &Ndb, txn: &Transaction, nks: &[NoteKey]) -> Muted { @@ -507,7 +524,7 @@ impl Accounts { if let Some(account) = self.accounts.get(index) { let pubkey = account.pubkey.bytes(); if let Some(account_data) = self.account_data.get(pubkey) { - let muted = Arc::clone(&account_data.muted.muted); + let muted = account_data.muted.muted.lock().unwrap().clone(); return Box::new(move |note: &Note, thread: &[u8; 32]| { muted.is_muted(note, thread) }); @@ -526,13 +543,6 @@ impl Accounts { relay_url, ); } - // send the active account's muted subscription - if let Some(muted_subid) = &data.muted.subid { - pool.send_to( - &ClientMessage::req(muted_subid.clone(), vec![data.muted.filter.clone()]), - relay_url, - ); - } } } @@ -588,16 +598,6 @@ impl Accounts { changed = true; } } - if let Some(sub) = data.muted.sub { - let nks = ndb.poll_for_notes(sub, 1); - if !nks.is_empty() { - let txn = Transaction::new(ndb).expect("txn"); - let muted = AccountMutedData::harvest_nip51_muted(ndb, &txn, &nks); - debug!("pubkey {}: updated muted {:?}", hex::encode(pubkey), muted); - data.muted.muted = Arc::new(muted); - changed = true; - } - } } changed } @@ -657,7 +657,7 @@ impl Accounts { debug!("current relays: {:?}", pool.urls()); } - pub fn update(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, ctx: &egui::Context) { + pub fn update(&mut self, subman: &mut SubMan, ctx: &egui::Context) { // IMPORTANT - This function is called in the UI update loop, // make sure it is fast when idle @@ -676,20 +676,25 @@ impl Accounts { if let Some(data) = self.account_data.get_mut(account.pubkey.bytes()) { if data.relay.sub.is_some() { // this account has relay subs, deactivate them - data.relay.deactivate(ndb, pool); + let mut ndb = subman.ndb().clone(); + let pool = subman.pool(); + data.relay.deactivate(&mut ndb, pool); } - if data.muted.sub.is_some() { + if data.muted.subrmtid.is_some() { // this account has muted subs, deactivate them - data.muted.deactivate(ndb, pool); + data.muted.deactivate(subman); } } } } + let ndb = subman.ndb().clone(); + let pool = subman.pool(); + // Were any accounts added or removed? let (added, removed) = self.delta_accounts(); for pk in added { - self.handle_added_account(ndb, &pk); + self.handle_added_account(&ndb, &pk); need_reconfig = true; } for pk in removed { @@ -698,7 +703,7 @@ impl Accounts { } // Did any accounts receive updates (ie NIP-65 relay lists) - need_reconfig = self.poll_for_updates(ndb) || need_reconfig; + need_reconfig = self.poll_for_updates(&ndb) || need_reconfig; // If needed, update the relay configuration if need_reconfig { @@ -710,11 +715,11 @@ impl Accounts { if let Some(data) = self.get_selected_account_data() { if data.relay.sub.is_none() { // the currently selected account doesn't have relay subs, activate them - data.relay.activate(ndb, pool); + data.relay.activate(&ndb, pool); } - if data.muted.sub.is_none() { + if data.muted.subrmtid.is_none() { // the currently selected account doesn't have muted subs, activate them - data.muted.activate(ndb, pool); + data.muted.activate(subman); } } } diff --git a/crates/notedeck/src/app.rs b/crates/notedeck/src/app.rs index aaf4d4ad..50e3e3b3 100644 --- a/crates/notedeck/src/app.rs +++ b/crates/notedeck/src/app.rs @@ -71,7 +71,7 @@ impl eframe::App for Notedeck { puffin::GlobalProfiler::lock().new_frame(); // handle account updates - self.accounts.update(&mut self.ndb, self.subman.pool(), ctx); + self.accounts.update(&mut self.subman, ctx); main_panel(&ctx.style(), crate::ui::is_narrow(ctx)).show(ctx, |ui| { // render app diff --git a/crates/notedeck/src/muted.rs b/crates/notedeck/src/muted.rs index 3d038033..dd463853 100644 --- a/crates/notedeck/src/muted.rs +++ b/crates/notedeck/src/muted.rs @@ -6,7 +6,7 @@ use std::collections::BTreeSet; // If the note is muted return a reason string, otherwise None pub type MuteFun = dyn Fn(&Note, &[u8; 32]) -> bool; -#[derive(Default)] +#[derive(Clone, Default)] pub struct Muted { // TODO - implement private mutes pub pubkeys: BTreeSet<[u8; 32]>, From cea6b349671a769f9665f8f167dafaff269b02c5 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Fri, 21 Feb 2025 08:45:35 -0800 Subject: [PATCH 05/11] consistently log all subscription opens and closes --- crates/enostr/src/relay/pool.rs | 20 +++++++++++++++++++- crates/notedeck/src/accounts.rs | 22 +++++++++++++++------- crates/notedeck/src/subman.rs | 2 +- crates/notedeck_columns/src/app.rs | 2 ++ 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/crates/enostr/src/relay/pool.rs b/crates/enostr/src/relay/pool.rs index 84b34736..1d9be217 100644 --- a/crates/enostr/src/relay/pool.rs +++ b/crates/enostr/src/relay/pool.rs @@ -96,6 +96,15 @@ impl PoolRelay { } pub fn subscribe(&mut self, subid: String, filter: Vec) -> Result<()> { + debug!( + "PoolRelay subscribe: sending sub {} {}: {:?}", + subid, + self.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); self.send(&ClientMessage::req(subid, filter)) } @@ -197,6 +206,7 @@ impl RelayPool { if let Some(debug) = &mut self.debug { debug.send_cmd(relay.url().to_owned(), &cmd); } + debug!("RelayPool unsubscribe close {} {}", subid, relay.url()); if let Err(err) = relay.send(&cmd) { error!( "error unsubscribing from {} on {}: {err}", @@ -215,7 +225,15 @@ impl RelayPool { &ClientMessage::req(subid.clone(), filter.clone()), ); } - + debug!( + "RelayPool subscribe: sending sub {} {}: {:?}", + subid, + relay.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { error!("error subscribing to {}: {err}", relay.url()); } diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 02a065e3..328e8d6a 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -227,14 +227,14 @@ impl AccountMutedData { let subspec = SubSpecBuilder::new() .filters(vec![self.filter.clone()]) .build(); + debug!( + "activating account muted sub {}: {}", + subspec.rmtid, + self.filter.json().unwrap() + ); if let Ok(mut rcvr) = subman.subscribe(subspec) { let idstr = rcvr.idstr(); self.subrmtid = rcvr.rmtid(); - debug!( - "activating account muted sub {}: {}", - idstr, - self.filter.json().unwrap() - ); let mutedref = self.muted.clone(); tokio::spawn(async move { loop { @@ -538,10 +538,18 @@ impl Accounts { for data in self.account_data.values() { // send the active account's relay list subscription if let Some(relay_subid) = &data.relay.subid { - pool.send_to( - &ClientMessage::req(relay_subid.clone(), vec![data.relay.filter.clone()]), + let filters = vec![data.relay.filter.clone()]; + debug!( + "Account send_initial_filters: sending sub {} {}: {:?}", + relay_subid, relay_url, + filters + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), ); + + pool.send_to(&ClientMessage::req(relay_subid.clone(), filters), relay_url); } } } diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs index abc10858..7727fc43 100644 --- a/crates/notedeck/src/subman.rs +++ b/crates/notedeck/src/subman.rs @@ -472,9 +472,9 @@ impl SubMan { } fn close_relay_sub(pool: &mut RelayPool, sid: &str, url: &str) { - debug!("closing relay sub {} {}", sid, url); if let Some(relay) = pool.relays.iter_mut().find(|r| r.url() == url) { let cmd = ClientMessage::close(sid.to_string()); + debug!("SubMan close_relay_sub close {} {}", sid, url); if let Err(err) = relay.send(&cmd) { error!("trouble closing relay sub: {} {}: {:?}", sid, url, err); } diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index de4b95de..686049f7 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -262,6 +262,7 @@ fn do_handle_eose( unknown_ids: &mut UnknownIds, note_cache: &mut NoteCache, ) -> Result<()> { + debug!("app do_handle_eose {} {}", subid, relay_url); let sub_kind = if let Some(sub_kind) = subscriptions.subs.get(subid) { sub_kind } else { @@ -285,6 +286,7 @@ fn do_handle_eose( // oneshot subs just close when they're done SubKind::OneShot => { let msg = ClientMessage::close(subid.to_string()); + debug!("app do_handle_eose close {} {}", subid, relay_url); pool.send_to(&msg, relay_url); } From 2ece2f7d7224e398fd62dbc1d9ef79a3b68c08e9 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sat, 22 Feb 2025 11:27:03 -0800 Subject: [PATCH 06/11] add get_selected_account_data_mut actually the old one was mut, now mut and !mut versions available --- crates/notedeck/src/accounts.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 328e8d6a..523d25e1 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -499,7 +499,15 @@ impl Accounts { } } - pub fn get_selected_account_data(&mut self) -> Option<&mut AccountData> { + pub fn get_selected_account_data(&self) -> Option<&AccountData> { + let account_pubkey = { + let account = self.get_selected_account()?; + *account.pubkey.bytes() + }; + self.account_data.get(&account_pubkey) + } + + pub fn get_selected_account_data_mut(&mut self) -> Option<&mut AccountData> { let account_pubkey = { let account = self.get_selected_account()?; *account.pubkey.bytes() @@ -720,7 +728,7 @@ impl Accounts { } // Do we need to activate account subs? - if let Some(data) = self.get_selected_account_data() { + if let Some(data) = self.get_selected_account_data_mut() { if data.relay.sub.is_none() { // the currently selected account doesn't have relay subs, activate them data.relay.activate(&ndb, pool); From b4bba7aaa8dd1e68a2693f50f8f9d133b363cd03 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sat, 22 Feb 2025 11:29:05 -0800 Subject: [PATCH 07/11] add get_selected_account_{readable,writable}_relays --- crates/notedeck/src/accounts.rs | 42 +++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 523d25e1..c9ee27a6 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -673,6 +673,48 @@ impl Accounts { debug!("current relays: {:?}", pool.urls()); } + fn get_fallback_relays(&self, check: impl Fn(&RelaySpec) -> bool) -> Vec { + self.bootstrap_relays + .iter() + .filter(|relay| check(relay)) + .cloned() + .collect() + } + + pub fn get_selected_account_readable_relays(&self) -> Vec { + if let Some(data) = self.get_selected_account_data() { + let readable_relays: Vec = data + .relay + .advertised + .iter() + .filter(|relay| relay.is_readable()) + .cloned() + .collect(); + + if !readable_relays.is_empty() { + return readable_relays; + } + } + self.get_fallback_relays(|relay| relay.is_readable()) + } + + pub fn get_selected_account_writable_relays(&self) -> Vec { + if let Some(data) = self.get_selected_account_data() { + let writable_relays: Vec = data + .relay + .advertised + .iter() + .filter(|relay| relay.is_writable()) + .cloned() + .collect(); + + if !writable_relays.is_empty() { + return writable_relays; + } + } + self.get_fallback_relays(|relay| relay.is_writable()) + } + pub fn update(&mut self, subman: &mut SubMan, ctx: &egui::Context) { // IMPORTANT - This function is called in the UI update loop, // make sure it is fast when idle From b8964e73da34ea1f071659065a002d6c03980fb8 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sat, 22 Feb 2025 18:29:05 -0800 Subject: [PATCH 08/11] subscribe to the correct relays Next: close relays when no subscriptions need them --- crates/enostr/src/relay/pool.rs | 39 ++++++++++++++++++ crates/notedeck/src/accounts.rs | 66 +++++++++++++++--------------- crates/notedeck/src/subman.rs | 56 +++++++++++++++++++------ crates/notedeck_columns/src/app.rs | 16 ++++++-- 4 files changed, 126 insertions(+), 51 deletions(-) diff --git a/crates/enostr/src/relay/pool.rs b/crates/enostr/src/relay/pool.rs index 1d9be217..5a7c2e3a 100644 --- a/crates/enostr/src/relay/pool.rs +++ b/crates/enostr/src/relay/pool.rs @@ -240,6 +240,45 @@ impl RelayPool { } } + // if the relay is in the pool already send the subscription, otherwise add the + // relay to the pool and we'll send it on open. + pub fn subscribe_relay( + &mut self, + subid: String, + filter: Vec, + relaystr: String, + ) -> bool { + if let Some(&mut ref mut relay) = self.relays.iter_mut().find(|r| r.url() == relaystr) { + if let Some(debug) = &mut self.debug { + debug.send_cmd( + relay.url().to_owned(), + &ClientMessage::req(subid.clone(), filter.clone()), + ); + } + debug!( + "RelayPool subscribe_relay: sending sub {} {}: {:?}", + subid, + relay.url(), + filter + .iter() + .map(|f| f.json().unwrap_or_default()) + .collect::>(), + ); + if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { + error!("error subscribing to {}: {err}", relay.url()); + } + true + } else { + let wakeup = move || { + // ignore + }; + if let Err(err) = self.add_url(relaystr.clone(), wakeup) { + error!("trouble adding url {}: {:?}", relaystr, err); + } + false + } + } + /// Keep relay connectiongs alive by pinging relays that haven't been /// pinged in awhile. Adjust ping rate with [`ping_rate`]. pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index c9ee27a6..879b4e0c 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -221,7 +221,7 @@ impl AccountMutedData { } // make this account the current selected account - pub fn activate(&mut self, subman: &mut SubMan) { + pub fn activate(&mut self, subman: &mut SubMan, default_relays: &[RelaySpec]) { assert!(self.subrmtid.is_none(), "subscription already exists"); let ndb = subman.ndb(); let subspec = SubSpecBuilder::new() @@ -232,7 +232,7 @@ impl AccountMutedData { subspec.rmtid, self.filter.json().unwrap() ); - if let Ok(mut rcvr) = subman.subscribe(subspec) { + if let Ok(mut rcvr) = subman.subscribe(subspec, default_relays) { let idstr = rcvr.idstr(); self.subrmtid = rcvr.rmtid(); let mutedref = self.muted.clone(); @@ -673,46 +673,43 @@ impl Accounts { debug!("current relays: {:?}", pool.urls()); } - fn get_fallback_relays(&self, check: impl Fn(&RelaySpec) -> bool) -> Vec { - self.bootstrap_relays - .iter() - .filter(|relay| check(relay)) - .cloned() - .collect() - } - - pub fn get_selected_account_readable_relays(&self) -> Vec { - if let Some(data) = self.get_selected_account_data() { - let readable_relays: Vec = data - .relay + fn get_combined_relays( + &self, + data_option: Option<&AccountData>, + filter: impl Fn(&RelaySpec) -> bool, + ) -> Vec { + let mut relays = if let Some(data) = data_option { + data.relay .advertised .iter() - .filter(|relay| relay.is_readable()) + .filter(|&x| filter(x)) .cloned() - .collect(); + .collect() + } else { + Vec::new() + }; - if !readable_relays.is_empty() { - return readable_relays; - } + if relays.is_empty() { + relays.extend(self.bootstrap_relays.iter().filter(|&x| filter(x)).cloned()); } - self.get_fallback_relays(|relay| relay.is_readable()) + + relays + } + + pub fn get_selected_account_readable_relays(&self) -> Vec { + self.get_combined_relays(self.get_selected_account_data(), |relay| { + relay.is_readable() + }) } pub fn get_selected_account_writable_relays(&self) -> Vec { - if let Some(data) = self.get_selected_account_data() { - let writable_relays: Vec = data - .relay - .advertised - .iter() - .filter(|relay| relay.is_writable()) - .cloned() - .collect(); + self.get_combined_relays(self.get_selected_account_data(), |relay| { + relay.is_writable() + }) + } - if !writable_relays.is_empty() { - return writable_relays; - } - } - self.get_fallback_relays(|relay| relay.is_writable()) + pub fn get_all_selected_account_relays(&self) -> Vec { + self.get_combined_relays(self.get_selected_account_data(), |_| true) } pub fn update(&mut self, subman: &mut SubMan, ctx: &egui::Context) { @@ -770,6 +767,7 @@ impl Accounts { } // Do we need to activate account subs? + let default_relays = self.get_all_selected_account_relays(); if let Some(data) = self.get_selected_account_data_mut() { if data.relay.sub.is_none() { // the currently selected account doesn't have relay subs, activate them @@ -777,7 +775,7 @@ impl Accounts { } if data.muted.subrmtid.is_none() { // the currently selected account doesn't have muted subs, activate them - data.muted.activate(subman); + data.muted.activate(subman, &default_relays); } } } diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs index 7727fc43..bcefff59 100644 --- a/crates/notedeck/src/subman.rs +++ b/crates/notedeck/src/subman.rs @@ -9,6 +9,8 @@ use uuid::Uuid; use enostr::{ClientMessage, Filter, PoolRelay, RelayEvent, RelayMessage, RelayPool}; use nostrdb::{self, Ndb, Subscription, SubscriptionStream}; +use crate::RelaySpec; + /// The Subscription Manager /// /// ```no_run @@ -330,8 +332,12 @@ impl SubMan { &mut self.pool } - pub fn subscribe(&mut self, spec: SubSpec) -> SubResult { - let (substate, subrcvr) = self.make_subscription(&spec)?; + pub fn subscribe( + &mut self, + spec: SubSpec, + default_relays: &[RelaySpec], + ) -> SubResult { + let (substate, subrcvr) = self.make_subscription(&spec, default_relays)?; let state = Rc::new(RefCell::new(substate)); if let Some(lclid) = subrcvr.lclid() { self.lcl.insert(lclid, Rc::clone(&state)); @@ -415,7 +421,11 @@ impl SubMan { } } - fn make_subscription(&mut self, spec: &SubSpec) -> SubResult<(SubState, SubReceiver)> { + fn make_subscription( + &mut self, + spec: &SubSpec, + default_relays: &[RelaySpec], + ) -> SubResult<(SubState, SubReceiver)> { // Setup local ndb subscription state let (maybe_lclstate, lclsub) = if spec.is_onlyremote { (None, None) @@ -438,23 +448,43 @@ impl SubMan { (None, None) } else { let (tx_ended, rx_ended) = mpsc::channel(1); - // FIXME - need to choose specific relays! - let relays: BTreeMap = self - .pool - .relays + + // Determine which relays to use + let relays = if !spec.allowed_relays.is_empty() { + spec.allowed_relays.clone() + } else { + default_relays + .iter() + .filter(|rs| rs.is_readable()) + .map(|rs| rs.url.clone()) + .collect() + }; + + // create the state map, special case multicast and blocked + let states: BTreeMap = relays .iter() - .map(|s| match s.url() { - "multicast" => (s.url().to_string(), RelaySubState::Current), - _ => (s.url().to_string(), RelaySubState::Syncing), + .map(|relay| { + let rss = if spec.blocked_relays.contains(relay) { + RelaySubState::Error("blocked".into()) + } else if self.pool.subscribe_relay( + spec.rmtid.clone(), + spec.filters.clone(), + relay.clone(), + ) { + RelaySubState::Syncing + } else { + RelaySubState::Pending + }; + + (relay.clone(), rss) }) .collect(); + let rmtid = spec.rmtid.clone(); - self.pool - .subscribe(spec.rmtid.clone(), spec.filters.clone()); ( Some(RmtSubState { rmtid: rmtid.clone(), - relays, + relays: states, tx_ended, }), Some(RmtSub { rmtid, rx_ended }), diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index 686049f7..1e7830a9 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -15,7 +15,7 @@ use crate::{ use notedeck::{ subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState, - ImageCache, NoteCache, SubError, SubMan, UnknownIds, + ImageCache, NoteCache, RelaySpec, SubError, SubMan, UnknownIds, }; use enostr::{ClientMessage, Keypair, Pubkey, RelayPool}; @@ -193,15 +193,23 @@ fn try_process_event<'a>( } if app_ctx.unknown_ids.ready_to_send() { - unknown_id_send(app_ctx.unknown_ids, app_ctx.subman); + unknown_id_send( + app_ctx.unknown_ids, + app_ctx.subman, + &app_ctx.accounts.get_all_selected_account_relays(), + ); } } -fn unknown_id_send(unknown_ids: &mut UnknownIds, subman: &mut SubMan) { +fn unknown_id_send( + unknown_ids: &mut UnknownIds, + subman: &mut SubMan, + default_relays: &[RelaySpec], +) { info!("Getting {} unknown ids from relays", &unknown_ids.numids()); for subspec in unknown_ids.generate_resolution_requests() { debug!("unknown_ids subscribe: {:?}", subspec); - match subman.subscribe(subspec) { + match subman.subscribe(subspec, default_relays) { Err(err) => error!("unknown_id_send subscribe failed: {:?}", err), Ok(mut rcvr) => { tokio::spawn(async move { From e399e1ec393c070efdfe06b8e71e023c8dee3e31 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sun, 23 Feb 2025 11:39:08 -0800 Subject: [PATCH 09/11] close unneeded relays --- crates/notedeck/src/subman.rs | 50 ++++++++++++++++++++++++++++-- crates/notedeck_columns/src/app.rs | 6 +++- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs index bcefff59..f2f6fd1c 100644 --- a/crates/notedeck/src/subman.rs +++ b/crates/notedeck/src/subman.rs @@ -1,5 +1,5 @@ use futures::{channel::mpsc, FutureExt, StreamExt}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::{cell::RefCell, cmp::Ordering, rc::Rc}; use thiserror::Error; @@ -24,6 +24,7 @@ use crate::RelaySpec; /// async fn main() -> Result<(), Box> { /// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; /// let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); +/// let default_relays = vec![]; /// /// // Define a filter and build the subscription specification /// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); @@ -33,7 +34,7 @@ use crate::RelaySpec; /// .build(); /// /// // Subscribe and obtain a SubReceiver -/// let mut receiver = subman.subscribe(spec)?; +/// let mut receiver = subman.subscribe(spec, &default_relays)?; /// /// // Process incoming note keys /// loop { @@ -514,6 +515,7 @@ impl SubMan { pub fn process_relays( &mut self, legacy_relay_handler: &mut H, + default_relays: &[RelaySpec], ) -> SubResult<()> { let wakeup = move || { // ignore @@ -580,6 +582,9 @@ impl SubMan { } } } + + self.close_unneeded_relays(default_relays); + Ok(()) } @@ -646,6 +651,41 @@ impl SubMan { } } } + + fn close_unneeded_relays(&mut self, default_relays: &[RelaySpec]) { + let current_relays: BTreeSet = self.pool.urls(); + let needed_relays: BTreeSet = self.needed_relays(default_relays); + let unneeded_relays: BTreeSet<_> = + current_relays.difference(&needed_relays).cloned().collect(); + if !unneeded_relays.is_empty() { + debug!("closing unneeded relays: {:?}", unneeded_relays); + self.pool.remove_urls(&unneeded_relays); + } + } + + fn needed_relays(&self, default_relays: &[RelaySpec]) -> BTreeSet { + let mut needed: BTreeSet = default_relays.iter().map(|rs| rs.url.clone()).collect(); + // for every remote subscription + for ssr in self.rmt.values() { + // that has remote substate (all will) + if let Some(ref rmtsubstate) = ssr.borrow().rmt { + // for each subscription remote relay + for (relay, state) in &rmtsubstate.relays { + // include any that are in-play + match state { + RelaySubState::Error(_) | RelaySubState::Closed => { + // these are terminal and we don't need this relay + } + _ => { + // relays in all other states are needed + _ = needed.insert(relay.clone()); + } + } + } + } + } + needed + } } pub trait LegacyRelayHandler { @@ -755,6 +795,7 @@ mod tests { // setup an ndb and subman to test let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; // subscribe to some stuff let mut receiver = subman.subscribe( @@ -762,6 +803,7 @@ mod tests { .filters(vec![Filter::new().kinds(vec![1]).build()]) .constraint(SubConstraint::OnlyLocal) .build(), + &default_relays, )?; let lclid = receiver.lclid().unwrap(); @@ -796,6 +838,7 @@ mod tests { // setup an ndb and subman to test let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; // subscribe to some stuff let mut receiver = subman.subscribe( @@ -803,6 +846,7 @@ mod tests { .filters(vec![Filter::new().kinds(vec![1]).build()]) .constraint(SubConstraint::OnlyLocal) .build(), + &default_relays, )?; let lclid = receiver.lclid().unwrap(); @@ -842,6 +886,7 @@ mod tests { // setup an ndb and subman to test let (_mndb, ndb) = ManagedNdb::setup(&testdbs_path_async!()); let mut subman = SubMan::new(ndb.clone(), RelayPool::new()); + let default_relays = vec![]; // subscribe to some stuff let mut receiver = subman.subscribe( @@ -849,6 +894,7 @@ mod tests { .filters(vec![Filter::new().kinds(vec![1]).build()]) .constraint(SubConstraint::OnlyLocal) .build(), + &default_relays, )?; let lclid = receiver.lclid().unwrap(); diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index 1e7830a9..d8ebc22e 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -155,6 +155,7 @@ fn try_process_event<'a>( ctx.input(|i| handle_key_events(i, current_columns)); { + let default_relays = app_ctx.accounts.get_all_selected_account_relays(); let mut relay_handler = RelayHandler::new( app_ctx.unknown_ids, app_ctx.note_cache, @@ -163,7 +164,10 @@ fn try_process_event<'a>( &mut damus.timeline_cache, damus.since_optimize, ); - app_ctx.subman.process_relays(&mut relay_handler).ok(); + app_ctx + .subman + .process_relays(&mut relay_handler, &default_relays) + .ok(); } for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { From 5b720d53c570e690a14e49c96bb001324534ddde Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sun, 23 Feb 2025 14:08:31 -0800 Subject: [PATCH 10/11] convert account relay list support to subman --- crates/notedeck/src/accounts.rs | 155 ++++++++++++----------------- crates/notedeck_columns/src/app.rs | 5 - 2 files changed, 63 insertions(+), 97 deletions(-) diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 879b4e0c..79c638fb 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -5,12 +5,11 @@ use crate::{ KeyStorageResponse, KeyStorageType, MuteFun, Muted, RelaySpec, SingleUnkIdAction, SubError, SubMan, UnknownIds, UserAccount, }; -use enostr::{ClientMessage, FilledKeypair, Keypair, RelayPool}; -use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Subscription, Transaction}; +use enostr::{FilledKeypair, Keypair, RelayPool}; +use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Transaction}; use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use url::Url; -use uuid::Uuid; // TODO: remove this use std::sync::{Arc, Mutex}; @@ -38,10 +37,9 @@ pub enum AccountsAction { pub struct AccountRelayData { filter: Filter, - subid: Option, - sub: Option, - local: BTreeSet, // used locally but not advertised - advertised: BTreeSet, // advertised via NIP-65 + subrmtid: Option, + local: BTreeSet, // used locally but not advertised + advertised: Arc>>, // advertised via NIP-65 } #[derive(Default)] @@ -83,47 +81,64 @@ impl AccountRelayData { AccountRelayData { filter, - subid: None, - sub: None, + subrmtid: None, local: BTreeSet::new(), - advertised: relays.into_iter().collect(), + advertised: Arc::new(Mutex::new(relays.into_iter().collect())), } } // make this account the current selected account - pub fn activate(&mut self, ndb: &Ndb, pool: &mut RelayPool) { + pub fn activate(&mut self, subman: &mut SubMan, default_relays: &[RelaySpec]) { debug!("activating relay sub {}", self.filter.json().unwrap()); - assert_eq!(self.subid, None, "subid already exists"); - assert_eq!(self.sub, None, "sub already exists"); - - // local subscription - let sub = ndb - .subscribe(&[self.filter.clone()]) - .expect("ndb relay list subscription"); - - // remote subscription - let subid = Uuid::new_v4().to_string(); - pool.subscribe(subid.clone(), vec![self.filter.clone()]); - - self.sub = Some(sub); - self.subid = Some(subid); + assert!(self.subrmtid.is_none(), "subscription already exists"); + let ndb = subman.ndb(); + let subspec = SubSpecBuilder::new() + .filters(vec![self.filter.clone()]) + .build(); + debug!( + "activating account relay sub {}: {}", + subspec.rmtid, + self.filter.json().unwrap() + ); + if let Ok(mut rcvr) = subman.subscribe(subspec, default_relays) { + let idstr = rcvr.idstr(); + self.subrmtid = rcvr.rmtid(); + let advertisedref = self.advertised.clone(); + tokio::spawn(async move { + loop { + match rcvr.next().await { + Err(SubError::StreamEnded) => { + debug!("account relays: sub {} complete", idstr); + break; + } + Err(err) => { + error!("account relays: sub {}: error: {:?}", idstr, err); + break; + } + Ok(nks) => { + debug!("account relays: sub {}: note keys: {:?}", idstr, nks); + let txn = Transaction::new(&ndb).expect("txn"); + let relays = Self::harvest_nip65_relays(&ndb, &txn, &nks); + debug!("updated relays {:?}", relays); + *advertisedref.lock().unwrap() = relays.into_iter().collect(); + } + } + } + }); + } } // this account is no longer the selected account - pub fn deactivate(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - debug!("deactivating relay sub {}", self.filter.json().unwrap()); - assert_ne!(self.subid, None, "subid doesn't exist"); - assert_ne!(self.sub, None, "sub doesn't exist"); - - // remote subscription - pool.unsubscribe(self.subid.as_ref().unwrap().clone()); - - // local subscription - ndb.unsubscribe(self.sub.unwrap()) - .expect("ndb relay list unsubscribe"); - - self.sub = None; - self.subid = None; + pub fn deactivate(&mut self, subman: &mut SubMan) { + assert!(self.subrmtid.is_some(), "subscription doesn't exist"); + let rmtid = self.subrmtid.as_ref().unwrap(); + debug!( + "deactivating account relays sub {}: {}", + rmtid, + self.filter.json().unwrap() + ); + subman.unsubscribe_rmtid(rmtid).ok(); + self.subrmtid = None; } // standardize the format (ie, trailing slashes) to avoid dups @@ -173,7 +188,7 @@ impl AccountRelayData { pub fn publish_nip65_relays(&self, seckey: &[u8; 32], pool: &mut RelayPool) { let mut builder = NoteBuilder::new().kind(10002).content(""); - for rs in &self.advertised { + for rs in self.advertised.lock().unwrap().iter() { builder = builder.start_tag().tag_str("r").tag_str(&rs.url); if rs.has_read_marker { builder = builder.tag_str("read"); @@ -542,26 +557,6 @@ impl Accounts { Box::new(|_: &Note, _: &[u8; 32]| false) } - pub fn send_initial_filters(&mut self, pool: &mut RelayPool, relay_url: &str) { - for data in self.account_data.values() { - // send the active account's relay list subscription - if let Some(relay_subid) = &data.relay.subid { - let filters = vec![data.relay.filter.clone()]; - debug!( - "Account send_initial_filters: sending sub {} {}: {:?}", - relay_subid, - relay_url, - filters - .iter() - .map(|f| f.json().unwrap_or_default()) - .collect::>(), - ); - - pool.send_to(&ClientMessage::req(relay_subid.clone(), filters), relay_url); - } - } - } - // Return accounts which have no account_data yet (added) and accounts // which have still data but are no longer in our account list (removed). fn delta_accounts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) { @@ -597,27 +592,6 @@ impl Accounts { self.account_data.remove(pubkey); } - fn poll_for_updates(&mut self, ndb: &Ndb) -> bool { - let mut changed = false; - for (pubkey, data) in &mut self.account_data { - if let Some(sub) = data.relay.sub { - let nks = ndb.poll_for_notes(sub, 1); - if !nks.is_empty() { - let txn = Transaction::new(ndb).expect("txn"); - let relays = AccountRelayData::harvest_nip65_relays(ndb, &txn, &nks); - debug!( - "pubkey {}: updated relays {:?}", - hex::encode(pubkey), - relays - ); - data.relay.advertised = relays.into_iter().collect(); - changed = true; - } - } - } - changed - } - fn update_relay_configuration( &mut self, pool: &mut RelayPool, @@ -636,7 +610,7 @@ impl Accounts { if desired_relays.is_empty() { if let Some(data) = self.get_selected_account_data() { desired_relays.extend(data.relay.local.iter().cloned()); - desired_relays.extend(data.relay.advertised.iter().cloned()); + desired_relays.extend(data.relay.advertised.lock().unwrap().iter().cloned()); } } @@ -681,6 +655,8 @@ impl Accounts { let mut relays = if let Some(data) = data_option { data.relay .advertised + .lock() + .unwrap() .iter() .filter(|&x| filter(x)) .cloned() @@ -729,11 +705,9 @@ impl Accounts { if Some(ndx) != self.currently_selected_account { // this account is not currently selected if let Some(data) = self.account_data.get_mut(account.pubkey.bytes()) { - if data.relay.sub.is_some() { + if data.relay.subrmtid.is_some() { // this account has relay subs, deactivate them - let mut ndb = subman.ndb().clone(); - let pool = subman.pool(); - data.relay.deactivate(&mut ndb, pool); + data.relay.deactivate(subman); } if data.muted.subrmtid.is_some() { // this account has muted subs, deactivate them @@ -757,9 +731,6 @@ impl Accounts { need_reconfig = true; } - // Did any accounts receive updates (ie NIP-65 relay lists) - need_reconfig = self.poll_for_updates(&ndb) || need_reconfig; - // If needed, update the relay configuration if need_reconfig { self.update_relay_configuration(pool, wakeup); @@ -769,9 +740,9 @@ impl Accounts { // Do we need to activate account subs? let default_relays = self.get_all_selected_account_relays(); if let Some(data) = self.get_selected_account_data_mut() { - if data.relay.sub.is_none() { + if data.relay.subrmtid.is_none() { // the currently selected account doesn't have relay subs, activate them - data.relay.activate(&ndb, pool); + data.relay.activate(subman, &default_relays); } if data.muted.subrmtid.is_none() { // the currently selected account doesn't have muted subs, activate them @@ -812,7 +783,7 @@ impl Accounts { match self.account_data.get_mut(&key_bytes) { None => error!("no account data found for the provided key."), Some(account_data) => { - let advertised = &mut account_data.relay.advertised; + let advertised = &mut account_data.relay.advertised.lock().unwrap(); if advertised.is_empty() { // If the selected account has no advertised relays, // initialize with the bootstrapping set. diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index d8ebc22e..2fa10227 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -87,7 +87,6 @@ struct RelayHandler<'a> { // From AppContext unknown_ids: &'a mut UnknownIds, note_cache: &'a mut NoteCache, - accounts: &'a mut Accounts, // From Damus subscriptions: &'a mut Subscriptions, @@ -100,14 +99,12 @@ impl<'a> RelayHandler<'a> { fn new( unknown_ids: &'a mut UnknownIds, note_cache: &'a mut NoteCache, - accounts: &'a mut Accounts, subscriptions: &'a mut Subscriptions, timeline_cache: &'a mut TimelineCache, since_optimize: bool, ) -> Self { RelayHandler { unknown_ids, - accounts, note_cache, subscriptions, timeline_cache, @@ -119,7 +116,6 @@ impl<'a> RelayHandler<'a> { impl LegacyRelayHandler for RelayHandler<'_> { /// Handle relay opened fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str) { - self.accounts.send_initial_filters(pool, relay); timeline::send_initial_timeline_filters( ndb, self.since_optimize, @@ -159,7 +155,6 @@ fn try_process_event<'a>( let mut relay_handler = RelayHandler::new( app_ctx.unknown_ids, app_ctx.note_cache, - app_ctx.accounts, &mut damus.subscriptions, &mut damus.timeline_cache, damus.since_optimize, From 2ecde856369bbbc06c076fc3bbc2470a8d195cdf Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sun, 23 Feb 2025 14:43:27 -0800 Subject: [PATCH 11/11] prune account code; don't manage pool, deprecate local and forced relays - local relays are very doable, but not sure if we want them - forced relays want to be implemented in the pool/subman and need more definition --- crates/notedeck/src/accounts.rs | 86 +++------------------------------ 1 file changed, 7 insertions(+), 79 deletions(-) diff --git a/crates/notedeck/src/accounts.rs b/crates/notedeck/src/accounts.rs index 79c638fb..6e7acc40 100644 --- a/crates/notedeck/src/accounts.rs +++ b/crates/notedeck/src/accounts.rs @@ -38,7 +38,7 @@ pub enum AccountsAction { pub struct AccountRelayData { filter: Filter, subrmtid: Option, - local: BTreeSet, // used locally but not advertised + _local: BTreeSet, // used locally but not advertised advertised: Arc>>, // advertised via NIP-65 } @@ -82,7 +82,7 @@ impl AccountRelayData { AccountRelayData { filter, subrmtid: None, - local: BTreeSet::new(), + _local: BTreeSet::new(), advertised: Arc::new(Mutex::new(relays.into_iter().collect())), } } @@ -343,13 +343,13 @@ pub struct Accounts { accounts: Vec, key_store: KeyStorageType, account_data: BTreeMap<[u8; 32], AccountData>, - forced_relays: BTreeSet, + _forced_relays: BTreeSet, bootstrap_relays: BTreeSet, needs_relay_config: bool, } impl Accounts { - pub fn new(key_store: KeyStorageType, forced_relays: Vec) -> Self { + pub fn new(key_store: KeyStorageType, _forced_relays: Vec) -> Self { let accounts = if let KeyStorageResponse::ReceivedResult(res) = key_store.get_keys() { res.unwrap_or_default() } else { @@ -358,7 +358,7 @@ impl Accounts { let currently_selected_account = get_selected_index(&accounts, &key_store); let account_data = BTreeMap::new(); - let forced_relays: BTreeSet = forced_relays + let _forced_relays: BTreeSet = _forced_relays .into_iter() .map(|u| RelaySpec::new(AccountRelayData::canonicalize_url(&u), false, false)) .collect(); @@ -379,7 +379,7 @@ impl Accounts { accounts, key_store, account_data, - forced_relays, + _forced_relays, bootstrap_relays, needs_relay_config: true, } @@ -592,61 +592,6 @@ impl Accounts { self.account_data.remove(pubkey); } - fn update_relay_configuration( - &mut self, - pool: &mut RelayPool, - wakeup: impl Fn() + Send + Sync + Clone + 'static, - ) { - debug!( - "updating relay configuration for currently selected {:?}", - self.currently_selected_account - .map(|i| hex::encode(self.accounts.get(i).unwrap().pubkey.bytes())) - ); - - // If forced relays are set use them only - let mut desired_relays = self.forced_relays.clone(); - - // Compose the desired relay lists from the selected account - if desired_relays.is_empty() { - if let Some(data) = self.get_selected_account_data() { - desired_relays.extend(data.relay.local.iter().cloned()); - desired_relays.extend(data.relay.advertised.lock().unwrap().iter().cloned()); - } - } - - // If no relays are specified at this point use the bootstrap list - if desired_relays.is_empty() { - desired_relays = self.bootstrap_relays.clone(); - } - - debug!("current relays: {:?}", pool.urls()); - debug!("desired relays: {:?}", desired_relays); - - let pool_specs = pool - .urls() - .iter() - .map(|url| RelaySpec::new(url.clone(), false, false)) - .collect(); - let add: BTreeSet = desired_relays.difference(&pool_specs).cloned().collect(); - let mut sub: BTreeSet = - pool_specs.difference(&desired_relays).cloned().collect(); - if !add.is_empty() { - debug!("configuring added relays: {:?}", add); - let _ = pool.add_urls(add.iter().map(|r| r.url.clone()).collect(), wakeup); - } - if !sub.is_empty() { - // certain relays are persistent like the multicast relay, - // although we should probably have a way to explicitly - // disable it - sub.remove(&RelaySpec::new("multicast", false, false)); - - debug!("removing unwanted relays: {:?}", sub); - pool.remove_urls(&sub.iter().map(|r| r.url.clone()).collect()); - } - - debug!("current relays: {:?}", pool.urls()); - } - fn get_combined_relays( &self, data_option: Option<&AccountData>, @@ -688,18 +633,10 @@ impl Accounts { self.get_combined_relays(self.get_selected_account_data(), |_| true) } - pub fn update(&mut self, subman: &mut SubMan, ctx: &egui::Context) { + pub fn update(&mut self, subman: &mut SubMan, _ctx: &egui::Context) { // IMPORTANT - This function is called in the UI update loop, // make sure it is fast when idle - // On the initial update the relays need config even if nothing changes below - let mut need_reconfig = self.needs_relay_config; - - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; - // Do we need to deactivate any existing account subs? for (ndx, account) in self.accounts.iter().enumerate() { if Some(ndx) != self.currently_selected_account { @@ -718,23 +655,14 @@ impl Accounts { } let ndb = subman.ndb().clone(); - let pool = subman.pool(); // Were any accounts added or removed? let (added, removed) = self.delta_accounts(); for pk in added { self.handle_added_account(&ndb, &pk); - need_reconfig = true; } for pk in removed { self.handle_removed_account(&pk); - need_reconfig = true; - } - - // If needed, update the relay configuration - if need_reconfig { - self.update_relay_configuration(pool, wakeup); - self.needs_relay_config = false; } // Do we need to activate account subs?