Skip to content

Can the ConsumerStream implement Clone? #4267

@sagoez

Description

@sagoez

Hello,

I frequently encounter the need to write code like the following (whether using Fluvio or Kafka):

async fn process_chunk_with_shutdown<'a>(
    &self,
    ...
) -> Result<Unit, Error> {
    match chunk {
        Ok(chunk) => {
            tokio::select! {
                _ = subsys.on_shutdown_requested() => {
                    // Perform shutdown-related actions here.
                },
                _ = self.process_chunk(ctx, *target, chunk.as_ref()) => {
                   ...
                },
            }
            Ok(())
        }
        Err(e) => ...
    }
}

async fn consume_chunks<'a>(
    &self,
    ...
) -> Result<Unit, Error> {
    let mut stream = ...

    stream
        .try_ready_chunks(consumer_batch_size)
        .map(|chunk| {
            async move {
                self.process_chunk_with_shutdown(chunk, ctx, target, subsys)
                    .await
            }
        })
        .buffered(consumer_batch_size)
        .collect::<Vec<_>>()
        .await;

    Ok(())
}

In essence, trying to read from a stream and process its chunks. However, with Fluvio I'm currently facing a limitation: the ConsumerStream isn't clonable, so I can't wrap it in an Arc and/or pass it by reference to a StreamIterator that consumes it (such as try_ready_chunks).

As far as I can tell, the only way to consume records via the Rust client while manually committing offsets is to process them one by one [next].

Would it be reasonable to implement Clone for ConsumerStream, or perhaps provide an operator that allows batch processing? If I'm missing something here, I’d greatly appreciate clarification. I'd also be happy to contribute if there’s something I can help with!

Thanks in advance for your time and support!

Metadata

Metadata

Assignees

Labels

no-staleOpt-out of closing issue due to no activity

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions