diff --git a/Cargo.lock b/Cargo.lock index e8b0ecbbce2..b8fc7360647 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1017,7 +1017,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -1435,8 +1435,10 @@ dependencies = [ "http", "js-sys", "lru", + "once_cell", "platform-wallet", "rs-dapi-client", + "rs-dash-event-bus", "rs-sdk-trusted-context-provider", "rustls-pemfile", "sanitize-filename", @@ -1876,16 +1878,19 @@ dependencies = [ "regex", "reopen", "rocksdb 0.23.0", + "rs-dash-event-bus", "rust_decimal", "rust_decimal_macros", "serde", "serde_json", + "sha2", "simple-signer", "strategy-tests", "tempfile", "tenderdash-abci", "thiserror 1.0.69", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-subscriber", @@ -2105,7 +2110,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.0", ] [[package]] @@ -3208,7 +3213,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3472,7 +3477,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if 1.0.3", - "windows-targets 0.48.5", + "windows-targets 0.53.3", ] [[package]] @@ -3649,9 +3654,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.16.2" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034" +checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" dependencies = [ "base64 0.22.1", "http-body-util", @@ -3662,16 +3667,16 @@ dependencies = [ "metrics", "metrics-util", "quanta", - "thiserror 1.0.69", + "thiserror 2.0.16", "tokio", "tracing", ] [[package]] name = "metrics-util" -version = "0.19.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +checksum = "fe8db7a05415d0f919ffb905afa37784f71901c9a773188876984b4f769ab986" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -5066,6 +5071,19 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "rs-dash-event-bus" +version = "2.1.0-dev.7" +dependencies = [ + "dapi-grpc", + "futures", + "metrics", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + [[package]] name = "rs-sdk-ffi" version = "2.1.0-dev.7" @@ -5200,7 +5218,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -5213,7 +5231,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.52.0", + "windows-sys 0.61.0", ] [[package]] @@ -6011,7 +6029,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.2", - "windows-sys 0.52.0", + "windows-sys 0.61.0", ] [[package]] @@ -6311,6 +6329,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -6335,6 +6354,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] @@ -7241,7 +7261,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0041d4db196..49592cb46ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "packages/rs-sdk-ffi", "packages/wasm-drive-verify", "packages/dash-platform-balance-checker", + "packages/rs-dash-event-bus", "packages/rs-platform-wallet", "packages/wasm-sdk", ] diff --git a/Dockerfile b/Dockerfile index 5560419149b..e6e836e9944 100644 --- a/Dockerfile +++ b/Dockerfile @@ -373,6 +373,7 @@ COPY --parents \ packages/rs-platform-versioning \ packages/rs-platform-value-convertible \ packages/rs-drive-abci \ + packages/rs-dash-event-bus \ packages/dashpay-contract \ packages/withdrawals-contract \ packages/masternode-reward-shares-contract \ @@ -451,6 +452,7 @@ COPY --parents \ .cargo \ packages/dapi-grpc \ packages/rs-dapi-grpc-macros \ + packages/rs-dash-event-bus \ packages/rs-dpp \ packages/rs-drive \ packages/rs-platform-value \ @@ -553,6 +555,7 @@ COPY --parents \ Cargo.toml \ rust-toolchain.toml \ .cargo \ + packages/rs-dash-event-bus \ packages/rs-dpp \ packages/rs-platform-value \ packages/rs-platform-serialization \ diff --git a/packages/dapi-grpc/protos/platform/v0/platform.proto b/packages/dapi-grpc/protos/platform/v0/platform.proto index 46be29d86bc..09406c8a303 100644 --- a/packages/dapi-grpc/protos/platform/v0/platform.proto +++ b/packages/dapi-grpc/protos/platform/v0/platform.proto @@ -6,6 +6,94 @@ package org.dash.platform.dapi.v0; import "google/protobuf/timestamp.proto"; +// Platform events streaming (v0) +message PlatformEventsCommand { + message PlatformEventsCommandV0 { + oneof command { + AddSubscriptionV0 add = 1; + RemoveSubscriptionV0 remove = 2; + PingV0 ping = 3; + } + } + oneof version { PlatformEventsCommandV0 v0 = 1; } +} + +message PlatformEventsResponse { + message PlatformEventsResponseV0 { + oneof response { + PlatformEventMessageV0 event = 1; + AckV0 ack = 2; + PlatformErrorV0 error = 3; + } + } + oneof version { PlatformEventsResponseV0 v0 = 1; } +} + +message AddSubscriptionV0 { + string client_subscription_id = 1; + PlatformFilterV0 filter = 2; +} + +message RemoveSubscriptionV0 { + string client_subscription_id = 1; +} + +message PingV0 { uint64 nonce = 1; } + +message AckV0 { + string client_subscription_id = 1; + string op = 2; // "add" | "remove" +} + +message PlatformErrorV0 { + string client_subscription_id = 1; + uint32 code = 2; + string message = 3; +} + +message PlatformEventMessageV0 { + string client_subscription_id = 1; + PlatformEventV0 event = 2; +} + +// Initial placeholder filter and event to be refined during integration +// Filter for StateTransitionResult events +message StateTransitionResultFilter { + // When set, only match StateTransitionResult events for this tx hash. + optional bytes tx_hash = 1; +} + +message PlatformFilterV0 { + oneof kind { + bool all = 1; // subscribe to all platform events + bool block_committed = 2; // subscribe to BlockCommitted events only + StateTransitionResultFilter state_transition_result = 3; // subscribe to StateTransitionResult events (optionally filtered by tx_hash) + } +} + +message PlatformEventV0 { + message BlockMetadata { + uint64 height = 1 [ jstype = JS_STRING ]; + uint64 time_ms = 2 [ jstype = JS_STRING ]; + bytes block_id_hash = 3; + } + + message BlockCommitted { + BlockMetadata meta = 1; + uint32 tx_count = 2; + } + + message StateTransitionFinalized { + BlockMetadata meta = 1; + bytes tx_hash = 2; + } + + oneof event { + BlockCommitted block_committed = 1; + StateTransitionFinalized state_transition_finalized = 2; + } +} + service Platform { rpc broadcastStateTransition(BroadcastStateTransitionRequest) returns (BroadcastStateTransitionResponse); @@ -102,6 +190,10 @@ service Platform { rpc getGroupActions(GetGroupActionsRequest) returns (GetGroupActionsResponse); rpc getGroupActionSigners(GetGroupActionSignersRequest) returns (GetGroupActionSignersResponse); + + // Bi-directional stream for multiplexed platform events subscriptions + rpc subscribePlatformEvents(stream PlatformEventsCommand) + returns (stream PlatformEventsResponse); } // Proof message includes cryptographic proofs for validating responses diff --git a/packages/rs-dash-event-bus/Cargo.toml b/packages/rs-dash-event-bus/Cargo.toml new file mode 100644 index 00000000000..52c070ad552 --- /dev/null +++ b/packages/rs-dash-event-bus/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "rs-dash-event-bus" +version = "2.1.0-dev.7" +edition = "2024" +license = "MIT" +description = "Shared event bus and Platform events multiplexer for Dash Platform (rs-dapi, rs-drive-abci, rs-sdk)" + +[lib] +name = "dash_event_bus" +path = "src/lib.rs" + +[features] +default = [] +metrics = ["dep:metrics"] + +[dependencies] +tokio = { version = "1", features = ["rt", "macros", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["sync"] } +tokio-util = { version = "0.7", features = ["rt"] } +tracing = "0.1" +futures = "0.3" + +# Internal workspace crates +dapi-grpc = { path = "../dapi-grpc" } + +# Optional metrics +metrics = { version = "0.24.2", optional = true } + +[dev-dependencies] +tokio = { version = "1", features = [ + "rt-multi-thread", + "macros", + "sync", + "time", +] } diff --git a/packages/rs-dash-event-bus/src/event_bus.rs b/packages/rs-dash-event-bus/src/event_bus.rs new file mode 100644 index 00000000000..dbb7398cd19 --- /dev/null +++ b/packages/rs-dash-event-bus/src/event_bus.rs @@ -0,0 +1,526 @@ +//! Generic, clonable in-process event bus with pluggable filtering. + +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{Mutex, RwLock, mpsc}; + +const DEFAULT_SUBSCRIPTION_CAPACITY: usize = 256; + +/// Filter trait for event matching on a specific event type. +pub trait Filter: Send + Sync { + /// Return true if the event matches the filter. + fn matches(&self, event: &E) -> bool; +} + +/// Internal subscription structure. +/// +/// Note: no Clone impl, so that dropping the sender closes the channel. +struct Subscription { + filter: F, + sender: mpsc::Sender, +} + +/// Generic, clonable in-process event bus with pluggable filtering. +pub struct EventBus { + subs: Arc>>>, + counter: Arc, + tasks: Arc>>, // tasks spawned for this subscription, cancelled on drop + channel_capacity: usize, +} + +impl Clone for EventBus { + fn clone(&self) -> Self { + Self { + subs: Arc::clone(&self.subs), + counter: Arc::clone(&self.counter), + tasks: Arc::clone(&self.tasks), + channel_capacity: self.channel_capacity, + } + } +} + +impl Default for EventBus +where + E: Clone + Send + 'static, + F: Filter + Send + Sync + Debug + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +impl EventBus { + /// Remove a subscription by id and update metrics. + pub async fn remove_subscription(&self, id: u64) { + let mut subs = self.subs.write().await; + if subs.remove(&id).is_some() { + metrics_unsubscribe_inc(); + metrics_active_gauge_set(subs.len()); + tracing::debug!("event_bus: removed subscription id={}", id); + } else { + tracing::debug!("event_bus: subscription id={} not found, not removed", id); + } + } +} + +impl EventBus +where + E: Clone + Send + 'static, + F: Filter + Debug + Send + Sync + 'static, +{ + /// Create a new, empty event bus. + pub fn new() -> Self { + Self::with_capacity(DEFAULT_SUBSCRIPTION_CAPACITY) + } + + /// Create a new event bus with a custom per-subscription channel capacity. + pub fn with_capacity(capacity: usize) -> Self { + metrics_register_once(); + Self { + subs: Arc::new(RwLock::new(BTreeMap::new())), + counter: Arc::new(AtomicU64::new(0)), + tasks: Arc::new(Mutex::new(tokio::task::JoinSet::new())), + channel_capacity: capacity.max(1), + } + } + + /// Add a new subscription using the provided filter. + pub async fn add_subscription(&self, filter: F) -> SubscriptionHandle { + tracing::trace!(?filter, "event_bus: adding subscription"); + + let id = self.counter.fetch_add(1, Ordering::SeqCst); + let (tx, rx) = mpsc::channel::(self.channel_capacity); + + let sub = Subscription { filter, sender: tx }; + + { + let mut subs = self.subs.write().await; + subs.insert(id, sub); + metrics_active_gauge_set(subs.len()); + metrics_subscribe_inc(); + } + tracing::debug!(sub_id = id, "event_bus: added subscription"); + + SubscriptionHandle { + id, + rx: Arc::new(Mutex::new(rx)), + drop: true, + event_bus: self.clone(), + } + } + + /// Publish an event to all subscribers whose filters match, using + /// the current Tokio runtime if available, otherwise log a warning. + /// + /// This is a best-effort, fire-and-forget variant of `notify`. + pub fn notify_sync(&self, event: E) { + let bus = self.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + bus.notify(event).await; + }); + } else { + tracing::warn!("event_bus.notify_sync: no current tokio runtime"); + } + } + + /// Publish an event to all subscribers whose filters match. + pub async fn notify(&self, event: E) { + metrics_events_published_inc(); + + let mut targets = Vec::new(); + { + let subs_guard = self.subs.read().await; + for (id, sub) in subs_guard.iter() { + if sub.filter.matches(&event) { + targets.push((*id, sub.sender.clone())); + } + } + } + + if targets.is_empty() { + return; + } + + let mut dead = Vec::new(); + + for (id, sender) in targets.into_iter() { + let payload = event.clone(); + + match sender.try_send(payload) { + Ok(()) => { + metrics_events_delivered_inc(); + tracing::trace!(subscription_id = id, "event_bus: event delivered"); + } + Err(TrySendError::Full(_value)) => { + metrics_events_dropped_inc(); + tracing::warn!( + subscription_id = id, + "event_bus: subscriber queue full, removing laggy subscriber to protect others" + ); + // Drop the event for this subscriber and remove subscription + dead.push(id); + } + Err(TrySendError::Closed(_value)) => { + metrics_events_dropped_inc(); + dead.push(id); + } + } + } + + for id in dead { + tracing::debug!( + subscription_id = id, + "event_bus: removing dead subscription" + ); + self.remove_subscription(id).await; + } + } + + /// Get the current number of active subscriptions. + pub async fn subscription_count(&self) -> usize { + self.subs.read().await.len() + } + + /// Copy all event messages from an unbounded mpsc receiver into the event bus. + pub async fn copy_from_unbounded_mpsc(&self, mut rx: mpsc::UnboundedReceiver) { + let bus = self.clone(); + let mut tasks = self.tasks.lock().await; + tasks.spawn(async move { + while let Some(event) = rx.recv().await { + bus.notify(event).await; + } + }); + } +} + +/// RAII subscription handle; dropping the last clone removes the subscription. +pub struct SubscriptionHandle +where + E: Send + 'static, + F: Send + Sync + 'static, +{ + id: u64, + rx: Arc>>, + event_bus: EventBus, + drop: bool, // true only for primary handles +} + +impl Clone for SubscriptionHandle +where + E: Send + 'static, + F: Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + id: self.id, + rx: Arc::clone(&self.rx), + event_bus: self.event_bus.clone(), + drop: self.drop, + } + } +} + +impl SubscriptionHandle +where + E: Send + 'static, + F: Send + Sync + 'static, +{ + /// Get the unique ID of this subscription. + pub fn id(&self) -> u64 { + self.id + } + + /// Receive the next event for this subscription. + pub async fn recv(&self) -> Option { + let mut rx = self.rx.lock().await; + rx.recv().await + } + + /// Disable automatic unsubscription when the last handle is dropped. + /// + /// By default, dropping the final [`SubscriptionHandle`] removes the + /// subscription from the [`EventBus`]. Calling this method keeps the + /// subscription registered so that the caller can explicitly remove it + /// via [`EventBus::remove_subscription`]. + pub fn no_unsubscribe_on_drop(mut self) -> Self { + self.drop = false; + self + } +} + +impl Drop for SubscriptionHandle +where + E: Send + 'static, + F: Send + Sync + 'static, +{ + fn drop(&mut self) { + if self.drop { + // Remove only when the last clone of this handle is dropped + if Arc::strong_count(&self.rx) == 1 { + let bus = self.event_bus.clone(); + let id = self.id; + + // Prefer removing via Tokio if a runtime is available + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + bus.remove_subscription(id).await; + }); + } else { + // Fallback: best-effort synchronous removal using try_write() + if let Ok(mut subs) = bus.subs.try_write() + && subs.remove(&id).is_some() { + metrics_unsubscribe_inc(); + metrics_active_gauge_set(subs.len()); + } + } + } + } + } +} + +// ---- Metrics helpers (gated) ---- + +#[cfg(feature = "metrics")] +mod met { + use metrics::{counter, describe_counter, describe_gauge, gauge}; + use std::sync::Once; + + pub const ACTIVE_SUBSCRIPTIONS: &str = "event_bus_active_subscriptions"; + pub const SUBSCRIBE_TOTAL: &str = "event_bus_subscribe_total"; + pub const UNSUBSCRIBE_TOTAL: &str = "event_bus_unsubscribe_total"; + pub const EVENTS_PUBLISHED_TOTAL: &str = "event_bus_events_published_total"; + pub const EVENTS_DELIVERED_TOTAL: &str = "event_bus_events_delivered_total"; + pub const EVENTS_DROPPED_TOTAL: &str = "event_bus_events_dropped_total"; + + pub fn register_metrics_once() { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + describe_gauge!( + ACTIVE_SUBSCRIPTIONS, + "Current number of active event bus subscriptions" + ); + describe_counter!( + SUBSCRIBE_TOTAL, + "Total subscriptions created on the event bus" + ); + describe_counter!( + UNSUBSCRIBE_TOTAL, + "Total subscriptions removed from the event bus" + ); + describe_counter!( + EVENTS_PUBLISHED_TOTAL, + "Total events published to the event bus" + ); + describe_counter!( + EVENTS_DELIVERED_TOTAL, + "Total events delivered to subscribers" + ); + describe_counter!( + EVENTS_DROPPED_TOTAL, + "Total events dropped due to dead subscribers" + ); + }); + } + + pub fn active_gauge_set(n: usize) { + gauge!(ACTIVE_SUBSCRIPTIONS).set(n as f64); + } + pub fn subscribe_inc() { + counter!(SUBSCRIBE_TOTAL).increment(1); + } + pub fn unsubscribe_inc() { + counter!(UNSUBSCRIBE_TOTAL).increment(1); + } + pub fn events_published_inc() { + counter!(EVENTS_PUBLISHED_TOTAL).increment(1); + } + pub fn events_delivered_inc() { + counter!(EVENTS_DELIVERED_TOTAL).increment(1); + } + pub fn events_dropped_inc() { + counter!(EVENTS_DROPPED_TOTAL).increment(1); + } +} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_register_once() { + met::register_metrics_once() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_register_once() {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_active_gauge_set(n: usize) { + met::active_gauge_set(n) +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_active_gauge_set(_n: usize) {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_subscribe_inc() { + met::subscribe_inc() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_subscribe_inc() {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_unsubscribe_inc() { + met::unsubscribe_inc() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_unsubscribe_inc() {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_events_published_inc() { + met::events_published_inc() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_events_published_inc() {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_events_delivered_inc() { + met::events_delivered_inc() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_events_delivered_inc() {} + +#[cfg(feature = "metrics")] +#[inline] +fn metrics_events_dropped_inc() { + met::events_dropped_inc() +} +#[cfg(not(feature = "metrics"))] +#[inline] +fn metrics_events_dropped_inc() {} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{Duration, timeout}; + + #[derive(Clone, Debug, PartialEq)] + enum Evt { + Num(u32), + } + + #[derive(Clone, Debug)] + struct EvenOnly; + + impl Filter for EvenOnly { + fn matches(&self, e: &Evt) -> bool { + matches!(e, Evt::Num(n) if n % 2 == 0) + } + } + + #[tokio::test] + async fn basic_subscribe_and_notify() { + let bus: EventBus = EventBus::new(); + let sub = bus.add_subscription(EvenOnly).await; + + bus.notify(Evt::Num(1)).await; // filtered out + bus.notify(Evt::Num(2)).await; // delivered + + let got = timeout(Duration::from_millis(200), sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(got, Evt::Num(2)); + } + + #[tokio::test] + async fn drop_removes_subscription() { + let bus: EventBus = EventBus::new(); + let sub = bus.add_subscription(EvenOnly).await; + assert_eq!(bus.subscription_count().await, 1); + drop(sub); + + for _ in 0..10 { + if bus.subscription_count().await == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert_eq!(bus.subscription_count().await, 0); + } + + #[tokio::test] + async fn multiple_events_delivered() { + let bus: EventBus = EventBus::new(); + let sub = bus.add_subscription(EvenOnly).await; + + bus.notify(Evt::Num(2)).await; + bus.notify(Evt::Num(12)).await; + + let a = timeout(Duration::from_millis(200), sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(a, Evt::Num(2)); + let b = timeout(Duration::from_millis(200), sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(b, Evt::Num(12)); + } + + #[tokio::test] + async fn no_unsubscribe_on_drop_allows_manual_cleanup() { + let bus: EventBus = EventBus::new(); + let handle = bus + .add_subscription(EvenOnly) + .await + .no_unsubscribe_on_drop(); + let id = handle.id(); + + drop(handle); + // Automatic removal should not happen + assert_eq!(bus.subscription_count().await, 1); + + bus.remove_subscription(id).await; + assert_eq!(bus.subscription_count().await, 0); + } + + #[tokio::test] + async fn unsubscribe() { + let bus: EventBus = EventBus::new(); + let sub = bus.add_subscription(EvenOnly).await; + + bus.notify(Evt::Num(2)).await; + bus.notify(Evt::Num(12)).await; + + bus.remove_subscription(sub.id()).await; + + bus.notify(Evt::Num(3)).await; // not delivered as we already unsubscribed + + let a = timeout(Duration::from_millis(200), sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(a, Evt::Num(2)); + let b = timeout(Duration::from_millis(200), sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(b, Evt::Num(12)); + + let c = timeout(Duration::from_millis(200), sub.recv()).await; + assert!(c.unwrap().is_none(), "only two events should be received",); + } +} diff --git a/packages/rs-dash-event-bus/src/event_mux.rs b/packages/rs-dash-event-bus/src/event_mux.rs new file mode 100644 index 00000000000..b891eb9824c --- /dev/null +++ b/packages/rs-dash-event-bus/src/event_mux.rs @@ -0,0 +1,1049 @@ +//! EventMux: a generic multiplexer between multiple Platform event subscribers +//! and producers. Subscribers send `PlatformEventsCommand` and receive +//! `PlatformEventsResponse`. Producers receive commands and generate responses. +//! +//! Features: +//! - Multiple subscribers and producers +//! - Round-robin dispatch of commands to producers +//! - Register per-subscriber filters on Add, remove on Remove +//! - Fan-out responses to all subscribers whose filters match + +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use dapi_grpc::platform::v0::PlatformEventsCommand; +use dapi_grpc::platform::v0::platform_events_command::Version as CmdVersion; +use dapi_grpc::platform::v0::platform_events_command::platform_events_command_v0::Command as Cmd; +use dapi_grpc::platform::v0::platform_events_response::platform_events_response_v0::Response as Resp; +use dapi_grpc::tonic::Status; +use futures::SinkExt; +use tokio::join; +use tokio::sync::{Mutex, mpsc}; +use tokio_util::sync::PollSender; + +use crate::event_bus::{EventBus, Filter as EventFilter, SubscriptionHandle}; +use dapi_grpc::platform::v0::PlatformEventsResponse; +use dapi_grpc::platform::v0::PlatformFilterV0; + +pub type EventsCommandResult = Result; +pub type EventsResponseResult = Result; + +const COMMAND_CHANNEL_CAPACITY: usize = 128; +const RESPONSE_CHANNEL_CAPACITY: usize = 512; + +pub type CommandSender = mpsc::Sender; +pub type CommandReceiver = mpsc::Receiver; + +pub type ResponseSender = mpsc::Sender; +pub type ResponseReceiver = mpsc::Receiver; + +/// EventMux: manages subscribers and producers, routes commands and responses. +pub struct EventMux { + bus: EventBus, + producers: Arc>>>, + rr_counter: Arc, + tasks: Arc>>, + subscriptions: Arc>>, + next_subscriber_id: Arc, +} + +impl Default for EventMux { + fn default() -> Self { + Self::new() + } +} + +impl EventMux { + async fn handle_subscriber_disconnect(&self, subscriber_id: u64) { + tracing::debug!(subscriber_id, "event_mux: handling subscriber disconnect"); + self.remove_subscriber(subscriber_id).await; + } + /// Create a new, empty EventMux without producers or subscribers. + pub fn new() -> Self { + Self { + bus: EventBus::new(), + producers: Arc::new(Mutex::new(Vec::new())), + rr_counter: Arc::new(AtomicUsize::new(0)), + tasks: Arc::new(Mutex::new(tokio::task::JoinSet::new())), + subscriptions: Arc::new(std::sync::Mutex::new(BTreeMap::new())), + next_subscriber_id: Arc::new(AtomicUsize::new(1)), + } + } + + /// Register a new producer. Returns an `EventProducer` comprised of: + /// - `cmd_rx`: producer receives commands from the mux + /// - `resp_tx`: producer sends generated responses into the mux + pub async fn add_producer(&self) -> EventProducer { + let (cmd_tx, cmd_rx) = mpsc::channel::(COMMAND_CHANNEL_CAPACITY); + let (resp_tx, resp_rx) = mpsc::channel::(RESPONSE_CHANNEL_CAPACITY); + + // Store command sender so mux can forward commands via round-robin + { + let mut prods = self.producers.lock().await; + prods.push(Some(cmd_tx)); + } + + // Route producer responses into the event bus + let bus = self.bus.clone(); + let mux = self.clone(); + let producer_index = { + let prods = self.producers.lock().await; + prods.len().saturating_sub(1) + }; + { + let mut tasks = self.tasks.lock().await; + tasks.spawn(async move { + let mut rx = resp_rx; + while let Some(resp) = rx.recv().await { + match resp { + Ok(response) => { + bus.notify(response).await; + } + Err(e) => { + tracing::error!(error = %e, "event_mux: producer response error"); + } + } + } + + // producer disconnected + tracing::warn!(index = producer_index, "event_mux: producer disconnected"); + mux.on_producer_disconnected(producer_index).await; + }); + } + + EventProducer { cmd_rx, resp_tx } + } + + /// Register a new subscriber. + /// + /// Subscriber is automatically cleaned up when channels are closed. + pub async fn add_subscriber(&self) -> EventSubscriber { + let (sub_cmd_tx, sub_cmd_rx) = + mpsc::channel::(COMMAND_CHANNEL_CAPACITY); + let (sub_resp_tx, sub_resp_rx) = + mpsc::channel::(RESPONSE_CHANNEL_CAPACITY); + + let mux = self.clone(); + let subscriber_id = self.next_subscriber_id.fetch_add(1, Ordering::Relaxed) as u64; + + { + let mut tasks = self.tasks.lock().await; + tasks.spawn(async move { + mux.run_subscriber_loop(subscriber_id, sub_cmd_rx, sub_resp_tx) + .await; + }); + } + + EventSubscriber { + cmd_tx: sub_cmd_tx, + resp_rx: sub_resp_rx, + } + } + + async fn run_subscriber_loop( + self, + subscriber_id: u64, + mut sub_cmd_rx: CommandReceiver, + sub_resp_tx: ResponseSender, + ) { + tracing::debug!(subscriber_id, "event_mux: starting subscriber loop"); + + loop { + let cmd = match sub_cmd_rx.recv().await { + Some(Ok(c)) => c, + Some(Err(e)) => { + tracing::warn!(subscriber_id, error=%e, "event_mux: subscriber command error"); + continue; + } + None => { + tracing::debug!( + subscriber_id, + "event_mux: subscriber command channel closed" + ); + break; + } + }; + + if let Some(CmdVersion::V0(v0)) = &cmd.version { + match &v0.command { + Some(Cmd::Add(add)) => { + let id = add.client_subscription_id.clone(); + tracing::debug!(subscriber_id, subscription_id = %id, "event_mux: adding subscription"); + + // If a subscription with this id already exists for this subscriber, + // remove it first to avoid duplicate fan-out and leaked handles. + if let Some((prev_sub_id, prev_handle_id, prev_assigned)) = { + let subs = self.subscriptions.lock().unwrap(); + subs.get(&SubscriptionKey { + subscriber_id, + id: id.clone(), + }) + .map(|info| { + (info.subscriber_id, info.handle.id(), info.assigned_producer) + }) + } + && prev_sub_id == subscriber_id { + tracing::warn!( + subscriber_id, + subscription_id = %id, + "event_mux: duplicate Add detected, removing previous subscription first" + ); + // Remove previous bus subscription + self.bus.remove_subscription(prev_handle_id).await; + // Notify previously assigned producer about removal + if let Some(prev_idx) = prev_assigned + && let Some(tx) = self.get_producer_tx(prev_idx).await { + let remove_cmd = PlatformEventsCommand { + version: Some(CmdVersion::V0( + dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0 { + command: Some(Cmd::Remove( + dapi_grpc::platform::v0::RemoveSubscriptionV0 { + client_subscription_id: id.clone(), + }, + )), + }, + )), + }; + if tx.send(Ok(remove_cmd)).await.is_err() { + tracing::debug!( + subscription_id = %id, + "event_mux: failed to send duplicate Remove to producer" + ); + } + } + // Drop previous mapping entry (it will be replaced below) + let _ = { + self.subscriptions.lock().unwrap().remove(&SubscriptionKey { + subscriber_id, + id: id.clone(), + }) + }; + } + + // Create subscription filtered by client_subscription_id and forward events + let handle = self + .bus + .add_subscription(IdFilter { id: id.clone() }) + .await + .no_unsubscribe_on_drop(); + + { + let mut subs = self.subscriptions.lock().unwrap(); + subs.insert( + SubscriptionKey { + subscriber_id, + id: id.clone(), + }, + SubscriptionInfo { + subscriber_id, + filter: add.filter.clone(), + assigned_producer: None, + handle: handle.clone(), + }, + ); + } + + // Assign producer for this subscription + if let Some((_idx, prod_tx)) = self + .assign_producer_for_subscription(subscriber_id, &id) + .await + { + if prod_tx.send(Ok(cmd)).await.is_err() { + tracing::debug!(subscription_id = %id, "event_mux: failed to send Add to producer - channel closed"); + } + } else { + // TODO: handle no producers available, possibly spawned jobs didn't start yet + tracing::warn!(subscription_id = %id, "event_mux: no producers available for Add"); + } + + // Start fan-out task for this subscription + let tx = sub_resp_tx.clone(); + let mux = self.clone(); + let sub_id = subscriber_id; + let mut tasks = self.tasks.lock().await; + tasks.spawn(async move { + let h = handle; + loop { + match h.recv().await { + Some(resp) => { + if tx.send(Ok(resp)).await.is_err() { + tracing::debug!(subscription_id = %id, "event_mux: failed to send response - subscriber channel closed"); + mux.handle_subscriber_disconnect(sub_id).await; + break; + } + } + None => { + tracing::debug!(subscription_id = %id, "event_mux: subscription ended"); + mux.handle_subscriber_disconnect(sub_id).await; + break; + } + } + } + }); + } + Some(Cmd::Remove(rem)) => { + let id = rem.client_subscription_id.clone(); + tracing::debug!(subscriber_id, subscription_id = %id, "event_mux: removing subscription"); + + // Remove subscription from bus and registry, and get assigned producer + let removed = { + self.subscriptions.lock().unwrap().remove(&SubscriptionKey { + subscriber_id, + id: id.clone(), + }) + }; + let assigned = if let Some(info) = removed { + self.bus.remove_subscription(info.handle.id()).await; + info.assigned_producer + } else { + None + }; + + if let Some(idx) = assigned + && let Some(tx) = self.get_producer_tx(idx).await + && tx.send(Ok(cmd)).await.is_err() { + tracing::debug!(subscription_id = %id, "event_mux: failed to send Remove to producer - channel closed"); + self.handle_subscriber_disconnect(subscriber_id).await; + } + } + _ => {} + } + } + } + + // subscriber disconnected: use the centralized cleanup method + tracing::debug!(subscriber_id, "event_mux: subscriber disconnected"); + self.handle_subscriber_disconnect(subscriber_id).await; + } + + /// Remove a subscriber and clean up all associated resources + pub async fn remove_subscriber(&self, subscriber_id: u64) { + tracing::debug!(subscriber_id, "event_mux: removing subscriber"); + + // Get all subscription IDs for this subscriber by iterating through subscriptions + let keys: Vec = { + let subs = self.subscriptions.lock().unwrap(); + subs.iter() + .filter_map(|(key, info)| { + if info.subscriber_id == subscriber_id { + Some(key.clone()) + } else { + None + } + }) + .collect() + }; + + tracing::debug!( + subscriber_id, + subscription_count = keys.len(), + "event_mux: found subscriptions for subscriber" + ); + + // Remove each subscription from the bus and notify producers + for key in keys { + let id = key.id.clone(); + let removed = { self.subscriptions.lock().unwrap().remove(&key) }; + let assigned = if let Some(info) = removed { + self.bus.remove_subscription(info.handle.id()).await; + tracing::debug!(subscription_id = %id, "event_mux: removed subscription from bus"); + info.assigned_producer + } else { + None + }; + + // Send remove command to assigned producer + if let Some(idx) = assigned + && let Some(tx) = self.get_producer_tx(idx).await { + let cmd = PlatformEventsCommand { + version: Some(CmdVersion::V0( + dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0 { + command: Some(Cmd::Remove( + dapi_grpc::platform::v0::RemoveSubscriptionV0 { + client_subscription_id: id.clone(), + }, + )), + }, + )), + }; + if tx.send(Ok(cmd)).await.is_err() { + tracing::debug!(subscription_id = %id, "event_mux: failed to send Remove to producer - channel closed"); + } else { + tracing::debug!(subscription_id = %id, "event_mux: sent Remove command to producer"); + } + } + } + + tracing::debug!(subscriber_id, "event_mux: subscriber removed"); + } + + async fn assign_producer_for_subscription( + &self, + subscriber_id: u64, + subscription_id: &str, + ) -> Option<(usize, CommandSender)> { + let prods_guard = self.producers.lock().await; + if prods_guard.is_empty() { + return None; + } + // Prefer existing assignment + { + let subs = self.subscriptions.lock().unwrap(); + if let Some(info) = subs.get(&SubscriptionKey { + subscriber_id, + id: subscription_id.to_string(), + }) + && let Some(idx) = info.assigned_producer + && let Some(Some(tx)) = prods_guard.get(idx) { + return Some((idx, tx.clone())); + } + } + // Use round-robin assignment for new subscriptions + let idx = self.rr_counter.fetch_add(1, Ordering::Relaxed) % prods_guard.len(); + let mut chosen_idx = idx; + + // Find first alive producer starting from round-robin position + let chosen = loop { + if let Some(Some(tx)) = prods_guard.get(chosen_idx) { + break Some((chosen_idx, tx.clone())); + } + chosen_idx = (chosen_idx + 1) % prods_guard.len(); + if chosen_idx == idx { + break None; // Cycled through all producers + } + }; + + drop(prods_guard); + if let Some((idx, tx)) = chosen { + if let Some(info) = self + .subscriptions + .lock() + .unwrap() + .get_mut(&SubscriptionKey { + subscriber_id, + id: subscription_id.to_string(), + }) + { + info.assigned_producer = Some(idx); + } + Some((idx, tx)) + } else { + None + } + } + + async fn get_producer_tx(&self, idx: usize) -> Option { + let prods = self.producers.lock().await; + prods.get(idx).and_then(|o| o.as_ref().cloned()) + } + + async fn on_producer_disconnected(&self, index: usize) { + // mark slot None + { + let mut prods = self.producers.lock().await; + if index < prods.len() { + prods[index] = None; + } + } + // collect affected subscribers + let affected_subscribers: BTreeSet = { + let subs = self.subscriptions.lock().unwrap(); + subs.iter() + .filter_map(|(_id, info)| { + if info.assigned_producer == Some(index) { + Some(info.subscriber_id) + } else { + None + } + }) + .collect() + }; + + // Remove all affected subscribers using the centralized method + for sub_id in affected_subscribers { + tracing::warn!( + subscriber_id = sub_id, + producer_index = index, + "event_mux: closing subscriber due to producer disconnect" + ); + self.remove_subscriber(sub_id).await; + } + // Note: reconnection of the actual producer transport is delegated to the caller. + } +} + +// Hashing moved to murmur3::murmur3_32 for deterministic producer selection. + +impl Clone for EventMux { + fn clone(&self) -> Self { + Self { + bus: self.bus.clone(), + producers: self.producers.clone(), + rr_counter: self.rr_counter.clone(), + tasks: self.tasks.clone(), + subscriptions: self.subscriptions.clone(), + next_subscriber_id: self.next_subscriber_id.clone(), + } + } +} + +impl EventMux { + /// Convenience API: subscribe directly with a filter and receive a subscription handle. + /// This method creates an internal subscription keyed by a generated client_subscription_id, + /// assigns a producer, sends the Add command upstream, and returns the id with an event bus handle. + pub async fn subscribe( + &self, + filter: PlatformFilterV0, + ) -> Result<(String, SubscriptionHandle), Status> { + let subscriber_id = self.next_subscriber_id.fetch_add(1, Ordering::Relaxed) as u64; + let id = format!("sub-{}", subscriber_id); + + // Create bus subscription and register mapping + let handle = self.bus.add_subscription(IdFilter { id: id.clone() }).await; + { + let mut subs = self.subscriptions.lock().unwrap(); + subs.insert( + SubscriptionKey { + subscriber_id, + id: id.clone(), + }, + SubscriptionInfo { + subscriber_id, + filter: Some(filter.clone()), + assigned_producer: None, + handle: handle.clone(), + }, + ); + } + + // Assign producer and send Add + if let Some((_idx, tx)) = self + .assign_producer_for_subscription(subscriber_id, &id) + .await + { + let cmd = PlatformEventsCommand { + version: Some(CmdVersion::V0( + dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0 { + command: Some(Cmd::Add(dapi_grpc::platform::v0::AddSubscriptionV0 { + client_subscription_id: id.clone(), + filter: Some(filter.clone()), + })), + }, + )), + }; + if tx.send(Ok(cmd)).await.is_err() { + tracing::debug!( + subscription_id = %id, + "event_mux: failed to send Add to assigned producer" + ); + } + + Ok((id, handle)) + } else { + tracing::warn!(subscription_id = %id, "event_mux: no producers available for Add"); + Err(Status::unavailable("no producers available")) + } + } +} + +/// Handle used by application code to implement a concrete producer. +/// - `cmd_rx`: read commands from the mux +/// - `resp_tx`: send generated responses into the mux +pub struct EventProducer { + pub cmd_rx: CommandReceiver, + pub resp_tx: ResponseSender, +} + +impl EventProducer { + /// Forward all messages from cmd_rx to self.cmd_tx and form resp_rx to self.resp_tx + pub async fn forward(self, mut cmd_tx: C, resp_rx: R) + where + C: futures::Sink + Unpin + Send + 'static, + R: futures::Stream + Unpin + Send + 'static, + // R: AsyncRead + Unpin + ?Sized, + // W: AsyncWrite + Unpin + ?Sized, + { + use futures::stream::StreamExt; + + let mut cmd_rx = self.cmd_rx; + + let resp_tx = self.resp_tx; + // let workers = JoinSet::new(); + let cmd_worker = tokio::spawn(async move { + while let Some(cmd) = cmd_rx.recv().await { + if cmd_tx.send(cmd).await.is_err() { + tracing::warn!("event_mux: failed to forward command to producer"); + break; + } + } + tracing::error!("event_mux: command channel closed, stopping producer forwarder"); + }); + + let resp_worker = tokio::spawn(async move { + let mut rx = resp_rx; + while let Some(resp) = rx.next().await { + if resp_tx.send(resp).await.is_err() { + tracing::warn!("event_mux: failed to forward response to mux"); + break; + } + } + tracing::error!( + "event_mux: response channel closed, stopping producer response forwarder" + ); + }); + + let _ = join!(cmd_worker, resp_worker); + } +} +/// Handle used by application code to implement a concrete subscriber. +/// Subscriber is automatically cleaned up when channels are closed. +pub struct EventSubscriber { + pub cmd_tx: CommandSender, + pub resp_rx: ResponseReceiver, +} + +impl EventSubscriber { + /// Forward all messages from cmd_rx to self.cmd_tx and from self.resp_rx to resp_tx + pub async fn forward(self, cmd_rx: C, mut resp_tx: R) + where + C: futures::Stream + Unpin + Send + 'static, + R: futures::Sink + Unpin + Send + 'static, + { + use futures::stream::StreamExt; + + let cmd_tx = self.cmd_tx; + let mut resp_rx = self.resp_rx; + + let cmd_worker = tokio::spawn(async move { + let mut rx = cmd_rx; + while let Some(cmd) = rx.next().await { + if cmd_tx.send(cmd).await.is_err() { + tracing::warn!("event_mux: failed to forward command from subscriber"); + break; + } + } + tracing::error!( + "event_mux: subscriber command channel closed, stopping command forwarder" + ); + }); + + let resp_worker = tokio::spawn(async move { + while let Some(resp) = resp_rx.recv().await { + if resp_tx.send(resp).await.is_err() { + tracing::warn!("event_mux: failed to forward response to subscriber"); + break; + } + } + tracing::error!( + "event_mux: subscriber response channel closed, stopping response forwarder" + ); + }); + + let _ = join!(cmd_worker, resp_worker); + } +} // ---- Filters ---- + +#[derive(Clone, Debug)] +pub struct IdFilter { + id: String, +} + +impl EventFilter for IdFilter { + fn matches(&self, event: &PlatformEventsResponse) -> bool { + if let Some(dapi_grpc::platform::v0::platform_events_response::Version::V0(v0)) = + &event.version + { + match &v0.response { + Some(Resp::Event(ev)) => ev.client_subscription_id == self.id, + Some(Resp::Ack(ack)) => ack.client_subscription_id == self.id, + Some(Resp::Error(err)) => err.client_subscription_id == self.id, + None => false, + } + } else { + false + } + } +} + +struct SubscriptionInfo { + subscriber_id: u64, + #[allow(dead_code)] + filter: Option, + assigned_producer: Option, + handle: SubscriptionHandle, +} + +#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Debug)] +struct SubscriptionKey { + subscriber_id: u64, + id: String, +} + +/// Public alias for platform events subscription handle used by SDK and DAPI. +pub type PlatformEventsSubscriptionHandle = SubscriptionHandle; + +/// Create a bounded Sink from an mpsc Sender that maps errors to tonic::Status +pub fn sender_sink( + sender: mpsc::Sender, +) -> impl futures::Sink { + Box::pin( + PollSender::new(sender) + .sink_map_err(|_| Status::internal("Failed to send command to PlatformEventsMux")), + ) +} + +/// Create a bounded Sink that accepts `Result` and forwards `Ok(T)` through the sender +/// while propagating errors. +pub fn result_sender_sink( + sender: mpsc::Sender, +) -> impl futures::Sink, Error = Status> { + Box::pin( + PollSender::new(sender) + .sink_map_err(|_| Status::internal("Failed to send command to PlatformEventsMux")) + .with(|value| async move { value }), + ) +} + +#[cfg(test)] +mod tests { + use super::sender_sink; + use super::*; + use dapi_grpc::platform::v0::platform_event_v0 as pe; + use dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0; + use dapi_grpc::platform::v0::platform_events_response::PlatformEventsResponseV0; + use dapi_grpc::platform::v0::{PlatformEventMessageV0, PlatformEventV0, PlatformFilterV0}; + use std::collections::HashMap; + use tokio::time::{Duration, timeout}; + + fn make_add_cmd(id: &str) -> PlatformEventsCommand { + PlatformEventsCommand { + version: Some(CmdVersion::V0(PlatformEventsCommandV0 { + command: Some(Cmd::Add(dapi_grpc::platform::v0::AddSubscriptionV0 { + client_subscription_id: id.to_string(), + filter: Some(PlatformFilterV0::default()), + })), + })), + } + } + + fn make_event_resp(id: &str) -> PlatformEventsResponse { + let meta = pe::BlockMetadata { + height: 1, + time_ms: 0, + block_id_hash: vec![], + }; + let evt = PlatformEventV0 { + event: Some(pe::Event::BlockCommitted(pe::BlockCommitted { + meta: Some(meta), + tx_count: 0, + })), + }; + + PlatformEventsResponse { + version: Some( + dapi_grpc::platform::v0::platform_events_response::Version::V0( + PlatformEventsResponseV0 { + response: Some(Resp::Event(PlatformEventMessageV0 { + client_subscription_id: id.to_string(), + event: Some(evt), + })), + }, + ), + ), + } + } + + #[tokio::test] + async fn should_deliver_events_once_per_subscriber_with_shared_id() { + let mux = EventMux::new(); + + // Single producer captures Add/Remove commands and accepts responses + let EventProducer { + mut cmd_rx, + resp_tx, + } = mux.add_producer().await; + + // Two subscribers share the same client_subscription_id + let EventSubscriber { + cmd_tx: sub1_cmd_tx, + resp_rx: mut resp_rx1, + } = mux.add_subscriber().await; + let EventSubscriber { + cmd_tx: sub2_cmd_tx, + resp_rx: mut resp_rx2, + } = mux.add_subscriber().await; + + let sub_id = "dup-sub"; + + sub1_cmd_tx + .send(Ok(make_add_cmd(sub_id))) + .await + .expect("send add for subscriber 1"); + sub2_cmd_tx + .send(Ok(make_add_cmd(sub_id))) + .await + .expect("send add for subscriber 2"); + + // Ensure producer receives both Add commands + for _ in 0..2 { + let got = timeout(Duration::from_secs(1), cmd_rx.recv()) + .await + .expect("timeout waiting for Add") + .expect("producer channel closed") + .expect("Add command error"); + match got.version.and_then(|v| match v { + CmdVersion::V0(v0) => v0.command, + }) { + Some(Cmd::Add(a)) => assert_eq!(a.client_subscription_id, sub_id), + other => panic!("expected Add command, got {:?}", other), + } + } + + // Emit a single event targeting the shared subscription id + resp_tx + .send(Ok(make_event_resp(sub_id))) + .await + .expect("failed to send event into mux"); + + let extract_id = |resp: PlatformEventsResponse| -> String { + match resp.version.and_then(|v| match v { + dapi_grpc::platform::v0::platform_events_response::Version::V0(v0) => { + v0.response.and_then(|r| match r { + Resp::Event(m) => Some(m.client_subscription_id), + _ => None, + }) + } + }) { + Some(id) => id, + None => panic!("unexpected response variant"), + } + }; + + let ev1 = timeout(Duration::from_secs(1), resp_rx1.recv()) + .await + .expect("timeout waiting for subscriber1 event") + .expect("subscriber1 channel closed") + .expect("subscriber1 event error"); + let ev2 = timeout(Duration::from_secs(1), resp_rx2.recv()) + .await + .expect("timeout waiting for subscriber2 event") + .expect("subscriber2 channel closed") + .expect("subscriber2 event error"); + + assert_eq!(extract_id(ev1), sub_id); + assert_eq!(extract_id(ev2), sub_id); + + // Ensure no duplicate deliveries per subscriber + assert!( + timeout(Duration::from_millis(100), resp_rx1.recv()) + .await + .is_err() + ); + assert!( + timeout(Duration::from_millis(100), resp_rx2.recv()) + .await + .is_err() + ); + + // Drop subscribers to trigger Remove for both + drop(sub1_cmd_tx); + drop(resp_rx1); + drop(sub2_cmd_tx); + drop(resp_rx2); + + for _ in 0..2 { + let got = timeout(Duration::from_secs(1), cmd_rx.recv()) + .await + .expect("timeout waiting for Remove") + .expect("producer channel closed") + .expect("Remove command error"); + match got.version.and_then(|v| match v { + CmdVersion::V0(v0) => v0.command, + }) { + Some(Cmd::Remove(r)) => assert_eq!(r.client_subscription_id, sub_id), + other => panic!("expected Remove command, got {:?}", other), + } + } + } + + #[tokio::test] + async fn mux_chain_three_layers_delivers_once_per_subscriber() { + use tokio_stream::wrappers::ReceiverStream; + + // Build three muxes + let mux1 = EventMux::new(); + let mux2 = EventMux::new(); + let mux3 = EventMux::new(); + + // Bridge: Mux1 -> Producer1a -> Subscriber2a -> Mux2 + // and Mux1 -> Producer1b -> Subscriber2b -> Mux2 + let prod1a = mux1.add_producer().await; + let sub2a = mux2.add_subscriber().await; + // Use a sink that accepts EventsCommandResult directly (no extra Result nesting) + let sub2a_cmd_sink = sender_sink(sub2a.cmd_tx.clone()); + let sub2a_resp_stream = ReceiverStream::new(sub2a.resp_rx); + tokio::spawn(async move { prod1a.forward(sub2a_cmd_sink, sub2a_resp_stream).await }); + + let prod1b = mux1.add_producer().await; + let sub2b = mux2.add_subscriber().await; + let sub2b_cmd_sink = sender_sink(sub2b.cmd_tx.clone()); + let sub2b_resp_stream = ReceiverStream::new(sub2b.resp_rx); + tokio::spawn(async move { prod1b.forward(sub2b_cmd_sink, sub2b_resp_stream).await }); + + // Bridge: Mux2 -> Producer2 -> Subscriber3 -> Mux3 + let prod2 = mux2.add_producer().await; + let sub3 = mux3.add_subscriber().await; + let sub3_cmd_sink = sender_sink(sub3.cmd_tx.clone()); + let sub3_resp_stream = ReceiverStream::new(sub3.resp_rx); + tokio::spawn(async move { prod2.forward(sub3_cmd_sink, sub3_resp_stream).await }); + + // Deepest producers where we will capture commands and inject events + let p3a = mux3.add_producer().await; + let p3b = mux3.add_producer().await; + let mut p3a_cmd_rx = p3a.cmd_rx; + let p3a_resp_tx = p3a.resp_tx; + let mut p3b_cmd_rx = p3b.cmd_rx; + let p3b_resp_tx = p3b.resp_tx; + + // Three top-level subscribers on Mux1 + let mut sub1a = mux1.add_subscriber().await; + let mut sub1b = mux1.add_subscriber().await; + let mut sub1c = mux1.add_subscriber().await; + let id_a = "s1a"; + let id_b = "s1b"; + let id_c = "s1c"; + + // Send Add commands downstream from each subscriber + sub1a + .cmd_tx + .send(Ok(make_add_cmd(id_a))) + .await + .expect("send add a"); + sub1b + .cmd_tx + .send(Ok(make_add_cmd(id_b))) + .await + .expect("send add b"); + sub1c + .cmd_tx + .send(Ok(make_add_cmd(id_c))) + .await + .expect("send add c"); + + // Ensure deepest producers receive each Add exactly once and not on both + let mut assigned: HashMap = HashMap::new(); + for _ in 0..3 { + let (which, got_opt) = timeout(Duration::from_secs(2), async { + tokio::select! { + c = p3a_cmd_rx.recv() => (0usize, c), + c = p3b_cmd_rx.recv() => (1usize, c), + } + }) + .await + .expect("timeout waiting for downstream add"); + + let got = got_opt + .expect("p3 cmd channel closed") + .expect("downstream add error"); + + match got.version.and_then(|v| match v { + CmdVersion::V0(v0) => v0.command, + }) { + Some(Cmd::Add(a)) => { + let id = a.client_subscription_id; + if let Some(prev) = assigned.insert(id.clone(), which) { + panic!( + "subscription {} was dispatched to two producers: {} and {}", + id, prev, which + ); + } + } + _ => panic!("expected Add at deepest producer"), + } + } + assert!( + assigned.contains_key(id_a) + && assigned.contains_key(id_b) + && assigned.contains_key(id_c) + ); + + // Emit one event per subscription id via the assigned deepest producer + match assigned.get(id_a) { + Some(0) => p3a_resp_tx + .send(Ok(make_event_resp(id_a))) + .await + .expect("emit event a"), + Some(1) => p3b_resp_tx + .send(Ok(make_event_resp(id_a))) + .await + .expect("emit event a"), + _ => panic!("missing assignment for id_a"), + } + match assigned.get(id_b) { + Some(0) => p3a_resp_tx + .send(Ok(make_event_resp(id_b))) + .await + .expect("emit event b"), + Some(1) => p3b_resp_tx + .send(Ok(make_event_resp(id_b))) + .await + .expect("emit event b"), + _ => panic!("missing assignment for id_b"), + } + match assigned.get(id_c) { + Some(0) => p3a_resp_tx + .send(Ok(make_event_resp(id_c))) + .await + .expect("emit event c"), + Some(1) => p3b_resp_tx + .send(Ok(make_event_resp(id_c))) + .await + .expect("emit event c"), + _ => panic!("missing assignment for id_c"), + } + + // Receive each exactly once at the top-level subscribers + let a_first = timeout(Duration::from_secs(2), sub1a.resp_rx.recv()) + .await + .expect("timeout waiting for a event") + .expect("a subscriber closed") + .expect("a event error"); + let b_first = timeout(Duration::from_secs(2), sub1b.resp_rx.recv()) + .await + .expect("timeout waiting for b event") + .expect("b subscriber closed") + .expect("b event error"); + let c_first = timeout(Duration::from_secs(2), sub1c.resp_rx.recv()) + .await + .expect("timeout waiting for c event") + .expect("c subscriber closed") + .expect("c event error"); + + let get_id = |resp: PlatformEventsResponse| -> String { + match resp.version.and_then(|v| match v { + dapi_grpc::platform::v0::platform_events_response::Version::V0(v0) => { + v0.response.and_then(|r| match r { + Resp::Event(m) => Some(m.client_subscription_id), + _ => None, + }) + } + }) { + Some(id) => id, + None => panic!("unexpected response variant"), + } + }; + + assert_eq!(get_id(a_first.clone()), id_a); + assert_eq!(get_id(b_first.clone()), id_b); + assert_eq!(get_id(c_first.clone()), id_c); + + // Ensure no duplicates by timing out on the next recv + let a_dup = timeout(Duration::from_millis(200), sub1a.resp_rx.recv()).await; + assert!(a_dup.is_err(), "unexpected duplicate for subscriber a"); + let b_dup = timeout(Duration::from_millis(200), sub1b.resp_rx.recv()).await; + assert!(b_dup.is_err(), "unexpected duplicate for subscriber b"); + let c_dup = timeout(Duration::from_millis(200), sub1c.resp_rx.recv()).await; + assert!(c_dup.is_err(), "unexpected duplicate for subscriber c"); + } +} diff --git a/packages/rs-dash-event-bus/src/grpc_producer.rs b/packages/rs-dash-event-bus/src/grpc_producer.rs new file mode 100644 index 00000000000..43259b38327 --- /dev/null +++ b/packages/rs-dash-event-bus/src/grpc_producer.rs @@ -0,0 +1,52 @@ +use dapi_grpc::platform::v0::PlatformEventsCommand; +use dapi_grpc::platform::v0::platform_client::PlatformClient; +use dapi_grpc::tonic::Status; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio_stream::wrappers::ReceiverStream; + +use crate::event_mux::{EventMux, result_sender_sink}; + +const UPSTREAM_COMMAND_BUFFER: usize = 128; + +/// A reusable gRPC producer that bridges a Platform gRPC client with an [`EventMux`]. +/// +/// Creates bi-directional channels, subscribes upstream using the provided client, +/// and forwards commands/responses between the upstream stream and the mux. +pub struct GrpcPlatformEventsProducer; + +impl GrpcPlatformEventsProducer { + /// Connect the provided `client` to the `mux` and forward messages until completion. + /// + /// The `ready` receiver is used to signal when the producer has started. + pub async fn run( + mux: EventMux, + mut client: PlatformClient, + ready: oneshot::Sender<()>, + ) -> Result<(), Status> + where + // C: DapiRequestExecutor, + C: dapi_grpc::tonic::client::GrpcService, + C::Error: Into, + C::ResponseBody: dapi_grpc::tonic::codegen::Body + + Send + + 'static, + ::Error: + Into + Send, + { + let (cmd_tx, cmd_rx) = mpsc::channel::(UPSTREAM_COMMAND_BUFFER); + tracing::debug!("connecting gRPC producer to upstream"); + let resp_stream = client + .subscribe_platform_events(ReceiverStream::new(cmd_rx)) + .await?; + let cmd_sink = result_sender_sink(cmd_tx); + let resp_rx = resp_stream.into_inner(); + + tracing::debug!("registering gRPC producer with mux"); + let producer = mux.add_producer().await; + tracing::debug!("gRPC producer connected to mux and ready, starting forward loop"); + ready.send(()).ok(); + producer.forward(cmd_sink, resp_rx).await; + Ok(()) + } +} diff --git a/packages/rs-dash-event-bus/src/lib.rs b/packages/rs-dash-event-bus/src/lib.rs new file mode 100644 index 00000000000..372205a4781 --- /dev/null +++ b/packages/rs-dash-event-bus/src/lib.rs @@ -0,0 +1,17 @@ +//! rs-dash-event-bus: shared event bus and Platform events multiplexer +//! +//! - `event_bus`: generic in-process pub/sub with pluggable filtering +//! - `platform_mux`: upstream bi-di gRPC multiplexer for Platform events + +pub mod event_bus; +pub mod event_mux; +pub mod grpc_producer; +pub mod local_bus_producer; + +pub use event_bus::{EventBus, Filter, SubscriptionHandle}; +pub use event_mux::{ + EventMux, EventProducer, EventSubscriber, PlatformEventsSubscriptionHandle, result_sender_sink, + sender_sink, +}; +pub use grpc_producer::GrpcPlatformEventsProducer; +pub use local_bus_producer::run_local_platform_events_producer; diff --git a/packages/rs-dash-event-bus/src/local_bus_producer.rs b/packages/rs-dash-event-bus/src/local_bus_producer.rs new file mode 100644 index 00000000000..3ea358942ab --- /dev/null +++ b/packages/rs-dash-event-bus/src/local_bus_producer.rs @@ -0,0 +1,181 @@ +use crate::event_bus::{EventBus, SubscriptionHandle}; +use crate::event_mux::EventMux; +use dapi_grpc::platform::v0::platform_events_command::Version as CmdVersion; +use dapi_grpc::platform::v0::platform_events_command::platform_events_command_v0::Command as Cmd; +use dapi_grpc::platform::v0::platform_events_response::platform_events_response_v0::Response as Resp; +// already imported below +use dapi_grpc::platform::v0::platform_events_response::{ + PlatformEventsResponseV0, Version as RespVersion, +}; +// keep single RespVersion import +use dapi_grpc::platform::v0::{ + PlatformEventMessageV0, PlatformEventV0, PlatformEventsResponse, PlatformFilterV0, +}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::task::JoinHandle; + +/// Runs a local producer that bridges EventMux commands to a local EventBus of Platform events. +/// +/// - `mux`: the shared EventMux instance to attach as a producer +/// - `event_bus`: local bus emitting `PlatformEventV0` events +/// - `make_adapter`: function to convert incoming `PlatformFilterV0` into a bus filter type `F` +pub async fn run_local_platform_events_producer( + mux: EventMux, + event_bus: EventBus, + make_adapter: Arc F + Send + Sync>, +) where + F: crate::event_bus::Filter + Send + Sync + Debug + 'static, +{ + let producer = mux.add_producer().await; + let mut cmd_rx = producer.cmd_rx; + let resp_tx = producer.resp_tx; + + let mut subs: HashMap, JoinHandle<_>)> = + HashMap::new(); + + while let Some(cmd_res) = cmd_rx.recv().await { + match cmd_res { + Ok(cmd) => { + let v0 = match cmd.version { + Some(CmdVersion::V0(v0)) => v0, + None => { + let err = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Error( + dapi_grpc::platform::v0::PlatformErrorV0 { + client_subscription_id: "".to_string(), + code: 400, + message: "missing version".to_string(), + }, + )), + })), + }; + if resp_tx.send(Ok(err)).await.is_err() { + tracing::warn!("local producer failed to send missing version error"); + } + continue; + } + }; + match v0.command { + Some(Cmd::Add(add)) => { + let id = add.client_subscription_id; + let adapter = (make_adapter)(add.filter.unwrap_or_default()); + let handle = event_bus.add_subscription(adapter).await; + + // Start forwarding events for this subscription + let id_for = id.clone(); + let handle_clone = handle.clone(); + let resp_tx_clone = resp_tx.clone(); + let worker = tokio::spawn(async move { + forward_local_events(handle_clone, &id_for, resp_tx_clone).await; + }); + + subs.insert(id.clone(), (handle, worker)); + + // Ack + let ack = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Ack(dapi_grpc::platform::v0::AckV0 { + client_subscription_id: id, + op: "add".to_string(), + })), + })), + }; + if resp_tx.send(Ok(ack)).await.is_err() { + tracing::warn!("local producer failed to send add ack"); + } + } + Some(Cmd::Remove(rem)) => { + let id = rem.client_subscription_id; + if let Some((subscription, worker)) = subs.remove(&id) { + let ack = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Ack(dapi_grpc::platform::v0::AckV0 { + client_subscription_id: id, + op: "remove".to_string(), + })), + })), + }; + if resp_tx.send(Ok(ack)).await.is_err() { + tracing::warn!("local producer failed to send remove ack"); + } + + // TODO: add subscription close method + drop(subscription); + worker.abort(); + } + } + Some(Cmd::Ping(p)) => { + let ack = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Ack(dapi_grpc::platform::v0::AckV0 { + client_subscription_id: p.nonce.to_string(), + op: "ping".to_string(), + })), + })), + }; + if resp_tx.send(Ok(ack)).await.is_err() { + tracing::warn!("local producer failed to send ping ack"); + } + } + None => { + let err = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Error( + dapi_grpc::platform::v0::PlatformErrorV0 { + client_subscription_id: "".to_string(), + code: 400, + message: "missing command".to_string(), + }, + )), + })), + }; + if resp_tx.send(Ok(err)).await.is_err() { + tracing::warn!("local producer failed to send missing command error"); + } + } + } + } + Err(e) => { + tracing::warn!("local producer received error command: {}", e); + let err = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Error(dapi_grpc::platform::v0::PlatformErrorV0 { + client_subscription_id: "".to_string(), + code: 500, + message: format!("{}", e), + })), + })), + }; + if resp_tx.send(Ok(err)).await.is_err() { + tracing::warn!("local producer failed to send upstream error"); + } + } + } + } +} + +async fn forward_local_events( + subscription: SubscriptionHandle, + client_subscription_id: &str, + forward_tx: crate::event_mux::ResponseSender, +) where + F: crate::event_bus::Filter + Send + Sync + 'static, +{ + while let Some(evt) = subscription.recv().await { + let resp = PlatformEventsResponse { + version: Some(RespVersion::V0(PlatformEventsResponseV0 { + response: Some(Resp::Event(PlatformEventMessageV0 { + client_subscription_id: client_subscription_id.to_string(), + event: Some(evt), + })), + })), + }; + if forward_tx.send(Ok(resp)).await.is_err() { + tracing::warn!("client disconnected, stopping local event forwarding"); + break; + } + } +} diff --git a/packages/rs-drive-abci/Cargo.toml b/packages/rs-drive-abci/Cargo.toml index 12b2f01974d..a8b9e3b0bb0 100644 --- a/packages/rs-drive-abci/Cargo.toml +++ b/packages/rs-drive-abci/Cargo.toml @@ -62,7 +62,7 @@ reopen = { version = "1.0.3" } delegate = { version = "0.13" } regex = { version = "1.8.1" } metrics = { version = "0.24" } -metrics-exporter-prometheus = { version = "0.16", default-features = false, features = [ +metrics-exporter-prometheus = { version = "0.17", default-features = false, features = [ "http-listener", ] } url = { version = "2.3.1" } @@ -73,10 +73,13 @@ tokio = { version = "1.40", features = [ "time", ] } tokio-util = { version = "0.7" } +tokio-stream = { version = "0.1" } derive_more = { version = "1.0", features = ["from", "deref", "deref_mut"] } async-trait = "0.1.77" console-subscriber = { version = "0.4", optional = true } bls-signatures = { git = "https://github.com/dashpay/bls-signatures", rev="0842b17583888e8f46c252a4ee84cdfd58e0546f", optional = true } +rs-dash-event-bus = { path = "../rs-dash-event-bus" } +sha2 = { version = "0.10" } [dev-dependencies] bs58 = { version = "0.5.0" } diff --git a/packages/rs-drive-abci/src/abci/app/consensus.rs b/packages/rs-drive-abci/src/abci/app/consensus.rs index d2145d1e4b0..85909a0287d 100644 --- a/packages/rs-drive-abci/src/abci/app/consensus.rs +++ b/packages/rs-drive-abci/src/abci/app/consensus.rs @@ -1,11 +1,16 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, EventBusApplication, PlatformApplication, TransactionalApplication, +}; use crate::abci::handler; use crate::abci::handler::error::error_into_exception; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::BlockExecutionContext; use crate::platform_types::platform::Platform; +use crate::query::PlatformFilterAdapter; use crate::rpc::core::CoreRPCLike; +use dapi_grpc::platform::v0::PlatformEventV0; +use dash_event_bus::event_bus::EventBus; use dpp::version::PlatformVersion; use drive::grovedb::Transaction; use std::fmt::Debug; @@ -23,15 +28,21 @@ pub struct ConsensusAbciApplication<'a, C> { transaction: RwLock>>, /// The current block execution context block_execution_context: RwLock>, + /// In-process Platform event bus used to publish events at finalize_block + event_bus: EventBus, } impl<'a, C> ConsensusAbciApplication<'a, C> { /// Create new ABCI app - pub fn new(platform: &'a Platform) -> Self { + pub fn new( + platform: &'a Platform, + event_bus: EventBus, + ) -> Self { Self { platform, transaction: Default::default(), block_execution_context: Default::default(), + event_bus, } } } @@ -48,6 +59,12 @@ impl BlockExecutionApplication for ConsensusAbciApplication<'_, C> { } } +impl EventBusApplication for ConsensusAbciApplication<'_, C> { + fn event_bus(&self) -> &EventBus { + &self.event_bus + } +} + impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> { /// create and store a new transaction fn start_transaction(&self) { diff --git a/packages/rs-drive-abci/src/abci/app/full.rs b/packages/rs-drive-abci/src/abci/app/full.rs index 542bce32668..cd7a8424c6e 100644 --- a/packages/rs-drive-abci/src/abci/app/full.rs +++ b/packages/rs-drive-abci/src/abci/app/full.rs @@ -1,11 +1,16 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, EventBusApplication, PlatformApplication, TransactionalApplication, +}; use crate::abci::handler; use crate::abci::handler::error::error_into_exception; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::BlockExecutionContext; use crate::platform_types::platform::Platform; +use crate::query::PlatformFilterAdapter; use crate::rpc::core::CoreRPCLike; +use dapi_grpc::platform::v0::PlatformEventV0; +use dash_event_bus::event_bus::EventBus; use dpp::version::PlatformVersion; use drive::grovedb::Transaction; use std::fmt::Debug; @@ -23,6 +28,8 @@ pub struct FullAbciApplication<'a, C> { pub transaction: RwLock>>, /// The current block execution context pub block_execution_context: RwLock>, + /// In-process Platform event bus used to publish events at finalize_block + pub event_bus: EventBus, } impl<'a, C> FullAbciApplication<'a, C> { @@ -32,6 +39,7 @@ impl<'a, C> FullAbciApplication<'a, C> { platform, transaction: Default::default(), block_execution_context: Default::default(), + event_bus: EventBus::new(), } } } @@ -48,6 +56,12 @@ impl BlockExecutionApplication for FullAbciApplication<'_, C> { } } +impl EventBusApplication for FullAbciApplication<'_, C> { + fn event_bus(&self) -> &EventBus { + &self.event_bus + } +} + impl<'a, C> TransactionalApplication<'a> for FullAbciApplication<'a, C> { /// create and store a new transaction fn start_transaction(&self) { diff --git a/packages/rs-drive-abci/src/abci/app/mod.rs b/packages/rs-drive-abci/src/abci/app/mod.rs index d86290b566b..2c44f92bcd2 100644 --- a/packages/rs-drive-abci/src/abci/app/mod.rs +++ b/packages/rs-drive-abci/src/abci/app/mod.rs @@ -10,12 +10,22 @@ pub mod execution_result; mod full; use crate::execution::types::block_execution_context::BlockExecutionContext; +use crate::query::PlatformFilterAdapter; use crate::rpc::core::DefaultCoreRPC; pub use check_tx::CheckTxAbciApplication; pub use consensus::ConsensusAbciApplication; +use dash_event_bus::event_bus::EventBus; use dpp::version::PlatformVersion; pub use full::FullAbciApplication; +/// Provides access to the in-process Platform event bus +pub trait EventBusApplication { + /// Returns the Platform `EventBus` used for publishing Platform events + fn event_bus( + &self, + ) -> &EventBus; +} + /// Platform-based ABCI application pub trait PlatformApplication { /// Returns Platform diff --git a/packages/rs-drive-abci/src/abci/handler/finalize_block.rs b/packages/rs-drive-abci/src/abci/handler/finalize_block.rs index 852f85cc6b8..396f8680a9b 100644 --- a/packages/rs-drive-abci/src/abci/handler/finalize_block.rs +++ b/packages/rs-drive-abci/src/abci/handler/finalize_block.rs @@ -1,11 +1,16 @@ -use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication}; +use crate::abci::app::{ + BlockExecutionApplication, EventBusApplication, PlatformApplication, TransactionalApplication, +}; use crate::error::execution::ExecutionError; use crate::error::Error; use crate::execution::types::block_execution_context::v0::BlockExecutionContextV0Getters; use crate::platform_types::cleaned_abci_messages::finalized_block_cleaned_request::v0::FinalizeBlockCleanedRequest; use crate::platform_types::platform_state::v0::PlatformStateV0Methods; +use crate::query::PlatformFilterAdapter; use crate::rpc::core::CoreRPCLike; +use dapi_grpc::platform::v0::{platform_event_v0, PlatformEventV0}; use dpp::dashcore::Network; +use sha2::{Digest, Sha256}; use std::sync::atomic::Ordering; use tenderdash_abci::proto::abci as proto; @@ -14,7 +19,10 @@ pub fn finalize_block<'a, A, C>( request: proto::RequestFinalizeBlock, ) -> Result where - A: PlatformApplication + TransactionalApplication<'a> + BlockExecutionApplication, + A: PlatformApplication + + TransactionalApplication<'a> + + BlockExecutionApplication + + EventBusApplication, C: CoreRPCLike, { let _timer = crate::metrics::abci_request_duration("finalize_block"); @@ -46,7 +54,7 @@ where let block_height = request_finalize_block.height; let block_finalization_outcome = app.platform().finalize_block_proposal( - request_finalize_block, + request_finalize_block.clone(), block_execution_context, transaction, platform_version, @@ -96,5 +104,76 @@ where .committed_block_height_guard .store(block_height, Ordering::Relaxed); + let bus = app.event_bus().clone(); + + publish_block_committed_event(&bus, &request_finalize_block)?; + publish_state_transition_result_events(&bus, &request_finalize_block)?; + Ok(proto::ResponseFinalizeBlock { retain_height: 0 }) } + +fn publish_block_committed_event( + event_bus: &dash_event_bus::event_bus::EventBus, + request_finalize_block: &FinalizeBlockCleanedRequest, +) -> Result<(), Error> { + // Publish BlockCommitted platform event to the global event bus (best-effort) + let header_time = request_finalize_block.block.header.time; + let seconds = header_time.seconds as i128; + let nanos = header_time.nanos as i128; + let time_ms = (seconds * 1000) + (nanos / 1_000_000); + + let meta = platform_event_v0::BlockMetadata { + height: request_finalize_block.height, + time_ms: time_ms as u64, + block_id_hash: request_finalize_block.block_id.hash.to_vec(), + }; + + // Number of txs in this block + let tx_count = request_finalize_block.block.data.txs.len() as u32; + + let block_committed = platform_event_v0::BlockCommitted { + meta: Some(meta), + tx_count, + }; + + let event = PlatformEventV0 { + event: Some(platform_event_v0::Event::BlockCommitted(block_committed)), + }; + + event_bus.notify_sync(event); + + Ok(()) +} + +fn publish_state_transition_result_events( + event_bus: &dash_event_bus::event_bus::EventBus, + request_finalize_block: &FinalizeBlockCleanedRequest, +) -> Result<(), Error> { + // Prepare BlockMetadata once + let header_time = request_finalize_block.block.header.time; + let seconds = header_time.seconds as i128; + let nanos = header_time.nanos as i128; + let time_ms = (seconds * 1000) + (nanos / 1_000_000); + + let meta = platform_event_v0::BlockMetadata { + height: request_finalize_block.height, + time_ms: time_ms as u64, + block_id_hash: request_finalize_block.block_id.hash.to_vec(), + }; + + // For each tx in the block, compute hash and emit a StateTransitionResult + for tx in &request_finalize_block.block.data.txs { + let tx_hash = Sha256::digest(tx); + let event = PlatformEventV0 { + event: Some(platform_event_v0::Event::StateTransitionFinalized( + platform_event_v0::StateTransitionFinalized { + meta: Some(meta.clone()), + tx_hash: tx_hash.to_vec(), + }, + )), + }; + event_bus.notify_sync(event); + } + + Ok(()) +} diff --git a/packages/rs-drive-abci/src/query/mod.rs b/packages/rs-drive-abci/src/query/mod.rs index 0e161b1ae19..d298ff069cf 100644 --- a/packages/rs-drive-abci/src/query/mod.rs +++ b/packages/rs-drive-abci/src/query/mod.rs @@ -15,6 +15,7 @@ use crate::error::query::QueryError; use dpp::validation::ValidationResult; +pub use service::PlatformFilterAdapter; pub use service::QueryService; /// A query validation result diff --git a/packages/rs-drive-abci/src/query/service.rs b/packages/rs-drive-abci/src/query/service.rs index f5c7dacc4b8..3efe5fd3dbf 100644 --- a/packages/rs-drive-abci/src/query/service.rs +++ b/packages/rs-drive-abci/src/query/service.rs @@ -47,20 +47,33 @@ use dapi_grpc::platform::v0::{ GetTokenPreProgrammedDistributionsResponse, GetTokenStatusesRequest, GetTokenStatusesResponse, GetTokenTotalSupplyRequest, GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest, GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse, + PlatformEventV0 as PlatformEvent, PlatformEventsCommand, PlatformEventsResponse, WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse, }; +use dapi_grpc::tonic::Streaming; use dapi_grpc::tonic::{Code, Request, Response, Status}; +use dash_event_bus::event_bus::{EventBus, Filter as EventBusFilter}; +use dash_event_bus::{sender_sink, EventMux}; use dpp::version::PlatformVersion; use std::fmt::Debug; use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::time::Duration; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tracing::Instrument; +const PLATFORM_EVENTS_STREAM_BUFFER: usize = 128; + /// Service to handle platform queries pub struct QueryService { platform: Arc>, + _event_bus: EventBus, + /// Multiplexer for Platform events + platform_events_mux: EventMux, + /// background worker tasks + workers: Arc>>, } type QueryMethod = fn( @@ -72,8 +85,30 @@ type QueryMethod = fn( impl QueryService { /// Creates new QueryService - pub fn new(platform: Arc>) -> Self { - Self { platform } + pub fn new( + platform: Arc>, + event_bus: EventBus, + ) -> Self { + let mux = EventMux::new(); + let mut workers = tokio::task::JoinSet::new(); + + // Start local mux producer to bridge internal event_bus + { + let bus = event_bus.clone(); + let worker_mux = mux.clone(); + workers.spawn(async move { + use std::sync::Arc; + let mk = Arc::new(|f| PlatformFilterAdapter::new(f)); + dash_event_bus::run_local_platform_events_producer(worker_mux, bus, mk).await; + }); + } + + Self { + platform, + _event_bus: event_bus, + platform_events_mux: mux, + workers: Arc::new(Mutex::new(workers)), + } } async fn handle_blocking_query( @@ -252,6 +287,47 @@ fn respond_with_unimplemented(name: &str) -> Result, Status> { Err(Status::unimplemented("the endpoint is not supported")) } +/// Adapter implementing EventBus filter semantics based on incoming gRPC `PlatformFilterV0`. +#[derive(Clone, Debug)] +pub struct PlatformFilterAdapter { + inner: dapi_grpc::platform::v0::PlatformFilterV0, +} + +impl PlatformFilterAdapter { + /// Create a new adapter wrapping the provided gRPC `PlatformFilterV0`. + pub fn new(inner: dapi_grpc::platform::v0::PlatformFilterV0) -> Self { + Self { inner } + } +} + +impl EventBusFilter for PlatformFilterAdapter { + fn matches(&self, event: &PlatformEvent) -> bool { + use dapi_grpc::platform::v0::platform_event_v0::Event as Evt; + use dapi_grpc::platform::v0::platform_filter_v0::Kind; + match self.inner.kind.as_ref() { + None => false, + Some(Kind::All(all)) => *all, + Some(Kind::BlockCommitted(b)) => { + if !*b { + return false; + } + matches!(event.event, Some(Evt::BlockCommitted(_))) + } + Some(Kind::StateTransitionResult(filter)) => { + // If tx_hash is provided, match only that hash; otherwise match any STR + if let Some(Evt::StateTransitionFinalized(ref r)) = event.event { + match &filter.tx_hash { + Some(h) => r.tx_hash == *h, + None => true, + } + } else { + false + } + } + } + } +} + #[async_trait] impl PlatformService for QueryService { async fn broadcast_state_transition( @@ -802,8 +878,32 @@ impl PlatformService for QueryService { ) .await } -} + type subscribePlatformEventsStream = ReceiverStream>; + + /// Uses EventMux: forward inbound commands to mux subscriber and return its response stream + async fn subscribe_platform_events( + &self, + request: Request>, + ) -> Result, Status> { + // TODO: two issues are to be resolved: + // 1) restart of client with the same subscription id shows that old subscription is not removed + // 2) connection drops after some time + // return Err(Status::unimplemented("the endpoint is not supported yet")); + let inbound = request.into_inner(); + let (downstream_tx, rx) = + mpsc::channel::>(PLATFORM_EVENTS_STREAM_BUFFER); + let subscriber = self.platform_events_mux.add_subscriber().await; + + let mut workers = self.workers.lock().unwrap(); + workers.spawn(async move { + let resp_sink = sender_sink(downstream_tx); + subscriber.forward(inbound, resp_sink).await; + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} #[async_trait] impl DriveInternal for QueryService { async fn get_proofs( diff --git a/packages/rs-drive-abci/src/server.rs b/packages/rs-drive-abci/src/server.rs index 3baf33f5c2a..32b0b76acf1 100644 --- a/packages/rs-drive-abci/src/server.rs +++ b/packages/rs-drive-abci/src/server.rs @@ -20,7 +20,13 @@ pub fn start( config: PlatformConfig, cancel: CancellationToken, ) { - let query_service = Arc::new(QueryService::new(Arc::clone(&platform))); + // Create a shared EventBus for platform events (filters adapted from gRPC filters) + let event_bus = dash_event_bus::event_bus::EventBus::< + dapi_grpc::platform::v0::PlatformEventV0, + crate::query::PlatformFilterAdapter, + >::new(); + + let query_service = Arc::new(QueryService::new(Arc::clone(&platform), event_bus.clone())); let drive_internal = Arc::clone(&query_service); @@ -70,7 +76,7 @@ pub fn start( // Start blocking ABCI socket-server that process consensus requests sequentially - let app = ConsensusAbciApplication::new(platform.as_ref()); + let app = ConsensusAbciApplication::new(platform.as_ref(), event_bus.clone()); let server = tenderdash_abci::ServerBuilder::new(app, &config.abci.consensus_bind_address) .with_cancel_token(cancel.clone()) diff --git a/packages/rs-drive-abci/tests/strategy_tests/main.rs b/packages/rs-drive-abci/tests/strategy_tests/main.rs index 16173c723fe..5967e60a7fd 100644 --- a/packages/rs-drive-abci/tests/strategy_tests/main.rs +++ b/packages/rs-drive-abci/tests/strategy_tests/main.rs @@ -198,6 +198,54 @@ mod tests { ); } + // Verify the in-process EventBus subscription delivers a published event + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn event_bus_subscribe_all_and_receive() { + use dapi_grpc::platform::v0::platform_event_v0; + use dapi_grpc::platform::v0::platform_filter_v0::Kind as FilterKind; + use dapi_grpc::platform::v0::{PlatformEventV0, PlatformFilterV0}; + use drive_abci::abci::app::FullAbciApplication; + use drive_abci::query::PlatformFilterAdapter; + + let config = PlatformConfig::default(); + let mut platform = TestPlatformBuilder::new() + .with_config(config.clone()) + .build_with_mock_rpc(); + + // Create ABCI app and subscribe to all events + let abci_application = FullAbciApplication::new(&platform.platform); + let filter = PlatformFilterV0 { + kind: Some(FilterKind::All(true)), + }; + let handle = abci_application + .event_bus + .add_subscription(PlatformFilterAdapter::new(filter)) + .await; + + // Publish a simple BlockCommitted event + let meta = platform_event_v0::BlockMetadata { + height: 1, + time_ms: 123, + block_id_hash: vec![0u8; 32], + }; + let evt = PlatformEventV0 { + event: Some(platform_event_v0::Event::BlockCommitted( + platform_event_v0::BlockCommitted { + meta: Some(meta), + tx_count: 0, + }, + )), + }; + abci_application.event_bus.notify_sync(evt.clone()); + + // Await delivery + let received = tokio::time::timeout(std::time::Duration::from_secs(1), handle.recv()) + .await + .expect("timed out waiting for event"); + + assert_eq!(received, Some(evt)); + } + #[test] fn run_chain_stop_and_restart() { let strategy = NetworkStrategy { diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index 23a5b5236f4..a1e2ccc219d 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -13,10 +13,11 @@ dpp = { path = "../rs-dpp", default-features = false, features = [ ] } dapi-grpc = { path = "../dapi-grpc", default-features = false } rs-dapi-client = { path = "../rs-dapi-client", default-features = false } +rs-dash-event-bus = { path = "../rs-dash-event-bus", optional = true } drive = { path = "../rs-drive", default-features = false, features = [ "verify", ] } -platform-wallet = { path = "../rs-platform-wallet", optional = true} +platform-wallet = { path = "../rs-platform-wallet", optional = true } drive-proof-verifier = { path = "../rs-drive-proof-verifier", default-features = false } dash-context-provider = { path = "../rs-context-provider", default-features = false } @@ -34,6 +35,7 @@ serde = { version = "1.0.219", default-features = false, features = [ serde_json = { version = "1.0", features = ["preserve_order"], optional = true } tracing = { version = "0.1.41" } hex = { version = "0.4.3" } +once_cell = { version = "1.19", optional = true } dotenvy = { version = "0.15.7", optional = true } envy = { version = "0.4.2", optional = true } futures = { version = "0.3.30" } @@ -42,6 +44,7 @@ lru = { version = "0.12.5", optional = true } bip37-bloom-filter = { git = "https://github.com/dashpay/rs-bip37-bloom-filter", branch = "develop" } zeroize = { version = "1.8", features = ["derive"] } + [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1.40", features = ["macros", "time", "rt-multi-thread"] } @@ -51,8 +54,8 @@ js-sys = "0.3" [dev-dependencies] rs-dapi-client = { path = "../rs-dapi-client" } drive-proof-verifier = { path = "../rs-drive-proof-verifier" } +tokio = { version = "1.40", features = ["macros", "rt-multi-thread", "signal"] } rs-sdk-trusted-context-provider = { path = "../rs-sdk-trusted-context-provider" } -tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } base64 = { version = "0.22.1" } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } dpp = { path = "../rs-dpp", default-features = false, features = [ @@ -67,10 +70,22 @@ test-case = { version = "3.3.1" } assert_matches = "1.5.0" [features] -# TODO: remove mocks from default features -default = ["mocks", "offline-testing", "dapi-grpc/client", "token_reward_explanations"] -spv-client = ["core_spv", "core_key_wallet_manager", "core_key_wallet", "core_bincode", "core_key_wallet_bincode"] +# TODO: remove mocks from default features +default = [ + "mocks", + "offline-testing", + "dapi-grpc/client", + "token_reward_explanations", + "subscriptions", +] +spv-client = [ + "core_spv", + "core_key_wallet_manager", + "core_key_wallet", + "core_bincode", + "core_key_wallet_bincode", +] mocks = [ "dep:serde", "dep:serde_json", @@ -87,6 +102,11 @@ mocks = [ "zeroize/serde", ] +subscriptions = [ + "dep:rs-dash-event-bus", + "dep:once_cell", +] + # Run integration tests using test vectors from `tests/vectors/` instead of connecting to live Dash Platform. # # This feature is enabled by default to allow testing without connecting to the Dash Platform as diff --git a/packages/rs-sdk/examples/platform_events.rs b/packages/rs-sdk/examples/platform_events.rs new file mode 100644 index 00000000000..5bd43ba0555 --- /dev/null +++ b/packages/rs-sdk/examples/platform_events.rs @@ -0,0 +1,220 @@ +fn main() { + #[cfg(feature = "subscriptions")] + subscribe::main(); + #[cfg(not(feature = "subscriptions"))] + { + println!("Enable the 'subscriptions' feature to run this example."); + } +} + +#[cfg(feature = "subscriptions")] +mod subscribe { + + use dapi_grpc::platform::v0::platform_filter_v0::Kind as FilterKind; + use dapi_grpc::platform::v0::PlatformFilterV0; + use dapi_grpc::platform::v0::{ + platform_events_response::platform_events_response_v0::Response as Resp, + PlatformEventsResponse, + }; + use dash_event_bus::SubscriptionHandle; + use dash_sdk::platform::fetch_current_no_parameters::FetchCurrent; + use dash_sdk::platform::types::epoch::Epoch; + use dash_sdk::{Sdk, SdkBuilder}; + use rs_dapi_client::{Address, AddressList}; + use serde::Deserialize; + use std::str::FromStr; + use zeroize::Zeroizing; + + #[derive(Debug, Deserialize)] + pub struct Config { + // Aligned with rs-sdk/tests/fetch/config.rs + #[serde(default)] + pub platform_host: String, + #[serde(default)] + pub platform_port: u16, + #[serde(default)] + pub platform_ssl: bool, + + #[serde(default)] + pub core_host: Option, + #[serde(default)] + pub core_port: u16, + #[serde(default)] + pub core_user: String, + #[serde(default)] + pub core_password: Zeroizing, + + #[serde(default)] + pub platform_ca_cert_path: Option, + + // Optional hex-encoded tx hash to filter STR events + #[serde(default)] + pub state_transition_tx_hash_hex: Option, + } + + impl Config { + const CONFIG_PREFIX: &'static str = "DASH_SDK_"; + fn load() -> Self { + let path: String = env!("CARGO_MANIFEST_DIR").to_owned() + "/tests/.env"; + let _ = dotenvy::from_path(&path); + envy::prefixed(Self::CONFIG_PREFIX) + .from_env() + .expect("configuration error: missing DASH_SDK_* vars; see rs-sdk/tests/.env") + } + } + + #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + pub(super) async fn main() { + tracing_subscriber::fmt::init(); + + let config = Config::load(); + let sdk = setup_sdk(&config); + // sanity check - fetch current epoch to see if connection works + let epoch = Epoch::fetch_current(&sdk).await.expect("fetch epoch"); + tracing::info!("Current epoch: {:?}", epoch); + + // Subscribe to BlockCommitted only + let filter_block = PlatformFilterV0 { + kind: Some(FilterKind::BlockCommitted(true)), + }; + let (block_id, block_handle) = sdk + .subscribe_platform_events(filter_block) + .await + .expect("subscribe block_committed"); + + // Subscribe to StateTransitionFinalized; optionally filter by tx hash if provided + let tx_hash_bytes = config + .state_transition_tx_hash_hex + .as_deref() + .and_then(|s| hex::decode(s).ok()); + let filter_str = PlatformFilterV0 { + kind: Some(FilterKind::StateTransitionResult( + dapi_grpc::platform::v0::StateTransitionResultFilter { + tx_hash: tx_hash_bytes, + }, + )), + }; + let (str_id, str_handle) = sdk + .subscribe_platform_events(filter_str) + .await + .expect("subscribe state_transition_result"); + + // Subscribe to All events as a separate stream (demonstration) + let filter_all = PlatformFilterV0 { + kind: Some(FilterKind::All(true)), + }; + let (all_id, all_handle) = sdk + .subscribe_platform_events(filter_all) + .await + .expect("subscribe all"); + + println!( + "Subscribed: BlockCommitted id={}, STR id={}, All id={}", + block_id, str_id, all_id + ); + println!("Waiting for events... (Ctrl+C to exit)"); + + let block_worker = tokio::spawn(worker(block_handle)); + let str_worker = tokio::spawn(worker(str_handle)); + let all_worker = tokio::spawn(worker(all_handle)); + + // Handle Ctrl+C to remove subscriptions and exit + let abort_block = block_worker.abort_handle(); + let abort_str = str_worker.abort_handle(); + let abort_all = all_worker.abort_handle(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + println!("Ctrl+C received, stopping..."); + abort_block.abort(); + abort_str.abort(); + abort_all.abort(); + }); + + // Wait for workers to finish + let _ = tokio::join!(block_worker, str_worker, all_worker); + } + + async fn worker(handle: SubscriptionHandle) + where + F: Send + Sync + 'static, + { + while let Some(resp) = handle.recv().await { + // Parse and print + if let Some(dapi_grpc::platform::v0::platform_events_response::Version::V0(v0)) = + resp.version + { + match v0.response { + Some(Resp::Event(ev)) => { + let sub_id = ev.client_subscription_id; + use dapi_grpc::platform::v0::platform_event_v0::Event as E; + if let Some(event_v0) = ev.event { + if let Some(event) = event_v0.event { + match event { + E::BlockCommitted(bc) => { + if let Some(meta) = bc.meta { + println!( + "{} BlockCommitted: height={} time_ms={} tx_count={} block_id_hash=0x{}", + sub_id, + meta.height, + meta.time_ms, + bc.tx_count, + hex::encode(meta.block_id_hash) + ); + } + } + E::StateTransitionFinalized(r) => { + if let Some(meta) = r.meta { + println!( + "{} StateTransitionFinalized: height={} tx_hash=0x{} block_id_hash=0x{}", + sub_id, + meta.height, + hex::encode(r.tx_hash), + hex::encode(meta.block_id_hash) + ); + } + } + } + } + } + } + Some(Resp::Ack(ack)) => { + println!("Ack: {} op={}", ack.client_subscription_id, ack.op); + } + Some(Resp::Error(err)) => { + eprintln!( + "Error: {} code={} msg={}", + err.client_subscription_id, err.code, err.message + ); + } + None => {} + } + } + } + } + + fn setup_sdk(config: &Config) -> Sdk { + let scheme = if config.platform_ssl { "https" } else { "http" }; + let host = &config.platform_host; + let address = Address::from_str(&format!("{}://{}:{}", scheme, host, config.platform_port)) + .expect("parse uri"); + tracing::debug!("Using DAPI address: {}", address.uri()); + let core_host = config.core_host.as_deref().unwrap_or(host); + + #[allow(unused_mut)] + let mut builder = SdkBuilder::new(AddressList::from_iter([address])).with_core( + core_host, + config.core_port, + &config.core_user, + &config.core_password, + ); + + #[cfg(not(target_arch = "wasm32"))] + if let Some(cert) = &config.platform_ca_cert_path { + builder = builder + .with_ca_certificate_file(cert) + .expect("load CA cert"); + } + + builder.build().expect("cannot build sdk") + } +} diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index cb1b79dd7e6..40d19532097 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -38,6 +38,9 @@ pub enum Error { /// DAPI client error, for example, connection error #[error("Dapi client error: {0}")] DapiClientError(rs_dapi_client::DapiClientError), + /// Subscription error + #[error("Subscription error: {0}")] + SubscriptionError(String), #[cfg(feature = "mocks")] /// DAPI mocks error #[error("Dapi mocks error: {0}")] diff --git a/packages/rs-sdk/src/platform.rs b/packages/rs-sdk/src/platform.rs index e5631646ea6..b05daa2bc91 100644 --- a/packages/rs-sdk/src/platform.rs +++ b/packages/rs-sdk/src/platform.rs @@ -18,6 +18,8 @@ pub mod types; pub mod documents; pub mod dpns_usernames; +#[cfg(feature = "subscriptions")] +pub mod events; pub mod group_actions; pub mod tokens; diff --git a/packages/rs-sdk/src/platform/events.rs b/packages/rs-sdk/src/platform/events.rs new file mode 100644 index 00000000000..c748bfae747 --- /dev/null +++ b/packages/rs-sdk/src/platform/events.rs @@ -0,0 +1,82 @@ +use dapi_grpc::platform::v0::platform_client::PlatformClient; +use dapi_grpc::platform::v0::PlatformFilterV0; +use dash_event_bus::GrpcPlatformEventsProducer; +use dash_event_bus::{EventMux, PlatformEventsSubscriptionHandle}; +use rs_dapi_client::transport::{create_channel, PlatformGrpcClient}; +use rs_dapi_client::{RequestSettings, Uri}; +use std::time::Duration; +use tokio::time::timeout; + +impl crate::Sdk { + pub(crate) async fn get_event_mux(&self) -> Result { + use once_cell::sync::OnceCell; + static MUX: OnceCell = OnceCell::new(); + + if let Some(mux) = MUX.get() { + return Ok(mux.clone()); + } + + let mux = EventMux::new(); + + // Build a gRPC client to a live address + let address = self + .address_list() + .get_live_address() + .ok_or_else(|| crate::Error::SubscriptionError("no live DAPI address".to_string()))?; + let uri: Uri = address.uri().clone(); + + tracing::debug!(address = ?uri, "creating gRPC client for platform events"); + let settings = self + .dapi_client_settings + .override_by(RequestSettings { + connect_timeout: Some(Duration::from_secs(5)), + timeout: Some(Duration::from_secs(3600)), + ..Default::default() + }) + .finalize(); + let channel = create_channel(uri, Some(&settings)) + .map_err(|e| crate::Error::SubscriptionError(format!("channel: {e}")))?; + let client: PlatformGrpcClient = PlatformClient::new(channel); + + // Spawn the producer bridge + let worker_mux = mux.clone(); + tracing::debug!("spawning platform events producer task"); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + self.spawn(async move { + let inner_mux = worker_mux.clone(); + tracing::debug!("starting platform events producer task GrpcPlatformEventsProducer"); + if let Err(e) = GrpcPlatformEventsProducer::run(inner_mux, client, ready_tx).await { + tracing::error!("platform events producer terminated: {}", e); + } + }) + .await; + // wait until the producer is ready, with a timeout + if timeout(Duration::from_secs(5), ready_rx).await.is_err() { + tracing::error!("timed out waiting for platform events producer to be ready"); + return Err(crate::Error::SubscriptionError( + "timeout waiting for platform events producer to be ready".to_string(), + )); + } + + let _ = MUX.set(mux.clone()); + + Ok(mux) + } + + /// Subscribe to Platform events and receive a raw EventBus handle. The + /// upstream subscription is removed automatically (RAII) when the last + /// clone of the handle is dropped. + pub async fn subscribe_platform_events( + &self, + filter: PlatformFilterV0, + ) -> Result<(String, PlatformEventsSubscriptionHandle), crate::Error> { + // Initialize global mux with a single upstream producer on first use + let mux = self.get_event_mux().await?; + + let (id, handle) = mux + .subscribe(filter) + .await + .map_err(|e| crate::Error::SubscriptionError(format!("subscribe: {}", e)))?; + Ok((id, handle)) + } +} diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index ed0e13374f8..fc80c390613 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -43,8 +43,11 @@ use std::sync::atomic::Ordering; use std::sync::{atomic, Arc}; #[cfg(not(target_arch = "wasm32"))] use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::Mutex; #[cfg(feature = "mocks")] -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::MutexGuard; +#[cfg(feature = "subscriptions")] +use tokio::task::JoinSet; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use zeroize::Zeroizing; @@ -140,6 +143,10 @@ pub struct Sdk { #[cfg(feature = "mocks")] dump_dir: Option, + + #[cfg(feature = "subscriptions")] + /// Set of worker tasks spawned by the SDK + workers: Arc>>, } impl Clone for Sdk { fn clone(&self) -> Self { @@ -154,6 +161,8 @@ impl Clone for Sdk { metadata_height_tolerance: self.metadata_height_tolerance, metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, dapi_client_settings: self.dapi_client_settings, + #[cfg(feature = "subscriptions")] + workers: Arc::clone(&self.workers), #[cfg(feature = "mocks")] dump_dir: self.dump_dir.clone(), } @@ -594,6 +603,26 @@ impl Sdk { SdkInstance::Mock { address_list, .. } => address_list, } } + + /// Spawn a new worker task that will be managed by the Sdk. + #[cfg(feature = "subscriptions")] + pub(crate) async fn spawn( + &self, + task: impl std::future::Future + Send + 'static, + ) -> tokio::sync::oneshot::Receiver<()> { + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + let mut workers = self + .workers + .try_lock() + .expect("workers lock is poisoned or in use"); + workers.spawn(async move { + task.await; + let _ = done_tx.send(()); + }); + tokio::task::yield_now().await; + + done_rx + } } /// If received metadata time differs from local time by more than `tolerance`, the remote node is considered stale. @@ -1076,6 +1105,8 @@ impl SdkBuilder { metadata_last_seen_height: Arc::new(atomic::AtomicU64::new(0)), metadata_height_tolerance: self.metadata_height_tolerance, metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, + #[cfg(feature = "subscriptions")] + workers: Default::default(), #[cfg(feature = "mocks")] dump_dir: self.dump_dir, }; @@ -1144,6 +1175,8 @@ impl SdkBuilder { metadata_last_seen_height: Arc::new(atomic::AtomicU64::new(0)), metadata_height_tolerance: self.metadata_height_tolerance, metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, + #[cfg(feature = "subscriptions")] + workers: Default::default(), }; let mut guard = mock_sdk.try_lock().expect("mock sdk is in use by another thread and cannot be reconfigured"); guard.set_sdk(sdk.clone()); @@ -1157,6 +1190,10 @@ impl SdkBuilder { None => return Err(Error::Config("Mock mode is not available. Please enable `mocks` feature or provide address list.".to_string())), }; + // let sdk_clone = sdk.clone(); + // start subscribing to events + // crate::sync::block_on(async move { sdk_clone.get_event_mux().await })??; + Ok(sdk) } } diff --git a/packages/rs-sdk/tests/fetch/mod.rs b/packages/rs-sdk/tests/fetch/mod.rs index bb16b2a04fa..6ddfe751f71 100644 --- a/packages/rs-sdk/tests/fetch/mod.rs +++ b/packages/rs-sdk/tests/fetch/mod.rs @@ -24,6 +24,8 @@ mod identity; mod identity_contract_nonce; mod mock_fetch; mod mock_fetch_many; +#[cfg(feature = "subscriptions")] +mod platform_events; mod prefunded_specialized_balance; mod protocol_version_vote_count; mod protocol_version_votes; diff --git a/packages/rs-sdk/tests/fetch/platform_events.rs b/packages/rs-sdk/tests/fetch/platform_events.rs new file mode 100644 index 00000000000..8d4d253fdc2 --- /dev/null +++ b/packages/rs-sdk/tests/fetch/platform_events.rs @@ -0,0 +1,110 @@ +use super::{common::setup_logs, config::Config}; +use dapi_grpc::platform::v0::platform_client::PlatformClient; +use dapi_grpc::platform::v0::platform_events_command::platform_events_command_v0::Command as Cmd; +use dapi_grpc::platform::v0::platform_events_command::Version as CmdVersion; +use dapi_grpc::platform::v0::platform_events_response::platform_events_response_v0::Response as Resp; +use dapi_grpc::platform::v0::platform_events_response::Version as RespVersion; +use dapi_grpc::platform::v0::{AddSubscriptionV0, PingV0, PlatformEventsCommand, PlatformFilterV0}; +use dash_event_bus::{EventMux, GrpcPlatformEventsProducer}; +use rs_dapi_client::transport::create_channel; +use rs_dapi_client::{RequestSettings, Uri}; +use tokio::time::{timeout, Duration}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[cfg(all(feature = "network-testing", not(feature = "offline-testing")))] +async fn test_platform_events_ping() { + setup_logs(); + + // Build gRPC client from test config + let cfg = Config::new(); + let address = cfg + .address_list() + .get_live_address() + .expect("at least one platform address configured") + .clone(); + let uri: Uri = address.uri().clone(); + let settings = RequestSettings { + timeout: Some(Duration::from_secs(30)), + ..Default::default() + } + .finalize(); + let channel = create_channel(uri, Some(&settings)).expect("create channel"); + let client = PlatformClient::new(channel); + + // Wire EventMux with a gRPC producer + let mux = EventMux::new(); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + let mux_worker = mux.clone(); + tokio::spawn(async move { + let _ = GrpcPlatformEventsProducer::run(mux_worker, client, ready_tx).await; + }); + // Wait until producer is ready + timeout(Duration::from_secs(5), ready_rx) + .await + .expect("producer ready timeout") + .expect("producer start"); + + // Create a raw subscriber on the mux to send commands and receive responses + let sub = mux.add_subscriber().await; + let cmd_tx = sub.cmd_tx; + let mut resp_rx = sub.resp_rx; + + // Choose a numeric ID for our subscription and ping + let id_num: u64 = 4242; + let id_str = id_num.to_string(); + + // Send Add with our chosen client_subscription_id + let add_cmd = PlatformEventsCommand { + version: Some(CmdVersion::V0( + dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0 { + command: Some(Cmd::Add(AddSubscriptionV0 { + client_subscription_id: id_str.clone(), + filter: Some(PlatformFilterV0::default()), + })), + }, + )), + }; + cmd_tx.send(Ok(add_cmd)).expect("send add"); + + // Expect Add ack + let add_ack = timeout(Duration::from_secs(3), resp_rx.recv()) + .await + .expect("timeout waiting add ack") + .expect("subscriber closed") + .expect("ack error"); + match add_ack.version.and_then(|v| match v { + RespVersion::V0(v0) => v0.response, + }) { + Some(Resp::Ack(a)) => { + assert_eq!(a.client_subscription_id, id_str); + assert_eq!(a.op, "add"); + } + other => panic!("expected add ack, got: {:?}", other.map(|_| ())), + } + + // Send Ping with matching nonce so that ack routes to our subscription + let ping_cmd = PlatformEventsCommand { + version: Some(CmdVersion::V0( + dapi_grpc::platform::v0::platform_events_command::PlatformEventsCommandV0 { + command: Some(Cmd::Ping(PingV0 { nonce: id_num })), + }, + )), + }; + cmd_tx.send(Ok(ping_cmd)).expect("send ping"); + + // Expect Ping ack routed through Mux to our subscriber + let ping_ack = timeout(Duration::from_secs(3), resp_rx.recv()) + .await + .expect("timeout waiting ping ack") + .expect("subscriber closed") + .expect("ack error"); + match ping_ack.version.and_then(|v| match v { + RespVersion::V0(v0) => v0.response, + }) { + Some(Resp::Ack(a)) => { + assert_eq!(a.client_subscription_id, id_str); + assert_eq!(a.op, "ping"); + } + other => panic!("expected ping ack, got: {:?}", other.map(|_| ())), + } +} diff --git a/packages/wasm-sdk/src/error.rs b/packages/wasm-sdk/src/error.rs index a2b2e264462..b46a6220e3e 100644 --- a/packages/wasm-sdk/src/error.rs +++ b/packages/wasm-sdk/src/error.rs @@ -169,6 +169,12 @@ impl From for WasmSdkError { Cancelled(msg) => Self::new(WasmSdkErrorKind::Cancelled, msg, None, retriable), StaleNode(e) => Self::new(WasmSdkErrorKind::StaleNode, e.to_string(), None, retriable), StateTransitionBroadcastError(e) => WasmSdkError::from(e), + SubscriptionError(msg) => Self::new( + WasmSdkErrorKind::DapiClientError, + format!("Subscription error: {}", msg), + None, + retriable, + ), } } }