Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions examples/e07_shard_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,22 @@ async fn main() {
let mut client =
Client::builder(token, intents).event_handler(Handler).await.expect("Err creating client");

// Here we clone a lock to the Shard Manager, and then move it into a new thread. The thread
// will unlock the manager and print shards' status on a loop.
let manager = client.shard_manager.clone();
// Here we get a HashMap of of the shards' status that we move into a new thread. A separate
// tokio task holds the ownership to each entry, so each one will require acquiring a lock
// before reading.
let runners = client.shard_manager.runner_info();

tokio::spawn(async move {
loop {
sleep(Duration::from_secs(30)).await;

let shard_runners = manager.runners.lock().await;

for (id, runner) in shard_runners.iter() {
println!(
"Shard ID {} is {} with a latency of {:?}",
id, runner.stage, runner.latency,
);
for (id, runner) in &runners {
if let Ok(runner) = runner.lock() {
println!(
"Shard ID {} is {} with a latency of {:?}",
id, runner.stage, runner.latency,
);
}
}
}
});
Expand Down
8 changes: 4 additions & 4 deletions src/gateway/client/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

#[cfg(feature = "cache")]
pub use crate::cache::Cache;
use crate::gateway::{ActivityData, ShardMessenger, ShardRunner};
use crate::gateway::{ActivityData, ShardMessenger};
use crate::http::{CacheHttp, Http};
use crate::model::prelude::*;

Expand Down Expand Up @@ -62,15 +62,15 @@ impl Context {
/// Create a new Context to be passed to an event handler.
pub(crate) fn new(
data: Arc<dyn std::any::Any + Send + Sync>,
runner: &ShardRunner,
shard_messenger: ShardMessenger,
shard_id: ShardId,
http: Arc<Http>,
#[cfg(feature = "cache")] cache: Arc<Cache>,
) -> Context {
Context {
shard: ShardMessenger::new(runner),
shard_id,
data,
shard: shard_messenger,
shard_id,
http,
#[cfg(feature = "cache")]
cache,
Expand Down
73 changes: 18 additions & 55 deletions src/gateway/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,29 @@ use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;

use futures::channel::mpsc::UnboundedReceiver as Receiver;
use futures::future::BoxFuture;
use futures::StreamExt as _;
#[cfg(feature = "tracing_instrument")]
use tracing::instrument;
use tracing::{debug, warn};

pub use self::context::Context;
pub use self::event_handler::{EventHandler, FullEvent, RawEventHandler};
use super::TransportCompression;
#[cfg(feature = "cache")]
use crate::cache::Cache;
#[cfg(feature = "cache")]
use crate::cache::Settings as CacheSettings;
#[cfg(feature = "framework")]
use crate::framework::Framework;
#[cfg(feature = "voice")]
use crate::gateway::VoiceGatewayManager;
use crate::gateway::{
use super::VoiceGatewayManager;
use super::{
ActivityData,
GatewayError,
PresenceData,
ShardManager,
ShardManagerOptions,
TransportCompression,
DEFAULT_WAIT_BETWEEN_SHARD_START,
};
#[cfg(feature = "cache")]
use crate::cache::Cache;
#[cfg(feature = "cache")]
use crate::cache::Settings as CacheSettings;
#[cfg(feature = "framework")]
use crate::framework::Framework;
use crate::http::Http;
use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
Expand Down Expand Up @@ -316,9 +313,6 @@ impl IntoFuture for ClientBuilder {
}
}

#[cfg(feature = "voice")]
let voice_manager = self.voice_manager;

#[cfg(feature = "cache")]
let cache = Arc::new(Cache::new_with_settings(self.cache_settings));

Expand All @@ -337,33 +331,33 @@ impl IntoFuture for ClientBuilder {

#[cfg(feature = "framework")]
let framework_cell = Arc::new(OnceLock::new());
let (shard_manager, shard_manager_ret_value) = ShardManager::new(ShardManagerOptions {

let shard_manager = ShardManager::new(ShardManagerOptions {
token: self.token,
data: Arc::clone(&data),
event_handler: self.event_handler,
raw_event_handler: self.raw_event_handler,
#[cfg(feature = "framework")]
framework: Arc::clone(&framework_cell),
#[cfg(feature = "voice")]
voice_manager: voice_manager.clone(),
voice_manager: self.voice_manager.clone(),
ws_url: Arc::clone(&ws_url),
compression: self.compression,
shard_total,
max_concurrency,
#[cfg(feature = "cache")]
cache: Arc::clone(&cache),
http: Arc::clone(&http),
intents,
presence: Some(presence),
max_concurrency,
wait_time_between_shard_start: self.wait_time_between_shard_start,
compression: self.compression,
});

let client = Client {
data,
shard_manager,
shard_manager_return_value: shard_manager_ret_value,
#[cfg(feature = "voice")]
voice_manager,
voice_manager: self.voice_manager,
ws_url,
#[cfg(feature = "cache")]
cache,
Expand Down Expand Up @@ -449,43 +443,17 @@ pub struct Client {
/// # use std::time::Duration;
/// #
/// # fn run(client: Client) {
/// // Create a clone of the `Arc` containing the shard manager.
/// let shard_manager = client.shard_manager.clone();
///
/// tokio::spawn(async move {
/// loop {
/// let count = shard_manager.shards_instantiated().await.len();
/// let count = client.shard_manager.shards_instantiated().len();
/// println!("Shard count instantiated: {}", count);
///
/// tokio::time::sleep(Duration::from_millis(5000)).await;
/// }
/// });
/// # }
/// ```
///
/// Shutting down all connections after one minute of operation:
///
/// ```rust,no_run
/// # use serenity::prelude::*;
/// # use std::time::Duration;
/// #
/// # fn run(client: Client) {
/// // Create a clone of the `Arc` containing the shard manager.
/// let shard_manager = client.shard_manager.clone();
///
/// // Create a thread which will sleep for 60 seconds and then have the shard manager
/// // shutdown.
/// tokio::spawn(async move {
/// tokio::time::sleep(Duration::from_secs(60)).await;
///
/// shard_manager.shutdown_all().await;
///
/// println!("Shutdown shard manager!");
/// });
/// # }
/// ```
pub shard_manager: Arc<ShardManager>,
shard_manager_return_value: Receiver<Result<(), GatewayError>>,
pub shard_manager: ShardManager,
/// The voice manager for the client.
///
/// This is an ergonomic structure for interfacing over shards' voice
Expand Down Expand Up @@ -796,12 +764,7 @@ impl Client {

debug!("Initializing shard info: {} - {}/{}", start_shard, init, total_shards);

self.shard_manager.initialize(start_shard, init, total_shards);
if let Some(Err(err)) = self.shard_manager_return_value.next().await {
return Err(Error::Gateway(err));
}

Ok(())
self.shard_manager.run(start_shard, init, total_shards).await.map_err(Error::Gateway)
}
}

Expand Down
21 changes: 6 additions & 15 deletions src/gateway/sharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,9 @@
//!
//! ### [`ShardManager`]
//!
//! The shard manager provides a clean interface for communicating with shard runners either
//! individually or collectively, with functions such as [`ShardManager::shutdown`] and
//! [`ShardManager::restart`] to manage shards in a fine-grained way.
//!
//! For most use cases, the [`ShardManager`] will fit all your low-level sharding needs.
//!
//! ### [`ShardQueuer`]
//!
//! A light wrapper around an mpsc receiver that receives [`ShardQueuerMessage`]s. It should be run
//! in its own thread so it can receive messages to start shards concurrently in a queue.
//! The shard manager provides an interface for managing the starting, restarting, and shutdown of
//! shards by spawning tasks, each containing a [`ShardRunner`], and communicating with each task
//! using message passing.
//!
//! ### [`ShardRunner`]
//!
Expand All @@ -37,7 +30,7 @@

mod shard_manager;
mod shard_messenger;
mod shard_queuer;
mod shard_queue;
mod shard_runner;

use std::fmt;
Expand All @@ -57,11 +50,12 @@ use url::Url;

pub use self::shard_manager::{
ShardManager,
ShardManagerMessage,
ShardManagerOptions,
DEFAULT_WAIT_BETWEEN_SHARD_START,
};
pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::{ShardQueue, ShardQueuer, ShardQueuerMessage};
pub use self::shard_queue::ShardQueue;
pub use self::shard_runner::{ShardRunner, ShardRunnerMessage, ShardRunnerOptions};
use super::{ActivityData, ChunkGuildFilter, GatewayError, PresenceData, WsClient};
use crate::constants::{self, CloseCode};
Expand Down Expand Up @@ -787,9 +781,6 @@ pub enum ShardAction {
pub struct ShardRunnerInfo {
/// The latency between when a heartbeat was sent and when the acknowledgement was received.
pub latency: Option<StdDuration>,
/// The channel used to communicate with the shard runner, telling it what to do with regards
/// to its status.
pub runner_tx: ShardMessenger,
/// The current connection stage of the shard.
pub stage: ConnectionStage,
}
Expand Down
Loading
Loading