Skip to content

Commit e508373

Browse files
Add send_transactions_to_address method to workers_cache (#4522)
* Add sned_transactions_to_address method to workers_cache * Update tpu-client-next/src/workers_cache.rs Co-authored-by: Illia Bobyr <[email protected]> --------- Co-authored-by: Illia Bobyr <[email protected]>
1 parent 0f56076 commit e508373

File tree

1 file changed

+55
-3
lines changed

1 file changed

+55
-3
lines changed

tpu-client-next/src/workers_cache.rs

+55-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ impl WorkerInfo {
4444
Ok(())
4545
}
4646

47+
async fn send_transactions(
48+
&self,
49+
txs_batch: TransactionBatch,
50+
) -> Result<(), WorkersCacheError> {
51+
self.sender
52+
.send(txs_batch)
53+
.await
54+
.map_err(|_| WorkersCacheError::ReceiverDropped)?;
55+
Ok(())
56+
}
57+
4758
/// Closes the worker by dropping the sender and awaiting the worker's
4859
/// statistics.
4960
async fn shutdown(self) -> Result<(), WorkersCacheError> {
@@ -118,9 +129,7 @@ impl WorkersCache {
118129
None
119130
}
120131

121-
/// Sends a batch of transactions to the worker for a given peer. If the
122-
/// worker for the peer is disconnected or fails, it is removed from the
123-
/// cache.
132+
/// Try sending a batch of transactions to the worker for a given peer.
124133
pub(crate) fn try_send_transactions_to_address(
125134
&mut self,
126135
peer: &SocketAddr,
@@ -149,6 +158,49 @@ impl WorkersCache {
149158
send_res
150159
}
151160

161+
/// Sends a batch of transactions to the worker for a given peer.
162+
///
163+
/// If the worker for the peer is disconnected or fails, it
164+
/// is removed from the cache.
165+
#[allow(
166+
dead_code,
167+
reason = "This method will be used in the upcoming changes to implement optional backpressure on the sender."
168+
)]
169+
pub(crate) async fn send_transactions_to_address(
170+
&mut self,
171+
peer: &SocketAddr,
172+
txs_batch: TransactionBatch,
173+
) -> Result<(), WorkersCacheError> {
174+
let Self {
175+
workers, cancel, ..
176+
} = self;
177+
178+
let body = async move {
179+
let current_worker = workers.get(peer).expect(
180+
"Failed to fetch worker for peer {peer}.\n\
181+
Peer existence must be checked before this call using `contains` method.",
182+
);
183+
let send_res = current_worker.send_transactions(txs_batch).await;
184+
if let Err(WorkersCacheError::ReceiverDropped) = send_res {
185+
// Remove the worker from the cache, if the peer has disconnected.
186+
if let Some(current_worker) = workers.pop(peer) {
187+
// To avoid obscuring the error from send, ignore a possible
188+
// `TaskJoinFailure`.
189+
let close_result = current_worker.shutdown().await;
190+
if let Err(error) = close_result {
191+
error!("Error while closing worker: {error}.");
192+
}
193+
}
194+
}
195+
196+
send_res
197+
};
198+
cancel
199+
.run_until_cancelled(body)
200+
.await
201+
.unwrap_or(Err(WorkersCacheError::ShutdownError))
202+
}
203+
152204
/// Closes and removes all workers in the cache. This is typically done when
153205
/// shutting down the system.
154206
pub(crate) async fn shutdown(&mut self) {

0 commit comments

Comments
 (0)