diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index 933b0bcd7..92009d148 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -8,7 +8,7 @@ use crate::error::processor::{ }; use crate::models::order::ReadObj as DbOrder; use crate::payment_sync::SYNC_NOTIFS_NOTIFY; -use crate::timeout_lock::{MutexTimeoutExt, RwLockTimeoutExt}; +use crate::timeout_lock::RwLockTimeoutExt; use crate::utils::remove_allocation_ids_from_payment; use actix_web::web::Data; use bigdecimal::{BigDecimal, Zero}; @@ -17,11 +17,16 @@ use futures::{FutureExt, TryFutureExt}; use metrics::counter; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::ops::Deref; +use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{Mutex, RwLock}; +use tokio::task::JoinHandle; +use tokio::time::error::Elapsed; use ya_client_model::payment::allocation::Deposit; use ya_client_model::payment::{ Account, ActivityPayment, AgreementPayment, DriverDetails, Network, Payment, @@ -306,8 +311,135 @@ impl DriverRegistry { const DB_LOCK_TIMEOUT: Duration = Duration::from_secs(30); const REGISTRY_LOCK_TIMEOUT: Duration = Duration::from_secs(30); +struct TimedMutex { + mutex: Mutex, + sender: Option>, + counter_task: Option>, +} + +use tokio::sync::MutexGuard; + +enum TimedMutexTaskMessage { + Start(String), + Finish, +} + +struct TimedMutexGuard<'a> { + mutex_guard: MutexGuard<'a, DbExecutor>, + sender: &'a Option>, +} + +impl Drop for TimedMutexGuard<'_> { + fn drop(&mut self) { + if let Some(sender) = &self.sender { + if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) { + log::error!("Cannot send fininsh to counter task {e}"); + } + } + } +} + +impl<'a> Deref for TimedMutexGuard<'a> { + type Target = MutexGuard<'a, DbExecutor>; + + fn deref(&self) -> &Self::Target { + &self.mutex_guard + } +} + +impl TimedMutex { + fn new(db: DbExecutor) -> Self { + let (sender, mut receiver) = + tokio::sync::mpsc::unbounded_channel::(); + + let counter_task = tokio::spawn(async move { + log::info!("[TimedMutex] Counter thread started"); + loop { + // wait for start or close without timeout + let task_name = match receiver.recv().await { + None => break, + Some(TimedMutexTaskMessage::Start(x)) => x, + Some(TimedMutexTaskMessage::Finish) => { + panic!("[TimedMutex] Unexpected finish") + } + }; + + log::info!("[TimedMutex] task {task_name} started..."); + let mut counter = 0; + loop { + match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await { + Err(_) => { + log::error!("[TimedMutex] Long running task: {task_name}!"); + counter += 1; + // five minutes + if counter > 30 { + exit(41); + } + } + Ok(None) => panic!("[TimedMutex] Unexpected mpsc close."), + Ok(Some(TimedMutexTaskMessage::Finish)) => break, + Ok(Some(TimedMutexTaskMessage::Start(_))) => { + panic!("[TimedMutex] Unexpected start") + } + } + } + + log::info!("[TimedMutex] Timed task {task_name} finished."); + } + log::info!("[TimedMutex] Counter thread finished"); + }); + + Self { + mutex: Mutex::new(db), + sender: Some(sender), + counter_task: Some(counter_task), + } + } + + async fn timeout_lock( + &self, + duration: Duration, + name: &str, + ) -> Result, Elapsed> { + let result = tokio::time::timeout(duration, self.mutex.lock()) + .await + .map_err(|e| { + log::info!("Failed to lock mutex in scenario {0}", name); + e + })?; + + if self.counter_task.as_ref().unwrap().is_finished() { + log::error!("counter task is dead! {name}"); + panic!() + } + + if let Some(sender) = &self.sender { + if let Err(e) = sender.send(TimedMutexTaskMessage::Start(name.into())) { + log::error!("Cannot send start to counter task {name}: {e}"); + } + } + + Ok(TimedMutexGuard { + mutex_guard: result, + sender: &self.sender, + }) + } +} + +impl Drop for TimedMutex { + fn drop(&mut self) { + self.sender.take().unwrap(); + let handle = self.counter_task.take().unwrap(); + tokio::spawn(async move { + if let Err(e) = handle.await { + log::error!("Cannot join counter thread {e}"); + } + }); + } +} + pub struct PaymentProcessor { - db_executor: Arc>, + db_executor: Arc, registry: RwLock, in_shutdown: AtomicBool, } @@ -325,7 +457,7 @@ enum PaymentSendToGsbError { impl PaymentProcessor { pub fn new(db_executor: DbExecutor) -> Self { Self { - db_executor: Arc::new(Mutex::new(db_executor)), + db_executor: Arc::new(TimedMutex::new(db_executor)), registry: Default::default(), in_shutdown: AtomicBool::new(false), } @@ -429,7 +561,10 @@ impl PaymentProcessor { let mut payment: Payment; { - let db_executor = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?; + let db_executor = self + .db_executor + .timeout_lock(DB_LOCK_TIMEOUT, "notify payment 1") + .await?; let orders = db_executor .as_dao::() @@ -512,7 +647,9 @@ impl PaymentProcessor { tokio::task::spawn_local( async move { - let db_executor = db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?; + let db_executor = db_executor + .timeout_lock(DB_LOCK_TIMEOUT, "notify payment 2") + .await?; let payment_dao: PaymentDao = db_executor.as_dao(); let sync_dao: SyncNotifsDao = db_executor.as_dao(); @@ -597,7 +734,7 @@ impl PaymentProcessor { let allocation_status = self .db_executor - .timeout_lock(DB_LOCK_TIMEOUT) + .timeout_lock(DB_LOCK_TIMEOUT, "schedule_payment 1") .await? .as_dao::() .get(msg.allocation_id.clone(), msg.payer_id) @@ -626,7 +763,7 @@ impl PaymentProcessor { .await??; self.db_executor - .timeout_lock(DB_LOCK_TIMEOUT) + .timeout_lock(DB_LOCK_TIMEOUT, "schedule_payment 2") .await? .as_dao::() .create(msg, order_id, driver) @@ -703,7 +840,10 @@ impl PaymentProcessor { } { - let db_executor = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?; + let db_executor = self + .db_executor + .timeout_lock(DB_LOCK_TIMEOUT, "verify payment 1") + .await?; // Verify agreement payments let agreement_dao: AgreementDao = db_executor.as_dao(); @@ -856,7 +996,10 @@ impl PaymentProcessor { } let (active_allocations, past_allocations) = { - let db = self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await?; + let db = self + .db_executor + .timeout_lock(DB_LOCK_TIMEOUT, "validate_allocation 1") + .await?; let dao = db.as_dao::(); let active = dao @@ -893,7 +1036,11 @@ impl PaymentProcessor { /// For `false` each allocation timestamp is respected. pub async fn release_allocations(&self, force: bool) { // keep this lock alive for the entirety of this function for now - let db_executor = match self.db_executor.timeout_lock(DB_LOCK_TIMEOUT).await { + let db_executor = match self + .db_executor + .timeout_lock(DB_LOCK_TIMEOUT, "release_allocations") + .await + { Ok(db) => db, Err(_) => { log::error!("Timed out waiting for db lock");