Skip to content

Commit 68aac34

Browse files
refactor(dynamic_routing): change insert operation to upsert for dynamic_routing_stats (#7398)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
1 parent a1ecce8 commit 68aac34

File tree

5 files changed

+219
-31
lines changed

5 files changed

+219
-31
lines changed

crates/diesel_models/src/dynamic_routing_stats.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use diesel::{Insertable, Queryable, Selectable};
1+
use diesel::{AsChangeset, Identifiable, Insertable, Queryable, Selectable};
22

33
use crate::schema::dynamic_routing_stats;
44

@@ -23,8 +23,8 @@ pub struct DynamicRoutingStatsNew {
2323
pub global_success_based_connector: Option<String>,
2424
}
2525

26-
#[derive(Clone, Debug, Eq, PartialEq, Queryable, Selectable, Insertable)]
27-
#[diesel(table_name = dynamic_routing_stats, primary_key(payment_id), check_for_backend(diesel::pg::Pg))]
26+
#[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Selectable, Insertable)]
27+
#[diesel(table_name = dynamic_routing_stats, primary_key(attempt_id, merchant_id), check_for_backend(diesel::pg::Pg))]
2828
pub struct DynamicRoutingStats {
2929
pub payment_id: common_utils::id_type::PaymentId,
3030
pub attempt_id: String,
@@ -43,3 +43,21 @@ pub struct DynamicRoutingStats {
4343
pub payment_method_type: Option<common_enums::PaymentMethodType>,
4444
pub global_success_based_connector: Option<String>,
4545
}
46+
47+
#[derive(
48+
Clone, Debug, Eq, PartialEq, AsChangeset, router_derive::DebugAsDisplay, serde::Deserialize,
49+
)]
50+
#[diesel(table_name = dynamic_routing_stats)]
51+
pub struct DynamicRoutingStatsUpdate {
52+
pub amount: common_utils::types::MinorUnit,
53+
pub success_based_routing_connector: String,
54+
pub payment_connector: String,
55+
pub currency: Option<common_enums::Currency>,
56+
pub payment_method: Option<common_enums::PaymentMethod>,
57+
pub capture_method: Option<common_enums::CaptureMethod>,
58+
pub authentication_type: Option<common_enums::AuthenticationType>,
59+
pub payment_status: common_enums::AttemptStatus,
60+
pub conclusive_classification: common_enums::SuccessBasedRoutingConclusiveState,
61+
pub payment_method_type: Option<common_enums::PaymentMethodType>,
62+
pub global_success_based_connector: Option<String>,
63+
}
Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1+
use diesel::{associations::HasTable, BoolExpressionMethods, ExpressionMethods};
2+
use error_stack::report;
3+
14
use super::generics;
25
use crate::{
3-
dynamic_routing_stats::{DynamicRoutingStats, DynamicRoutingStatsNew},
6+
dynamic_routing_stats::{
7+
DynamicRoutingStats, DynamicRoutingStatsNew, DynamicRoutingStatsUpdate,
8+
},
9+
errors,
10+
schema::dynamic_routing_stats::dsl,
411
PgPooledConn, StorageResult,
512
};
613

@@ -9,3 +16,46 @@ impl DynamicRoutingStatsNew {
916
generics::generic_insert(conn, self).await
1017
}
1118
}
19+
20+
impl DynamicRoutingStats {
21+
pub async fn find_optional_by_attempt_id_merchant_id(
22+
conn: &PgPooledConn,
23+
attempt_id: String,
24+
merchant_id: &common_utils::id_type::MerchantId,
25+
) -> StorageResult<Option<Self>> {
26+
generics::generic_find_one_optional::<<Self as HasTable>::Table, _, _>(
27+
conn,
28+
dsl::merchant_id
29+
.eq(merchant_id.to_owned())
30+
.and(dsl::attempt_id.eq(attempt_id.to_owned())),
31+
)
32+
.await
33+
}
34+
35+
pub async fn update(
36+
conn: &PgPooledConn,
37+
attempt_id: String,
38+
merchant_id: &common_utils::id_type::MerchantId,
39+
dynamic_routing_stat: DynamicRoutingStatsUpdate,
40+
) -> StorageResult<Self> {
41+
generics::generic_update_with_results::<
42+
<Self as HasTable>::Table,
43+
DynamicRoutingStatsUpdate,
44+
_,
45+
_,
46+
>(
47+
conn,
48+
dsl::merchant_id
49+
.eq(merchant_id.to_owned())
50+
.and(dsl::attempt_id.eq(attempt_id.to_owned())),
51+
dynamic_routing_stat,
52+
)
53+
.await?
54+
.first()
55+
.cloned()
56+
.ok_or_else(|| {
57+
report!(errors::DatabaseError::NotFound)
58+
.attach_printable("Error while updating dynamic_routing_stats entry")
59+
})
60+
}
61+
}

