diff --git a/rust/arrow/src/json/mod.rs b/rust/arrow/src/json/mod.rs index 85ab6aec01b..6b3df188a47 100644 --- a/rust/arrow/src/json/mod.rs +++ b/rust/arrow/src/json/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Transfer data between the Arrow memory format and JSON line-delimited records. +//! Transfer data between the Arrow memory format and JSON +//! line-delimited records. See the module level documentation for the +//! [`reader`] and [`writer`] for usage examples. pub mod reader; pub mod writer; pub use self::reader::Reader; pub use self::reader::ReaderBuilder; -pub use self::writer::Writer; +pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer}; diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs index fd1a83751b5..bc94bb50624 100644 --- a/rust/arrow/src/json/reader.rs +++ b/rust/arrow/src/json/reader.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! JSON Reader +//! # JSON Reader //! //! This JSON reader allows JSON line-delimited files to be read into the Arrow memory //! model. Records are loaded in batches and are then converted from row-based data to diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs index bdd29572f58..dbb70cf897e 100644 --- a/rust/arrow/src/json/writer.rs +++ b/rust/arrow/src/json/writer.rs @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! JSON Writer +//! # JSON Writer //! -//! This JSON writer allows converting Arrow record batches into array of JSON objects. It also -//! provides a Writer struct to help serialize record batches directly into line-delimited JSON -//! objects as bytes. +//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of +//! JSON objects or JSON formatted byte streams. //! -//! Serialize record batches into array of JSON objects: +//! ## Writing JSON Objects +//! +//! To serialize [`RecordBatch`]es into array of +//! [JSON](https://docs.serde.rs/serde_json/) objects, use +//! [`record_batches_to_json_rows`]: //! //! ``` //! use std::sync::Arc; @@ -42,7 +45,39 @@ //! ); //! ``` //! -//! Serialize record batches into line-delimited JSON bytes: +//! ## Writing JSON formatted byte streams +//! +//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use +//! [`LineDelimitedWriter`]: +//! +//! ``` +//! use std::sync::Arc; +//! +//! use arrow::array::Int32Array; +//! use arrow::datatypes::{DataType, Field, Schema}; +//! use arrow::json; +//! use arrow::record_batch::RecordBatch; +//! +//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); +//! let a = Int32Array::from(vec![1, 2, 3]); +//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); +//! +//! // Write the record batch out as JSON +//! let buf = Vec::new(); +//! let mut writer = json::LineDelimitedWriter::new(buf); +//! writer.write_batches(&vec![batch]).unwrap(); +//! writer.finish().unwrap(); +//! +//! // Get the underlying buffer back, +//! let buf = writer.into_inner(); +//! assert_eq!(r#"{"a":1} +//! {"a":2} +//! {"a":3} +//!"#, String::from_utf8(buf).unwrap()) +//! ``` +//! +//! To serialize [`RecordBatch`]es into a well formed JSON array, use +//! [`ArrayWriter`]: //! //! ``` //! use std::sync::Arc; @@ -56,13 +91,19 @@ //! let a = Int32Array::from(vec![1, 2, 3]); //! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); //! +//! // Write the record batch out as a JSON array //! let buf = Vec::new(); -//! let mut writer = json::Writer::new(buf); +//! let mut writer = json::ArrayWriter::new(buf); //! writer.write_batches(&vec![batch]).unwrap(); +//! writer.finish().unwrap(); +//! +//! // Get the underlying buffer back, +//! let buf = writer.into_inner(); +//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap()) //! ``` -use std::io::{BufWriter, Write}; use std::iter; +use std::{fmt::Debug, io::Write}; use serde_json::map::Map as JsonMap; use serde_json::Value; @@ -108,6 +149,7 @@ fn struct_array_to_jsonmap_array( inner_objs } +/// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s pub fn array_to_json_array(array: &ArrayRef) -> Vec { match array.data_type() { DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(), @@ -286,6 +328,8 @@ fn set_column_for_json_rows( } } +/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON +/// [`serde_json::map::JsonMap`]s (objects) pub fn record_batches_to_json_rows( batches: &[RecordBatch], ) -> Vec> { @@ -309,33 +353,158 @@ pub fn record_batches_to_json_rows( rows } -/// A JSON writer -#[derive(Debug)] -pub struct Writer { - writer: BufWriter, +/// This trait defines how to format a sequence of JSON objects to a +/// byte stream. +pub trait JsonFormat: Debug + Default { + #[inline] + /// write any bytes needed at the start of the file to the writer + fn start_stream(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } + + #[inline] + /// write any bytes needed for the start of each row + fn start_row(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> { + Ok(()) + } + + #[inline] + /// write any bytes needed for the end of each row + fn end_row(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } + + /// write any bytes needed for the start of each row + fn end_stream(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } } -impl Writer { - pub fn new(writer: W) -> Self { - Self::from_buf_writer(BufWriter::new(writer)) +/// Produces JSON output with one record per line. For example +/// +/// ```json +/// {"foo":1} +/// {"bar":1} +/// +/// ``` +#[derive(Debug, Default)] +pub struct LineDelimited {} + +impl JsonFormat for LineDelimited { + fn end_row(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"\n")?; + Ok(()) + } +} + +/// Produces JSON output as a single JSON array. For example +/// +/// ```json +/// [{"foo":1},{"bar":1}] +/// ``` +#[derive(Debug, Default)] +pub struct JsonArray {} + +impl JsonFormat for JsonArray { + fn start_stream(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"[")?; + Ok(()) + } + + fn start_row(&self, writer: &mut W, is_first_row: bool) -> Result<()> { + if !is_first_row { + writer.write_all(b",")?; + } + Ok(()) } - pub fn from_buf_writer(writer: BufWriter) -> Self { - Self { writer } + fn end_stream(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"]")?; + Ok(()) } +} + +/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects +pub type LineDelimitedWriter = Writer; +/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays +pub type ArrayWriter = Writer; + +/// A JSON writer which serializes [`RecordBatch`]es to a stream of +/// `u8` encoded JSON objects. See the module level documentation for +/// detailed usage and examples. The specific format of the stream is +/// controlled by the [`JsonFormat`] type parameter. +#[derive(Debug)] +pub struct Writer +where + W: Write, + F: JsonFormat, +{ + /// Underlying writer to use to write bytes + writer: W, + + /// Has the writer output any records yet? + started: bool, + + /// Is the writer finished? + finished: bool, + + /// Determines how the byte stream is formatted + format: F, +} + +impl Writer +where + W: Write, + F: JsonFormat, +{ + /// Construct a new writer + pub fn new(writer: W) -> Self { + Self { + writer, + started: false, + finished: false, + format: F::default(), + } + } + + /// Write a single JSON row to the output writer pub fn write_row(&mut self, row: &Value) -> Result<()> { + let is_first_row = !self.started; + if !self.started { + self.format.start_stream(&mut self.writer)?; + self.started = true; + } + + self.format.start_row(&mut self.writer, is_first_row)?; self.writer.write_all(&serde_json::to_vec(row)?)?; - self.writer.write_all(b"\n")?; + self.format.end_row(&mut self.writer)?; Ok(()) } + /// Convert the [`RecordBatch`] into JSON rows, and write them to the output pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> { for row in record_batches_to_json_rows(batches) { self.write_row(&Value::Object(row))?; } Ok(()) } + + /// Finishes the output stream. This function must be called after + /// all record batches have been produced. (e.g. producing the final `']'` if writing + /// arrays. + pub fn finish(&mut self) -> Result<()> { + if self.started && !self.finished { + self.format.end_stream(&mut self.writer)?; + self.finished = true; + } + Ok(()) + } + + /// Unwraps this `Writer`, returning the underlying writer + pub fn into_inner(self) -> W { + self.writer + } } #[cfg(test)] @@ -343,6 +512,8 @@ mod tests { use std::fs::{read_to_string, File}; use std::sync::Arc; + use serde_json::json; + use crate::buffer::*; use crate::json::reader::*; @@ -364,7 +535,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -423,7 +594,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -465,7 +636,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -523,7 +694,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -597,7 +768,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -621,7 +792,7 @@ mod tests { let mut buf = Vec::new(); { - let mut writer = Writer::new(&mut buf); + let mut writer = LineDelimitedWriter::new(&mut buf); writer.write_batches(&[batch]).unwrap(); } @@ -653,4 +824,37 @@ mod tests { fn write_basic_nulls() { test_write_for_file("test/data/basic_nulls.json"); } + + #[test] + fn json_writer_empty() { + let mut writer = ArrayWriter::new(vec![] as Vec); + writer.finish().unwrap(); + assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), ""); + } + + #[test] + fn json_writer_one_row() { + let mut writer = ArrayWriter::new(vec![] as Vec); + let v = json!({ "an": "object" }); + writer.write_row(&v).unwrap(); + writer.finish().unwrap(); + assert_eq!( + String::from_utf8(writer.into_inner()).unwrap(), + r#"[{"an":"object"}]"# + ); + } + + #[test] + fn json_writer_two_rows() { + let mut writer = ArrayWriter::new(vec![] as Vec); + let v = json!({ "an": "object" }); + writer.write_row(&v).unwrap(); + let v = json!({ "another": "object" }); + writer.write_row(&v).unwrap(); + writer.finish().unwrap(); + assert_eq!( + String::from_utf8(writer.into_inner()).unwrap(), + r#"[{"an":"object"},{"another":"object"}]"# + ); + } }