Skip to content

Commit

Permalink
WIP: refactor relay support into subman
Browse files Browse the repository at this point in the history
Is currently in notedeck_columns::app
  • Loading branch information
ksedgwic committed Feb 17, 2025
1 parent e0bef72 commit cd41b1c
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 94 deletions.
87 changes: 83 additions & 4 deletions crates/notedeck/src/subman.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::collections::BTreeMap;
use std::fmt;
use std::{cell::RefCell, cmp::Ordering, rc::Rc};
use thiserror::Error;
use tracing::{error, info, trace, warn};
use uuid::Uuid;

use enostr::{Filter, RelayPool};
use enostr::{Filter, PoolRelay, RelayEvent, RelayMessage, RelayPool};
use nostrdb::{self, Ndb, Subscription, SubscriptionStream};

/// The Subscription Manager
Expand Down Expand Up @@ -292,6 +293,84 @@ impl SubMan {

Ok((maybe_tx_eose, SubReceiver { lclsub, rmtsub }))
}

pub fn process_relays<H: LegacyRelayHandler>(
&mut self,
legacy_relay_handler: &mut H,
) -> SubResult<()> {
let wakeup = move || {
// ignore
};
self.pool.keepalive_ping(wakeup);

// NOTE: we don't use the while let loop due to borrow issues
#[allow(clippy::while_let_loop)]
loop {
let ev = if let Some(ev) = self.pool.try_recv() {
ev.into_owned()
} else {
break;
};

match (&ev.event).into() {
RelayEvent::Opened => {
legacy_relay_handler.handle_opened(&ev.relay);
}
// TODO: handle reconnects
RelayEvent::Closed => warn!("{} connection closed", &ev.relay),
RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e),
RelayEvent::Other(msg) => trace!("other event {:?}", &msg),
RelayEvent::Message(msg) => {
self.process_message(legacy_relay_handler, &ev.relay, &msg);
}
}
}
Ok(())
}

pub fn process_message<H: LegacyRelayHandler>(
&mut self,
legacy_relay_handler: &mut H,
relay: &str,
msg: &RelayMessage,
) {
match msg {
RelayMessage::Event(_subid, ev) => {
let relay = if let Some(relay) = self.pool.relays.iter().find(|r| r.url() == relay)
{
relay
} else {
error!("couldn't find relay {} for note processing!?", relay);
return;
};

match relay {
PoolRelay::Websocket(_) => {
//info!("processing event {}", event);
if let Err(err) = self.ndb.process_event(ev) {
error!("error processing event {ev}: {err}");
}
}
PoolRelay::Multicast(_) => {
// multicast events are client events
if let Err(err) = self.ndb.process_client_event(ev) {
error!("error processing multicast event {ev}: {err}");
}
}
}
}
RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg),
RelayMessage::OK(cr) => info!("OK {:?}", cr),
RelayMessage::Eose(sid) => {
legacy_relay_handler.handle_eose(sid, relay);
}
}
}
}

pub trait LegacyRelayHandler {
fn handle_opened(&mut self, relay: &str);
fn handle_eose(&mut self, sid: &str, relay: &str);
}

struct LclSub {
Expand Down Expand Up @@ -359,9 +438,9 @@ impl SubReceiver {
// only remote sub (prefetch only, values not returned)
match rsub.rmteose.next().await {
Some(_) => Err(SubError::StreamEnded),
None => Err(SubError::InternalError(format!(
"trouble reading from rmteose"
))),
None => Err(SubError::InternalError(
"trouble reading from rmteose".to_string(),
)),
}
} else {
// query case
Expand Down
161 changes: 71 additions & 90 deletions crates/notedeck_columns/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use crate::{
};

use notedeck::{
Accounts, AppContext, DataPath, DataPathType, FilterState, ImageCache, SubError, SubMan,
UnknownIds,
subman::LegacyRelayHandler, Accounts, AppContext, DataPath, DataPathType, FilterState,
ImageCache, SubError, SubMan, UnknownIds,
};

use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage};
use enostr::{ClientMessage, Keypair, Pubkey};
use uuid::Uuid;

use egui_extras::{Size, StripBuilder};
Expand All @@ -28,7 +28,7 @@ use nostrdb::{Ndb, Transaction};
use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, warn};

#[derive(Debug, Eq, PartialEq, Clone)]
pub enum DamusState {
Expand Down Expand Up @@ -83,53 +83,78 @@ fn handle_key_events(input: &egui::InputState, columns: &mut Columns) {
}
}

fn try_process_event(
damus: &mut Damus,
app_ctx: &mut AppContext<'_>,
struct RelayHandler<'a> {
app_ctx: &'a mut AppContext<'a>,
subscriptions: &'a mut Subscriptions,
timeline_cache: &'a mut TimelineCache,
since_optimize: bool,
}

