Skip to content

Commit

Permalink
refactor: rewrite mega type to an enum containing both cases (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
LorrensP-2158466 authored and wiedld committed Jul 31, 2024
1 parent 282468a commit ebb20fc
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Result of calling [`serialize_rb_stream_to_object_store`]
pub(crate) enum SerializedRecordBatchResult {
Success {
/// the writer
writer: WriterType,

/// the number of rows successfully written
row_count: usize,
},
Failure {
/// As explained in [`serialize_rb_stream_to_object_store`]:
/// - If an IO error occured that involved the ObjectStore writer, then the writer will not be returned to the caller
/// - Otherwise, the writer is returned to the caller
writer: Option<WriterType>,

/// the actual error that occured
err: DataFusionError,
},
}

impl SerializedRecordBatchResult {
/// Create the success variant
pub fn success(writer: WriterType, row_count: usize) -> Self {
Self::Success { writer, row_count }
}

pub fn failure(writer: Option<WriterType>, err: DataFusionError) -> Self {
Self::Failure { writer, err }
}
}

/// Serializes a single data stream in parallel and writes to an ObjectStore concurrently.
/// Data order is preserved.
///
Expand All @@ -55,7 +86,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
) -> SerializedRecordBatchResult {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -86,43 +117,43 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(e) => {
return Err((
return SerializedRecordBatchResult::failure(
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
))
)
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((Some(writer), e));
return SerializedRecordBatchResult::failure(Some(writer), e);
}
Err(e) => {
// Handle task panic or cancellation
return Err((
return SerializedRecordBatchResult::failure(
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
));
);
}
}
}

match serialize_task.join().await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((Some(writer), e)),
Ok(Err(e)) => return SerializedRecordBatchResult::failure(Some(writer), e),
Err(_) => {
return Err((
return SerializedRecordBatchResult::failure(
Some(writer),
internal_datafusion_err!("Unknown error writing to object store"),
))
)
}
}
Ok((writer, row_count as u64))
SerializedRecordBatchResult::success(writer, row_count)
}

type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
Expand Down Expand Up @@ -153,14 +184,17 @@ pub(crate) async fn stateless_serialize_and_write_files(
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => match res {
Ok((writer, cnt)) => {
SerializedRecordBatchResult::Success {
writer,
row_count: cnt,
} => {
finished_writers.push(writer);
row_count += cnt;
}
Err((writer, e)) => {
SerializedRecordBatchResult::Failure { writer, err } => {
finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
triggering_error = Some(err);
}
},
Err(e) => {
Expand Down Expand Up @@ -193,7 +227,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
}
}

tx.send(row_count).map_err(|_| {
tx.send(row_count as u64).map_err(|_| {
internal_datafusion_err!(
"Error encountered while sending row count back to file sink!"
)
Expand Down

0 comments on commit ebb20fc

Please sign in to comment.