diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 00000000..3a26366d --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1 @@ +edition = "2021" diff --git a/Cargo.lock b/Cargo.lock index 8ca5953b..eeb26f0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "ab_glyph" @@ -1226,6 +1226,7 @@ dependencies = [ "serde_derive", "serde_json", "tracing", + "url", ] [[package]] @@ -1446,9 +1447,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1461,9 +1462,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1471,15 +1472,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1488,15 +1489,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1505,21 +1506,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2483,7 +2484,6 @@ dependencies = [ [[package]] name = "nostrdb" version = "0.3.4" -source = "git+https://github.com/damus-io/nostrdb-rs?rev=9bbafd8a2e904b77a51e7cfca71eb5bb5650e829#9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" dependencies = [ "bindgen", "cc", @@ -2512,11 +2512,13 @@ dependencies = [ "ehttp 0.2.0", "enostr", "env_logger 0.10.2", + "futures", "hex", "image", "indexmap", "log", "nostrdb", + "once_cell", "poll-promise", "puffin", "puffin_egui", @@ -2800,9 +2802,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "opaque-debug" diff --git a/Cargo.toml b/Cargo.toml index 17e99ef2..16ef3c03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,8 +28,10 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence tracing = "0.1.40" #wasm-bindgen = "0.2.83" -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" } +#nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" } #nostrdb = { path = "/Users/jb55/dev/github/damus-io/nostrdb-rs" } +nostrdb = { path = "../nostrdb-rs" } +#nostrdb = { git = "https://github.com/ksedgwic/nostrdb-rs.git", rev = "3c3ac0cba199a2594cdf6785209bfdcc52ec64c4" } #nostrdb = "0.3.4" enostr = { path = "enostr" } serde_json = "1.0.89" @@ -43,7 +45,8 @@ strum_macros = "0.26" bitflags = "2.5.0" uuid = { version = "1.10.0", features = ["v4"] } indexmap = "2.6.0" - +futures = "0.3.31" +once_cell = "1.20.0" [target.'cfg(target_os = "macos")'.dependencies] security-framework = "2.11.0" diff --git a/enostr/Cargo.toml b/enostr/Cargo.toml index 4b742289..04b59580 100644 --- a/enostr/Cargo.toml +++ b/enostr/Cargo.toml @@ -11,7 +11,10 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence serde_json = "1.0.89" nostr = { version = "0.30.0" } -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" } +#nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" } +nostrdb = { path = "/home/user/bonsai/nostrdb-rs" } +#nostrdb = { git = "https://github.com/ksedgwic/nostrdb-rs.git", rev = "3c3ac0cba199a2594cdf6785209bfdcc52ec64c4" } hex = "0.4.3" tracing = "0.1.40" env_logger = "0.11.1" +url = "2.5.2" diff --git a/enostr/src/relay/message.rs b/enostr/src/relay/message.rs index b48e5aea..dea89e8d 100644 --- a/enostr/src/relay/message.rs +++ b/enostr/src/relay/message.rs @@ -90,7 +90,17 @@ impl<'a> RelayMessage<'a> { // Event // Relay response format: ["EVENT", , ] if &msg[0..=7] == "[\"EVENT\"" { - return Ok(Self::event(msg, "fixme")); + let mut start = 9; + while let Some(&b' ') = msg.as_bytes().get(start) { + start += 1; // Move past optional spaces + } + if let Some(comma_index) = msg[start..].find(',') { + let subid_end = start + comma_index; + let subid = &msg[start..subid_end].trim().trim_matches('"'); + return Ok(Self::event(msg, subid)); + } else { + return Ok(Self::event(msg, "fixme")); + } } // EOSE (NIP-15) diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs index f4aab0ce..73785fd9 100644 --- a/enostr/src/relay/pool.rs +++ b/enostr/src/relay/pool.rs @@ -2,13 +2,17 @@ use crate::relay::{Relay, RelayStatus}; use crate::{ClientMessage, Result}; use nostrdb::Filter; +use std::collections::HashMap; +use std::collections::HashSet; use std::time::{Duration, Instant}; +use url::Url; + #[cfg(not(target_arch = "wasm32"))] use ewebsock::{WsEvent, WsMessage}; #[cfg(not(target_arch = "wasm32"))] -use tracing::{debug, error}; +use tracing::{debug, error, info}; #[derive(Debug)] pub struct PoolEvent<'a> { @@ -40,6 +44,7 @@ impl PoolRelay { pub struct RelayPool { pub relays: Vec, + pub subs: HashMap>, pub ping_rate: Duration, } @@ -54,6 +59,7 @@ impl RelayPool { pub fn new() -> RelayPool { RelayPool { relays: vec![], + subs: HashMap::new(), ping_rate: Duration::from_secs(25), } } @@ -83,9 +89,11 @@ impl RelayPool { for relay in &mut self.relays { relay.relay.send(&ClientMessage::close(subid.clone())); } + self.subs.remove(&subid); } pub fn subscribe(&mut self, subid: String, filter: Vec) { + self.subs.insert(subid.clone(), filter.clone()); for relay in &mut self.relays { relay.relay.subscribe(subid.clone(), filter.clone()); } @@ -152,14 +160,76 @@ impl RelayPool { url: String, wakeup: impl Fn() + Send + Sync + Clone + 'static, ) -> Result<()> { + let url = Self::canonicalize_url(&url); + // Check if the URL already exists in the pool. + if self.has(&url) { + return Ok(()); + } let relay = Relay::new(url, wakeup)?; - let pool_relay = PoolRelay::new(relay); + let mut pool_relay = PoolRelay::new(relay); + + // Add all of the existing subscriptions to the new relay + for (subid, filters) in &self.subs { + pool_relay.relay.subscribe(subid.clone(), filters.clone()); + } self.relays.push(pool_relay); Ok(()) } + // Add and remove relays to match the provided list + pub fn set_relays( + &mut self, + urls: &Vec, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + // Canonicalize the new URLs. + let new_urls = urls + .iter() + .map(|u| Self::canonicalize_url(u)) + .collect::>(); + + // Get the old URLs from the existing relays. + let old_urls = self + .relays + .iter() + .map(|pr| pr.relay.url.clone()) + .collect::>(); + + debug!("old relays: {:?}", old_urls); + debug!("new relays: {:?}", new_urls); + + if new_urls.len() == 0 { + info!("bootstrapping, not clearing the relay list ..."); + return Ok(()); + } + + // Remove the relays that are in old_urls but not in new_urls. + let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect(); + self.relays.retain(|pr| !to_remove.contains(&pr.relay.url)); + + // FIXME - how do we close connections the removed relays? + + // Add the relays that are in new_urls but not in old_urls. + let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect(); + for url in to_add { + if let Err(e) = self.add_url(url.clone(), wakeup.clone()) { + error!("Failed to add relay with URL {}: {:?}", url, e); + } + } + + Ok(()) + } + + // standardize the format (ie, trailing slashes) to avoid dups + fn canonicalize_url(url: &String) -> String { + match Url::parse(&url) { + Ok(parsed_url) => parsed_url.to_string(), + Err(_) => url.clone(), // If parsing fails, return the original URL. + } + } + /// Attempts to receive a pool event from a list of relays. The /// function searches each relay in the list in order, attempting to /// receive a message from each. If a message is received, return it. diff --git a/src/app.rs b/src/app.rs index e04b8add..a299efc2 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,6 +4,7 @@ use crate::{ app_style::user_requested_visuals_change, args::Args, column::Columns, + dispatcher::{self, HandlerTable}, draft::Drafts, error::{Error, FilterError}, filter::{self, FilterState}, @@ -16,6 +17,7 @@ use crate::{ notes_holder::NotesHolderStorage, profile::Profile, subscriptions::{SubKind, Subscriptions}, + task, thread::Thread, timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, ui::{self, DesktopSidePanel}, @@ -46,6 +48,7 @@ pub enum DamusState { /// We derive Deserialize/Serialize so we can persist app state on shutdown. pub struct Damus { + reference: Option>>, state: DamusState, pub note_cache: NoteCache, pub pool: RelayPool, @@ -60,6 +63,7 @@ pub struct Damus { pub img_cache: ImageCache, pub accounts: AccountManager, pub subscriptions: Subscriptions, + pub dispatch: HandlerTable, frame_history: crate::frame_history::FrameHistory, @@ -471,6 +475,11 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { .insert("unknownids".to_string(), SubKind::OneShot); setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns) .expect("home subscription failed"); + + let damusref = damus.reference(); + tokio::spawn(async move { + task::track_user_relays(damusref).await; + }); } DamusState::NewTimelineSub(new_timeline_id) => { @@ -514,7 +523,7 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) { #[cfg(feature = "profiling")] puffin::profile_function!(); - //info!("processing event {}", event); + debug!("processing event {}", event); if let Err(_err) = damus.ndb.process_event(event) { error!("error processing event {}", event); } @@ -663,6 +672,7 @@ impl Damus { let mut config = Config::new(); config.set_ingester_threads(4); + config.set_sub_cb(ndb_sub_updated); let mut accounts = AccountManager::new( // TODO: should pull this from settings @@ -720,10 +730,12 @@ impl Damus { } Self { + reference: None, pool, debug, unknown_ids: UnknownIds::default(), subscriptions: Subscriptions::default(), + dispatch: HandlerTable::default(), since_optimize: parsed_args.since_optimize, threads: NotesHolderStorage::default(), profiles: NotesHolderStorage::default(), @@ -740,6 +752,19 @@ impl Damus { } } + pub fn set_reference(&mut self, reference: Weak>) { + self.reference = Some(reference); + } + + pub fn reference(&self) -> DamusRef { + self.reference + .as_ref() + .expect("weak damus reference") + .upgrade() + .expect("strong damus reference") + .clone() + } + pub fn pool_mut(&mut self) -> &mut RelayPool { &mut self.pool } @@ -803,9 +828,11 @@ impl Damus { let mut config = Config::new(); config.set_ingester_threads(2); Self { + reference: None, debug, unknown_ids: UnknownIds::default(), subscriptions: Subscriptions::default(), + dispatch: HandlerTable::default(), since_optimize: true, threads: NotesHolderStorage::default(), profiles: NotesHolderStorage::default(), @@ -1056,3 +1083,84 @@ impl eframe::App for Damus { render_damus(self, ctx); } } + +use futures::SinkExt; +use std::sync::{Arc, Mutex, Weak}; +use tokio::runtime::Handle; + +pub type DamusRef = Arc>; + +pub fn with_mut_damus(damusref: &DamusRef, mut f: F) -> T +where + F: FnMut(&mut Damus) -> T, +{ + let mut damus = damusref.as_ref().lock().unwrap(); + f(&mut damus) +} + +/// A wrapper so access to Damus can be synchronized +pub struct DamusApp { + damus: DamusRef, +} + +impl DamusApp { + pub fn new(damus: DamusRef) -> Self { + if ONCEDAMUSREF.set(Arc::clone(&damus)).is_err() { + panic!("ONCEDAMUSREF was already initialized."); + } + + let handle = tokio::runtime::Handle::try_current().expect("current tokio runtime"); + if TOKIORUNTIME.set(handle.clone()).is_err() { + panic!("Failed to set Tokio runtime handle"); + } + + let weak_damus = Arc::downgrade(&damus); + damus.lock().unwrap().set_reference(weak_damus); + Self { damus } + } + + pub fn with_mut_damus(&mut self, f: F) -> T + where + F: FnMut(&mut Damus) -> T, + { + with_mut_damus(&self.damus, f) + } +} + +impl eframe::App for DamusApp { + fn save(&mut self, storage: &mut dyn eframe::Storage) { + self.with_mut_damus(|damus| damus.save(storage)) + } + + fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) { + self.with_mut_damus(|damus| damus.update(ctx, frame)); + } +} + +use once_cell::sync::OnceCell; + +static ONCEDAMUSREF: OnceCell>> = OnceCell::new(); +static TOKIORUNTIME: OnceCell = OnceCell::new(); + +extern "C" fn ndb_sub_updated(_ctx: *mut std::ffi::c_void, subid: u64) { + let damus = ONCEDAMUSREF + .get() + .expect("ONCEDAMUSREF is not initialized.") + .lock() + .unwrap(); + + if let Some(sink) = damus.dispatch.get(&subid) { + let mut sink_clone = sink.clone(); + if let Some(handler) = TOKIORUNTIME.get() { + handler.spawn(async move { + sink_clone + .sender + .send(dispatcher::Event::NdbSubUpdate) + .await + .ok(); + }); + } else { + eprintln!("Tokio runtime handle is not set."); + } + } +} diff --git a/src/bin/notedeck.rs b/src/bin/notedeck.rs index 199272b4..83292233 100644 --- a/src/bin/notedeck.rs +++ b/src/bin/notedeck.rs @@ -1,7 +1,9 @@ #![warn(clippy::all, rust_2018_idioms)] #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] // hide console window on Windows in release use notedeck::app_creation::generate_native_options; -use notedeck::Damus; +use notedeck::{Damus, DamusApp}; + +use std::sync::{Arc, Mutex}; // Entry point for wasm //#[cfg(target_arch = "wasm32")] @@ -17,7 +19,13 @@ async fn main() { let _res = eframe::run_native( "Damus NoteDeck", generate_native_options(), - Box::new(|cc| Ok(Box::new(Damus::new(cc, ".", std::env::args().collect())))), + Box::new(|cc| { + Ok(Box::new(DamusApp::new(Arc::new(Mutex::new(Damus::new( + cc, + ".", + std::env::args().collect(), + )))))) + }), ); } diff --git a/src/dispatcher.rs b/src/dispatcher.rs new file mode 100644 index 00000000..5487aa78 --- /dev/null +++ b/src/dispatcher.rs @@ -0,0 +1,80 @@ +use futures::channel::mpsc; +use std::collections::HashMap; +use std::error::Error; +use std::fmt; + +use uuid::Uuid; + +use nostrdb::Filter; + +use crate::Damus; + +#[allow(dead_code)] // until InternalError is used +#[derive(Debug)] +pub enum DispatcherError { + InternalError(String), + NdbError(nostrdb::Error), +} + +impl From for DispatcherError { + fn from(err: nostrdb::Error) -> Self { + DispatcherError::NdbError(err) + } +} + +impl fmt::Display for DispatcherError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DispatcherError::InternalError(msg) => write!(f, "Internal error: {}", msg), + DispatcherError::NdbError(err) => write!(f, "nostrdb error: {}", err), + } + } +} + +impl Error for DispatcherError {} + +pub type DispatcherResult = Result; + +#[derive(Debug)] +pub enum Event { + NdbSubUpdate, +} + +/// Used by the relay code to dispatch events to a waiting handlers +#[derive(Debug, Clone)] +pub struct EventSink { + pub sender: mpsc::Sender, +} + +/// Maps subscription id to handler for the subscription +pub type HandlerTable = HashMap; + +/// Used by async tasks to receive events +#[allow(dead_code)] // until id is read +#[derive(Debug)] +pub struct EventSource { + pub ndbid: u64, + pub poolid: String, + pub receiver: mpsc::Receiver, +} + +pub fn subscribe( + damus: &mut Damus, + filters: &[Filter], + bufsz: usize, +) -> DispatcherResult { + let (sender, receiver) = mpsc::channel::(bufsz); + let ndbid = damus.ndb.subscribe(&filters)?.id(); + let poolid = Uuid::new_v4().to_string(); + damus.pool.subscribe(poolid.clone(), filters.into()); + damus.dispatch.insert(ndbid, EventSink { sender }); + Ok(EventSource { + ndbid, + poolid, + receiver, + }) +} + +pub fn _unsubscribe(_sub: EventSource) -> DispatcherResult<()> { + unimplemented!() +} diff --git a/src/lib.rs b/src/lib.rs index a44abc60..ef6351f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod app_style; mod args; mod colors; mod column; +mod dispatcher; mod draft; mod filter; mod fonts; @@ -32,6 +33,7 @@ pub mod relay_pool_manager; mod result; mod route; mod subscriptions; +mod task; mod test_data; mod thread; mod time; @@ -47,13 +49,16 @@ mod view_state; mod test_utils; mod linux_key_storage; -pub use app::Damus; +pub use app::{with_mut_damus, Damus, DamusApp, DamusRef}; pub use error::Error; pub use profile::DisplayName; #[cfg(target_os = "android")] use winit::platform::android::EventLoopBuilderExtAndroid; +#[cfg(target_os = "android")] +use std::sync::{Arc, Mutex}; + pub type Result = std::result::Result; //#[cfg(target_os = "android")] @@ -83,7 +88,11 @@ pub async fn android_main(app: AndroidApp) { let _res = eframe::run_native( "Damus NoteDeck", options, - Box::new(move |cc| Ok(Box::new(Damus::new(cc, path, app_args)))), + Box::new(|cc| { + Ok(Box::new(DamusApp::new(Arc::new(Mutex::new(Damus::new( + cc, path, app_args, + )))))) + }), ); } diff --git a/src/notecache.rs b/src/notecache.rs index 51bdc757..593fbaf5 100644 --- a/src/notecache.rs +++ b/src/notecache.rs @@ -2,6 +2,7 @@ use crate::time::time_ago_since; use crate::timecache::TimeCached; use nostrdb::{Note, NoteKey, NoteReply, NoteReplyBuf}; use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; #[derive(Default)] @@ -42,8 +43,9 @@ impl CachedNote { let created_at = note.created_at(); let reltime = TimeCached::new( Duration::from_secs(1), - Box::new(move || time_ago_since(created_at)), + Arc::new(move || time_ago_since(created_at)) as Arc String + Send + Sync>, ); + let reply = NoteReply::new(note.tags()).to_owned(); CachedNote { reltime, reply } } diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 00000000..ba04fd23 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,110 @@ +use futures::stream::StreamExt; +use tracing::{debug, error}; + +use enostr::RelayPool; +use nostrdb::{Filter, Ndb, Transaction}; + +use crate::dispatcher; +use crate::note::NoteRef; +use crate::{with_mut_damus, DamusRef}; + +pub async fn track_user_relays(damusref: DamusRef) { + debug!("track_user_relays starting"); + + let filter = user_relay_filter(&damusref); + + // Do we have a user relay list stored in nostrdb? Start with that ... + with_mut_damus(&damusref, |damus| { + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!("track_user_relays: initial from nostrdb: {:#?}", relays); + set_relays(&mut damus.pool, relays); + }); + + // Subscribe to user relay list updates + let mut src = with_mut_damus(&damusref, |damus| { + dispatcher::subscribe(damus, &[filter.clone()], 10).expect("subscribe") + }); + debug!( + "track_user_relays: ndbid: {}, poolid: {}", + src.ndbid, src.poolid + ); + + // Track user relay list updates + loop { + match src.receiver.next().await { + Some(_ev) => with_mut_damus(&damusref, |damus| { + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!("track_user_relays update: {:#?}", relays); + set_relays(&mut damus.pool, relays); + }), + None => { + debug!("track_user_relays: saw None"); + break; + } + } + } + + // Should only get here if the channel is closed + debug!("track_user_relays finished"); +} + +fn user_relay_filter(damusref: &DamusRef) -> Filter { + with_mut_damus(&damusref, |damus| { + let account = damus + .accounts + .get_selected_account() + .as_ref() + .map(|a| a.pubkey.bytes()) + .expect("selected account"); + + // NIP-65 + Filter::new() + .authors([account]) + .kinds([10002]) + .limit(1) + .build() + }) +} + +// useful for debugging +fn _query_note_json(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .map(|n| n.json().unwrap()) + .collect() +} + +fn query_nip65_relays(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .flat_map(|n| { + n.tags() + .iter() + .filter_map(|ti| ti.get_unchecked(1).variant().str()) + .map(|s| s.to_string()) + }) + .collect() +} + +fn set_relays(pool: &mut RelayPool, relays: Vec) { + let wakeup = move || { + // FIXME - how do we repaint? + }; + if let Err(e) = pool.set_relays(&relays, wakeup) { + error!("{:?}", e) + } +} diff --git a/src/timecache.rs b/src/timecache.rs index ef992219..b6a7b36d 100644 --- a/src/timecache.rs +++ b/src/timecache.rs @@ -1,4 +1,4 @@ -use std::rc::Rc; +use std::sync::Arc; use std::time::{Duration, Instant}; #[derive(Clone)] @@ -6,16 +6,16 @@ pub struct TimeCached { last_update: Instant, expires_in: Duration, value: Option, - refresh: Rc T + 'static>, + refresh: Arc T + Send + Sync + 'static>, // Use Send + Sync } impl TimeCached { - pub fn new(expires_in: Duration, refresh: impl Fn() -> T + 'static) -> Self { + pub fn new(expires_in: Duration, refresh: Arc T + Send + Sync>) -> Self { TimeCached { last_update: Instant::now(), expires_in, value: None, - refresh: Rc::new(refresh), + refresh, } } diff --git a/src/timeline/mod.rs b/src/timeline/mod.rs index 72593375..b3ed9dbe 100644 --- a/src/timeline/mod.rs +++ b/src/timeline/mod.rs @@ -9,9 +9,8 @@ use std::sync::atomic::{AtomicU32, Ordering}; use egui_virtual_list::VirtualList; use nostrdb::{Ndb, Note, Subscription, Transaction}; -use std::cell::RefCell; use std::hash::Hash; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use tracing::{debug, error}; @@ -84,7 +83,7 @@ pub struct TimelineTab { pub notes: Vec, pub selection: i32, pub filter: ViewFilter, - pub list: Rc>, + pub list: Arc>, } impl TimelineTab { @@ -97,7 +96,7 @@ impl TimelineTab { let mut list = VirtualList::new(); list.hide_on_resize(None); list.over_scan(1000.0); - let list = Rc::new(RefCell::new(list)); + let list = Arc::new(Mutex::new(list)); let notes: Vec = Vec::with_capacity(cap); TimelineTab { @@ -120,7 +119,7 @@ impl TimelineTab { // TODO: technically items could have been added inbetween if new_items > 0 { - let mut list = self.list.borrow_mut(); + let mut list = self.list.lock().unwrap(); match merge_kind { // TODO: update egui_virtual_list to support spliced inserts diff --git a/src/ui/timeline.rs b/src/ui/timeline.rs index 1c2d33c1..5b705423 100644 --- a/src/ui/timeline.rs +++ b/src/ui/timeline.rs @@ -248,7 +248,8 @@ impl<'a> TimelineTabView<'a> { self.tab .list .clone() - .borrow_mut() + .lock() + .unwrap() .ui_custom_layout(ui, len, |ui, start_index| { ui.spacing_mut().item_spacing.y = 0.0; ui.spacing_mut().item_spacing.x = 4.0;