Skip to content

Conversation

@jorgecarleitao
Copy link
Member

@jorgecarleitao jorgecarleitao commented Sep 20, 2020

This is a proposal to change how we programmatically iterate over record batches in arrow and datafusion.

Instead of

pub fn collect(
    it: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
) -> Result<Vec<RecordBatch>> {
    let mut reader = it.lock().unwrap();
    let mut results: Vec<RecordBatch> = vec![];
    loop {
        match reader.next_batch() {
            Ok(Some(batch)) => {
                results.push(batch);
            }
            Ok(None) => {
                // end of result set
                return Ok(results);
            }
            Err(e) => return Err(ExecutionError::from(e)),
        }
    }
}

use

pub fn collect(
    it: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
) -> Result<Vec<RecordBatch>> {
    it.lock()
        .unwrap()
        .into_iter()
        .collect::<ArrowResult<Vec<_>>>()
        .map_err(|e| ExecutionError::from(e))
}

I.e. via the Iterator trait.

This allow us to write more expressive code, as well as offer a well documented and popular API to our users (Iterator).

Finally, this change also opens the possibility to implement future::Stream, the async version of Iterator.

@github-actions
Copy link

@jorgecarleitao jorgecarleitao changed the title ARROW-10046: [Rust] [DataFusion] Made *Iterator implement Iterator ARROW-10046: [Rust] [DataFusion] Made RecordBatchReader implement Iterator Sep 20, 2020
@jorgecarleitao jorgecarleitao marked this pull request as ready for review September 22, 2020 15:20
@jorgecarleitao
Copy link
Member Author

FYI, I am trying to summarize how DataFusion iterates over data, and I came up with this summary.

This is a small conceptual change, but of fundamental importance:

We can now build the iterator API on top of next_batch, as well
as a futures::Stream trait.
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this will be a breaking change for any code that currently uses RecordBatchReader I think it is a significant improvement and I vote (not that I am sure what my vote counts for :) ) that it is merged in.

If the API breakage is a concern (by removing RecordBatchReader::next_batch I have a suggestion, inline, that could potentially mitigate that)

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. Thanks @jorgecarleitao

@andygrove andygrove closed this in 0597f48 Oct 2, 2020
andygrove pushed a commit that referenced this pull request Oct 3, 2020
This PR is built on top of #8225 and Replaces `Arc<Mutex<dyn ...>>` by `Box<dyn ...>`.

In the TopK example, I had to move some functions away from the `impl`. This is because `self` cannot be borrowed as mutable and immutable at the same time, and, during iteration, it was being borrowed as mutable (to update the BTree) and as immutable (to access the `input`). There is probably a better way of achieving this e.g. via interior mutability.

Closes #8307 from jorgecarleitao/box_iterator

Authored-by: Jorge C. Leitao <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
@jorgecarleitao jorgecarleitao deleted the reader_iterator branch December 14, 2020 07:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants