Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions crates/diesel_models/src/dynamic_routing_stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use diesel::{Insertable, Queryable, Selectable};
use diesel::{AsChangeset, Identifiable, Insertable, Queryable, Selectable};

use crate::schema::dynamic_routing_stats;

Expand All @@ -23,8 +23,8 @@ pub struct DynamicRoutingStatsNew {
pub global_success_based_connector: Option<String>,
}

#[derive(Clone, Debug, Eq, PartialEq, Queryable, Selectable, Insertable)]
#[diesel(table_name = dynamic_routing_stats, primary_key(payment_id), check_for_backend(diesel::pg::Pg))]
#[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Selectable, Insertable)]
#[diesel(table_name = dynamic_routing_stats, primary_key(attempt_id, merchant_id), check_for_backend(diesel::pg::Pg))]
pub struct DynamicRoutingStats {
pub payment_id: common_utils::id_type::PaymentId,
pub attempt_id: String,
Expand All @@ -43,3 +43,21 @@ pub struct DynamicRoutingStats {
pub payment_method_type: Option<common_enums::PaymentMethodType>,
pub global_success_based_connector: Option<String>,
}

#[derive(
Clone, Debug, Eq, PartialEq, AsChangeset, router_derive::DebugAsDisplay, serde::Deserialize,
)]
#[diesel(table_name = dynamic_routing_stats)]
pub struct DynamicRoutingStatsUpdate {
pub amount: common_utils::types::MinorUnit,
pub success_based_routing_connector: String,
pub payment_connector: String,
pub currency: Option<common_enums::Currency>,
pub payment_method: Option<common_enums::PaymentMethod>,
pub capture_method: Option<common_enums::CaptureMethod>,
pub authentication_type: Option<common_enums::AuthenticationType>,
pub payment_status: common_enums::AttemptStatus,
pub conclusive_classification: common_enums::SuccessBasedRoutingConclusiveState,
pub payment_method_type: Option<common_enums::PaymentMethodType>,
pub global_success_based_connector: Option<String>,
}
52 changes: 51 additions & 1 deletion crates/diesel_models/src/query/dynamic_routing_stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods};
use error_stack::report;

use super::generics;
use crate::{
dynamic_routing_stats::{DynamicRoutingStats, DynamicRoutingStatsNew},
dynamic_routing_stats::{
DynamicRoutingStats, DynamicRoutingStatsNew, DynamicRoutingStatsUpdate,
},
errors,
schema::dynamic_routing_stats::dsl,
PgPooledConn, StorageResult,
};

Expand All @@ -9,3 +16,46 @@ impl DynamicRoutingStatsNew {
generics::generic_insert(conn, self).await
}
}

impl DynamicRoutingStats {
pub async fn find_optional_by_attempt_id_merchant_id(
conn: &PgPooledConn,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
) -> StorageResult<Option<Self>> {
generics::generic_find_one_optional::<<Self as HasTable>::Table, _, _>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::attempt_id.eq(attempt_id.to_owned())),
)
.await
}

pub async fn update(
conn: &PgPooledConn,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
dynamic_routing_stat: DynamicRoutingStatsUpdate,
) -> StorageResult<Self> {
generics::generic_update_with_results::<
<Self as HasTable>::Table,
DynamicRoutingStatsUpdate,
_,
_,
>(
conn,
dsl::merchant_id
.eq(merchant_id.to_owned())
.and(dsl::attempt_id.eq(attempt_id.to_owned())),
dynamic_routing_stat,
)
.await?
.first()
.cloned()
.ok_or_else(|| {
report!(errors::DatabaseError::NotFound)
.attach_printable("Error while updating dynamic_routing_stats entry")
})
}
}
92 changes: 66 additions & 26 deletions crates/router/src/core/routing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use common_utils::ext_traits::ValueExt;
use common_utils::{ext_traits::Encode, id_type, types::keymanager::KeyManagerState};
use diesel_models::configs;
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
use diesel_models::dynamic_routing_stats::DynamicRoutingStatsNew;
use diesel_models::dynamic_routing_stats::{DynamicRoutingStatsNew, DynamicRoutingStatsUpdate};
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
use diesel_models::routing_algorithm;
use error_stack::ResultExt;
Expand Down Expand Up @@ -821,28 +821,6 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
first_merchant_success_based_connector_label.to_string(),
);

let dynamic_routing_stats = DynamicRoutingStatsNew {
payment_id: payment_attempt.payment_id.to_owned(),
attempt_id: payment_attempt.attempt_id.clone(),
merchant_id: payment_attempt.merchant_id.to_owned(),
profile_id: payment_attempt.profile_id.to_owned(),
amount: payment_attempt.get_total_amount(),
success_based_routing_connector: first_merchant_success_based_connector_label
.to_string(),
payment_connector: payment_connector.to_string(),
payment_method_type: payment_attempt.payment_method_type,
currency: payment_attempt.currency,
payment_method: payment_attempt.payment_method,
capture_method: payment_attempt.capture_method,
authentication_type: payment_attempt.authentication_type,
payment_status: payment_attempt.status,
conclusive_classification: outcome,
created_at: common_utils::date_time::now(),
global_success_based_connector: Some(
first_global_success_based_connector.label.to_string(),
),
};

