Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/e09_collectors/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ impl EventHandler for Handler {
// There is a method implemented for some models to conveniently collect replies. They
// return a builder that can be turned into a Stream, or here, where we can await a
// single reply
let collector =
msg.author.id.collect_messages(ctx.shard.clone()).timeout(Duration::from_secs(10));
let collector = msg.author.id.collect_messages(&ctx).timeout(Duration::from_secs(10));
if let Some(answer) = collector.await {
if answer.content.to_lowercase() == "ferris" {
let _ = answer.reply(&ctx.http, "That's correct!").await;
Expand All @@ -47,7 +46,7 @@ impl EventHandler for Handler {
// The message model can also be turned into a Collector to collect reactions on it.
let collector = react_msg
.id
.collect_reactions(ctx.shard.clone())
.collect_reactions(&ctx)
.timeout(Duration::from_secs(10))
.author_id(msg.author.id);

Expand All @@ -65,7 +64,7 @@ impl EventHandler for Handler {
let _ = msg.reply(&ctx.http, "Write 5 messages in 10 seconds").await;

// We can create a collector from scratch too using this builder future.
let collector = MessageCollector::new(ctx.shard.clone())
let collector = MessageCollector::new(&ctx)
// Only collect messages by this user.
.author_id(msg.author.id)
.channel_id(msg.channel_id)
Expand Down Expand Up @@ -100,7 +99,7 @@ impl EventHandler for Handler {
// We can also collect arbitrary events using the collect() function. For example, here we
// collect updates to the messages that the user sent above and check for them updating all
// 5 of them.
let mut collector = serenity::collector::collect(&ctx.shard, move |event| match event {
let mut collector = serenity::collector::collect(&ctx, move |event| match event {
// Only collect MessageUpdate events for the 5 MessageIds we're interested in.
Event::MessageUpdate(event)
if collected.iter().any(|msg| event.message.id == msg.id) =>
Expand Down
6 changes: 2 additions & 4 deletions examples/e14_message_components/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl EventHandler for Handler {
// manually in the EventHandler.
let interaction = match m
.id
.collect_component_interactions(ctx.shard.clone())
.collect_component_interactions(&ctx)
.timeout(Duration::from_secs(60 * 3))
.await
{
Expand Down Expand Up @@ -108,9 +108,7 @@ impl EventHandler for Handler {

// Wait for multiple interactions
let mut interaction_stream =
m.id.collect_component_interactions(ctx.shard.clone())
.timeout(Duration::from_secs(60 * 3))
.stream();
m.id.collect_component_interactions(&ctx).timeout(Duration::from_secs(60 * 3)).stream();

while let Some(interaction) = interaction_stream.next().await {
let sound = &interaction.data.custom_id;
Expand Down
4 changes: 2 additions & 2 deletions examples/testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn message(ctx: &Context, msg: Message) -> Result<(), serenity::Error> {
.await?;
let button_press = msg
.id
.collect_component_interactions(ctx.shard.clone())
.collect_component_interactions(ctx)
.timeout(std::time::Duration::from_secs(10))
.await;
match button_press {
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn message(ctx: &Context, msg: Message) -> Result<(), serenity::Error> {
.await?;

let msg_id = msg.id;
let mut message_updates = serenity::collector::collect(&ctx.shard, move |ev| match ev {
let mut message_updates = serenity::collector::collect(ctx, move |ev| match ev {
Event::MessageUpdate(x) if x.message.id == msg_id => Some(()),
_ => None,
});
Expand Down
43 changes: 23 additions & 20 deletions src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::future::pending;
use futures::{Stream, StreamExt as _};
pub use quick_modal::*;

use crate::gateway::{CollectorCallback, ShardMessenger};
use crate::gateway::CollectorCallback;
use crate::gateway::client::Context;
use crate::internal::prelude::*;
use crate::model::prelude::*;

Expand All @@ -18,10 +19,10 @@ use crate::model::prelude::*;
/// # use std::time::Duration;
/// # use futures::StreamExt as _;
/// # use serenity::model::prelude::Event;
/// # use serenity::gateway::ShardMessenger;
/// # use serenity::gateway::client::Context;
/// # use serenity::collector::collect;
/// # async fn example_(shard: &ShardMessenger) {
/// let stream = collect(shard, |event| match event {
/// # async fn example_(ctx: &Context) {
/// let stream = collect(ctx, |event| match event {
/// Event::ReactionRemove(event) => Some(event.reaction.clone()),
/// _ => None,
/// });
Expand All @@ -33,18 +34,20 @@ use crate::model::prelude::*;
/// .await;
/// # }
/// ```
pub fn collect<T, F>(shard: &ShardMessenger, extractor: F) -> impl Stream<Item = T> + use<T, F>
pub fn collect<T, F>(ctx: &Context, extractor: F) -> impl Stream<Item = T> + use<T, F>
where
T: Send + 'static,
F: Fn(&Event) -> Option<T> + Send + Sync + 'static,
{
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();

// Register an event callback in the shard. It's kept alive as long as we return `true`
shard.add_collector(CollectorCallback(Arc::new(move |event| match extractor(event) {
// If this event matches, we send it to the receiver stream
Some(item) => sender.send(item).is_ok(),
None => !sender.is_closed(),
ctx.collectors.write().push(CollectorCallback(Arc::new(move |event| {
match extractor(event) {
// If this event matches, we send it to the receiver stream
Some(item) => sender.send(item).is_ok(),
None => !sender.is_closed(),
}
})));

// Convert the mpsc Receiver into a Stream
Expand All @@ -62,18 +65,18 @@ macro_rules! make_specific_collector {
#[doc = concat!("A [`", stringify!($collector_type), "`] receives [`", stringify!($item_type), "`]'s match the given filters for a set duration.")]
$( #[ $($meta)* ] )*
#[must_use]
pub struct $collector_type {
shard: ShardMessenger,
pub struct $collector_type<'a> {
ctx: &'a Context,
duration: Option<std::time::Duration>,
filter: Option<Box<dyn Fn(&$item_type) -> bool + Send + Sync>>,
$( $filter_name: Option<$filter_type>, )*
}

impl $collector_type {
impl<'a> $collector_type<'a> {
/// Creates a new collector without any filters configured.
pub fn new(shard: ShardMessenger) -> Self {
pub fn new(ctx: &'a Context) -> Self {
Self {
shard,
ctx,
duration: None,
filter: None,
$( $filter_name: None, )*
Expand Down Expand Up @@ -124,7 +127,7 @@ macro_rules! make_specific_collector {
None => pending::<()>().await,
} };

let stream = collect(&self.shard, move |event| match event {
let stream = collect(&self.ctx, move |event| match event {
$extractor if filters_pass($extracted_item) => Some($extracted_item.clone()),
_ => None,
});
Expand All @@ -139,23 +142,23 @@ macro_rules! make_specific_collector {
}
}

impl IntoFuture for $collector_type {
impl<'a> IntoFuture for $collector_type<'a> {
type Output = Option<$item_type>;
type IntoFuture = futures::future::BoxFuture<'static, Self::Output>;
type IntoFuture = futures::future::BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.next())
}
}

pub trait $collector_trait {
fn $method_name(self, shard_messenger: ShardMessenger) -> $collector_type;
fn $method_name(self, ctx: &Context) -> $collector_type<'_>;
}

$(
impl $collector_trait for $filter_type {
fn $method_name(self, shard_messenger: ShardMessenger) -> $collector_type {
$collector_type::new(shard_messenger).$filter_name(self)
fn $method_name(self, ctx: &Context) -> $collector_type<'_> {
$collector_type::new(ctx).$filter_name(self)
}
}
)*
Expand Down
2 changes: 1 addition & 1 deletion src/collector/quick_modal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<'a> CreateQuickModal<'a> {
);
builder.execute(&ctx.http, interaction_id, token).await?;

let collector = ModalInteractionCollector::new(ctx.shard.clone())
let collector = ModalInteractionCollector::new(ctx)
.custom_ids(vec![FixedString::from_str_trunc(&modal_custom_id)]);

let collector = match self.timeout {
Expand Down
Loading