Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Owned consumer stream #27

Open
carlocorradini opened this issue May 22, 2024 · 5 comments
Open

Owned consumer stream #27

carlocorradini opened this issue May 22, 2024 · 5 comments

Comments

@carlocorradini
Copy link
Contributor

Motivation

Since the message is behind a lifetime, it is currently not possible to return impl Stream<Item =?> from a function using consumer stream.

The following cannot be compiled (cannot return value referencing temporary value
returns a value referencing data owned by the current function
):

async fn my_stream(&self, options: SeaConsumerOptions) -> impl Stream<Item = String> {
    self
        .streamer // SeaStreamer
        .create_consumer(&[StreamKey::new("my_key").unwrap()], options).unwrap()
        .stream()
        .filter_map(|message| {
            message.ok().and_then(|message| {
                message
                    .message()
                    .as_str()
                    .ok()
                    .and_then(|message| Some(message.to_string()))
            })
        })
}

Proposed Solutions

I would want something like the two functions that redis provides for pubsub streams: one that consumes the stream with self and returns only Stream without the lifetime, and another employing &mut self that returns Stream + '_.
into_stream or something similar.

@tyt2y3
Copy link
Member

tyt2y3 commented May 22, 2024

Thank you for your suggestion. It is definitely doable with a custom struct owning the object and impl the Stream, similar to how it is done in flume

@carlocorradini
Copy link
Contributor Author

Any idea or implementation in your mind? 🤗🤯

@tyt2y3
Copy link
Member

tyt2y3 commented May 23, 2024

I just realized it's very difficult due to self-referencing lifetime.

The following does not compile:

#[derive(Debug)]
pub struct ConsumerStream<'a, C: Consumer + 'static> {
    con: C,
    fut: Option<C::NextFuture<'a>>,
}


/// Common interface of consumers, to be implemented by all backends.
pub trait Consumer: Sized + Send + Sync {
    ...
    fn into_stream<'a>(self) -> ConsumerStream<'a, Self> {
        ConsumerStream {
            con: self,
            fut: None,
        }
    }
}


impl<'a, C: Consumer> Stream for ConsumerStream<'a, C>
where
    Self: 'a,
{
    type Item = StreamResult<C::Message<'a>, C::Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        use std::task::Poll::{Pending, Ready};
        if self.fut.is_none() {
            self.fut = Some(self.con.next());
        }
        match &mut self.fut {
            Some(fut) => match std::pin::Pin::new(fut).poll(cx) {
                Ready(res) => {
                    self.fut = None;
                    Ready(Some(res))
                }
                Pending => Pending,
            },
            None => unreachable!(),
        }
    }
}

@carlocorradini
Copy link
Contributor Author

I am aware that this is not simple or straightforward, but I believe it is necessary.
Many libraries support this, and most patterns require a function to return a stream (async-graphql subscriptions).
We can try, and I'm willing to help (though I'm not as experienced as you are) 🤗🥳🤯

@tyt2y3
Copy link
Member

tyt2y3 commented Jun 2, 2024

I have had a design that works with the Redis backend, yeah sadly we have to implement per backend and leverage flume's into_stream.

I am not sure if this will be easily doable for the Kafka backend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants