From ebb20fc81eacc1079171cd34e42f3536272eae86 Mon Sep 17 00:00:00 2001 From: Lorrens Pantelis <100197010+LorrensP-2158466@users.noreply.github.com> Date: Sun, 21 Jul 2024 14:31:41 +0200 Subject: [PATCH] refactor: rewrite mega type to an enum containing both cases (#11539) --- .../file_format/write/orchestration.rs | 62 ++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index f788865b070f7..1d32063ee9f3f 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -42,6 +42,37 @@ use tokio::task::JoinSet; type WriterType = Box; type SerializerType = Arc; +/// Result of calling [`serialize_rb_stream_to_object_store`] +pub(crate) enum SerializedRecordBatchResult { + Success { + /// the writer + writer: WriterType, + + /// the number of rows successfully written + row_count: usize, + }, + Failure { + /// As explained in [`serialize_rb_stream_to_object_store`]: + /// - If an IO error occured that involved the ObjectStore writer, then the writer will not be returned to the caller + /// - Otherwise, the writer is returned to the caller + writer: Option, + + /// the actual error that occured + err: DataFusionError, + }, +} + +impl SerializedRecordBatchResult { + /// Create the success variant + pub fn success(writer: WriterType, row_count: usize) -> Self { + Self::Success { writer, row_count } + } + + pub fn failure(writer: Option, err: DataFusionError) -> Self { + Self::Failure { writer, err } + } +} + /// Serializes a single data stream in parallel and writes to an ObjectStore concurrently. /// Data order is preserved. /// @@ -55,7 +86,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, serializer: Arc, mut writer: WriterType, -) -> std::result::Result<(WriterType, u64), (Option, DataFusionError)> { +) -> SerializedRecordBatchResult { let (tx, mut rx) = mpsc::channel::>>(100); let serialize_task = SpawnedTask::spawn(async move { @@ -86,43 +117,43 @@ pub(crate) async fn serialize_rb_stream_to_object_store( match writer.write_all(&bytes).await { Ok(_) => (), Err(e) => { - return Err(( + return SerializedRecordBatchResult::failure( None, DataFusionError::Execution(format!( "Error writing to object store: {e}" )), - )) + ) } }; row_count += cnt; } Ok(Err(e)) => { // Return the writer along with the error - return Err((Some(writer), e)); + return SerializedRecordBatchResult::failure(Some(writer), e); } Err(e) => { // Handle task panic or cancellation - return Err(( + return SerializedRecordBatchResult::failure( Some(writer), DataFusionError::Execution(format!( "Serialization task panicked or was cancelled: {e}" )), - )); + ); } } } match serialize_task.join().await { Ok(Ok(_)) => (), - Ok(Err(e)) => return Err((Some(writer), e)), + Ok(Err(e)) => return SerializedRecordBatchResult::failure(Some(writer), e), Err(_) => { - return Err(( + return SerializedRecordBatchResult::failure( Some(writer), internal_datafusion_err!("Unknown error writing to object store"), - )) + ) } } - Ok((writer, row_count as u64)) + SerializedRecordBatchResult::success(writer, row_count) } type FileWriteBundle = (Receiver, SerializerType, WriterType); @@ -153,14 +184,17 @@ pub(crate) async fn stateless_serialize_and_write_files( while let Some(result) = join_set.join_next().await { match result { Ok(res) => match res { - Ok((writer, cnt)) => { + SerializedRecordBatchResult::Success { + writer, + row_count: cnt, + } => { finished_writers.push(writer); row_count += cnt; } - Err((writer, e)) => { + SerializedRecordBatchResult::Failure { writer, err } => { finished_writers.extend(writer); any_errors = true; - triggering_error = Some(e); + triggering_error = Some(err); } }, Err(e) => { @@ -193,7 +227,7 @@ pub(crate) async fn stateless_serialize_and_write_files( } } - tx.send(row_count).map_err(|_| { + tx.send(row_count as u64).map_err(|_| { internal_datafusion_err!( "Error encountered while sending row count back to file sink!" )