Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion rust/parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.

use std::fs::File;
use std::{
io::{Seek, SeekFrom, Write},
rc::Rc,
};

use arrow::array;
use arrow::datatypes::Schema;
use byteorder::{ByteOrder, LittleEndian};
use parquet_format as parquet;
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};

use crate::basic::PageType;
use crate::basic::{PageType, Repetition, Type};
use crate::column::{
page::{CompressedPage, Page, PageWriteSpec, PageWriter},
writer::{get_column_writer, ColumnWriter},
};
use crate::errors::{ParquetError, Result};
use crate::file::properties::WriterProperties;
use crate::file::{
metadata::*, properties::WriterPropertiesPtr, reader::TryClone,
statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC,
};
use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr};
use crate::util::io::{FileSink, Position};
use arrow::record_batch::RecordBatch;

// ----------------------------------------------------------------------
// APIs for file & row group writers
Expand Down Expand Up @@ -521,6 +526,75 @@ impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
}
}

struct ArrowWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is more appropriate to put this in src/arrow/writer?

writer: SerializedFileWriter<File>,
rows: i64,
}

impl ArrowWriter {
pub fn new(file: File, _arrow_schema: &Schema) -> Self {
//TODO convert Arrow schema to Parquet schema
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(
types::Type::primitive_type_builder("a", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
),
Rc::new(
types::Type::primitive_type_builder("b", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
);
let props = Rc::new(WriterProperties::builder().build());
let file_writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();

Self {
writer: file_writer,
rows: 0,
}
}

pub fn write(&mut self, batch: &RecordBatch) {
let mut row_group_writer = self.writer.next_row_group().unwrap();
for i in 0..batch.schema().fields().len() {
let col_writer = row_group_writer.next_column().unwrap();
if let Some(mut writer) = col_writer {
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let array = batch
.column(i)
.as_any()
.downcast_ref::<array::Int32Array>()
.unwrap();
self.rows += typed
.write_batch(array.value_slice(0, array.len()), None, None)
.unwrap() as i64;
}
//TODO add other types
Copy link
Member

@wesm wesm Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of initial things to keep in mind

  • Writes of both Nullable (OPTIONAL) and non-nullable (REQUIRED) fields
  • You can optimize the special case where a nullable field's data has no nulls
  • A good amount of code is required to handle converting from the Arrow physical form of various logical types to the Parquet equivalent one, see https://github.com/apache/arrow/blob/master/cpp/src/parquet/column_writer.cc for details
  • It would be worth thinking up front about how dictionary-encoded data is handled both on the Arrow write and Arrow read paths. In parquet-cpp we initially discarded Arrow DictionaryArrays on write (casting e.g. Dictionary to dense String), and through real world need I was forced to revisit this (quite painfully) to enable Arrow dictionaries to survive roundtrips to Parquet format, and also achieve better performance and memory use in both reads and writes. You can certainly do a dictionary-to-dense conversion like we did, but you may someday find yourselves doing the same painful refactor that I did to make dictionary write and read not only more efficient but also dictionary order preserving.

_ => {
unimplemented!();
}
}
row_group_writer.close_column(writer).unwrap();
}
}
self.writer.close_row_group(row_group_writer).unwrap();
}

pub fn close(&mut self) {
self.writer.close().unwrap();
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -538,6 +612,36 @@ mod tests {
use crate::record::RowAccessor;
use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file};

use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[test]
fn arrow_writer() {
// define schema
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

// create some data
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 3, 4, 5]);

// build a record batch
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(a), Arc::new(b)],
)
.unwrap();

let file = File::create("test.parquet").unwrap();
let mut writer = ArrowWriter::new(file, &schema);
writer.write(&batch);
writer.close();
}

#[test]
fn test_file_writer_error_after_close() {
let file = get_temp_file("test_file_writer_error_after_close", &[]);
Expand Down