Skip to content

Commit

Permalink
Shortcut sending data to network if input port is reachable locally
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSarle committed Dec 4, 2024
1 parent a994872 commit af6d207
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 84 deletions.
20 changes: 20 additions & 0 deletions lib/protoflow-zeromq/src/input_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ impl ZmqInputPortState {
Closed => PortState::Closed,
}
}

pub async fn event_sender(&self) -> Option<Sender<ZmqTransportEvent>> {
use ZmqInputPortState::*;
match self {
Open(_, sender) | Connected(.., sender, _) => Some(sender.clone()),
Closed => None,
}
}
}

fn input_topics(id: InputPortID) -> Vec<String> {
Expand All @@ -86,6 +94,18 @@ fn input_topics(id: InputPortID) -> Vec<String> {
]
}

pub async fn input_port_event_sender(
inputs: &RwLock<BTreeMap<InputPortID, RwLock<ZmqInputPortState>>>,
id: InputPortID,
) -> Option<Sender<ZmqTransportEvent>> {
if let Some(input_state) = inputs.read().await.get(&id) {
let input_state = input_state.read().await;
input_state.event_sender().await
} else {
None
}
}

pub fn start_input_worker(
transport: &ZmqTransport,
input_port_id: InputPortID,
Expand Down
2 changes: 1 addition & 1 deletion lib/protoflow-zeromq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ZmqTransport {
inputs,
};

start_pub_socket_worker(psock, pub_queue_recv);
start_pub_socket_worker(&transport, psock, pub_queue_recv);
start_sub_socket_worker(&transport, ssock, sub_queue_recv);

transport
Expand Down
24 changes: 22 additions & 2 deletions lib/protoflow-zeromq/src/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{subscribe_topics, unsubscribe_topics, ZmqTransport, ZmqTransportEvent};
use protoflow_core::{
prelude::{fmt, format, vec, Bytes, String, ToString, Vec},
prelude::{fmt, format, vec, BTreeMap, Bytes, String, ToString, Vec},
InputPortID, OutputPortID, PortError, PortState,
};
use tokio::sync::{
Expand All @@ -19,7 +19,7 @@ pub enum ZmqOutputPortRequest {
Send(Bytes),
}

const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(200);
const DEFAULT_MAX_RETRIES: u64 = 10;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -65,6 +65,14 @@ impl ZmqOutputPortState {
Closed => PortState::Closed,
}
}

pub async fn event_sender(&self) -> Option<Sender<ZmqTransportEvent>> {
use ZmqOutputPortState::*;
match self {
Open(.., sender) | Connected(.., sender, _) => Some(sender.clone()),
Closed => None,
}
}
}

fn output_topics(source: OutputPortID, target: InputPortID) -> Vec<String> {
Expand All @@ -75,6 +83,18 @@ fn output_topics(source: OutputPortID, target: InputPortID) -> Vec<String> {
]
}

pub async fn output_port_event_sender(
outputs: &RwLock<BTreeMap<OutputPortID, RwLock<ZmqOutputPortState>>>,
id: OutputPortID,
) -> Option<Sender<ZmqTransportEvent>> {
if let Some(output_state) = outputs.read().await.get(&id) {
let output_state = output_state.read().await;
output_state.event_sender().await
} else {
None
}
}

