From cd41b1cf602eb388070b95ab4915bc6b2ccceb4d Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Sat, 15 Feb 2025 12:05:51 -0800 Subject: [PATCH] WIP: refactor relay support into subman Is currently in notedeck_columns::app --- crates/notedeck/src/subman.rs | 87 +++++++++++++++- crates/notedeck_columns/src/app.rs | 161 +++++++++++++---------------- 2 files changed, 154 insertions(+), 94 deletions(-) diff --git a/crates/notedeck/src/subman.rs b/crates/notedeck/src/subman.rs index cd4347a3..864c4c13 100644 --- a/crates/notedeck/src/subman.rs +++ b/crates/notedeck/src/subman.rs @@ -3,9 +3,10 @@ use std::collections::BTreeMap; use std::fmt; use std::{cell::RefCell, cmp::Ordering, rc::Rc}; use thiserror::Error; +use tracing::{error, info, trace, warn}; use uuid::Uuid; -use enostr::{Filter, RelayPool}; +use enostr::{Filter, PoolRelay, RelayEvent, RelayMessage, RelayPool}; use nostrdb::{self, Ndb, Subscription, SubscriptionStream}; /// The Subscription Manager @@ -292,6 +293,84 @@ impl SubMan { Ok((maybe_tx_eose, SubReceiver { lclsub, rmtsub })) } + + pub fn process_relays( + &mut self, + legacy_relay_handler: &mut H, + ) -> SubResult<()> { + let wakeup = move || { + // ignore + }; + self.pool.keepalive_ping(wakeup); + + // NOTE: we don't use the while let loop due to borrow issues + #[allow(clippy::while_let_loop)] + loop { + let ev = if let Some(ev) = self.pool.try_recv() { + ev.into_owned() + } else { + break; + }; + + match (&ev.event).into() { + RelayEvent::Opened => { + legacy_relay_handler.handle_opened(&ev.relay); + } + // TODO: handle reconnects + RelayEvent::Closed => warn!("{} connection closed", &ev.relay), + RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), + RelayEvent::Other(msg) => trace!("other event {:?}", &msg), + RelayEvent::Message(msg) => { + self.process_message(legacy_relay_handler, &ev.relay, &msg); + } + } + } + Ok(()) + } + + pub fn process_message( + &mut self, + legacy_relay_handler: &mut H, + relay: &str, + msg: &RelayMessage, + ) { + match msg { + RelayMessage::Event(_subid, ev) => { + let relay = if let Some(relay) = self.pool.relays.iter().find(|r| r.url() == relay) + { + relay + } else { + error!("couldn't find relay {} for note processing!?", relay); + return; + }; + + match relay { + PoolRelay::Websocket(_) => { + //info!("processing event {}", event); + if let Err(err) = self.ndb.process_event(ev) { + error!("error processing event {ev}: {err}"); + } + } + PoolRelay::Multicast(_) => { + // multicast events are client events + if let Err(err) = self.ndb.process_client_event(ev) { + error!("error processing multicast event {ev}: {err}"); + } + } + } + } + RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), + RelayMessage::OK(cr) => info!("OK {:?}", cr), + RelayMessage::Eose(sid) => { + legacy_relay_handler.handle_eose(sid, relay); + } + } + } +} + +pub trait LegacyRelayHandler { + fn handle_opened(&mut self, relay: &str); + fn handle_eose(&mut self, sid: &str, relay: &str); } struct LclSub { @@ -359,9 +438,9 @@ impl SubReceiver { // only remote sub (prefetch only, values not returned) match rsub.rmteose.next().await { Some(_) => Err(SubError::StreamEnded), - None => Err(SubError::InternalError(format!( - "trouble reading from rmteose" - ))), + None => Err(SubError::InternalError( + "trouble reading from rmteose".to_string(), + )), } } else { // query case diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs index 3c28f99e..0ea7e94b 100644 --- a/crates/notedeck_columns/src/app.rs +++ b/crates/notedeck_columns/src/app.rs @@ -14,11 +14,11 @@ use crate::{ }; use notedeck::{ - Accounts, AppContext, DataPath, DataPathType, FilterState, ImageCache, SubError, SubMan, - UnknownIds, + subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState, + ImageCache, SubError, SubMan, UnknownIds, }; -use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage}; +use enostr::{ClientMessage, Keypair, Pubkey}; use uuid::Uuid; use egui_extras::{Size, StripBuilder}; @@ -28,7 +28,7 @@ use nostrdb::{Ndb, Transaction}; use std::collections::{BTreeSet, HashMap}; use std::path::Path; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; #[derive(Debug, Eq, PartialEq, Clone)] pub enum DamusState { @@ -83,53 +83,78 @@ fn handle_key_events(input: &egui::InputState, columns: &mut Columns) { } } -fn try_process_event( - damus: &mut Damus, - app_ctx: &mut AppContext<'_>, +struct RelayHandler<'a> { + app_ctx: &'a mut AppContext<'a>, + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, + since_optimize: bool, +} + +impl<'a> RelayHandler<'a> { + fn new( + app_ctx: &'a mut AppContext<'a>, + subscriptions: &'a mut Subscriptions, + timeline_cache: &'a mut TimelineCache, + since_optimize: bool, + ) -> Self { + RelayHandler { + app_ctx, + subscriptions, + timeline_cache, + since_optimize, + } + } +} + +impl<'a> LegacyRelayHandler for RelayHandler<'a> { + /// Handle relay opened + fn handle_opened(&mut self, relay: &str) { + self.app_ctx + .accounts + .send_initial_filters(self.app_ctx.subman.pool(), relay); + timeline::send_initial_timeline_filters( + self.app_ctx.ndb, + self.since_optimize, + self.timeline_cache, + self.subscriptions, + self.app_ctx.subman.pool(), + relay, + ); + } + + /// Handle end-of-stored-events + fn handle_eose(&mut self, sid: &str, relay: &str) { + if let Err(_) = handle_eose( + &*self.subscriptions, + self.timeline_cache, + self.app_ctx, + sid, + relay, + ) { + // already logged ... + } + } +} + +fn try_process_event<'a>( + damus: &'a mut Damus, + app_ctx: &'a mut AppContext<'a>, ctx: &egui::Context, ) -> Result<()> { let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache); ctx.input(|i| handle_key_events(i, current_columns)); - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; - - app_ctx.subman.pool().keepalive_ping(wakeup); - - // NOTE: we don't use the while let loop due to borrow issues - #[allow(clippy::while_let_loop)] - loop { - let ev = if let Some(ev) = app_ctx.subman.pool().try_recv() { - ev.into_owned() - } else { - break; - }; - - match (&ev.event).into() { - RelayEvent::Opened => { - app_ctx - .accounts - .send_initial_filters(app_ctx.subman.pool(), &ev.relay); - - timeline::send_initial_timeline_filters( - app_ctx.ndb, - damus.since_optimize, - &mut damus.timeline_cache, - &mut damus.subscriptions, - app_ctx.subman.pool(), - &ev.relay, - ); - } - // TODO: handle reconnects - RelayEvent::Closed => warn!("{} connection closed", &ev.relay), - RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), - RelayEvent::Other(msg) => trace!("other event {:?}", &msg), - RelayEvent::Message(msg) => { - process_message(damus, app_ctx, &ev.relay, &msg); - } - } + { + let mut relay_handler = RelayHandler::new( + app_ctx, + &mut damus.subscriptions, + &mut damus.timeline_cache, + damus.since_optimize, + ); + relay_handler + .app_ctx + .subman + .process_relays(&mut relay_handler); } for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() { @@ -139,7 +164,6 @@ fn try_process_event( app_ctx.note_cache, timeline, ); - if is_ready { let txn = Transaction::new(app_ctx.ndb).expect("txn"); // only thread timelines are reversed @@ -162,7 +186,6 @@ fn try_process_event( if app_ctx.unknown_ids.ready_to_send() { unknown_id_send(app_ctx.unknown_ids, app_ctx.subman); } - Ok(()) } @@ -310,48 +333,6 @@ fn handle_eose( Ok(()) } -fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) { - match msg { - RelayMessage::Event(_subid, ev) => { - let relay = - if let Some(relay) = ctx.subman.pool().relays.iter().find(|r| r.url() == relay) { - relay - } else { - error!("couldn't find relay {} for note processing!?", relay); - return; - }; - - match relay { - PoolRelay::Websocket(_) => { - //info!("processing event {}", event); - if let Err(err) = ctx.ndb.process_event(ev) { - error!("error processing event {ev}: {err}"); - } - } - PoolRelay::Multicast(_) => { - // multicast events are client events - if let Err(err) = ctx.ndb.process_client_event(ev) { - error!("error processing multicast event {ev}: {err}"); - } - } - } - } - RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), - RelayMessage::OK(cr) => info!("OK {:?}", cr), - RelayMessage::Eose(sid) => { - if let Err(err) = handle_eose( - &damus.subscriptions, - &mut damus.timeline_cache, - ctx, - sid, - relay, - ) { - error!("error handling eose: {}", err); - } - } - } -} - fn render_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ui: &mut egui::Ui) { if notedeck::ui::is_narrow(ui.ctx()) { render_damus_mobile(damus, app_ctx, ui);