impl<'a> RelayHandler<'a> {
fn new(
app_ctx: &'a mut AppContext<'a>,
subscriptions: &'a mut Subscriptions,
timeline_cache: &'a mut TimelineCache,
since_optimize: bool,
) -> Self {
RelayHandler {
app_ctx,
subscriptions,
timeline_cache,
since_optimize,
}
}
}

impl<'a> LegacyRelayHandler for RelayHandler<'a> {
/// Handle relay opened
fn handle_opened(&mut self, relay: &str) {
self.app_ctx
.accounts
.send_initial_filters(self.app_ctx.subman.pool(), relay);
timeline::send_initial_timeline_filters(
self.app_ctx.ndb,
self.since_optimize,
self.timeline_cache,
self.subscriptions,
self.app_ctx.subman.pool(),
relay,
);
}

/// Handle end-of-stored-events
fn handle_eose(&mut self, sid: &str, relay: &str) {
if let Err(_) = handle_eose(
&*self.subscriptions,
self.timeline_cache,
self.app_ctx,
sid,
relay,
) {
// already logged ...
}
}
}

fn try_process_event<'a>(
damus: &'a mut Damus,
app_ctx: &'a mut AppContext<'a>,
ctx: &egui::Context,
) -> Result<()> {
let current_columns = get_active_columns_mut(app_ctx.accounts, &mut damus.decks_cache);
ctx.input(|i| handle_key_events(i, current_columns));

let ctx2 = ctx.clone();
let wakeup = move || {
ctx2.request_repaint();
};

app_ctx.subman.pool().keepalive_ping(wakeup);

// NOTE: we don't use the while let loop due to borrow issues
#[allow(clippy::while_let_loop)]
loop {
let ev = if let Some(ev) = app_ctx.subman.pool().try_recv() {
ev.into_owned()
} else {
break;
};

match (&ev.event).into() {
RelayEvent::Opened => {
app_ctx
.accounts
.send_initial_filters(app_ctx.subman.pool(), &ev.relay);

timeline::send_initial_timeline_filters(
app_ctx.ndb,
damus.since_optimize,
&mut damus.timeline_cache,
&mut damus.subscriptions,
app_ctx.subman.pool(),
&ev.relay,
);
}
// TODO: handle reconnects
RelayEvent::Closed => warn!("{} connection closed", &ev.relay),
RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e),
RelayEvent::Other(msg) => trace!("other event {:?}", &msg),
RelayEvent::Message(msg) => {
process_message(damus, app_ctx, &ev.relay, &msg);
}
}
{
let mut relay_handler = RelayHandler::new(
app_ctx,
&mut damus.subscriptions,
&mut damus.timeline_cache,
damus.since_optimize,
);
relay_handler
.app_ctx
.subman
.process_relays(&mut relay_handler);
}

for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() {
Expand All @@ -139,7 +164,6 @@ fn try_process_event(
app_ctx.note_cache,
timeline,
);

if is_ready {
let txn = Transaction::new(app_ctx.ndb).expect("txn");
// only thread timelines are reversed
Expand All @@ -162,7 +186,6 @@ fn try_process_event(
if app_ctx.unknown_ids.ready_to_send() {
unknown_id_send(app_ctx.unknown_ids, app_ctx.subman);
}

Ok(())
}

Expand Down Expand Up @@ -310,48 +333,6 @@ fn handle_eose(
Ok(())
}

fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) {
match msg {
RelayMessage::Event(_subid, ev) => {
let relay =
if let Some(relay) = ctx.subman.pool().relays.iter().find(|r| r.url() == relay) {
relay
} else {
error!("couldn't find relay {} for note processing!?", relay);
return;
};

match relay {
PoolRelay::Websocket(_) => {
//info!("processing event {}", event);
if let Err(err) = ctx.ndb.process_event(ev) {
error!("error processing event {ev}: {err}");
}
}
PoolRelay::Multicast(_) => {
// multicast events are client events
if let Err(err) = ctx.ndb.process_client_event(ev) {
error!("error processing multicast event {ev}: {err}");
}
}
}
}
RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg),
RelayMessage::OK(cr) => info!("OK {:?}", cr),
RelayMessage::Eose(sid) => {
if let Err(err) = handle_eose(
&damus.subscriptions,
&mut damus.timeline_cache,
ctx,
sid,
relay,
) {
error!("error handling eose: {}", err);
}
}
}
}

fn render_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ui: &mut egui::Ui) {
if notedeck::ui::is_narrow(ui.ctx()) {
render_damus_mobile(damus, app_ctx, ui);
Expand Down

0 comments on commit cd41b1c

Please sign in to comment.