Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable External ArrowColumnWriter Access #4859

Closed

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Sep 25, 2023

Which issue does this PR close?

Related to: #1718
Enables: apache/datafusion#7655

Rationale for this change

#4850 enabled external access to ArrowRowGroupWriter so downstream users could orchestrate serialization of row groups in parallel on threads/tokio tasks as desired. This PR goes one level deeper to make ArrowColumnWriter and associated structs/functions public, so that downstream users can serialize columns in parallel.

This PR also adds some utility methods to break apart and reconstruct ArrowRowGroupWriter. The idea is to do the following:

  1. Initialize ArrowRowGroupWriter
  2. Break into component ArrowColumnWriters and distribute to threads/tasks
  3. Serialize columns in parallel
  4. Join column writers back to main thread, reconstruct ArrowRowGroupWriter and finalize the row group

The above strategy is implemented in apache/datafusion#7655.

What changes are included in this PR?

ArrowColumnWriter and associated sturcts/functions are marked pub. Additional utility methods implemented for ArrowRowGroupWriter.

Are there any user-facing changes?

Additional structs and functions are public.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Sep 25, 2023
@@ -479,7 +511,7 @@ fn get_arrow_column_writer(
}

/// Write the leaves of `array` in depth-first order to `writers` with `levels`
fn write_leaves<'a, W>(
pub fn write_leaves<'a, W>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some way we could avoid making this and the accompanying LevelInfo structures public? I'm not really sure we want to make a hard API commitment to not changing them...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I did some refactoring and created an ArrowColumnWriter::write method that abstracts these low level implementation details for external users.

/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
pub type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a hack, that I stomached largely because it was an internal implementation detail, is there some way we can keep this hidden as an implementation detail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some additional refactoring to split out the Vec<(SharedColumnChunk, ArrowColumnWriter)> into two separate Vecs. This way I could return ownership of only the ArrowColumnWriters. SharedColumnChunk is private again.

enum ArrowColumnWriter {
/// Serializes [ArrayRef]s to [ArrowColumnChunk]s which can be concatenated
/// to form a parquet row group
pub enum ArrowColumnWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the enum variants private as well, i.e. something like

pub struct ArrowColumnWriter(ArrowColumnWriterImpl);

enum ArrowColumnWriterImpl {
    ...
}

Comment on lines +425 to +435
/// Takes ownership of all [ArrowColumnWriter]s from this [ArrowRowGroupWriter]
/// Caller must restore ownership with give_col_writers before calling close method.
pub fn take_col_writers(&mut self) -> Vec<ArrowColumnWriter> {
self.writers.drain(..).collect()
}

/// Restores ownership of all [ArrowColumnWriter]s. Caller is responsible for
/// returning the [Vec] in the same order returned by take_col_writers method.
pub fn give_col_writers(&mut self, writers: Vec<ArrowColumnWriter>) {
self.writers = writers;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of this API tbh, I'll have a play and see what I can come up with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let me know if you come up with something more elegant!

My original attempt was just to provide a mutable reference to the writers, but it is more challenging to handle mutable references safely vs. passing ownership in parallel async tasks (I considered giving https://docs.rs/async-scoped/latest/async_scoped/ a try but decided against it).

I moved away from entirely deconstructing the ArrowRowGroupWriterin order to keep the SharedColumnChunk private.

/// Serializes an [ArrayRef] to a [ArrowColumnChunk] for an in progress row group.
pub fn write(&mut self, array: ArrayRef, field: Arc<Field>) -> Result<()> {
let mut levels = calculate_array_levels(&array, &field)?.into_iter();
let mut writer_iter = std::iter::once(self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually wrong if array is nested, and therefore comprises multiple leaf columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, makes sense. Perhaps we could have something like:

pub struct ArrowColumnWriter(Vec<ArrowColumnWriterImpl>);

enum ArrowColumnWriterImpl{
...
}

which for a non nested column would contain only one ArrowColumnWriterImpl, but could hold multiple in the case of nested columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a play with a slightly different API for this, will report back 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe #4871 the alternate API proposal?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants