Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),

#[error("python generator exit: {0}")]
PyGeneratorExit(String),

#[error("deserialize error: {0}")]
DeserializeError(String),

Expand Down Expand Up @@ -225,6 +228,9 @@ where
let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e);
msg
}
ResponseProcessingError::PyGeneratorExit(_) => {
"Stream ended before generation completed".to_string()
}
ResponseProcessingError::PythonException(e) => {
let msg = format!("a python exception was caught while processing the async generator: {}", e);
msg
Expand Down Expand Up @@ -276,8 +282,16 @@ where
{
let item = item.map_err(|e| {
println!();
Python::with_gil(|py| e.display(py));
ResponseProcessingError::PythonException(e.to_string())
let mut is_py_generator_exit = false;
Python::with_gil(|py| {
e.display(py);
is_py_generator_exit = e.is_instance_of::<pyo3::exceptions::PyGeneratorExit>(py);
});
if is_py_generator_exit {
ResponseProcessingError::PyGeneratorExit(e.to_string())
} else {
ResponseProcessingError::PythonException(e.to_string())
}
})?;
let response = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))
Expand Down
11 changes: 10 additions & 1 deletion lib/runtime/src/pipeline/network/ingress/push_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

use super::*;
use crate::protocols::maybe_error::MaybeError;
use prometheus::{Histogram, IntCounter, IntCounterVec, IntGauge};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl WorkHandlerMetrics {
impl<T: Data, U: Data> PushWorkHandler for Ingress<SingleIn<T>, ManyOut<U>>
where
T: Data + for<'de> Deserialize<'de> + std::fmt::Debug,
U: Data + Serialize + std::fmt::Debug,
U: Data + Serialize + MaybeError + std::fmt::Debug,
{
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
// Call the Ingress-specific add_metrics implementation
Expand Down Expand Up @@ -220,6 +221,14 @@ where
let mut send_complete_final = true;
while let Some(resp) = stream.next().await {
tracing::trace!("Sending response: {:?}", resp);
if let Some(err) = resp.err() {
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
if format!("{:?}", err) == STREAM_ERR_MSG {
tracing::warn!(STREAM_ERR_MSG);
send_complete_final = false;
break;
}
}
let resp_wrapper = NetworkStreamWrapper {
data: Some(resp),
complete_final: false,
Expand Down
Loading