crates/router/src/core/routing/helpers.rs

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use common_utils::ext_traits::ValueExt;
1414
use common_utils::{ext_traits::Encode, id_type, types::keymanager::KeyManagerState};
1515
use diesel_models::configs;
1616
#[cfg(all(feature = "v1", feature = "dynamic_routing"))]
17-
use diesel_models::dynamic_routing_stats::DynamicRoutingStatsNew;
17+
use diesel_models::dynamic_routing_stats::{DynamicRoutingStatsNew, DynamicRoutingStatsUpdate};
1818
#[cfg(all(feature = "dynamic_routing", feature = "v1"))]
1919
use diesel_models::routing_algorithm;
2020
use error_stack::ResultExt;
@@ -821,28 +821,6 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
821821
first_merchant_success_based_connector_label.to_string(),
822822
);
823823

824-
let dynamic_routing_stats = DynamicRoutingStatsNew {
825-
payment_id: payment_attempt.payment_id.to_owned(),
826-
attempt_id: payment_attempt.attempt_id.clone(),
827-
merchant_id: payment_attempt.merchant_id.to_owned(),
828-
profile_id: payment_attempt.profile_id.to_owned(),
829-
amount: payment_attempt.get_total_amount(),
830-
success_based_routing_connector: first_merchant_success_based_connector_label
831-
.to_string(),
832-
payment_connector: payment_connector.to_string(),
833-
payment_method_type: payment_attempt.payment_method_type,
834-
currency: payment_attempt.currency,
835-
payment_method: payment_attempt.payment_method,
836-
capture_method: payment_attempt.capture_method,
837-
authentication_type: payment_attempt.authentication_type,
838-
payment_status: payment_attempt.status,
839-
conclusive_classification: outcome,
840-
created_at: common_utils::date_time::now(),
841-
global_success_based_connector: Some(
842-
first_global_success_based_connector.label.to_string(),
843-
),
844-
};
845-
846824
core_metrics::DYNAMIC_SUCCESS_BASED_ROUTING.add(
847825
1,
848826
router_env::metric_attributes!(
@@ -915,12 +893,74 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
915893
);
916894
logger::debug!("successfully pushed success_based_routing metrics");
917895

918-
state
896+
let duplicate_stats = state
919897
.store
920-
.insert_dynamic_routing_stat_entry(dynamic_routing_stats)
898+
.find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
899+
payment_attempt.attempt_id.clone(),
900+
&payment_attempt.merchant_id.to_owned(),
901+
)
921902
.await
922903
.change_context(errors::ApiErrorResponse::InternalServerError)
923-
.attach_printable("Unable to push dynamic routing stats to db")?;
904+
.attach_printable("Failed to fetch dynamic_routing_stats entry")?;
905+
906+
if duplicate_stats.is_some() {
907+
let dynamic_routing_update = DynamicRoutingStatsUpdate {
908+
amount: payment_attempt.get_total_amount(),
909+
success_based_routing_connector: first_merchant_success_based_connector_label
910+
.to_string(),
911+
payment_connector: payment_connector.to_string(),
912+
payment_method_type: payment_attempt.payment_method_type,
913+
currency: payment_attempt.currency,
914+
payment_method: payment_attempt.payment_method,
915+
capture_method: payment_attempt.capture_method,
916+
authentication_type: payment_attempt.authentication_type,
917+
payment_status: payment_attempt.status,
918+
conclusive_classification: outcome,
919+
global_success_based_connector: Some(
920+
first_global_success_based_connector.label.to_string(),
921+
),
922+
};
923+
924+
state
925+
.store
926+
.update_dynamic_routing_stats(
927+
payment_attempt.attempt_id.clone(),
928+
&payment_attempt.merchant_id.to_owned(),
929+
dynamic_routing_update,
930+
)
931+
.await
932+
.change_context(errors::ApiErrorResponse::InternalServerError)
933+
.attach_printable("Unable to update dynamic routing stats to db")?;
934+
} else {
935+
let dynamic_routing_stats = DynamicRoutingStatsNew {
936+
payment_id: payment_attempt.payment_id.to_owned(),
937+
attempt_id: payment_attempt.attempt_id.clone(),
938+
merchant_id: payment_attempt.merchant_id.to_owned(),
939+
profile_id: payment_attempt.profile_id.to_owned(),
940+
amount: payment_attempt.get_total_amount(),
941+
success_based_routing_connector: first_merchant_success_based_connector_label
942+
.to_string(),
943+
payment_connector: payment_connector.to_string(),
944+
payment_method_type: payment_attempt.payment_method_type,
945+
currency: payment_attempt.currency,
946+
payment_method: payment_attempt.payment_method,
947+
capture_method: payment_attempt.capture_method,
948+
authentication_type: payment_attempt.authentication_type,
949+
payment_status: payment_attempt.status,
950+
conclusive_classification: outcome,
951+
created_at: common_utils::date_time::now(),
952+
global_success_based_connector: Some(
953+
first_global_success_based_connector.label.to_string(),
954+
),
955+
};
956+
957+
state
958+
.store
959+
.insert_dynamic_routing_stat_entry(dynamic_routing_stats)
960+
.await
961+
.change_context(errors::ApiErrorResponse::InternalServerError)
962+
.attach_printable("Unable to push dynamic routing stats to db")?;
963+
};
924964