core_metrics::DYNAMIC_SUCCESS_BASED_ROUTING.add(
1,
router_env::metric_attributes!(
Expand Down Expand Up @@ -915,12 +893,74 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
);
logger::debug!("successfully pushed success_based_routing metrics");

state
let duplicate_stats = state
.store
.insert_dynamic_routing_stat_entry(dynamic_routing_stats)
.find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
payment_attempt.attempt_id.clone(),
&payment_attempt.merchant_id.to_owned(),
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unable to push dynamic routing stats to db")?;
.attach_printable("Failed to fetch dynamic_routing_stats entry")?;

if duplicate_stats.is_some() {
let dynamic_routing_update = DynamicRoutingStatsUpdate {
amount: payment_attempt.get_total_amount(),
success_based_routing_connector: first_merchant_success_based_connector_label
.to_string(),
payment_connector: payment_connector.to_string(),
payment_method_type: payment_attempt.payment_method_type,
currency: payment_attempt.currency,
payment_method: payment_attempt.payment_method,
capture_method: payment_attempt.capture_method,
authentication_type: payment_attempt.authentication_type,
payment_status: payment_attempt.status,
conclusive_classification: outcome,
global_success_based_connector: Some(
first_global_success_based_connector.label.to_string(),
),
};

state
.store
.update_dynamic_routing_stats(
payment_attempt.attempt_id.clone(),
&payment_attempt.merchant_id.to_owned(),
dynamic_routing_update,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unable to update dynamic routing stats to db")?;
} else {
let dynamic_routing_stats = DynamicRoutingStatsNew {
payment_id: payment_attempt.payment_id.to_owned(),
attempt_id: payment_attempt.attempt_id.clone(),
merchant_id: payment_attempt.merchant_id.to_owned(),
profile_id: payment_attempt.profile_id.to_owned(),
amount: payment_attempt.get_total_amount(),
success_based_routing_connector: first_merchant_success_based_connector_label
.to_string(),
payment_connector: payment_connector.to_string(),
payment_method_type: payment_attempt.payment_method_type,
currency: payment_attempt.currency,
payment_method: payment_attempt.payment_method,
capture_method: payment_attempt.capture_method,
authentication_type: payment_attempt.authentication_type,
payment_status: payment_attempt.status,
conclusive_classification: outcome,
created_at: common_utils::date_time::now(),
global_success_based_connector: Some(
first_global_success_based_connector.label.to_string(),
),
};

state
.store
.insert_dynamic_routing_stat_entry(dynamic_routing_stats)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Unable to push dynamic routing stats to db")?;
};

client
.update_success_rate(
Expand Down
78 changes: 78 additions & 0 deletions crates/router/src/db/dynamic_routing_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ pub trait DynamicRoutingStatsInterface {
&self,
dynamic_routing_stat_new: storage::DynamicRoutingStatsNew,
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError>;

async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError>;

async fn update_dynamic_routing_stats(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
data: storage::DynamicRoutingStatsUpdate,
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError>;
}

#[async_trait::async_trait]
Expand All @@ -31,6 +44,33 @@ impl DynamicRoutingStatsInterface for Store {
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}

async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::DynamicRoutingStats::find_optional_by_attempt_id_merchant_id(
&conn,
attempt_id,
merchant_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}

async fn update_dynamic_routing_stats(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
data: storage::DynamicRoutingStatsUpdate,
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::DynamicRoutingStats::update(&conn, attempt_id, merchant_id, data)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
}

#[async_trait::async_trait]
Expand All @@ -42,6 +82,23 @@ impl DynamicRoutingStatsInterface for MockDb {
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}

async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
&self,
_attempt_id: String,
_merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}

async fn update_dynamic_routing_stats(
&self,
_attempt_id: String,
_merchant_id: &common_utils::id_type::MerchantId,
_data: storage::DynamicRoutingStatsUpdate,
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}

#[async_trait::async_trait]
Expand All @@ -55,4 +112,25 @@ impl DynamicRoutingStatsInterface for KafkaStore {
.insert_dynamic_routing_stat_entry(dynamic_routing_stat)
.await
}

async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
self.diesel_store
.find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(attempt_id, merchant_id)
.await
}

async fn update_dynamic_routing_stats(
&self,
attempt_id: String,
merchant_id: &common_utils::id_type::MerchantId,
data: storage::DynamicRoutingStatsUpdate,
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
self.diesel_store
.update_dynamic_routing_stats(attempt_id, merchant_id, data)
.await
}
}
4 changes: 3 additions & 1 deletion crates/router/src/types/storage/dynamic_routing_stats.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub use diesel_models::dynamic_routing_stats::{DynamicRoutingStats, DynamicRoutingStatsNew};
pub use diesel_models::dynamic_routing_stats::{
DynamicRoutingStats, DynamicRoutingStatsNew, DynamicRoutingStatsUpdate,
};
Loading