pub fn start_output_worker(
transport: &ZmqTransport,
output_port_id: OutputPortID,
Expand Down
124 changes: 43 additions & 81 deletions lib/protoflow-zeromq/src/socket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// This is free and unencumbered software released into the public domain.

use crate::{ZmqInputPortState, ZmqOutputPortState, ZmqTransport, ZmqTransportEvent};
use crate::{
input_port_event_sender, output_port_event_sender, ZmqInputPortState, ZmqOutputPortState,
ZmqTransport, ZmqTransportEvent,
};
use protoflow_core::{
prelude::{BTreeMap, String, Vec},
InputPortID, OutputPortID, PortError,
Expand All @@ -18,18 +21,45 @@ pub enum ZmqSubscriptionRequest {
}

#[cfg(feature = "tracing")]
use tracing::{error, trace, trace_span};
use tracing::{debug, error, trace, trace_span, warn};

pub fn start_pub_socket_worker(psock: zeromq::PubSocket, pub_queue: Receiver<ZmqTransportEvent>) {
pub fn start_pub_socket_worker(
transport: &ZmqTransport,
psock: zeromq::PubSocket,
pub_queue: Receiver<ZmqTransportEvent>,
) {
#[cfg(feature = "tracing")]
let span = trace_span!("ZmqTransport::pub_socket");
let outputs = transport.outputs.clone();
let inputs = transport.inputs.clone();
let mut psock = psock;
let mut pub_queue = pub_queue;
tokio::task::spawn(async move {
while let Some(event) = pub_queue.recv().await {
#[cfg(feature = "tracing")]
span.in_scope(|| trace!(?event, "sending event to socket"));

use ZmqTransportEvent::*;
let shortcut_sender = match event {
Connect(_, id) | Message(_, id, _, _) | CloseOutput(_, id) => {
input_port_event_sender(&inputs, id).await
}
AckConnection(id, _) | AckMessage(id, ..) => {
output_port_event_sender(&outputs, id).await
}
CloseInput(..) => None,
};

if let Some(sender) = shortcut_sender {
#[cfg(feature = "tracing")]
span.in_scope(|| debug!("attempting to shortcut send directly to target port"));
if sender.send(event.clone()).await.is_ok() {
continue;
}
#[cfg(feature = "tracing")]
span.in_scope(|| warn!("failed to send message with shortcut, sending to socket"));
}

if let Err(err) = psock.send(event.into()).await {
#[cfg(feature = "tracing")]
span.in_scope(|| error!(?err, "failed to send message"));
Expand Down Expand Up @@ -126,89 +156,21 @@ async fn handle_zmq_msg(
use ZmqTransportEvent::*;
match event {
// input ports
Connect(_, input_port_id) => {
let sender = {
let inputs = inputs.read().await;
let Some(input) = inputs.get(&input_port_id) else {
return Err(PortError::Invalid(input_port_id.into()));
};
let input = input.read().await;

use ZmqInputPortState::*;
match &*input {
Closed => return Err(PortError::Invalid(input_port_id.into())),
Open(.., sender) | Connected(.., sender, _) => sender.clone(),
}
};

sender.send(event).await.map_err(|_| PortError::Closed)
}
Message(_, input_port_id, _, _) => {
let sender = {
let inputs = inputs.read().await;
let Some(input) = inputs.get(&input_port_id) else {
return Err(PortError::Invalid(input_port_id.into()));
};

let input = input.read().await;
let ZmqInputPortState::Connected(_, _, _, sender, _) = &*input else {
return Err(PortError::Invalid(input_port_id.into()));
};

sender.clone()
};

sender.send(event).await.map_err(|_| PortError::Closed)
}
CloseOutput(_, input_port_id) => {
let sender = {
let inputs = inputs.read().await;
let Some(input) = inputs.get(&input_port_id) else {
return Err(PortError::Invalid(input_port_id.into()));
};
let input = input.read().await;

use ZmqInputPortState::*;
match &*input {
Closed => return Err(PortError::Invalid(input_port_id.into())),
Open(.., sender) | Connected(.., sender, _) => sender.clone(),
}
};
Connect(_, input_port_id)
| Message(_, input_port_id, _, _)
| CloseOutput(_, input_port_id) => {
let sender = input_port_event_sender(inputs, input_port_id)
.await
.ok_or_else(|| PortError::Invalid(input_port_id.into()))?;

sender.send(event).await.map_err(|_| PortError::Closed)
}

// output ports
AckConnection(output_port_id, _) => {
let sender = {
let outputs = outputs.read().await;
let Some(output) = outputs.get(&output_port_id) else {
return Err(PortError::Invalid(output_port_id.into()));
};
let output = output.read().await;

let ZmqOutputPortState::Open(.., sender) = &*output else {
return Err(PortError::Invalid(output_port_id.into()));
};

sender.clone()
};

sender.send(event).await.map_err(|_| PortError::Closed)
}
AckMessage(output_port_id, _, _) => {
let sender = {
let outputs = outputs.read().await;
let Some(output) = outputs.get(&output_port_id) else {
return Err(PortError::Invalid(output_port_id.into()));
};
let output = output.read().await;
let ZmqOutputPortState::Connected(_, sender, _) = &*output else {
return Err(PortError::Invalid(output_port_id.into()));
};

sender.clone()
};
AckConnection(output_port_id, _) | AckMessage(output_port_id, _, _) => {
let sender = output_port_event_sender(outputs, output_port_id)
.await
.ok_or_else(|| PortError::Invalid(output_port_id.into()))?;

sender.send(event).await.map_err(|_| PortError::Closed)
}
Expand Down

0 comments on commit af6d207

Please sign in to comment.