Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable External ArrowColumnWriter Access #4859

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ use std::ops::Range;

/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`]
/// for each leaf column encountered
pub(crate) fn calculate_array_levels(
array: &ArrayRef,
field: &Field,
) -> Result<Vec<LevelInfo>> {
pub fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<LevelInfo>> {
let mut builder = LevelInfoBuilder::try_new(field, Default::default())?;
builder.write(array, 0..array.len());
Ok(builder.finish())
Expand Down Expand Up @@ -538,7 +535,7 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking into account
/// any non-primitive parents it may have in the arrow representation
#[derive(Debug, Eq, PartialEq, Clone)]
pub(crate) struct LevelInfo {
pub struct LevelInfo {
/// Array's definition levels
///
/// Present if `max_def_level != 0`
Expand Down
46 changes: 39 additions & 7 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use thrift::protocol::{TCompactOutputProtocol, TSerializable};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow_schema::{
ArrowError, DataType as ArrowDataType, IntervalUnit, Schema, SchemaRef,
};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
Expand All @@ -52,7 +54,7 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use levels::{calculate_array_levels, LevelInfo};

mod byte_array;
mod levels;
pub mod levels;

/// Arrow writer
///
Expand Down Expand Up @@ -299,9 +301,9 @@ impl Read for ArrowColumnChunkReader {

/// A shared [`ArrowColumnChunk`]
///
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
/// This allows it to be owned by lower level page writers whilst allowing access via
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
pub type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a hack, that I stomached largely because it was an internal implementation detail, is there some way we can keep this hidden as an implementation detail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some additional refactoring to split out the Vec<(SharedColumnChunk, ArrowColumnWriter)> into two separate Vecs. This way I could return ownership of only the ArrowColumnWriters. SharedColumnChunk is private again.


#[derive(Default)]
struct ArrowPageWriter {
Expand Down Expand Up @@ -347,8 +349,8 @@ impl PageWriter for ArrowPageWriter {
}
}

/// Encodes a leaf column to [`ArrowPageWriter`]
enum ArrowColumnWriter {
/// Can be passed to [write_leaves] function to serialize arrays to [SharedColumnChunk]s
pub enum ArrowColumnWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the enum variants private as well, i.e. something like

pub struct ArrowColumnWriter(ArrowColumnWriterImpl);

enum ArrowColumnWriterImpl {
    ...
}

ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
Column(ColumnWriter<'static>),
}
Expand Down Expand Up @@ -398,6 +400,17 @@ impl ArrowRowGroupWriter {
Ok(())
}

pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}

/// Converts this [ArrowRowGroupWriter] into a collection of individual [ArrowColumnWriter]s
/// and associated [SharedColumnChunk]s. This permits the caller greater control over how
/// data is serialized, such as via parallel threads or async tasks.
pub fn into_col_writers(self) -> Vec<(SharedColumnChunk, ArrowColumnWriter)> {
self.writers
}

pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
self.writers
.into_iter()
Expand All @@ -414,6 +427,25 @@ impl ArrowRowGroupWriter {
}
}

/// Represents components of a [ArrowRowGroupWriter] which have been broken apart by passing ownership
/// back out to the caller.
type DeconstructedRowGroupWriterComponents =
(Arc<Schema>, Vec<(SharedColumnChunk, ArrowColumnWriter)>);

impl From<DeconstructedRowGroupWriterComponents> for ArrowRowGroupWriter {
fn from(value: DeconstructedRowGroupWriterComponents) -> Self {
let schema = value.0;
let writers = value.1;
Self {
writers,
schema,
// The caller is responsible for tracking buffered_rows when dissasembling and
// reasembling ArrowRowGroupWriter
buffered_rows: 0,
}
}
}

/// Get an [`ArrowColumnWriter`] along with a reference to its [`SharedColumnChunk`]
fn get_arrow_column_writer(
data_type: &ArrowDataType,
Expand Down Expand Up @@ -479,7 +511,7 @@ fn get_arrow_column_writer(
}

/// Write the leaves of `array` in depth-first order to `writers` with `levels`
fn write_leaves<'a, W>(
pub fn write_leaves<'a, W>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some way we could avoid making this and the accompanying LevelInfo structures public? I'm not really sure we want to make a hard API commitment to not changing them...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I did some refactoring and created an ArrowColumnWriter::write method that abstracts these low level implementation details for external users.

writers: &mut W,
levels: &mut IntoIter<LevelInfo>,
array: &(dyn Array + 'static),
Expand Down
Loading