Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions rust/arrow/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! Transfer data between the Arrow memory format and JSON line-delimited records.
//! Transfer data between the Arrow memory format and JSON
//! line-delimited records. See the module level documentation for the
//! [`reader`] and [`writer`] for usage examples.

pub mod reader;
pub mod writer;

pub use self::reader::Reader;
pub use self::reader::ReaderBuilder;
pub use self::writer::Writer;
pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer};
2 changes: 1 addition & 1 deletion rust/arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! JSON Reader
//! # JSON Reader
//!
//! This JSON reader allows JSON line-delimited files to be read into the Arrow memory
//! model. Records are loaded in batches and are then converted from row-based data to
Expand Down
252 changes: 228 additions & 24 deletions rust/arrow/src/json/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.

//! JSON Writer
//! # JSON Writer
//!
//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also
//! provides a Writer struct to help serialize record batches directly into line-delimited JSON
//! objects as bytes.
//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
//! JSON objects or JSON formatted byte streams.
//!
//! Serialize record batches into array of JSON objects:
//! ## Writing JSON Objects
//!
//! To serialize [`RecordBatch`]es into array of
//! [JSON](https://docs.serde.rs/serde_json/) objects, use
//! [`record_batches_to_json_rows`]:
//!
//! ```
//! use std::sync::Arc;
Expand All @@ -42,7 +45,39 @@
//! );
//! ```
//!
//! Serialize record batches into line-delimited JSON bytes:
//! ## Writing JSON formatted byte streams
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc examples are probably the best way to see how to use this structure and what the changes looked like

//!
//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
//! [`LineDelimitedWriter`]:
//!
//! ```
//! use std::sync::Arc;
//!
//! use arrow::array::Int32Array;
//! use arrow::datatypes::{DataType, Field, Schema};
//! use arrow::json;
//! use arrow::record_batch::RecordBatch;
//!
//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
//!
//! // Write the record batch out as JSON
//! let buf = Vec::new();
//! let mut writer = json::LineDelimitedWriter::new(buf);
//! writer.write_batches(&vec![batch]).unwrap();
//! writer.finish().unwrap();
//!
//! // Get the underlying buffer back,
//! let buf = writer.into_inner();
//! assert_eq!(r#"{"a":1}
//! {"a":2}
//! {"a":3}
//!"#, String::from_utf8(buf).unwrap())
//! ```
//!
//! To serialize [`RecordBatch`]es into a well formed JSON array, use
//! [`ArrayWriter`]:
//!
//! ```
//! use std::sync::Arc;
Expand All @@ -56,13 +91,19 @@
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
//!
//! // Write the record batch out as a JSON array
//! let buf = Vec::new();
//! let mut writer = json::Writer::new(buf);
//! let mut writer = json::ArrayWriter::new(buf);
//! writer.write_batches(&vec![batch]).unwrap();
//! writer.finish().unwrap();
//!
//! // Get the underlying buffer back,
//! let buf = writer.into_inner();
//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap())
//! ```

use std::io::{BufWriter, Write};
use std::iter;
use std::{fmt::Debug, io::Write};

use serde_json::map::Map as JsonMap;
use serde_json::Value;
Expand Down Expand Up @@ -108,6 +149,7 @@ fn struct_array_to_jsonmap_array(
inner_objs
}

/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> {
match array.data_type() {
DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(),
Expand Down Expand Up @@ -286,6 +328,8 @@ fn set_column_for_json_rows(
}
}

/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`serde_json::map::JsonMap`]s (objects)
pub fn record_batches_to_json_rows(
batches: &[RecordBatch],
) -> Vec<JsonMap<String, Value>> {
Expand All @@ -309,40 +353,167 @@ pub fn record_batches_to_json_rows(
rows
}

/// A JSON writer
#[derive(Debug)]
pub struct Writer<W: Write> {
writer: BufWriter<W>,
/// This trait defines how to format a sequence of JSON objects to a
/// byte stream.
pub trait JsonFormat: Debug + Default {
#[inline]
/// write any bytes needed at the start of the file to the writer
fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the start of each row
fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the end of each row
fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

/// write any bytes needed for the start of each row
fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}
}

impl<W: Write> Writer<W> {
pub fn new(writer: W) -> Self {
Self::from_buf_writer(BufWriter::new(writer))
/// Produces JSON output with one record per line. For example
///
/// ```json
/// {"foo":1}
/// {"bar":1}
///
/// ```
#[derive(Debug, Default)]
pub struct LineDelimited {}

impl JsonFormat for LineDelimited {
fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"\n")?;
Ok(())
}
}

/// Produces JSON output as a single JSON array. For example
///
/// ```json
/// [{"foo":1},{"bar":1}]
/// ```
#[derive(Debug, Default)]
pub struct JsonArray {}

impl JsonFormat for JsonArray {
fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"[")?;
Ok(())
}

fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<()> {
if !is_first_row {
writer.write_all(b",")?;
}
Ok(())
}

pub fn from_buf_writer(writer: BufWriter<W>) -> Self {
Self { writer }
fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"]")?;
Ok(())
}
}

/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects
pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;

/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays
pub type ArrayWriter<W> = Writer<W, JsonArray>;

/// A JSON writer which serializes [`RecordBatch`]es to a stream of
/// `u8` encoded JSON objects. See the module level documentation for
/// detailed usage and examples. The specific format of the stream is
/// controlled by the [`JsonFormat`] type parameter.
#[derive(Debug)]
pub struct Writer<W, F>
where
W: Write,
F: JsonFormat,
{
/// Underlying writer to use to write bytes
writer: W,

/// Has the writer output any records yet?
started: bool,

/// Is the writer finished?
finished: bool,

/// Determines how the byte stream is formatted
format: F,
}

impl<W, F> Writer<W, F>
where
W: Write,
F: JsonFormat,
{
/// Construct a new writer
pub fn new(writer: W) -> Self {
Self {
writer,
started: false,
finished: false,
format: F::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default format? I can't tell from the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a default format for Writer (I couldn't figure out how to make one).

This line makes an instance of the formatter. Though come to think of it, none of the formatters actually have state now 🤔 I could move some of the state into JsonArray maybe to make that clearer

}
}

/// Write a single JSON row to the output writer
pub fn write_row(&mut self, row: &Value) -> Result<()> {
let is_first_row = !self.started;
if !self.started {
self.format.start_stream(&mut self.writer)?;
self.started = true;
}

self.format.start_row(&mut self.writer, is_first_row)?;
self.writer.write_all(&serde_json::to_vec(row)?)?;
self.writer.write_all(b"\n")?;
self.format.end_row(&mut self.writer)?;
Ok(())
}

/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> {
for row in record_batches_to_json_rows(batches) {
self.write_row(&Value::Object(row))?;
}
Ok(())
}

/// Finishes the output stream. This function must be called after
/// all record batches have been produced. (e.g. producing the final `']'` if writing
/// arrays.
pub fn finish(&mut self) -> Result<()> {
if self.started && !self.finished {
self.format.end_stream(&mut self.writer)?;
self.finished = true;
}
Ok(())
}

/// Unwraps this `Writer<W>`, returning the underlying writer
pub fn into_inner(self) -> W {
self.writer
}
}

#[cfg(test)]
mod tests {
use std::fs::{read_to_string, File};
use std::sync::Arc;

use serde_json::json;

use crate::buffer::*;
use crate::json::reader::*;

Expand All @@ -364,7 +535,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand Down Expand Up @@ -423,7 +594,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand Down Expand Up @@ -465,7 +636,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand Down Expand Up @@ -523,7 +694,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand Down Expand Up @@ -597,7 +768,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand All @@ -621,7 +792,7 @@ mod tests {

let mut buf = Vec::new();
{
let mut writer = Writer::new(&mut buf);
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

Expand Down Expand Up @@ -653,4 +824,37 @@ mod tests {
fn write_basic_nulls() {
test_write_for_file("test/data/basic_nulls.json");
}

#[test]
fn json_writer_empty() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
writer.finish().unwrap();
assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), "");
}

#[test]
fn json_writer_one_row() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
writer.write_row(&v).unwrap();
writer.finish().unwrap();
assert_eq!(
String::from_utf8(writer.into_inner()).unwrap(),
r#"[{"an":"object"}]"#
);
}

#[test]
fn json_writer_two_rows() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
writer.write_row(&v).unwrap();
let v = json!({ "another": "object" });
writer.write_row(&v).unwrap();
writer.finish().unwrap();
assert_eq!(
String::from_utf8(writer.into_inner()).unwrap(),
r#"[{"an":"object"},{"another":"object"}]"#
);
}
}