Skip to content

Commit

Permalink
Split file and in memory db (#1616)
Browse files Browse the repository at this point in the history
* Split file and in memory db

* Fix in memory db connection problem

* DbExecutor max pool size is set only for in memory database

* Fix: default pool size was still set to 1

Co-authored-by: Przemysław Rekucki <[email protected]>
  • Loading branch information
nieznanysprawiciel and prekucki authored Oct 19, 2021
1 parent 9907f09 commit 1047620
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 113 deletions.
2 changes: 2 additions & 0 deletions core/market/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ pub(crate) mod migrations {
}

pub(crate) use ya_persistence::executor::Error as DbError;
pub(crate) use ya_persistence::executor::{AsMixedDao, DbMixedExecutor};

pub(crate) type DbResult<T> = Result<T, DbError>;
28 changes: 21 additions & 7 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use diesel::prelude::*;

use ya_client::model::market::Reason;
use ya_client::model::NodeId;
use ya_persistence::executor::{do_with_transaction, AsDao, ConnType, PoolType};
use ya_persistence::executor::{do_with_transaction, readonly_transaction, ConnType, PoolType};

use crate::config::DbConfig;
use crate::db::dao::agreement_events::create_event;
Expand All @@ -17,7 +17,7 @@ use crate::db::schema::market_agreement::dsl as agreement;
use crate::db::schema::market_agreement::dsl::market_agreement;
use crate::db::schema::market_agreement_event::dsl as event;
use crate::db::schema::market_agreement_event::dsl::market_agreement_event;
use crate::db::{DbError, DbResult};
use crate::db::{AsMixedDao, DbError, DbResult};

#[derive(thiserror::Error, Debug)]
pub enum SaveAgreementError {
Expand All @@ -31,11 +31,15 @@ pub enum SaveAgreementError {

pub struct AgreementDao<'c> {
pool: &'c PoolType,
ram_pool: &'c PoolType,
}

impl<'a> AsDao<'a> for AgreementDao<'a> {
fn as_dao(pool: &'a PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for AgreementDao<'a> {
fn as_dao(disk_pool: &'a PoolType, ram_pool: &'a PoolType) -> Self {
Self {
pool: disk_pool,
ram_pool,
}
}
}

Expand Down Expand Up @@ -130,12 +134,18 @@ impl<'c> AgreementDao<'c> {

pub async fn save(&self, agreement: Agreement) -> Result<Agreement, SaveAgreementError> {
// Agreement is always created for last Provider Proposal.
// TODO: Accessing two databases can cause race conditions in some edge cases.
let proposal_id = agreement.offer_proposal_id.clone();
do_with_transaction(self.pool, move |conn| {
readonly_transaction(self.ram_pool, move |conn| {
if has_counter_proposal(conn, &proposal_id)? {
return Err(SaveAgreementError::ProposalCountered(proposal_id.clone()));
}
Ok(())
})
.await?;

let proposal_id = agreement.offer_proposal_id.clone();
let agreement = do_with_transaction(self.pool, move |conn| {
if let Some(agreement) = find_agreement_for_proposal(conn, &proposal_id)? {
return Err(SaveAgreementError::Exists(
agreement.id,
Expand All @@ -146,8 +156,12 @@ impl<'c> AgreementDao<'c> {
diesel::insert_into(market_agreement)
.values(&agreement)
.execute(conn)?;
Ok(agreement)
})
.await?;

update_proposal_state(conn, &proposal_id, ProposalState::Accepted)?;
do_with_transaction(self.ram_pool, move |conn| {
update_proposal_state(conn, &agreement.offer_proposal_id, ProposalState::Accepted)?;
Ok(agreement)
})
.await
Expand Down
10 changes: 5 additions & 5 deletions core/market/src/db/dao/agreement_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};

use ya_client::model::market::Reason;
use ya_client::model::NodeId;
use ya_persistence::executor::PoolType;
use ya_persistence::executor::{readonly_transaction, ConnType};
use ya_persistence::executor::{AsDao, PoolType};

use crate::db::dao::AgreementDaoError;
use crate::db::model::{Agreement, AgreementEvent, AgreementId, NewAgreementEvent};
Expand All @@ -13,15 +13,15 @@ use crate::db::schema::market_agreement::dsl as agreement;
use crate::db::schema::market_agreement::dsl::market_agreement;
use crate::db::schema::market_agreement_event::dsl as event;
use crate::db::schema::market_agreement_event::dsl::market_agreement_event;
use crate::db::DbResult;
use crate::db::{AsMixedDao, DbResult};

pub struct AgreementEventsDao<'c> {
pool: &'c PoolType,
}

impl<'c> AsDao<'c> for AgreementEventsDao<'c> {
fn as_dao(pool: &'c PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for AgreementEventsDao<'a> {
fn as_dao(disk_pool: &'a PoolType, _ram_pool: &'a PoolType) -> Self {
Self { pool: disk_pool }
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/market/src/db/dao/cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::config::DbConfig;
use crate::db::dao::{AgreementDao, DemandDao, NegotiationEventsDao, OfferDao, ProposalDao};
use futures::join;
use tokio::time;
use ya_persistence::executor::DbExecutor;

pub async fn clean(db: DbExecutor, cfg: &DbConfig) {
use crate::config::DbConfig;
use crate::db::dao::{AgreementDao, DemandDao, NegotiationEventsDao, OfferDao, ProposalDao};
use crate::db::DbMixedExecutor;

pub async fn clean(db: DbMixedExecutor, cfg: &DbConfig) {
let demand_db = db.clone();
let events_db = db.clone();
let offer_db = db.clone();
Expand All @@ -27,7 +28,7 @@ pub async fn clean(db: DbExecutor, cfg: &DbConfig) {
}
}

pub async fn clean_forever(db: DbExecutor, cfg: DbConfig) {
pub async fn clean_forever(db: DbMixedExecutor, cfg: DbConfig) {
let mut interval = time::interval(cfg.cleanup_interval);
loop {
interval.tick().await;
Expand Down
10 changes: 5 additions & 5 deletions core/market/src/db/dao/demand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};

use ya_client::model::NodeId;
use ya_persistence::executor::ConnType;
use ya_persistence::executor::{do_with_transaction, readonly_transaction, AsDao, PoolType};
use ya_persistence::executor::{do_with_transaction, readonly_transaction, PoolType};

use crate::db::model::{Demand, SubscriptionId};
use crate::db::schema::market_demand::dsl;
use crate::db::{DbError, DbResult};
use crate::db::{AsMixedDao, DbError, DbResult};

#[allow(unused)]
pub struct DemandDao<'c> {
pool: &'c PoolType,
}

impl<'c> AsDao<'c> for DemandDao<'c> {
fn as_dao(pool: &'c PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for DemandDao<'a> {
fn as_dao(_disk_pool: &'a PoolType, ram_pool: &'a PoolType) -> Self {
Self { pool: ram_pool }
}
}

Expand Down
10 changes: 5 additions & 5 deletions core/market/src/db/dao/negotiation_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use thiserror::Error;

use ya_client::model::market::Reason;
use ya_persistence::executor::ConnType;
use ya_persistence::executor::{do_with_transaction, AsDao, PoolType};
use ya_persistence::executor::{do_with_transaction, PoolType};

use crate::config::DbConfig;
use crate::db::dao::demand::{demand_status, DemandState};
use crate::db::dao::offer::{query_state, OfferState};
use crate::db::dao::sql_functions::datetime;
use crate::db::model::{Agreement, EventType, MarketEvent, Owner, Proposal, SubscriptionId};
use crate::db::schema::market_negotiation_event::dsl;
use crate::db::{DbError, DbResult};
use crate::db::{AsMixedDao, DbError, DbResult};

#[derive(Error, Debug)]
pub enum TakeEventsError {
Expand All @@ -29,9 +29,9 @@ pub struct NegotiationEventsDao<'c> {
pool: &'c PoolType,
}

impl<'c> AsDao<'c> for NegotiationEventsDao<'c> {
fn as_dao(pool: &'c PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for NegotiationEventsDao<'a> {
fn as_dao(_disk_pool: &'a PoolType, ram_pool: &'a PoolType) -> Self {
Self { pool: ram_pool }
}
}

Expand Down
12 changes: 5 additions & 7 deletions core/market/src/db/dao/offer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@ use diesel::expression::dsl::now as sql_now;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};

use ya_client::model::NodeId;
use ya_persistence::executor::{
do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType,
};
use ya_persistence::executor::{do_with_transaction, readonly_transaction, ConnType, PoolType};

use crate::db::model::SubscriptionId;
use crate::db::model::{Offer, OfferUnsubscribed};
use crate::db::schema::market_offer::dsl as offer;
use crate::db::schema::market_offer::dsl::market_offer;
use crate::db::schema::market_offer_unsubscribed::dsl as unsubscribed;
use crate::db::schema::market_offer_unsubscribed::dsl::market_offer_unsubscribed;
use crate::db::{DbError, DbResult};
use crate::db::{AsMixedDao, DbError, DbResult};

pub struct OfferDao<'c> {
pool: &'c PoolType,
}

impl<'c> AsDao<'c> for OfferDao<'c> {
fn as_dao(pool: &'c PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for OfferDao<'a> {
fn as_dao(_disk_pool: &'a PoolType, ram_pool: &'a PoolType) -> Self {
Self { pool: ram_pool }
}
}

Expand Down
12 changes: 5 additions & 7 deletions core/market/src/db/dao/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ use diesel::expression::dsl::now as sql_now;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use serde::{Deserialize, Serialize};

use ya_persistence::executor::{
do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType,
};
use ya_persistence::executor::{do_with_transaction, readonly_transaction, ConnType, PoolType};

use crate::db::model::{DbProposal, Negotiation, Proposal, ProposalId, ProposalState};
use crate::db::schema::market_negotiation::dsl as dsl_negotiation;
use crate::db::schema::market_proposal::dsl;
use crate::db::{DbError, DbResult};
use crate::db::{AsMixedDao, DbError, DbResult};

#[derive(thiserror::Error, Debug)]
pub enum SaveProposalError {
Expand All @@ -34,9 +32,9 @@ pub struct ProposalDao<'c> {
pool: &'c PoolType,
}

impl<'c> AsDao<'c> for ProposalDao<'c> {
fn as_dao(pool: &'c PoolType) -> Self {
Self { pool }
impl<'a> AsMixedDao<'a> for ProposalDao<'a> {
fn as_dao(_disk_pool: &'a PoolType, ram_pool: &'a PoolType) -> Self {
Self { pool: ram_pool }
}
}

Expand Down
13 changes: 8 additions & 5 deletions core/market/src/db/model/negotiation_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use ya_client::model::market::event::{ProviderEvent, RequestorEvent};
use ya_client::model::market::{Agreement as ClientAgreement, Proposal as ClientProposal, Reason};
use ya_client::model::ErrorMessage;
use ya_diesel_utils::DbTextField;
use ya_persistence::executor::DbExecutor;

use super::SubscriptionId;
use crate::db::dao::{AgreementDao, ProposalDao};
use crate::db::model::agreement_events::DbReason;
use crate::db::model::{Agreement, AgreementId, Owner, Proposal, ProposalId};
use crate::db::schema::market_negotiation_event;
use crate::db::DbMixedExecutor;

#[derive(Error, Debug)]
pub enum EventError {
Expand Down Expand Up @@ -112,7 +112,7 @@ impl MarketEvent {

pub async fn into_client_requestor_event(
self,
db: &DbExecutor,
db: &DbMixedExecutor,
) -> Result<RequestorEvent, EventError> {
let event_date = DateTime::<Utc>::from_utc(self.timestamp, Utc);
match self.event_type {
Expand All @@ -136,7 +136,7 @@ impl MarketEvent {
}
}

async fn into_client_proposal(self, db: DbExecutor) -> Result<ClientProposal, EventError> {
async fn into_client_proposal(self, db: DbMixedExecutor) -> Result<ClientProposal, EventError> {
let prop = db
.as_dao::<ProposalDao>()
.get_proposal(&self.artifact_id)
Expand All @@ -147,7 +147,10 @@ impl MarketEvent {
Ok(prop.into_client()?)
}

async fn into_client_agreement(self, db: DbExecutor) -> Result<ClientAgreement, EventError> {
async fn into_client_agreement(
self,
db: DbMixedExecutor,
) -> Result<ClientAgreement, EventError> {
let agreement = db
.as_dao::<AgreementDao>()
.select(&self.artifact_id, None, Utc::now().naive_utc())
Expand All @@ -160,7 +163,7 @@ impl MarketEvent {

pub async fn into_client_provider_event(
self,
db: &DbExecutor,
db: &DbMixedExecutor,
) -> Result<ProviderEvent, EventError> {
let event_date = DateTime::<Utc>::from_utc(self.timestamp, Utc);
match self.event_type {
Expand Down
21 changes: 14 additions & 7 deletions core/market/src/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use ya_client::model::market::{
Reason,
};
use ya_core_model::market::{local, BUS_ID};
use ya_persistence::executor::DbExecutor;
use ya_service_api_interfaces::{Provider, Service};
use ya_service_api_web::middleware::Identity;

use crate::db::DbMixedExecutor;
use ya_service_api_web::scope::ExtendableScope;

pub mod agreement;
Expand Down Expand Up @@ -58,19 +58,21 @@ pub enum MarketInitError {
Migration(#[from] anyhow::Error),
#[error("Failed to initialize config. Error: {0}.")]
Config(#[from] structopt::clap::Error),
#[error("Failed to initialize in memory market database. Error: {0}.")]
InMemory(anyhow::Error),
}

/// Structure connecting all market objects.
pub struct MarketService {
pub db: DbExecutor,
pub db: DbMixedExecutor,
pub matcher: Matcher,
pub provider_engine: ProviderBroker,
pub requestor_engine: RequestorBroker,
}

impl MarketService {
pub fn new(
db: &DbExecutor,
db: &DbMixedExecutor,
identity_api: Arc<dyn IdentityApi>,
config: Arc<Config>,
) -> Result<Self, MarketInitError> {
Expand All @@ -81,7 +83,10 @@ impl MarketService {
counter!("market.demands.unsubscribed", 0);
counter!("market.demands.expired", 0);

db.apply_migration(crate::db::migrations::run_with_output)?;
db.ram_db
.apply_migration(crate::db::migrations::run_with_output)?;
db.disk_db
.apply_migration(crate::db::migrations::run_with_output)?;

let store = SubscriptionStore::new(db.clone(), config.clone());
let (matcher, listeners) = Matcher::new(store.clone(), identity_api, config.clone())?;
Expand Down Expand Up @@ -132,12 +137,14 @@ impl MarketService {
Ok(())
}

pub async fn gsb<Context: Provider<Self, DbExecutor>>(ctx: &Context) -> anyhow::Result<()> {
pub async fn gsb<Context: Provider<Self, DbMixedExecutor>>(
ctx: &Context,
) -> anyhow::Result<()> {
let market = MARKET.get_or_init_market(&ctx.component())?;
Ok(market.bind_gsb(BUS_ID, local::BUS_ID).await?)
}

pub fn rest<Context: Provider<Self, DbExecutor>>(ctx: &Context) -> actix_web::Scope {
pub fn rest<Context: Provider<Self, DbMixedExecutor>>(ctx: &Context) -> actix_web::Scope {
match MARKET.get_or_init_market(&ctx.component()) {
Ok(market) => MarketService::bind_rest(market),
Err(e) => {
Expand Down Expand Up @@ -298,7 +305,7 @@ impl StaticMarket {

pub fn get_or_init_market(
&self,
db: &DbExecutor,
db: &DbMixedExecutor,
) -> Result<Arc<MarketService>, MarketInitError> {
let mut guarded_market = self.locked_market.lock().unwrap();
if let Some(market) = &*guarded_market {
Expand Down
Loading

0 comments on commit 1047620

Please sign in to comment.