Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ temp_cache = ["cache", "mini-moka", "typesize?/mini_moka"]

typesize = ["dep:typesize", "small-fixed-array/typesize", "bool_to_bitflags/typesize"]

# Enables compile-time heavy instrument macros from tracing
tracing_instrument = ["tracing/attributes"]

# Removed feature (https://github.com/serenity-rs/serenity/pull/2246)
absolute_ratelimits = []

Expand Down
5 changes: 2 additions & 3 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use dashmap::DashMap;
#[cfg(feature = "temp_cache")]
use mini_moka::sync::Cache as MokaCache;
use parking_lot::RwLock;
use tracing::instrument;

pub use self::cache_update::CacheUpdate;
pub use self::settings::Settings;
Expand Down Expand Up @@ -234,7 +233,7 @@ impl Cache {
///
/// let cache = Cache::new_with_settings(settings);
/// ```
#[instrument]
#[cfg_attr(feature = "tracing_instrument", instrument)]
pub fn new_with_settings(settings: Settings) -> Self {
#[cfg(feature = "temp_cache")]
fn temp_cache<K, V>(ttl: Duration) -> MokaCache<K, V, BuildHasher>
Expand Down Expand Up @@ -673,7 +672,7 @@ impl Cache {
/// Refer to the [`CacheUpdate` examples].
///
/// [`CacheUpdate` examples]: CacheUpdate#examples
#[instrument(skip(self, e))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self, e)))]
pub fn update<E: CacheUpdate>(&self, e: &mut E) -> Option<E::Output> {
e.update(self)
}
Expand Down
3 changes: 2 additions & 1 deletion src/client/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub struct Context {
pub cache: Arc<Cache>,
}

// Used by the #[instrument] macro on client::dispatch::handle_event
// Used by the #[cfg_attr(feature = "tracing_instrument", instrument)] macro on
// client::dispatch::handle_event
impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
Expand Down
30 changes: 13 additions & 17 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::channel::mpsc::UnboundedReceiver as Receiver;
use futures::future::BoxFuture;
use futures::StreamExt as _;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument};
use tracing::debug;
use typemap_rev::{TypeMap, TypeMapKey};

pub use self::context::Context;
Expand Down Expand Up @@ -304,7 +304,7 @@ impl IntoFuture for ClientBuilder {

type IntoFuture = BoxFuture<'static, Result<Client>>;

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
fn into_future(self) -> Self::IntoFuture {
let data = Arc::new(RwLock::new(self.data));
#[cfg(feature = "framework")]
Expand Down Expand Up @@ -631,8 +631,12 @@ impl Client {
/// # }
/// ```
///
/// # Errors
///
/// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to an error.
///
/// [gateway docs]: crate::gateway#sharding
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn start(&mut self) -> Result<()> {
self.start_connection(0, 0, NonZeroU16::MIN).await
}
Expand Down Expand Up @@ -673,7 +677,7 @@ impl Client {
/// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to an error.
///
/// [gateway docs]: crate::gateway#sharding
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn start_autosharded(&mut self) -> Result<()> {
let (end, total) = {
let res = self.http.get_bot_gateway().await?;
Expand Down Expand Up @@ -737,7 +741,7 @@ impl Client {
/// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to an error.
///
/// [gateway docs]: crate::gateway#sharding
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn start_shard(&mut self, shard: u16, shards: u16) -> Result<()> {
self.start_connection(shard, shard, check_shard_total(shards)).await
}
Expand Down Expand Up @@ -778,7 +782,7 @@ impl Client {
/// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to an error.
///
/// [Gateway docs]: crate::gateway#sharding
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn start_shards(&mut self, total_shards: u16) -> Result<()> {
self.start_connection(0, total_shards - 1, check_shard_total(total_shards)).await
}
Expand Down Expand Up @@ -819,12 +823,12 @@ impl Client {
/// Returns a [`ClientError::Shutdown`] when all shards have shutdown due to an error.
///
/// [Gateway docs]: crate::gateway#sharding
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn start_shard_range(&mut self, range: Range<u16>, total_shards: u16) -> Result<()> {
self.start_connection(range.start, range.end, check_shard_total(total_shards)).await
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
async fn start_connection(
&mut self,
start_shard: u16,
Expand All @@ -842,15 +846,7 @@ impl Client {

debug!("Initializing shard info: {} - {}/{}", start_shard, init, total_shards);

if let Err(why) = self.shard_manager.initialize(start_shard, init, total_shards) {
error!("Failed to boot a shard: {:?}", why);
info!("Shutting down all shards");

self.shard_manager.shutdown_all().await;

return Err(Error::Client(ClientError::ShardBootFailure));
}

self.shard_manager.initialize(start_shard, init, total_shards);
if let Some(Err(err)) = self.shard_manager_return_value.next().await {
return Err(Error::Gateway(err));
}
Expand Down
3 changes: 1 addition & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::io::Error as IoError;
use reqwest::{header::InvalidHeaderValue, Error as ReqwestError};
#[cfg(feature = "gateway")]
use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
use tracing::instrument;

#[cfg(feature = "client")]
use crate::client::ClientError;
Expand Down Expand Up @@ -171,7 +170,7 @@ impl fmt::Display for Error {
}

impl StdError for Error {
#[instrument]
#[cfg_attr(feature = "tracing_instrument", instrument)]
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Format(inner) => Some(inner),
Expand Down
3 changes: 1 addition & 2 deletions src/framework/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use structures::buckets::{Bucket, RateLimitAction};
pub use structures::*;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::instrument;
use uwl::Stream;

use self::buckets::{RateLimitInfo, RevertBucket};
Expand Down Expand Up @@ -602,7 +601,7 @@ impl StandardFramework {

#[async_trait]
impl Framework for StandardFramework {
#[instrument(skip(self, event))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self, event)))]
async fn dispatch(&self, mut ctx: Context, event: FullEvent) {
let FullEvent::Message {
new_message: msg,
Expand Down
23 changes: 8 additions & 15 deletions src/gateway/bridge/shard_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::mpsc::{self, UnboundedReceiver as Receiver, UnboundedSende
use futures::{SinkExt, StreamExt};
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout;
use tracing::{info, instrument, warn};
use tracing::{info, warn};
use typemap_rev::TypeMap;

#[cfg(feature = "voice")]
Expand Down Expand Up @@ -170,21 +170,14 @@ impl ShardManager {
///
/// This will communicate shard boots with the [`ShardQueuer`] so that they are properly
/// queued.
#[instrument(skip(self))]
pub fn initialize(
&self,
shard_index: u16,
shard_init: u16,
shard_total: NonZeroU16,
) -> Result<()> {
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub fn initialize(&self, shard_index: u16, shard_init: u16, shard_total: NonZeroU16) {
let shard_to = shard_index + shard_init;

self.set_shard_total(shard_total);
for shard_id in shard_index..shard_to {
self.boot(ShardId(shard_id));
}

Ok(())
}

/// Restarts a shard runner.
Expand All @@ -207,7 +200,7 @@ impl ShardManager {
/// ```
///
/// [`ShardRunner`]: super::ShardRunner
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn restart(&self, shard_id: ShardId) {
info!("Restarting shard {shard_id}");
self.shutdown(shard_id, 4000).await;
Expand All @@ -218,7 +211,7 @@ impl ShardManager {
/// valid [`ShardRunner`].
///
/// [`ShardRunner`]: super::ShardRunner
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn shards_instantiated(&self) -> Vec<ShardId> {
self.runners.lock().await.keys().copied().collect()
}
Expand All @@ -231,7 +224,7 @@ impl ShardManager {
/// **Note**: If the receiving end of an mpsc channel - theoretically owned by the shard runner
/// - no longer exists, then the shard runner will not know it should shut down. This _should
/// never happen_. It may already be stopped.
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn shutdown(&self, shard_id: ShardId, code: u16) {
const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(5);

Expand Down Expand Up @@ -273,7 +266,7 @@ impl ShardManager {
///
/// If you only need to shutdown a select number of shards, prefer looping over the
/// [`Self::shutdown`] method.
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn shutdown_all(&self) {
let keys = {
let runners = self.runners.lock().await;
Expand Down Expand Up @@ -305,7 +298,7 @@ impl ShardManager {
drop(self.shard_queuer.unbounded_send(msg));
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
fn boot(&self, shard_id: ShardId) {
info!("Telling shard queuer to start shard {shard_id}");

Expand Down
14 changes: 7 additions & 7 deletions src/gateway/bridge/shard_queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::channel::mpsc::UnboundedReceiver as Receiver;
use futures::StreamExt;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{sleep, timeout, Duration, Instant};
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, warn};
use typemap_rev::TypeMap;

#[cfg(feature = "voice")]
Expand Down Expand Up @@ -100,7 +100,7 @@ impl ShardQueuer {
/// over.
///
/// **Note**: This should be run in its own thread due to the blocking nature of the loop.
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn run(&mut self) {
// The duration to timeout from reads over the Rx channel. This can be done in a loop, and
// if the read times out then a shard can be started if one is presently waiting in the
Expand Down Expand Up @@ -135,7 +135,7 @@ impl ShardQueuer {
}
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
async fn check_last_start(&mut self) {
let Some(instant) = self.last_start else { return };

Expand All @@ -152,7 +152,7 @@ impl ShardQueuer {
sleep(to_sleep).await;
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
async fn checked_start(&mut self, shard_id: ShardId) {
debug!("[Shard Queuer] Checked start for shard {shard_id}");

Expand All @@ -167,7 +167,7 @@ impl ShardQueuer {
self.last_start = Some(Instant::now());
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
async fn start(&mut self, shard_id: ShardId) -> Result<()> {
let mut shard = Shard::new(
Arc::clone(&self.ws_url),
Expand Down Expand Up @@ -212,7 +212,7 @@ impl ShardQueuer {
Ok(())
}

#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
async fn shutdown_runners(&mut self) {
let keys = {
let runners = self.runners.lock().await;
Expand All @@ -236,7 +236,7 @@ impl ShardQueuer {
/// **Note**: If the receiving end of an mpsc channel - theoretically owned by the shard runner
/// - no longer exists, then the shard runner will not know it should shut down. This _should
/// never happen_. It may already be stopped.
#[instrument(skip(self))]
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn shutdown(&mut self, shard_id: ShardId, code: u16) {
info!("Shutting down shard {}", shard_id);

Expand Down
Loading