Skip to content

Commit

Permalink
revert back to moving buffer around
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Jul 21, 2023
1 parent fba4fae commit be55e96
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 118 deletions.
5 changes: 2 additions & 3 deletions datafusion-examples/examples/dataframe-to-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;

//use datafusion::prelude::data;
use object_store::aws::AmazonS3Builder;
use std::env;
use std::sync::Arc;
use url::Url;

/// This example demonstrates executing a simple query against an Arrow data source (a directory
/// with multiple Parquet files) and fetching results
/// This example demonstrates querying data from AmazonS3 and writing
/// the result of a query back to AmazonS3
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
Expand Down
29 changes: 19 additions & 10 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::datasource::physical_plan::RecordBatchMultiPartWriter;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
Expand All @@ -35,6 +34,7 @@ use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use tokio::io::AsyncWriteExt;

use super::FileScanConfig;

Expand Down Expand Up @@ -576,17 +576,26 @@ pub async fn plan_to_csv(
let file = object_store::path::Path::parse(filename)?;

let mut stream = plan.execute(i, task_ctx.clone())?;

join_set.spawn(async move {
let (_, multipart_writer) = storeref.put_multipart(&file).await?;
let mut multipart_rb_writer =
RecordBatchMultiPartWriter::new(csv::Writer::new, multipart_writer, None);
while let Some(next_batch) = stream.next().await {
let batch = next_batch?;
multipart_rb_writer.write_rb(batch).await?;
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buffer = Vec::with_capacity(1024);
//only write headers on first iteration
let mut write_headers = true;
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = csv::WriterBuilder::new()
.has_headers(write_headers)
.build(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buffer.clear();
//prevent writing headers more than once
write_headers = false;
}

multipart_rb_writer.shutdown().await
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
});
}

Expand Down
26 changes: 13 additions & 13 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::datasource::physical_plan::RecordBatchMultiPartWriter;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
Expand All @@ -44,6 +43,7 @@ use std::any::Any;
use std::io::BufReader;
use std::sync::Arc;
use std::task::Poll;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;

use super::FileScanConfig;
Expand Down Expand Up @@ -271,20 +271,20 @@ pub async fn plan_to_json(
let file = object_store::path::Path::parse(filename)?;

let mut stream = plan.execute(i, task_ctx.clone())?;

join_set.spawn(async move {
let (_, multipart_writer) = storeref.put_multipart(&file).await?;
let mut multipart_rb_writer = RecordBatchMultiPartWriter::new(
json::LineDelimitedWriter::new,
multipart_writer,
None,
);
while let Some(next_batch) = stream.next().await {
let batch = next_batch?;
multipart_rb_writer.write_rb(batch).await?;
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buffer = Vec::with_capacity(1024);
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buffer.clear();
}

multipart_rb_writer.shutdown().await
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
});
}

Expand Down
92 changes: 0 additions & 92 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ use arrow::{
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_array::RecordBatchWriter;
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
pub(crate) use json::plan_to_json;
pub use json::{JsonOpener, NdJsonExec};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use crate::physical_plan::ExecutionPlan;
use crate::{
Expand Down Expand Up @@ -76,7 +74,6 @@ use std::{
cmp::min,
collections::HashMap,
fmt::{Debug, Formatter, Result as FmtResult},
io::Write,
marker::PhantomData,
sync::Arc,
vec,
Expand Down Expand Up @@ -479,95 +476,6 @@ where
Ok(())
}

struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}

//Coordinates writing a sequence of RecordBatches using a SharedBuffer
//between a sync Arrow RecordBatchWriter and an async ObjectStore writer
struct RecordBatchMultiPartWriter<W: RecordBatchWriter> {
rb_writer: W,
multipart_writer: Box<dyn AsyncWrite + Send + Unpin>,
shared_buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
//buffer into shared buffer until we reach target_part_size_bytes
//then empty shared buffer / flush via multipart put to ObjectStore
target_part_size_bytes: usize,
}

impl<W: RecordBatchWriter> RecordBatchMultiPartWriter<W> {
fn new<F>(
rb_constructor: F,
multipart_writer: Box<dyn AsyncWrite + Send + Unpin>,
target_part_size_bytes: Option<usize>,
) -> Self
where
F: Fn(SharedBuffer) -> W,
{
let target_part_size = target_part_size_bytes.unwrap_or(10485760);
let buffer = Arc::new(futures::lock::Mutex::new(Vec::with_capacity(
target_part_size * 2,
)));
let shared_buffer = SharedBuffer {
buffer: buffer.clone(),
};
let rb_writer = rb_constructor(shared_buffer);
Self {
rb_writer,
multipart_writer,
shared_buffer: buffer,
target_part_size_bytes: target_part_size,
}
}

//sends bytes in buffer to ObjectStore and clears buffer
async fn put_part(&mut self, last_part: bool) -> Result<(), DataFusionError> {
let mut buffer = self.shared_buffer.lock().await;
if last_part || buffer.len() >= self.target_part_size_bytes {
self.multipart_writer.write_all(&buffer).await?;
buffer.clear();
}
Ok(())
}

async fn write_rb(&mut self, batch: RecordBatch) -> Result<(), DataFusionError> {
self.rb_writer.write(&batch)?;
self.put_part(false).await?;
Ok(())
}

async fn flush(&mut self, last_part: bool) -> Result<(), DataFusionError> {
self.put_part(last_part).await?;
self.multipart_writer
.flush()
.await
.map_err(DataFusionError::from)
}

async fn shutdown(&mut self) -> Result<(), DataFusionError> {
self.flush(true).await?;
self.multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
}
}

/// helper formatting array elements with a comma and a space between them
fn fmt_elements_split_by_commas<E, I, F>(
iter: I,
Expand Down

0 comments on commit be55e96

Please sign in to comment.