925965
client
926966
.update_success_rate(

crates/router/src/db/dynamic_routing_stats.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@ pub trait DynamicRoutingStatsInterface {
1616
&self,
1717
dynamic_routing_stat_new: storage::DynamicRoutingStatsNew,
1818
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError>;
19+
20+
async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
21+
&self,
22+
attempt_id: String,
23+
merchant_id: &common_utils::id_type::MerchantId,
24+
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError>;
25+
26+
async fn update_dynamic_routing_stats(
27+
&self,
28+
attempt_id: String,
29+
merchant_id: &common_utils::id_type::MerchantId,
30+
data: storage::DynamicRoutingStatsUpdate,
31+
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError>;
1932
}
2033

2134
#[async_trait::async_trait]
@@ -31,6 +44,33 @@ impl DynamicRoutingStatsInterface for Store {
3144
.await
3245
.map_err(|error| report!(errors::StorageError::from(error)))
3346
}
47+
48+
async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
49+
&self,
50+
attempt_id: String,
51+
merchant_id: &common_utils::id_type::MerchantId,
52+
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
53+
let conn = connection::pg_connection_write(self).await?;
54+
storage::DynamicRoutingStats::find_optional_by_attempt_id_merchant_id(
55+
&conn,
56+
attempt_id,
57+
merchant_id,
58+
)
59+
.await
60+
.map_err(|error| report!(errors::StorageError::from(error)))
61+
}
62+
63+
async fn update_dynamic_routing_stats(
64+
&self,
65+
attempt_id: String,
66+
merchant_id: &common_utils::id_type::MerchantId,
67+
data: storage::DynamicRoutingStatsUpdate,
68+
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
69+
let conn = connection::pg_connection_write(self).await?;
70+
storage::DynamicRoutingStats::update(&conn, attempt_id, merchant_id, data)
71+
.await
72+
.map_err(|error| report!(errors::StorageError::from(error)))
73+
}
3474
}
3575

3676
#[async_trait::async_trait]
@@ -42,6 +82,23 @@ impl DynamicRoutingStatsInterface for MockDb {
4282
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
4383
Err(errors::StorageError::MockDbError)?
4484
}
85+
86+
async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
87+
&self,
88+
_attempt_id: String,
89+
_merchant_id: &common_utils::id_type::MerchantId,
90+
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
91+
Err(errors::StorageError::MockDbError)?
92+
}
93+
94+
async fn update_dynamic_routing_stats(
95+
&self,
96+
_attempt_id: String,
97+
_merchant_id: &common_utils::id_type::MerchantId,
98+
_data: storage::DynamicRoutingStatsUpdate,
99+
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
100+
Err(errors::StorageError::MockDbError)?
101+
}
45102
}
46103

47104
#[async_trait::async_trait]
@@ -55,4 +112,25 @@ impl DynamicRoutingStatsInterface for KafkaStore {
55112
.insert_dynamic_routing_stat_entry(dynamic_routing_stat)
56113
.await
57114
}
115+
116+
async fn find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(
117+
&self,
118+
attempt_id: String,
119+
merchant_id: &common_utils::id_type::MerchantId,
120+
) -> CustomResult<Option<storage::DynamicRoutingStats>, errors::StorageError> {
121+
self.diesel_store
122+
.find_dynamic_routing_stats_optional_by_attempt_id_merchant_id(attempt_id, merchant_id)
123+
.await
124+
}
125+
126+
async fn update_dynamic_routing_stats(
127+
&self,
128+
attempt_id: String,
129+
merchant_id: &common_utils::id_type::MerchantId,
130+
data: storage::DynamicRoutingStatsUpdate,
131+
) -> CustomResult<storage::DynamicRoutingStats, errors::StorageError> {
132+
self.diesel_store
133+
.update_dynamic_routing_stats(attempt_id, merchant_id, data)
134+
.await
135+
}
58136
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
pub use diesel_models::dynamic_routing_stats::{DynamicRoutingStats, DynamicRoutingStatsNew};
1+
pub use diesel_models::dynamic_routing_stats::{
2+
DynamicRoutingStats, DynamicRoutingStatsNew, DynamicRoutingStatsUpdate,
3+
};

0 commit comments

Comments
 (0)