From d7ebb404271cf54723e75d7611aeb6d43169f793 Mon Sep 17 00:00:00 2001 From: Dima Pristupa Date: Fri, 20 Dec 2024 22:57:47 +0200 Subject: [PATCH] `DatasetOwnershipService`: moved to the `kamu-dataset` area & implemented via `DatasetEntryServiceImpl` (#1004) * DatasetOwnershipService: use odf namespace * DatasetEntryServiceImpl: impl DatasetOwnershipService * DatasetOwnershipService: move to kamu-datasets scope * CHANGELOG.md: update --- CHANGELOG.md | 1 + Cargo.lock | 4 +- .../dataset_mut/dataset_mut_utils.rs | 2 +- .../flows_mut/account_flow_configs_mut.rs | 2 +- .../queries/accounts/account_flow_configs.rs | 2 +- .../tests/test_gql_account_flow_configs.rs | 12 +- .../tests/tests/test_gql_dataset_flow_runs.rs | 10 +- src/adapter/graphql/tests/utils/auth_utils.rs | 18 +- .../http/tests/harness/client_side_harness.rs | 8 +- src/app/cli/src/app.rs | 3 - src/domain/core/src/services/mod.rs | 2 - .../src/services/dataset_entry_service.rs | 2 + .../src/services/dataset_ownership_service.rs | 17 +- .../datasets/domain/src/services/mod.rs | 2 + .../src/dataset_entry_service_impl.rs | 67 ++- .../tests/tests/test_dataset_entry_service.rs | 4 +- src/domain/flow-system/services/Cargo.toml | 3 +- .../services/src/flow/flow_agent_impl.rs | 2 +- .../src/flow/flow_query_service_impl.rs | 2 +- .../src/flow/flow_scheduling_helper.rs | 51 ++- .../tests/tests/test_flow_agent_impl.rs | 243 +++++----- .../tests/tests/utils/flow_harness_shared.rs | 140 +++--- .../services/tests/tests/utils/task_driver.rs | 19 +- src/infra/core/Cargo.toml | 8 +- .../dataset_ownership_service_inmem.rs | 300 ------------- src/infra/core/src/services/mod.rs | 2 - .../core/src/testing/base_repo_harness.rs | 7 +- ...ate_dataset_from_snapshot_use_case_impl.rs | 13 +- .../use_cases/create_dataset_use_case_impl.rs | 13 +- src/infra/core/tests/tests/mod.rs | 1 - .../test_dataset_ownership_service_inmem.rs | 273 ------------ .../tests/tests/test_entries_streamer.rs | 414 ------------------ 32 files changed, 392 insertions(+), 1255 deletions(-) rename src/domain/{core => datasets/domain}/src/services/dataset_ownership_service.rs (69%) delete mode 100644 src/infra/core/src/services/dataset_ownership_service_inmem.rs delete mode 100644 src/infra/core/tests/tests/test_dataset_ownership_service_inmem.rs delete mode 100644 src/utils/database-common/tests/tests/test_entries_streamer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 467c1e7c85..6481f3e57a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Recommendation: for ease of reading, use the following order: - OSO: added resource storage for access speed - E2E: Using the correct account in multi-tenant mode - And also the possibility of set it up + - `DatasetOwnershipService`: moved to the `kamu-dataset` crate area & implemented via `DatasetEntryServiceImpl` ## [0.213.1] - 2024-12-18 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 267a57d3a5..f60685f67d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5263,7 +5263,6 @@ dependencies = [ "curl", "curl-sys", "dashmap", - "database-common", "datafusion", "datafusion-ethers", "datafusion-functions-json", @@ -5284,8 +5283,6 @@ dependencies = [ "itertools 0.13.0", "kamu", "kamu-accounts", - "kamu-accounts-inmem", - "kamu-accounts-services", "kamu-core", "kamu-data-utils", "kamu-datasets", @@ -6358,6 +6355,7 @@ dependencies = [ "kamu-accounts-inmem", "kamu-accounts-services", "kamu-core", + "kamu-datasets", "kamu-datasets-inmem", "kamu-datasets-services", "kamu-flow-system", diff --git a/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut_utils.rs b/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut_utils.rs index b156d92dbe..4046ac95f2 100644 --- a/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut_utils.rs +++ b/src/adapter/graphql/src/mutations/dataset_mut/dataset_mut_utils.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use kamu_core::DatasetOwnershipService; +use kamu_datasets::DatasetOwnershipService; use opendatafabric as odf; use crate::prelude::*; diff --git a/src/adapter/graphql/src/mutations/flows_mut/account_flow_configs_mut.rs b/src/adapter/graphql/src/mutations/flows_mut/account_flow_configs_mut.rs index 32769e6d02..32fa4b8c08 100644 --- a/src/adapter/graphql/src/mutations/flows_mut/account_flow_configs_mut.rs +++ b/src/adapter/graphql/src/mutations/flows_mut/account_flow_configs_mut.rs @@ -10,7 +10,7 @@ use chrono::Utc; use fs::FlowConfigurationService; use kamu_accounts::Account; -use kamu_core::DatasetOwnershipService; +use kamu_datasets::DatasetOwnershipService; use kamu_flow_system as fs; use opendatafabric::DatasetID; diff --git a/src/adapter/graphql/src/queries/accounts/account_flow_configs.rs b/src/adapter/graphql/src/queries/accounts/account_flow_configs.rs index d9543d2df6..04632fcbd6 100644 --- a/src/adapter/graphql/src/queries/accounts/account_flow_configs.rs +++ b/src/adapter/graphql/src/queries/accounts/account_flow_configs.rs @@ -9,7 +9,7 @@ use futures::StreamExt; use kamu_accounts::Account as AccountEntity; -use kamu_core::DatasetOwnershipService; +use kamu_datasets::DatasetOwnershipService; use kamu_flow_system::FlowConfigurationService; use crate::prelude::*; diff --git a/src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs b/src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs index f5562d189b..ff68a05960 100644 --- a/src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs @@ -17,9 +17,6 @@ use indoc::indoc; use kamu::testing::{MetadataFactory, MockDatasetActionAuthorizer, MockDatasetChangesService}; use kamu::{ CreateDatasetFromSnapshotUseCaseImpl, - DatasetOwnershipServiceInMemory, - DatasetOwnershipServiceInMemoryStateInitializer, - DatasetRegistryRepoBridge, DatasetRepositoryLocalFs, DatasetRepositoryWriter, MetadataQueryServiceImpl, @@ -28,8 +25,8 @@ use kamu_accounts::{JwtAuthenticationConfig, DEFAULT_ACCOUNT_NAME, DEFAULT_ACCOU use kamu_accounts_inmem::InMemoryAccessTokenRepository; use kamu_accounts_services::{AccessTokenServiceImpl, AuthenticationServiceImpl}; use kamu_core::*; -use kamu_datasets_inmem::InMemoryDatasetDependencyRepository; -use kamu_datasets_services::DependencyGraphServiceImpl; +use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEntryRepository}; +use kamu_datasets_services::{DatasetEntryServiceImpl, DependencyGraphServiceImpl}; use kamu_flow_system::FlowAgentConfig; use kamu_flow_system_inmem::{InMemoryFlowConfigurationEventStore, InMemoryFlowEventStore}; use kamu_task_system_inmem::InMemoryTaskEventStore; @@ -643,7 +640,6 @@ impl FlowConfigHarness { .add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir)) .bind::() .bind::() - .add::() .add::() .add::() .add_value(dataset_changes_mock) @@ -665,8 +661,8 @@ impl FlowConfigHarness { )) .add::() .add::() - .add::() - .add::() + .add::() + .add::() .add::(); NoOpDatabasePlugin::init_database_components(&mut b); diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs index b593f0bc6b..510a160567 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs @@ -18,8 +18,6 @@ use indoc::indoc; use kamu::testing::{MetadataFactory, MockDatasetChangesService}; use kamu::{ CreateDatasetFromSnapshotUseCaseImpl, - DatasetOwnershipServiceInMemory, - DatasetRegistryRepoBridge, DatasetRepositoryLocalFs, DatasetRepositoryWriter, MetadataQueryServiceImpl, @@ -47,8 +45,8 @@ use kamu_core::{ TenancyConfig, MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE, }; -use kamu_datasets_inmem::InMemoryDatasetDependencyRepository; -use kamu_datasets_services::DependencyGraphServiceImpl; +use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEntryRepository}; +use kamu_datasets_services::{DatasetEntryServiceImpl, DependencyGraphServiceImpl}; use kamu_flow_system::{ Flow, FlowAgentConfig, @@ -3111,7 +3109,6 @@ impl FlowRunsHarness { .add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir)) .bind::() .bind::() - .add::() .add::() .add::() .add_value(dataset_changes_mock) @@ -3132,7 +3129,8 @@ impl FlowRunsHarness { .add::() .add::() .add_value(JwtAuthenticationConfig::default()) - .add::() + .add::() + .add::() .add::(); NoOpDatabasePlugin::init_database_components(&mut b); diff --git a/src/adapter/graphql/tests/utils/auth_utils.rs b/src/adapter/graphql/tests/utils/auth_utils.rs index 84e5b53c68..ed3ae25fee 100644 --- a/src/adapter/graphql/tests/utils/auth_utils.rs +++ b/src/adapter/graphql/tests/utils/auth_utils.rs @@ -17,12 +17,6 @@ use kamu_adapter_graphql::ANONYMOUS_ACCESS_FORBIDDEN_MESSAGE; pub async fn authentication_catalogs( base_catalog: &dill::Catalog, ) -> (dill::Catalog, dill::Catalog) { - let catalog_anonymous = dill::CatalogBuilder::new_chained(base_catalog) - .add_value(CurrentAccountSubject::anonymous( - AnonymousAccountReason::NoAuthenticationProvided, - )) - .build(); - let current_account_subject = CurrentAccountSubject::new_test(); let mut predefined_accounts_config = PredefinedAccountsConfig::new(); @@ -36,12 +30,20 @@ pub async fn authentication_catalogs( panic!() } - let catalog_authorized = dill::CatalogBuilder::new_chained(base_catalog) + let base_auth_catalog = dill::CatalogBuilder::new_chained(base_catalog) .add::() .add::() .add::() + .add_value(predefined_accounts_config.clone()) + .build(); + + let catalog_anonymous = dill::CatalogBuilder::new_chained(&base_auth_catalog) + .add_value(CurrentAccountSubject::anonymous( + AnonymousAccountReason::NoAuthenticationProvided, + )) + .build(); + let catalog_authorized = dill::CatalogBuilder::new_chained(&base_auth_catalog) .add_value(current_account_subject) - .add_value(predefined_accounts_config) .build(); init_on_startup::run_startup_jobs(&catalog_authorized) diff --git a/src/adapter/http/tests/harness/client_side_harness.rs b/src/adapter/http/tests/harness/client_side_harness.rs index 442cee8a40..2719ffa35c 100644 --- a/src/adapter/http/tests/harness/client_side_harness.rs +++ b/src/adapter/http/tests/harness/client_side_harness.rs @@ -52,7 +52,7 @@ pub(crate) struct ClientSideHarness { catalog: dill::Catalog, pull_dataset_use_case: Arc, push_dataset_use_case: Arc, - access_token_resover: Arc, + access_token_resolver: Arc, options: ClientSideHarnessOptions, } @@ -162,7 +162,7 @@ impl ClientSideHarness { let pull_dataset_use_case = catalog.get_one::().unwrap(); let push_dataset_use_case = catalog.get_one::().unwrap(); - let access_token_resover = catalog + let access_token_resolver = catalog .get_one::() .unwrap(); @@ -171,7 +171,7 @@ impl ClientSideHarness { catalog, pull_dataset_use_case, push_dataset_use_case, - access_token_resover, + access_token_resolver, options, } } @@ -341,7 +341,7 @@ impl ClientSideHarness { let mut ws_url = url.odf_to_transport_protocol().unwrap(); ws_url.ensure_trailing_slash(); let maybe_access_token = self - .access_token_resover + .access_token_resolver .resolve_odf_dataset_access_token(&ws_url); ws_url = ws_url.join(method).unwrap(); diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index 77d13a6539..9555dae1f4 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -535,9 +535,6 @@ pub fn configure_server_catalog(base_catalog: &Catalog) -> CatalogBuilder { b.add::(); - b.add::(); - b.add::(); - kamu_task_system_services::register_dependencies(&mut b); b.add_value(kamu_flow_system_inmem::domain::FlowAgentConfig::new( diff --git a/src/domain/core/src/services/mod.rs b/src/domain/core/src/services/mod.rs index 289e2c27c8..734cd21372 100644 --- a/src/domain/core/src/services/mod.rs +++ b/src/domain/core/src/services/mod.rs @@ -23,7 +23,6 @@ pub use transform::*; pub use watermark::*; pub mod dataset_changes_service; -pub mod dataset_ownership_service; pub mod dataset_registry; pub mod dependency_graph_service; pub mod engine_provisioner; @@ -43,7 +42,6 @@ pub mod sync_service; pub mod verification_service; pub use dataset_changes_service::*; -pub use dataset_ownership_service::*; pub use dataset_registry::*; pub use dependency_graph_service::*; pub use engine_provisioner::*; diff --git a/src/domain/datasets/domain/src/services/dataset_entry_service.rs b/src/domain/datasets/domain/src/services/dataset_entry_service.rs index 0da7d15874..a142981c87 100644 --- a/src/domain/datasets/domain/src/services/dataset_entry_service.rs +++ b/src/domain/datasets/domain/src/services/dataset_entry_service.rs @@ -22,6 +22,8 @@ pub trait DatasetEntryService: Sync + Send { // TODO: Private Datasets: extract to DatasetEntryRegistry? fn all_entries(&self) -> DatasetEntryStream; + fn entries_owned_by(&self, owner_id: &odf::AccountID) -> DatasetEntryStream; + async fn list_all_entries( &self, pagination: PaginationOpts, diff --git a/src/domain/core/src/services/dataset_ownership_service.rs b/src/domain/datasets/domain/src/services/dataset_ownership_service.rs similarity index 69% rename from src/domain/core/src/services/dataset_ownership_service.rs rename to src/domain/datasets/domain/src/services/dataset_ownership_service.rs index 384b698c27..fe68361f74 100644 --- a/src/domain/core/src/services/dataset_ownership_service.rs +++ b/src/domain/datasets/domain/src/services/dataset_ownership_service.rs @@ -8,27 +8,26 @@ // by the Apache License, Version 2.0. use internal_error::InternalError; -use opendatafabric::{AccountID, DatasetID}; +use opendatafabric as odf; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// TODO: Private Datasets: replace with DatasetEntry-related service #[async_trait::async_trait] pub trait DatasetOwnershipService: Sync + Send { - async fn get_dataset_owners( + async fn get_dataset_owner( &self, - dataset_id: &DatasetID, - ) -> Result, InternalError>; + dataset_id: &odf::DatasetID, + ) -> Result; async fn get_owned_datasets( &self, - account_id: &AccountID, - ) -> Result, InternalError>; + account_id: &odf::AccountID, + ) -> Result, InternalError>; async fn is_dataset_owned_by( &self, - dataset_id: &DatasetID, - account_id: &AccountID, + dataset_id: &odf::DatasetID, + account_id: &odf::AccountID, ) -> Result; } diff --git a/src/domain/datasets/domain/src/services/mod.rs b/src/domain/datasets/domain/src/services/mod.rs index a0b3ec926a..004333bdb6 100644 --- a/src/domain/datasets/domain/src/services/mod.rs +++ b/src/domain/datasets/domain/src/services/mod.rs @@ -10,7 +10,9 @@ mod dataset_entry_service; mod dataset_env_var_service; mod dataset_key_value_service; +mod dataset_ownership_service; pub use dataset_entry_service::*; pub use dataset_env_var_service::*; pub use dataset_key_value_service::*; +pub use dataset_ownership_service::*; diff --git a/src/domain/datasets/services/src/dataset_entry_service_impl.rs b/src/domain/datasets/services/src/dataset_entry_service_impl.rs index b1cbe6e4e6..a59b45c551 100644 --- a/src/domain/datasets/services/src/dataset_entry_service_impl.rs +++ b/src/domain/datasets/services/src/dataset_entry_service_impl.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use database_common::{EntityPageListing, EntityPageStreamer, PaginationOpts}; use dill::{component, interface, meta, Catalog}; -use internal_error::{InternalError, ResultIntoInternal}; +use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal}; use kamu_accounts::{AccountRepository, CurrentAccountSubject}; use kamu_core::{ DatasetHandleStream, @@ -68,6 +68,7 @@ struct AccountsCache { #[component(pub)] #[interface(dyn DatasetEntryService)] #[interface(dyn DatasetRegistry)] +#[interface(dyn DatasetOwnershipService)] #[interface(dyn MessageConsumer)] #[interface(dyn MessageConsumerT)] #[meta(MessageConsumerMeta { @@ -336,6 +337,19 @@ impl DatasetEntryService for DatasetEntryServiceImpl { ) } + fn entries_owned_by(&self, owner_id: &odf::AccountID) -> DatasetEntryStream { + let owner_id = owner_id.clone(); + + EntityPageStreamer::default().into_stream( + || async { Ok(Arc::new(owner_id)) }, + move |owner_id, pagination| async move { + self.list_entries_owned_by(&owner_id, pagination) + .await + .int_err() + }, + ) + } + async fn list_all_entries( &self, pagination: PaginationOpts, @@ -511,6 +525,57 @@ impl DatasetRegistry for DatasetEntryServiceImpl { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[async_trait::async_trait] +impl DatasetOwnershipService for DatasetEntryServiceImpl { + async fn get_dataset_owner( + &self, + dataset_id: &odf::DatasetID, + ) -> Result { + let dataset_entry = self + .dataset_entry_repo + .get_dataset_entry(dataset_id) + .await + .int_err()?; + + Ok(dataset_entry.owner_id) + } + + async fn get_owned_datasets( + &self, + account_id: &odf::AccountID, + ) -> Result, InternalError> { + use futures::TryStreamExt; + + let owned_dataset_ids = self + .entries_owned_by(account_id) + .try_collect::>() + .await? + .into_iter() + .map(|dataset_entry| dataset_entry.id) + .collect::>(); + + Ok(owned_dataset_ids) + } + + async fn is_dataset_owned_by( + &self, + dataset_id: &odf::DatasetID, + account_id: &odf::AccountID, + ) -> Result { + let get_res = self.dataset_entry_repo.get_dataset_entry(dataset_id).await; + + match get_res { + Ok(dataset_entry) => Ok(dataset_entry.owner_id == *account_id), + Err(err) => match err { + GetDatasetEntryError::NotFound(_) => Ok(false), + unexpected_error => Err(unexpected_error.int_err()), + }, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + impl MessageConsumer for DatasetEntryServiceImpl {} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/datasets/services/tests/tests/test_dataset_entry_service.rs b/src/domain/datasets/services/tests/tests/test_dataset_entry_service.rs index 8e751cb3f3..365996b221 100644 --- a/src/domain/datasets/services/tests/tests/test_dataset_entry_service.rs +++ b/src/domain/datasets/services/tests/tests/test_dataset_entry_service.rs @@ -10,7 +10,7 @@ use std::sync::{Arc, RwLock}; use chrono::{DateTime, TimeZone, Utc}; -use dill::{Catalog, CatalogBuilder, Component}; +use dill::{CatalogBuilder, Component}; use init_on_startup::InitOnStartup; use kamu::{DatasetRepositoryWriter, MockDatasetRepositoryWriter}; use kamu_accounts::{Account, AccountRepository, CurrentAccountSubject}; @@ -195,7 +195,6 @@ async fn test_indexes_datasets_correctly() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct DatasetEntryServiceHarness { - _catalog: Catalog, outbox: Arc, dataset_entry_indexer: Arc, account_repo: Arc, @@ -252,7 +251,6 @@ impl DatasetEntryServiceHarness { outbox: catalog.get_one().unwrap(), dataset_entry_indexer: catalog.get_one().unwrap(), account_repo: catalog.get_one().unwrap(), - _catalog: catalog, } } diff --git a/src/domain/flow-system/services/Cargo.toml b/src/domain/flow-system/services/Cargo.toml index 8384e7a30b..dc6a779eb4 100644 --- a/src/domain/flow-system/services/Cargo.toml +++ b/src/domain/flow-system/services/Cargo.toml @@ -26,11 +26,12 @@ database-common = { workspace = true } database-common-macros = { workspace = true } init-on-startup = { workspace = true } internal-error = { workspace = true } -messaging-outbox = { workspace = true } kamu-accounts = { workspace = true } kamu-core = { workspace = true } +kamu-datasets = { workspace = true } kamu-flow-system = { workspace = true } kamu-task-system = { workspace = true } +messaging-outbox = { workspace = true } observability = { workspace = true, default-features = false } opendatafabric = { workspace = true } time-source = { workspace = true } diff --git a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs index 42df2ecfb8..454d00c96c 100644 --- a/src/domain/flow-system/services/src/flow/flow_agent_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_agent_impl.rs @@ -567,7 +567,7 @@ impl MessageConsumerT for FlowAgentImpl { let finish_time = self.agent_config.round_time(message.event_time)?; // In case of success: - // - execute followup method + // - execute follow-up method if let Some(flow_result) = flow.try_result_as_ref() && !flow_result.is_empty() { diff --git a/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs b/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs index 7e25ce58cd..53e820c121 100644 --- a/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs +++ b/src/domain/flow-system/services/src/flow/flow_query_service_impl.rs @@ -15,7 +15,7 @@ use database_common::PaginationOpts; use dill::{component, interface, Catalog}; use futures::TryStreamExt; use internal_error::ResultIntoInternal; -use kamu_core::DatasetOwnershipService; +use kamu_datasets::DatasetOwnershipService; use kamu_flow_system::*; use opendatafabric::{AccountID, DatasetID}; diff --git a/src/domain/flow-system/services/src/flow/flow_scheduling_helper.rs b/src/domain/flow-system/services/src/flow/flow_scheduling_helper.rs index 001113c7b4..f7b7713b69 100644 --- a/src/domain/flow-system/services/src/flow/flow_scheduling_helper.rs +++ b/src/domain/flow-system/services/src/flow/flow_scheduling_helper.rs @@ -12,7 +12,8 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use dill::component; use internal_error::InternalError; -use kamu_core::{DatasetChangesService, DatasetOwnershipService, DependencyGraphService}; +use kamu_core::{DatasetChangesService, DependencyGraphService}; +use kamu_datasets::DatasetOwnershipService; use kamu_flow_system::*; use messaging_outbox::{Outbox, OutboxExt}; use time_source::SystemTimeSource; @@ -221,35 +222,33 @@ impl FlowSchedulingHelper { } DownstreamDependencyTriggerType::TriggerOwnHardCompaction => { - let dataset_owner_account_ids = self + let owner_account_id = self .dataset_ownership_service - .get_dataset_owners(&fk_dataset.dataset_id) + .get_dataset_owner(&fk_dataset.dataset_id) .await?; for dependent_dataset_id in dependent_dataset_ids { - for owner_account_id in &dataset_owner_account_ids { - if self - .dataset_ownership_service - .is_dataset_owned_by(&dependent_dataset_id, owner_account_id) - .await? - { - plans.push(DownstreamDependencyFlowPlan { - flow_key: FlowKeyDataset::new( - dependent_dataset_id.clone(), - DatasetFlowType::HardCompaction, - ) - .into(), - flow_trigger_context: FlowTriggerContext::Unconditional, - // Currently we trigger Hard compaction recursively only in keep - // metadata only mode - maybe_config_snapshot: Some(FlowConfigurationSnapshot::Compaction( - CompactionRule::MetadataOnly(CompactionRuleMetadataOnly { - recursive: true, - }), - )), - }); - break; - } + let owned = self + .dataset_ownership_service + .is_dataset_owned_by(&dependent_dataset_id, &owner_account_id) + .await?; + + if owned { + plans.push(DownstreamDependencyFlowPlan { + flow_key: FlowKeyDataset::new( + dependent_dataset_id, + DatasetFlowType::HardCompaction, + ) + .into(), + flow_trigger_context: FlowTriggerContext::Unconditional, + // Currently we trigger Hard compaction recursively only in keep + // metadata only mode + maybe_config_snapshot: Some(FlowConfigurationSnapshot::Compaction( + CompactionRule::MetadataOnly(CompactionRuleMetadataOnly { + recursive: true, + }), + )), + }); } } } diff --git a/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs b/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs index 544defc291..c5cdfff7cb 100644 --- a/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs +++ b/src/domain/flow-system/services/tests/tests/test_flow_agent_impl.rs @@ -12,6 +12,7 @@ use std::str::FromStr; use chrono::{Duration, DurationRound, Utc}; use futures::TryStreamExt; use kamu::testing::MockDatasetChangesService; +use kamu_accounts::{AccountConfig, CurrentAccountSubject}; use kamu_core::*; use kamu_flow_system::*; use kamu_task_system::*; @@ -106,7 +107,6 @@ async fn test_read_initial_config_and_queue_without_waiting() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -143,7 +143,8 @@ async fn test_read_initial_config_and_queue_without_waiting() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -262,7 +263,6 @@ async fn test_read_initial_config_shouldnt_queue_in_recovery_case() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -283,7 +283,8 @@ async fn test_read_initial_config_shouldnt_queue_in_recovery_case() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -374,7 +375,6 @@ async fn test_cron_config() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -402,7 +402,8 @@ async fn test_cron_config() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -544,7 +545,6 @@ async fn test_manual_trigger() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -613,7 +613,8 @@ async fn test_manual_trigger() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -755,7 +756,6 @@ async fn test_ingest_trigger_with_ingest_config() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -824,7 +824,8 @@ async fn test_ingest_trigger_with_ingest_config() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -933,7 +934,6 @@ async fn test_manual_trigger_compaction() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -971,7 +971,8 @@ async fn test_manual_trigger_compaction() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -1071,7 +1072,6 @@ async fn test_manual_trigger_reset() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -1089,7 +1089,8 @@ async fn test_manual_trigger_reset() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -1256,14 +1257,12 @@ async fn test_reset_trigger_keep_metadata_compaction_for_derivatives() { harness.advance_time(Duration::milliseconds(300)).await; }; - // tokio::join!(trigger0_handle, task0_handle, main_handle) tokio::join!(trigger0_handle, task0_handle, task1_handle, task2_handle, main_handle) } => Ok(()) } .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -1325,7 +1324,8 @@ async fn test_reset_trigger_keep_metadata_compaction_for_derivatives() { Flow ID = 1 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -1406,7 +1406,6 @@ async fn test_manual_trigger_compaction_with_config() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -1424,7 +1423,8 @@ async fn test_manual_trigger_compaction_with_config() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -1526,7 +1526,7 @@ async fn test_full_hard_compaction_trigger_keep_metadata_compaction_for_derivati }); let task0_handle = task0_driver.run(); - // Task 1: "foo_bar" start running at 110ms, finish at 180sms + // Task 1: "foo_baz" start running at 110ms, finish at 180sms let task1_driver = harness.task_driver(TaskDriverArgs { task_id: TaskID::new(1), task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "1")]), @@ -1554,7 +1554,7 @@ async fn test_full_hard_compaction_trigger_keep_metadata_compaction_for_derivati }); let task1_handle = task1_driver.run(); - // Task 2: "foo_bar_baz" start running at 200ms, finish at 240ms + // Task 2: "foo_bar" start running at 200ms, finish at 240ms let task2_driver = harness.task_driver(TaskDriverArgs { task_id: TaskID::new(2), task_metadata: TaskMetadata::from(vec![(METADATA_TASK_FLOW_ID, "2")]), @@ -1593,7 +1593,6 @@ async fn test_full_hard_compaction_trigger_keep_metadata_compaction_for_derivati .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -1655,7 +1654,8 @@ async fn test_full_hard_compaction_trigger_keep_metadata_compaction_for_derivati Flow ID = 1 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()), ); } @@ -1821,7 +1821,6 @@ async fn test_manual_trigger_keep_metadata_only_with_recursive_compaction() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -1885,7 +1884,8 @@ async fn test_manual_trigger_keep_metadata_only_with_recursive_compaction() { Flow ID = 2 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -1995,7 +1995,6 @@ async fn test_manual_trigger_keep_metadata_only_without_recursive_compaction() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -2013,7 +2012,8 @@ async fn test_manual_trigger_keep_metadata_only_without_recursive_compaction() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -2021,40 +2021,50 @@ async fn test_manual_trigger_keep_metadata_only_without_recursive_compaction() { #[test_log::test(tokio::test)] async fn test_manual_trigger_keep_metadata_only_compaction_multiple_accounts() { - let wasya_account_name = AccountName::new_unchecked("wasya"); - let petya_account_name = AccountName::new_unchecked("petya"); + let wasya = AccountConfig::from_name(AccountName::new_unchecked("wasya")); + let petya = AccountConfig::from_name(AccountName::new_unchecked("petya")); + + let subject_wasya = + CurrentAccountSubject::logged(wasya.get_id(), wasya.account_name.clone(), false); + let subject_petya = + CurrentAccountSubject::logged(petya.get_id(), petya.account_name.clone(), false); let harness = FlowHarness::with_overrides(FlowHarnessOverrides { tenancy_config: TenancyConfig::MultiTenant, - custom_account_names: vec![wasya_account_name.clone(), petya_account_name.clone()], + predefined_accounts: vec![wasya, petya], ..Default::default() }) .await; let foo_create_result = harness - .create_root_dataset(DatasetAlias { - dataset_name: DatasetName::new_unchecked("foo"), - account_name: Some(wasya_account_name.clone()), - }) + .create_root_dataset_using_subject( + DatasetAlias { + dataset_name: DatasetName::new_unchecked("foo"), + account_name: Some(subject_wasya.account_name().clone()), + }, + subject_wasya.clone(), + ) .await; let foo_id = foo_create_result.dataset_handle.id; let foo_bar_id = harness - .create_derived_dataset( + .create_derived_dataset_using_subject( DatasetAlias { dataset_name: DatasetName::new_unchecked("foo.bar"), - account_name: Some(wasya_account_name.clone()), + account_name: Some(subject_wasya.account_name().clone()), }, vec![foo_id.clone()], + subject_wasya, ) .await; let foo_baz_id = harness - .create_derived_dataset( + .create_derived_dataset_using_subject( DatasetAlias { dataset_name: DatasetName::new_unchecked("foo.baz"), - account_name: Some(petya_account_name.clone()), + account_name: Some(subject_petya.account_name().clone()), }, vec![foo_id.clone()], + subject_petya, ) .await; @@ -2148,7 +2158,6 @@ async fn test_manual_trigger_keep_metadata_only_compaction_multiple_accounts() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -2186,7 +2195,8 @@ async fn test_manual_trigger_keep_metadata_only_compaction_multiple_accounts() { Flow ID = 1 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -2324,7 +2334,6 @@ async fn test_dataset_flow_configuration_paused_resumed_modified() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -2414,7 +2423,8 @@ async fn test_dataset_flow_configuration_paused_resumed_modified() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -2555,7 +2565,6 @@ async fn test_respect_last_success_time_when_schedule_resumes() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -2645,7 +2654,8 @@ async fn test_respect_last_success_time_when_schedule_resumes() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -2763,7 +2773,6 @@ async fn test_dataset_deleted() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -2831,7 +2840,8 @@ async fn test_dataset_deleted() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -2964,7 +2974,6 @@ async fn test_task_completions_trigger_next_loop_on_success() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -3046,7 +3055,8 @@ async fn test_task_completions_trigger_next_loop_on_success() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -3201,7 +3211,6 @@ async fn test_derived_dataset_triggered_initially_and_after_input_change() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -3303,7 +3312,8 @@ async fn test_derived_dataset_triggered_initially_and_after_input_change() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -3401,7 +3411,6 @@ async fn test_throttling_manual_triggers() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -3429,7 +3438,8 @@ async fn test_throttling_manual_triggers() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -3677,7 +3687,6 @@ async fn test_throttling_derived_dataset_with_2_parents() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -3901,7 +3910,8 @@ async fn test_throttling_derived_dataset_with_2_parents() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -4101,7 +4111,6 @@ async fn test_batching_condition_records_reached() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -4235,7 +4244,8 @@ async fn test_batching_condition_records_reached() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -4407,7 +4417,6 @@ async fn test_batching_condition_timeout() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -4509,7 +4518,8 @@ async fn test_batching_condition_timeout() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -4681,7 +4691,6 @@ async fn test_batching_condition_watermark() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -4783,7 +4792,8 @@ async fn test_batching_condition_watermark() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -5080,7 +5090,6 @@ async fn test_batching_condition_with_2_inputs() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -5335,7 +5344,8 @@ async fn test_batching_condition_with_2_inputs() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -5343,42 +5353,51 @@ async fn test_batching_condition_with_2_inputs() { #[test_log::test(tokio::test)] async fn test_list_all_flow_initiators() { - let foo_account_name = AccountName::new_unchecked("foo"); - let bar_account_name = AccountName::new_unchecked("bar"); + let foo = AccountConfig::from_name(AccountName::new_unchecked("foo")); + let bar = AccountConfig::from_name(AccountName::new_unchecked("bar")); + + let subject_foo = CurrentAccountSubject::logged(foo.get_id(), foo.account_name.clone(), false); + let subject_bar = CurrentAccountSubject::logged(bar.get_id(), bar.account_name.clone(), false); let harness = FlowHarness::with_overrides(FlowHarnessOverrides { - custom_account_names: vec![foo_account_name.clone(), bar_account_name.clone()], + predefined_accounts: vec![foo, bar], tenancy_config: TenancyConfig::MultiTenant, ..Default::default() }) .await; let foo_create_result = harness - .create_root_dataset(DatasetAlias { - dataset_name: DatasetName::new_unchecked("foo"), - account_name: Some(foo_account_name.clone()), - }) + .create_root_dataset_using_subject( + DatasetAlias { + dataset_name: DatasetName::new_unchecked("foo"), + account_name: Some(subject_foo.account_name().clone()), + }, + subject_foo.clone(), + ) .await; let foo_id = foo_create_result.dataset_handle.id; let foo_account_id = harness .auth_svc - .find_account_id_by_name(&foo_account_name) + .find_account_id_by_name(subject_foo.account_name()) .await .unwrap() .unwrap(); let bar_account_id = harness .auth_svc - .find_account_id_by_name(&bar_account_name) + .find_account_id_by_name(subject_bar.account_name()) .await .unwrap() .unwrap(); let bar_create_result = harness - .create_root_dataset(DatasetAlias { - dataset_name: DatasetName::new_unchecked("bar"), - account_name: Some(bar_account_name.clone()), - }) + .create_root_dataset_using_subject( + DatasetAlias { + dataset_name: DatasetName::new_unchecked("bar"), + account_name: Some(subject_bar.account_name().clone()), + }, + subject_bar, + ) .await; let bar_id = bar_create_result.dataset_handle.id; @@ -5474,7 +5493,7 @@ async fn test_list_all_flow_initiators() { .await .unwrap(); - assert_eq!(foo_dataset_initiators_list, [foo_account_id.clone()]); + pretty_assertions::assert_eq!([foo_account_id.clone()], *foo_dataset_initiators_list); let bar_dataset_initiators_list: Vec<_> = harness .flow_query_service @@ -5486,59 +5505,69 @@ async fn test_list_all_flow_initiators() { .await .unwrap(); - assert_eq!(bar_dataset_initiators_list, [bar_account_id.clone()]); + pretty_assertions::assert_eq!([bar_account_id.clone()], *bar_dataset_initiators_list); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[test_log::test(tokio::test)] async fn test_list_all_datasets_with_flow() { - let foo_account_name = AccountName::new_unchecked("foo"); - let bar_account_name = AccountName::new_unchecked("bar"); + let foo = AccountConfig::from_name(AccountName::new_unchecked("foo")); + let bar = AccountConfig::from_name(AccountName::new_unchecked("bar")); + + let subject_foo = CurrentAccountSubject::logged(foo.get_id(), foo.account_name.clone(), false); + let subject_bar = CurrentAccountSubject::logged(bar.get_id(), bar.account_name.clone(), false); let harness = FlowHarness::with_overrides(FlowHarnessOverrides { - custom_account_names: vec![foo_account_name.clone(), bar_account_name.clone()], + predefined_accounts: vec![foo, bar], tenancy_config: TenancyConfig::MultiTenant, ..Default::default() }) .await; let foo_create_result = harness - .create_root_dataset(DatasetAlias { - dataset_name: DatasetName::new_unchecked("foo"), - account_name: Some(foo_account_name.clone()), - }) + .create_root_dataset_using_subject( + DatasetAlias { + dataset_name: DatasetName::new_unchecked("foo"), + account_name: Some(subject_foo.account_name().clone()), + }, + subject_foo.clone(), + ) .await; let foo_id = foo_create_result.dataset_handle.id; let _foo_bar_id = harness - .create_derived_dataset( + .create_derived_dataset_using_subject( DatasetAlias { dataset_name: DatasetName::new_unchecked("foo.bar"), - account_name: Some(foo_account_name.clone()), + account_name: Some(subject_foo.account_name().clone()), }, vec![foo_id.clone()], + subject_foo.clone(), ) .await; let foo_account_id = harness .auth_svc - .find_account_id_by_name(&foo_account_name) + .find_account_id_by_name(subject_foo.account_name()) .await .unwrap() .unwrap(); let bar_account_id = harness .auth_svc - .find_account_id_by_name(&bar_account_name) + .find_account_id_by_name(subject_bar.account_name()) .await .unwrap() .unwrap(); let bar_create_result = harness - .create_root_dataset(DatasetAlias { - dataset_name: DatasetName::new_unchecked("bar"), - account_name: Some(bar_account_name.clone()), - }) + .create_root_dataset_using_subject( + DatasetAlias { + dataset_name: DatasetName::new_unchecked("bar"), + account_name: Some(subject_bar.account_name().clone()), + }, + subject_bar, + ) .await; let bar_id = bar_create_result.dataset_handle.id; @@ -5634,7 +5663,7 @@ async fn test_list_all_datasets_with_flow() { .await .unwrap(); - assert_eq!(foo_dataset_initiators_list, [foo_account_id.clone()]); + pretty_assertions::assert_eq!([foo_account_id.clone()], *foo_dataset_initiators_list); let bar_dataset_initiators_list: Vec<_> = harness .flow_query_service @@ -5646,7 +5675,7 @@ async fn test_list_all_datasets_with_flow() { .await .unwrap(); - assert_eq!(bar_dataset_initiators_list, [bar_account_id.clone()]); + pretty_assertions::assert_eq!([bar_account_id.clone()], *bar_dataset_initiators_list); let all_datasets_with_flow: Vec<_> = harness .flow_query_service @@ -5658,7 +5687,7 @@ async fn test_list_all_datasets_with_flow() { .await .unwrap(); - assert_eq!(all_datasets_with_flow, [foo_id]); + pretty_assertions::assert_eq!([foo_id], *all_datasets_with_flow); let all_datasets_with_flow: Vec<_> = harness .flow_query_service @@ -5670,7 +5699,7 @@ async fn test_list_all_datasets_with_flow() { .await .unwrap(); - assert_eq!(all_datasets_with_flow, [bar_id]); + pretty_assertions::assert_eq!([bar_id], *all_datasets_with_flow); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -5739,7 +5768,6 @@ async fn test_abort_flow_before_scheduling_tasks() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -5765,7 +5793,8 @@ async fn test_abort_flow_before_scheduling_tasks() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -5835,7 +5864,6 @@ async fn test_abort_flow_after_scheduling_still_waiting_for_executor() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -5866,7 +5894,8 @@ async fn test_abort_flow_after_scheduling_still_waiting_for_executor() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -5936,7 +5965,6 @@ async fn test_abort_flow_after_task_running_has_started() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -5956,7 +5984,8 @@ async fn test_abort_flow_after_task_running_has_started() { Flow ID = 0 Finished Aborted "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -6040,7 +6069,6 @@ async fn test_abort_flow_after_task_finishes() { test_flow_listener.define_dataset_display_name(foo_id.clone(), "foo".to_string()); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -6077,7 +6105,8 @@ async fn test_abort_flow_after_task_finishes() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } @@ -6177,7 +6206,7 @@ async fn test_respect_last_success_time_when_activate_configuration() { // Main simulation script let main_handle = async { - // Initially both "foo" isscheduled without waiting. + // Initially both "foo" and "bar are scheduled without waiting. // "foo": // - flow 0: task 0 starts at 10ms, finishes at 20ms // - next flow 2 queued for 120ms (20ms initiated + 100ms period) @@ -6217,7 +6246,6 @@ async fn test_respect_last_success_time_when_activate_configuration() { .unwrap(); pretty_assertions::assert_eq!( - format!("{}", test_flow_listener.as_ref()), indoc::indoc!( r#" #0: +0ms: @@ -6285,7 +6313,8 @@ async fn test_respect_last_success_time_when_activate_configuration() { Flow ID = 0 Finished Success "# - ) + ), + format!("{}", test_flow_listener.as_ref()) ); } diff --git a/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs b/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs index 589f37e178..45b3c40b87 100644 --- a/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs +++ b/src/domain/flow-system/services/tests/tests/utils/flow_harness_shared.rs @@ -29,8 +29,8 @@ use kamu_accounts_services::{ PredefinedAccountsRegistrator, }; use kamu_core::*; -use kamu_datasets_inmem::InMemoryDatasetDependencyRepository; -use kamu_datasets_services::DependencyGraphServiceImpl; +use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEntryRepository}; +use kamu_datasets_services::{DatasetEntryServiceImpl, DependencyGraphServiceImpl}; use kamu_flow_system::*; use kamu_flow_system_inmem::*; use kamu_flow_system_services::*; @@ -61,7 +61,11 @@ pub(crate) const SCHEDULING_MANDATORY_THROTTLING_PERIOD_MS: i64 = SCHEDULING_ALI pub(crate) struct FlowHarness { _tmp_dir: tempfile::TempDir, - pub catalog: dill::Catalog, + catalog_without_subject: Catalog, + delete_dataset_use_case: Arc, + create_dataset_from_snapshot_use_case: Arc, + + pub catalog: Catalog, pub flow_configuration_service: Arc, pub flow_configuration_event_store: Arc, pub flow_agent: Arc, @@ -76,7 +80,7 @@ pub(crate) struct FlowHarnessOverrides { pub awaiting_step: Option, pub mandatory_throttling_period: Option, pub mock_dataset_changes: Option, - pub custom_account_names: Vec, + pub predefined_accounts: Vec, pub tenancy_config: TenancyConfig, } @@ -91,19 +95,17 @@ impl FlowHarness { std::fs::create_dir(&datasets_dir).unwrap(); let accounts_catalog = { - let predefined_accounts_config = if overrides.custom_account_names.is_empty() { + let predefined_accounts_config = if overrides.predefined_accounts.is_empty() { PredefinedAccountsConfig::single_tenant() } else { let mut predefined_accounts_config = PredefinedAccountsConfig::new(); - for account_name in overrides.custom_account_names { - predefined_accounts_config - .predefined - .push(AccountConfig::from_name(account_name)); + for account in overrides.predefined_accounts { + predefined_accounts_config.predefined.push(account); } predefined_accounts_config }; - let mut b = dill::CatalogBuilder::new(); + let mut b = CatalogBuilder::new(); b.add_value(predefined_accounts_config) .add::() .add::() @@ -134,8 +136,8 @@ impl FlowHarness { let mock_dataset_changes = overrides.mock_dataset_changes.unwrap_or_default(); - let catalog = { - let mut b = dill::CatalogBuilder::new_chained(&accounts_catalog); + let catalog_without_subject = { + let mut b = CatalogBuilder::new_chained(&accounts_catalog); b.add_builder( messaging_outbox::OutboxImmediateImpl::builder() @@ -155,10 +157,8 @@ impl FlowHarness { .add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir)) .bind::() .bind::() - .add::() .add_value(mock_dataset_changes) .bind::() - .add_value(CurrentAccountSubject::new_test()) .add::() .add::() .add::() @@ -168,11 +168,11 @@ impl FlowHarness { .add::() .add::() .add::() - .add::() + .add::() + .add::() .add::() .add::() - .add::() - .add::(); + .add::(); kamu_flow_system_services::register_dependencies(&mut b); @@ -200,35 +200,39 @@ impl FlowHarness { b.build() }; - let flow_agent = catalog.get_one::().unwrap(); - let flow_query_service = catalog.get_one::().unwrap(); - let flow_configuration_service = catalog.get_one::().unwrap(); - let flow_configuration_event_store = catalog - .get_one::() - .unwrap(); - let flow_event_store = catalog.get_one::().unwrap(); - let auth_svc = catalog.get_one::().unwrap(); + let catalog = CatalogBuilder::new_chained(&catalog_without_subject) + .add_value(CurrentAccountSubject::new_test()) + .build(); Self { _tmp_dir: tmp_dir, - catalog, - flow_agent, - flow_query_service, - flow_configuration_service, - flow_configuration_event_store, - flow_event_store, + flow_configuration_service: catalog.get_one().unwrap(), + flow_configuration_event_store: catalog.get_one().unwrap(), + flow_agent: catalog.get_one().unwrap(), + flow_query_service: catalog.get_one().unwrap(), + flow_event_store: catalog.get_one().unwrap(), + auth_svc: catalog.get_one().unwrap(), fake_system_time_source, - auth_svc, + delete_dataset_use_case: catalog.get_one().unwrap(), + create_dataset_from_snapshot_use_case: catalog.get_one().unwrap(), + catalog, + catalog_without_subject, } } - pub async fn create_root_dataset(&self, dataset_alias: DatasetAlias) -> CreateDatasetResult { - let create_dataset_from_snapshot = self - .catalog + pub async fn create_root_dataset_using_subject( + &self, + dataset_alias: DatasetAlias, + subject: CurrentAccountSubject, + ) -> CreateDatasetResult { + let subject_catalog = CatalogBuilder::new_chained(&self.catalog_without_subject) + .add_value(subject) + .build(); + let create_dataset_from_snapshot_use_case = subject_catalog .get_one::() .unwrap(); - create_dataset_from_snapshot + create_dataset_from_snapshot_use_case .execute( MetadataFactory::dataset_snapshot() .name(dataset_alias) @@ -241,17 +245,34 @@ impl FlowHarness { .unwrap() } - pub async fn create_derived_dataset( + pub async fn create_root_dataset(&self, dataset_alias: DatasetAlias) -> CreateDatasetResult { + self.create_dataset_from_snapshot_use_case + .execute( + MetadataFactory::dataset_snapshot() + .name(dataset_alias) + .kind(DatasetKind::Root) + .push_event(MetadataFactory::set_polling_source().build()) + .build(), + Default::default(), + ) + .await + .unwrap() + } + + pub async fn create_derived_dataset_using_subject( &self, dataset_alias: DatasetAlias, input_ids: Vec, + subject: CurrentAccountSubject, ) -> DatasetID { - let create_dataset_from_snapshot = self - .catalog + let subject_catalog = CatalogBuilder::new_chained(&self.catalog_without_subject) + .add_value(subject) + .build(); + let create_dataset_from_snapshot_use_case = subject_catalog .get_one::() .unwrap(); - let create_result = create_dataset_from_snapshot + let create_result = create_dataset_from_snapshot_use_case .execute( MetadataFactory::dataset_snapshot() .name(dataset_alias) @@ -270,24 +291,39 @@ impl FlowHarness { create_result.dataset_handle.id } - pub async fn eager_initialization(&self) { - use init_on_startup::InitOnStartup; - let dataset_ownership_initializer = self - .catalog - .get_one::() - .unwrap(); - dataset_ownership_initializer - .run_initialization() + pub async fn create_derived_dataset( + &self, + dataset_alias: DatasetAlias, + input_ids: Vec, + ) -> DatasetID { + let create_result = self + .create_dataset_from_snapshot_use_case + .execute( + MetadataFactory::dataset_snapshot() + .name(dataset_alias) + .kind(DatasetKind::Derivative) + .push_event( + MetadataFactory::set_transform() + .inputs_from_refs(input_ids) + .build(), + ) + .build(), + Default::default(), + ) .await .unwrap(); + create_result.dataset_handle.id + } + + pub async fn eager_initialization(&self) { + use init_on_startup::InitOnStartup; + self.flow_agent.run_initialization().await.unwrap(); } pub async fn delete_dataset(&self, dataset_id: &DatasetID) { - // Do the actual deletion - let delete_dataset = self.catalog.get_one::().unwrap(); - delete_dataset + self.delete_dataset_use_case .execute_via_ref(&(dataset_id.as_local_ref())) .await .unwrap(); @@ -457,3 +493,5 @@ impl FlowHarness { } } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs index fbcd290acd..061cc3f671 100644 --- a/src/domain/flow-system/services/tests/tests/utils/task_driver.rs +++ b/src/domain/flow-system/services/tests/tests/utils/task_driver.rs @@ -25,6 +25,7 @@ pub(crate) struct TaskDriver { args: TaskDriverArgs, } +#[derive(Debug)] pub(crate) struct TaskDriverArgs { pub(crate) task_id: TaskID, pub(crate) task_metadata: TaskMetadata, @@ -53,14 +54,14 @@ impl TaskDriver { let start_time = self.time_source.now(); self.time_source.sleep(self.args.run_since_start).await; - while !(self.task_exists().await) { + while !self.task_exists().await { yield_now().await; } self.ensure_task_matches_logical_plan().await; - // Note: we can omit transaction, since this is a test-only abstraction - // with assumed immediate delivery + // Note: We can omit transaction, since this is a test-only abstraction + // with assumed immediate delivery self.outbox .post_message( MESSAGE_PRODUCER_KAMU_TASK_AGENT, @@ -76,8 +77,8 @@ impl TaskDriver { if let Some((finish_in, with_outcome)) = self.args.finish_in_with { self.time_source.sleep(finish_in).await; - // Note: we can omit transaction, since this is a test-only abstraction - // with assummed immediate delivery + // Note: We can omit transaction, since this is a test-only abstraction + // with assumed immediate delivery self.outbox .post_message( MESSAGE_PRODUCER_KAMU_TASK_AGENT, @@ -105,11 +106,15 @@ impl TaskDriver { .await .expect("Task does not exist yet"); - assert_eq!(self.args.expected_logical_plan, task.logical_plan); + pretty_assertions::assert_eq!(self.args.expected_logical_plan, task.logical_plan); + match &task.logical_plan { LogicalPlan::UpdateDataset(ud) => { assert!(self.args.dataset_id.is_some()); - assert_eq!(&ud.dataset_id, self.args.dataset_id.as_ref().unwrap()); + pretty_assertions::assert_eq!( + self.args.dataset_id.as_ref().unwrap(), + &ud.dataset_id, + ); } LogicalPlan::Probe(_) => assert!(self.args.dataset_id.is_none()), LogicalPlan::HardCompactDataset(_) | LogicalPlan::ResetDataset(_) => (), diff --git a/src/infra/core/Cargo.toml b/src/infra/core/Cargo.toml index 3f37b84e13..afe5b9a844 100644 --- a/src/infra/core/Cargo.toml +++ b/src/infra/core/Cargo.toml @@ -28,7 +28,10 @@ ingest-evm = ["dep:alloy", "dep:datafusion-ethers"] ingest-ftp = ["dep:curl", "dep:curl-sys"] ingest-mqtt = ["dep:rumqttc"] query-extensions-json = ["dep:datafusion-functions-json"] -testing = ["dep:mockall", "kamu-data-utils/testing"] +testing = [ + "dep:mockall", + "kamu-data-utils/testing", +] [dependencies] @@ -145,10 +148,7 @@ libc = "0.2" # For getting uid:gid [dev-dependencies] -database-common = { workspace = true } kamu = { workspace = true, features = ["testing"] } -kamu-accounts-inmem = { workspace = true } -kamu-accounts-services = { workspace = true } kamu-data-utils = { workspace = true, features = ["testing"] } kamu-datasets-services = { workspace = true } kamu-datasets-inmem = { workspace = true } diff --git a/src/infra/core/src/services/dataset_ownership_service_inmem.rs b/src/infra/core/src/services/dataset_ownership_service_inmem.rs deleted file mode 100644 index be6a805c6e..0000000000 --- a/src/infra/core/src/services/dataset_ownership_service_inmem.rs +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright Kamu Data, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; - -use dill::*; -use init_on_startup::{InitOnStartup, InitOnStartupMeta}; -use internal_error::InternalError; -use kamu_accounts::{AuthenticationService, CurrentAccountSubject}; -use kamu_core::*; -use messaging_outbox::{ - MessageConsumer, - MessageConsumerMeta, - MessageConsumerT, - MessageDeliveryMechanism, -}; -use opendatafabric::{AccountID, AccountName, DatasetID}; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -pub struct DatasetOwnershipServiceInMemory { - state: Arc>, -} - -#[derive(Default)] -struct State { - dataset_ids_by_account_id: HashMap>, - account_ids_by_dataset_id: HashMap>, - initially_scanned: bool, -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[component(pub)] -#[interface(dyn DatasetOwnershipService)] -#[interface(dyn MessageConsumer)] -#[interface(dyn MessageConsumerT)] -#[meta(MessageConsumerMeta { - consumer_name: MESSAGE_CONSUMER_KAMU_CORE_DATASET_OWNERSHIP_SERVICE, - feeding_producers: &[MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE], - delivery: MessageDeliveryMechanism::Immediate, -})] -#[scope(Singleton)] -impl DatasetOwnershipServiceInMemory { - pub fn new() -> Self { - Self { - state: Default::default(), - } - } - - fn insert_dataset_record( - &self, - state: &mut State, - dataset_id: &DatasetID, - owner_account_id: &AccountID, - ) { - state - .account_ids_by_dataset_id - .insert(dataset_id.clone(), vec![owner_account_id.clone()]); - - state - .dataset_ids_by_account_id - .entry(owner_account_id.clone()) - .and_modify(|e| { - e.insert(dataset_id.clone()); - }) - .or_insert_with(|| { - let mut dataset_ids = HashSet::new(); - dataset_ids.insert(dataset_id.clone()); - dataset_ids - }); - } - - async fn check_has_initialized(&self) -> Result<(), InternalError> { - let has_initially_scanned = self.state.read().await.initially_scanned; - - if has_initially_scanned { - Ok(()) - } else { - InternalError::bail("The service was not previously initialized!") - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[async_trait::async_trait] -impl DatasetOwnershipService for DatasetOwnershipServiceInMemory { - #[tracing::instrument(level = "debug", skip_all, fields(%dataset_id))] - async fn get_dataset_owners( - &self, - dataset_id: &DatasetID, - ) -> Result, InternalError> { - self.check_has_initialized().await?; - - let guard = self.state.read().await; - let maybe_account_ids = guard.account_ids_by_dataset_id.get(dataset_id); - if let Some(account_ids) = maybe_account_ids { - Ok(account_ids.clone()) - } else { - Ok(vec![]) - } - } - - #[tracing::instrument(level = "debug", skip_all, fields(%account_id))] - async fn get_owned_datasets( - &self, - account_id: &AccountID, - ) -> Result, InternalError> { - self.check_has_initialized().await?; - - let guard = self.state.read().await; - let maybe_dataset_ids = guard.dataset_ids_by_account_id.get(account_id); - if let Some(dataset_ids) = maybe_dataset_ids { - Ok(dataset_ids.iter().cloned().collect::>()) - } else { - Ok(vec![]) - } - } - - #[tracing::instrument(level = "debug", skip_all, fields(%dataset_id, %account_id))] - async fn is_dataset_owned_by( - &self, - dataset_id: &DatasetID, - account_id: &AccountID, - ) -> Result { - self.check_has_initialized().await?; - - let guard = self.state.read().await; - - let maybe_account_ids = guard.account_ids_by_dataset_id.get(dataset_id); - if let Some(account_ids) = maybe_account_ids { - Ok(account_ids.contains(account_id)) - } else { - Ok(false) - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -impl MessageConsumer for DatasetOwnershipServiceInMemory {} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[async_trait::async_trait] -impl MessageConsumerT for DatasetOwnershipServiceInMemory { - #[tracing::instrument( - level = "debug", - skip_all, - name = "DatasetOwnershipServiceInMemory[DatasetLifecycleMessage]" - )] - async fn consume_message( - &self, - _: &Catalog, - message: &DatasetLifecycleMessage, - ) -> Result<(), InternalError> { - tracing::debug!(received_message = ?message, "Received dataset lifecycle message"); - - match message { - DatasetLifecycleMessage::Created(message) => { - let mut guard = self.state.write().await; - self.insert_dataset_record( - &mut guard, - &message.dataset_id, - &message.owner_account_id, - ); - } - DatasetLifecycleMessage::Deleted(message) => { - let account_ids = self.get_dataset_owners(&message.dataset_id).await?; - if !account_ids.is_empty() { - let mut guard = self.state.write().await; - for account_id in account_ids { - if let Some(dataset_ids) = - guard.dataset_ids_by_account_id.get_mut(&account_id) - { - dataset_ids.remove(&message.dataset_id); - } - } - guard.account_ids_by_dataset_id.remove(&message.dataset_id); - } - } - DatasetLifecycleMessage::DependenciesUpdated(_) - | DatasetLifecycleMessage::Renamed(_) => { - // No action required - } - } - - Ok(()) - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Initializer -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -pub struct DatasetOwnershipServiceInMemoryStateInitializer { - current_account_subject: Arc, - dataset_registry: Arc, - authentication_service: Arc, - dataset_ownership_service: Arc, -} - -#[component(pub)] -#[interface(dyn InitOnStartup)] -#[meta(InitOnStartupMeta { - job_name: JOB_KAMU_CORE_DATASET_OWNERSHIP_INITIALIZER, - depends_on: &[kamu_accounts::JOB_KAMU_ACCOUNTS_PREDEFINED_ACCOUNTS_REGISTRATOR], - requires_transaction: true -})] -impl DatasetOwnershipServiceInMemoryStateInitializer { - pub fn new( - current_account_subject: Arc, - dataset_registry: Arc, - authentication_service: Arc, - dataset_ownership_service: Arc, - ) -> Self { - Self { - current_account_subject, - dataset_registry, - authentication_service, - dataset_ownership_service, - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[async_trait::async_trait] -impl InitOnStartup for DatasetOwnershipServiceInMemoryStateInitializer { - #[tracing::instrument( - level = "debug", - skip_all, - name = "DatasetOwnershipServiceInMemoryStateInitializer::run_initialization" - )] - async fn run_initialization(&self) -> Result<(), InternalError> { - let mut guard = self.dataset_ownership_service.state.write().await; - if guard.initially_scanned { - tracing::warn!("The service has already initialized"); - - return Ok(()); - } - - use futures::StreamExt; - - tracing::debug!("Initializing dataset ownership data started"); - - let mut account_ids_by_name: HashMap = HashMap::new(); - - let mut datasets_stream = self.dataset_registry.all_dataset_handles(); - while let Some(Ok(dataset_handle)) = datasets_stream.next().await { - let account_name = match dataset_handle.alias.account_name { - Some(account_name) => account_name, - None => match self.current_account_subject.as_ref() { - CurrentAccountSubject::Anonymous(_) => { - panic!("Initializing dataset ownership without authorization") - } - CurrentAccountSubject::Logged(l) => l.account_name.clone(), - }, - }; - - let maybe_account_id = if let Some(account_id) = account_ids_by_name.get(&account_name) - { - Some(account_id.clone()) - } else { - let maybe_account_id = self - .authentication_service - .find_account_id_by_name(&account_name) - .await?; - if let Some(account_id) = maybe_account_id { - account_ids_by_name.insert(account_name.clone(), account_id.clone()); - Some(account_id) - } else { - None - } - }; - - if let Some(account_id) = maybe_account_id { - self.dataset_ownership_service.insert_dataset_record( - &mut guard, - &dataset_handle.id, - &account_id, - ); - } - } - - guard.initially_scanned = true; - - Ok(()) - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/src/services/mod.rs b/src/infra/core/src/services/mod.rs index e98a3f0f0f..0466492179 100644 --- a/src/infra/core/src/services/mod.rs +++ b/src/infra/core/src/services/mod.rs @@ -25,7 +25,6 @@ pub use transform::*; pub use watermark::*; mod dataset_changes_service_impl; -mod dataset_ownership_service_inmem; mod dataset_registry_repo_bridge; mod metadata_query_service_impl; mod provenance_service_impl; @@ -36,7 +35,6 @@ mod query_service_impl; mod verification_service_impl; pub use dataset_changes_service_impl::*; -pub use dataset_ownership_service_inmem::*; pub use dataset_registry_repo_bridge::*; pub use metadata_query_service_impl::*; pub use provenance_service_impl::*; diff --git a/src/infra/core/src/testing/base_repo_harness.rs b/src/infra/core/src/testing/base_repo_harness.rs index 0b80f35057..0320b42fae 100644 --- a/src/infra/core/src/testing/base_repo_harness.rs +++ b/src/infra/core/src/testing/base_repo_harness.rs @@ -61,14 +61,11 @@ impl BaseRepoHarness { .add::() .build(); - let dataset_registry = catalog.get_one().unwrap(); - let dataset_repo_writer = catalog.get_one().unwrap(); - Self { temp_dir, + dataset_registry: catalog.get_one().unwrap(), + dataset_repo_writer: catalog.get_one().unwrap(), catalog, - dataset_registry, - dataset_repo_writer, } } diff --git a/src/infra/core/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs b/src/infra/core/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs index 9708eed327..8aefc0b4c3 100644 --- a/src/infra/core/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs +++ b/src/infra/core/src/use_cases/create_dataset_from_snapshot_use_case_impl.rs @@ -57,6 +57,12 @@ impl CreateDatasetFromSnapshotUseCase for CreateDatasetFromSnapshotUseCaseImpl { snapshot: DatasetSnapshot, options: CreateDatasetUseCaseOptions, ) -> Result { + let logged_account_id = match self.current_account_subject.as_ref() { + CurrentAccountSubject::Anonymous(_) => { + panic!("Anonymous account cannot create dataset"); + } + CurrentAccountSubject::Logged(l) => l.account_id.clone(), + }; let dataset_name = snapshot.name.dataset_name.clone(); let CreateDatasetFromSnapshotResult { create_dataset_result, @@ -71,12 +77,7 @@ impl CreateDatasetFromSnapshotUseCase for CreateDatasetFromSnapshotUseCaseImpl { MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE, DatasetLifecycleMessage::created( create_dataset_result.dataset_handle.id.clone(), - match self.current_account_subject.as_ref() { - CurrentAccountSubject::Anonymous(_) => { - panic!("Anonymous account cannot create dataset"); - } - CurrentAccountSubject::Logged(l) => l.account_id.clone(), - }, + logged_account_id, options.dataset_visibility, dataset_name, ), diff --git a/src/infra/core/src/use_cases/create_dataset_use_case_impl.rs b/src/infra/core/src/use_cases/create_dataset_use_case_impl.rs index 691086a43f..084cc7280e 100644 --- a/src/infra/core/src/use_cases/create_dataset_use_case_impl.rs +++ b/src/infra/core/src/use_cases/create_dataset_use_case_impl.rs @@ -57,6 +57,12 @@ impl CreateDatasetUseCase for CreateDatasetUseCaseImpl { seed_block: MetadataBlockTyped, options: CreateDatasetUseCaseOptions, ) -> Result { + let logged_account_id = match self.current_account_subject.as_ref() { + CurrentAccountSubject::Anonymous(_) => { + panic!("Anonymous account cannot create dataset"); + } + CurrentAccountSubject::Logged(l) => l.account_id.clone(), + }; let create_result = self .dataset_repo_writer .create_dataset(dataset_alias, seed_block) @@ -67,12 +73,7 @@ impl CreateDatasetUseCase for CreateDatasetUseCaseImpl { MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE, DatasetLifecycleMessage::created( create_result.dataset_handle.id.clone(), - match self.current_account_subject.as_ref() { - CurrentAccountSubject::Anonymous(_) => { - panic!("Anonymous account cannot create dataset"); - } - CurrentAccountSubject::Logged(l) => l.account_id.clone(), - }, + logged_account_id, options.dataset_visibility, dataset_alias.dataset_name.clone(), ), diff --git a/src/infra/core/tests/tests/mod.rs b/src/infra/core/tests/tests/mod.rs index 387f00765c..0388190cea 100644 --- a/src/infra/core/tests/tests/mod.rs +++ b/src/infra/core/tests/tests/mod.rs @@ -12,7 +12,6 @@ mod ingest; mod repos; mod test_compaction_services_impl; mod test_dataset_changes_service_impl; -mod test_dataset_ownership_service_inmem; mod test_datasets_filtering; mod test_metadata_chain_comparator; mod test_pull_request_planner_impl; diff --git a/src/infra/core/tests/tests/test_dataset_ownership_service_inmem.rs b/src/infra/core/tests/tests/test_dataset_ownership_service_inmem.rs deleted file mode 100644 index 8bfc2c0ea1..0000000000 --- a/src/infra/core/tests/tests/test_dataset_ownership_service_inmem.rs +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright Kamu Data, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::collections::HashMap; -use std::sync::Arc; - -use database_common::{DatabaseTransactionRunner, NoOpDatabasePlugin}; -use kamu::testing::BaseRepoHarness; -use kamu::{DatasetOwnershipServiceInMemory, DatasetOwnershipServiceInMemoryStateInitializer}; -use kamu_accounts::{ - AccountConfig, - AuthenticationService, - JwtAuthenticationConfig, - PredefinedAccountsConfig, - DEFAULT_ACCOUNT_ID, -}; -use kamu_accounts_inmem::{InMemoryAccessTokenRepository, InMemoryAccountRepository}; -use kamu_accounts_services::{ - AccessTokenServiceImpl, - AuthenticationServiceImpl, - LoginPasswordAuthProvider, - PredefinedAccountsRegistrator, -}; -use kamu_core::{DatasetOwnershipService, TenancyConfig}; -use opendatafabric::{AccountID, AccountName, DatasetAlias, DatasetID, DatasetName}; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[test_log::test(tokio::test)] -async fn test_multi_tenant_dataset_owners() { - let mut harness = DatasetOwnershipHarness::new(TenancyConfig::MultiTenant).await; - - harness.create_multi_tenant_datasets().await; - harness.eager_initialization().await; - - for (account_id, mut dataset_ids) in harness.account_datasets { - let mut owner_datasets = harness - .dataset_ownership_service - .get_owned_datasets(&account_id) - .await - .unwrap(); - owner_datasets.sort(); - dataset_ids.sort(); - assert_eq!(owner_datasets, dataset_ids); - - for dataset_id in dataset_ids { - let is_owner = harness - .dataset_ownership_service - .is_dataset_owned_by(&dataset_id, &account_id) - .await - .unwrap(); - assert!(is_owner); - - let is_invalid_owner = harness - .dataset_ownership_service - .is_dataset_owned_by(&dataset_id, &DEFAULT_ACCOUNT_ID) - .await - .unwrap(); - assert!(!is_invalid_owner); - - let dataset_owners = harness - .dataset_ownership_service - .get_dataset_owners(&dataset_id) - .await - .unwrap(); - - assert_eq!(dataset_owners, [account_id.clone()]); - } - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[oop::extend(BaseRepoHarness, base_repo_harness)] -struct DatasetOwnershipHarness { - base_repo_harness: BaseRepoHarness, - catalog: dill::Catalog, - dataset_ownership_service: Arc, - auth_svc: Arc, - account_datasets: HashMap>, -} - -impl DatasetOwnershipHarness { - async fn new(tenancy_config: TenancyConfig) -> Self { - let base_repo_harness = BaseRepoHarness::new(tenancy_config); - - let predefined_accounts = [ - AccountName::new_unchecked("alice"), - AccountName::new_unchecked("bob"), - AccountName::new_unchecked("eve"), - ]; - let mut predefined_accounts_config = PredefinedAccountsConfig::new(); - for account_name in predefined_accounts { - predefined_accounts_config - .predefined - .push(AccountConfig::from_name(account_name)); - } - - let base_catalog = { - let mut b = dill::CatalogBuilder::new_chained(base_repo_harness.catalog()); - - b.add::() - .add::() - .add_value(predefined_accounts_config.clone()) - .add_value(JwtAuthenticationConfig::default()) - .add::() - .add::() - .add::() - .add::() - .add::() - .add::(); - - NoOpDatabasePlugin::init_database_components(&mut b); - - b.build() - }; - - init_on_startup::run_startup_jobs(&base_catalog) - .await - .unwrap(); - - // Attach ownership initializer in separate catalog, - // so that the startup job is not run before creating datasets - let catalog = { - let mut b = dill::CatalogBuilder::new_chained(&base_catalog); - b.add::(); - b.build() - }; - - let dataset_ownership_service = catalog.get_one::().unwrap(); - let auth_svc = catalog.get_one::().unwrap(); - - Self { - base_repo_harness, - catalog, - dataset_ownership_service, - auth_svc, - account_datasets: HashMap::new(), - } - } - - async fn eager_initialization(&self) { - use init_on_startup::InitOnStartup; - let initializer = self - .catalog - .get_one::() - .unwrap(); - initializer.run_initialization().await.unwrap(); - } - - async fn create_multi_tenant_datasets(&mut self) { - let alice = AccountName::new_unchecked("alice"); - let bob = AccountName::new_unchecked("bob"); - let eve: AccountName = AccountName::new_unchecked("eve"); - - let mut dataset_accounts: HashMap<&'static str, AccountName> = HashMap::new(); - dataset_accounts.insert("foo", alice.clone()); - dataset_accounts.insert("bar", alice.clone()); - dataset_accounts.insert("baz", bob.clone()); - dataset_accounts.insert("foo-bar", alice); - dataset_accounts.insert("foo-baz", bob); - dataset_accounts.insert("foo-bar-foo-baz", eve); - - self.create_datasets(|dataset_name| dataset_accounts.get(dataset_name).cloned()) - .await; - } - - async fn create_datasets(&mut self, account_getter: impl Fn(&str) -> Option) { - self.create_root_dataset(account_getter("foo"), "foo").await; - self.create_root_dataset(account_getter("bar"), "bar").await; - self.create_root_dataset(account_getter("baz"), "baz").await; - - self.create_derived_dataset( - account_getter("foo-bar"), - "foo-bar", - vec![ - DatasetAlias::new(account_getter("foo"), DatasetName::new_unchecked("foo")), - DatasetAlias::new(account_getter("bar"), DatasetName::new_unchecked("bar")), - ], - ) - .await; - - self.create_derived_dataset( - account_getter("foo-baz"), - "foo-baz", - vec![ - DatasetAlias::new(account_getter("foo"), DatasetName::new_unchecked("foo")), - DatasetAlias::new(account_getter("baz"), DatasetName::new_unchecked("baz")), - ], - ) - .await; - - self.create_derived_dataset( - account_getter("foo-bar-foo-baz"), - "foo-bar-foo-baz", - vec![ - DatasetAlias::new( - account_getter("foo-bar"), - DatasetName::new_unchecked("foo-bar"), - ), - DatasetAlias::new( - account_getter("foo-baz"), - DatasetName::new_unchecked("foo-baz"), - ), - ], - ) - .await; - } - - async fn create_root_dataset(&mut self, account_name: Option, dataset_name: &str) { - let account_id = self - .auth_svc - .find_account_id_by_name(account_name.as_ref().unwrap()) - .await - .unwrap() - .unwrap(); - - let created_dataset = self - ._super() - .create_root_dataset(&DatasetAlias::new( - account_name, - DatasetName::new_unchecked(dataset_name), - )) - .await; - - self.account_datasets - .entry(account_id.clone()) - .and_modify(|e| { - e.push(created_dataset.dataset_handle.id.clone()); - }) - .or_insert_with(|| vec![created_dataset.dataset_handle.id.clone()]); - } - - async fn create_derived_dataset( - &mut self, - account_name: Option, - dataset_name: &str, - input_aliases: Vec, - ) { - let account_id = self - .auth_svc - .find_account_id_by_name(account_name.as_ref().unwrap()) - .await - .unwrap() - .unwrap(); - - let created_dataset = self - ._super() - .create_derived_dataset( - &DatasetAlias::new(account_name, DatasetName::new_unchecked(dataset_name)), - input_aliases - .iter() - .map(DatasetAlias::as_local_ref) - .collect(), - ) - .await; - - self.account_datasets - .entry(account_id.clone()) - .and_modify(|e| { - e.push(created_dataset.dataset_handle.id.clone()); - }) - .or_insert_with(|| vec![created_dataset.dataset_handle.id.clone()]); - } -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/utils/database-common/tests/tests/test_entries_streamer.rs b/src/utils/database-common/tests/tests/test_entries_streamer.rs deleted file mode 100644 index b80e6294f5..0000000000 --- a/src/utils/database-common/tests/tests/test_entries_streamer.rs +++ /dev/null @@ -1,414 +0,0 @@ -// Copyright Kamu Data, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::sync::Arc; - -use database_common::{EntityPageListing, EntityPageStreamer, PaginationOpts}; -use futures::TryStreamExt; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -struct TestPaginationOpts { - total_entity_count: usize, - expected_entities_call_count: usize, - start_offset: usize, - page_limit: usize, - expected_entities: Vec, -} - -macro_rules! test_pagination { - ($test_pagination_opts: expr) => { - let TestPaginationOpts { - total_entity_count, - expected_entities_call_count, - start_offset, - page_limit, - expected_entities, - } = $test_pagination_opts; - - let entity_source = entity_source(total_entity_count, expected_entities_call_count); - let streamer = EntityPageStreamer::new(start_offset, page_limit); - - let stream = streamer.into_stream( - || async { - let arguments = entity_source.init_arguments().await; - Ok(arguments) - }, - |_, pagination| { - let entity_source = entity_source.clone(); - async move { - let listing = entity_source.entities(pagination).await; - Ok(listing) - } - }, - ); - - let actual_entries = stream.try_collect::>().await.unwrap(); - - pretty_assertions::assert_eq!(expected_entities, actual_entries); - }; -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_pagination_less_than_a_page() { - test_pagination!(TestPaginationOpts { - total_entity_count: 3, - start_offset: 0, - page_limit: 5, - expected_entities_call_count: 1, - expected_entities: vec![ - TestEntity { id: 0 }, - TestEntity { id: 1 }, - TestEntity { id: 2 }, - ], - }); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_pagination_fits_on_one_page() { - test_pagination!(TestPaginationOpts { - total_entity_count: 5, - start_offset: 0, - page_limit: 5, - expected_entities_call_count: 1, - expected_entities: vec![ - TestEntity { id: 0 }, - TestEntity { id: 1 }, - TestEntity { id: 2 }, - TestEntity { id: 3 }, - TestEntity { id: 4 }, - ], - }); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_pagination_more_than_a_page() { - test_pagination!(TestPaginationOpts { - total_entity_count: 7, - start_offset: 0, - page_limit: 5, - expected_entities_call_count: 2, - expected_entities: vec![ - TestEntity { id: 0 }, - TestEntity { id: 1 }, - TestEntity { id: 2 }, - TestEntity { id: 3 }, - TestEntity { id: 4 }, - TestEntity { id: 5 }, - TestEntity { id: 6 }, - ], - }); -} - -#[tokio::test] -async fn test_pagination_fits_on_few_pages() { - test_pagination!(TestPaginationOpts { - total_entity_count: 10, - start_offset: 0, - page_limit: 5, - expected_entities_call_count: 2, - expected_entities: vec![ - TestEntity { id: 0 }, - TestEntity { id: 1 }, - TestEntity { id: 2 }, - TestEntity { id: 3 }, - TestEntity { id: 4 }, - TestEntity { id: 5 }, - TestEntity { id: 6 }, - TestEntity { id: 7 }, - TestEntity { id: 8 }, - TestEntity { id: 9 }, - ], - }); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_pagination_start_offset_in_the_page_middle() { - test_pagination!(TestPaginationOpts { - total_entity_count: 10, - start_offset: 5, - page_limit: 10, - expected_entities_call_count: 1, - expected_entities: vec![ - TestEntity { id: 5 }, - TestEntity { id: 6 }, - TestEntity { id: 7 }, - TestEntity { id: 8 }, - TestEntity { id: 9 }, - ], - }); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_pagination_start_offset_is_greater_than_the_total_entity_count() { - test_pagination!(TestPaginationOpts { - total_entity_count: 10, - start_offset: 11, - page_limit: 10, - expected_entities_call_count: 1, - expected_entities: vec![], - }); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_paged_page_processing_of_input_data_by_ref() { - fn assert_page(page: &[&TestEntity], pagination: &PaginationOpts) { - match pagination.offset { - 0 => { - pretty_assertions::assert_eq!( - vec![ - &TestEntity { id: 0 }, - &TestEntity { id: 1 }, - &TestEntity { id: 2 }, - ], - page - ); - } - 3 => { - pretty_assertions::assert_eq!( - vec![ - &TestEntity { id: 3 }, - &TestEntity { id: 4 }, - &TestEntity { id: 5 }, - ], - page - ); - } - 6 => { - pretty_assertions::assert_eq!( - vec![ - &TestEntity { id: 6 }, - &TestEntity { id: 7 }, - &TestEntity { id: 8 }, - ], - page - ); - } - 9 => { - pretty_assertions::assert_eq!(vec![&TestEntity { id: 9 },], page); - } - _ => { - unreachable!() - } - } - } - - let input_data = vec![ - TestEntity { id: 0 }, - TestEntity { id: 1 }, - TestEntity { id: 2 }, - TestEntity { id: 3 }, - TestEntity { id: 4 }, - TestEntity { id: 5 }, - TestEntity { id: 6 }, - TestEntity { id: 7 }, - TestEntity { id: 8 }, - TestEntity { id: 9 }, - ]; - - struct CollectionArgs<'a> { - pub input_data: &'a Vec, - } - - let streamer = EntityPageStreamer::new(0, 3); - - let stream = streamer.into_stream( - || async { - Ok(Arc::new(CollectionArgs { - input_data: &input_data, - })) - }, - |input, pagination| { - let input_len = input.input_data.len(); - - let input_page = input - .input_data - .iter() - .skip(pagination.offset) - .take(pagination.safe_limit(input_len)) - .collect::>(); - - assert_page(&input_page, &pagination); - - async move { - Ok(EntityPageListing { - list: input_page, - total_count: input_len, - }) - } - }, - ); - - stream.try_collect::>().await.unwrap(); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[tokio::test] -async fn test_paged_page_processing_of_input_data_by_value() { - #[derive(Debug, Clone, PartialEq)] - struct ClonableTestEntity { - id: usize, - } - - fn assert_page(page: &[ClonableTestEntity], pagination: &PaginationOpts) { - match pagination.offset { - 0 => { - pretty_assertions::assert_eq!( - vec![ - ClonableTestEntity { id: 0 }, - ClonableTestEntity { id: 1 }, - ClonableTestEntity { id: 2 }, - ], - page - ); - } - 3 => { - pretty_assertions::assert_eq!( - vec![ - ClonableTestEntity { id: 3 }, - ClonableTestEntity { id: 4 }, - ClonableTestEntity { id: 5 }, - ], - page - ); - } - 6 => { - pretty_assertions::assert_eq!( - vec![ - ClonableTestEntity { id: 6 }, - ClonableTestEntity { id: 7 }, - ClonableTestEntity { id: 8 }, - ], - page - ); - } - 9 => { - pretty_assertions::assert_eq!(vec![ClonableTestEntity { id: 9 },], page); - } - _ => { - unreachable!() - } - } - } - - let input_data = vec![ - ClonableTestEntity { id: 0 }, - ClonableTestEntity { id: 1 }, - ClonableTestEntity { id: 2 }, - ClonableTestEntity { id: 3 }, - ClonableTestEntity { id: 4 }, - ClonableTestEntity { id: 5 }, - ClonableTestEntity { id: 6 }, - ClonableTestEntity { id: 7 }, - ClonableTestEntity { id: 8 }, - ClonableTestEntity { id: 9 }, - ]; - - let streamer = EntityPageStreamer::new(0, 3); - - let stream = streamer.into_stream( - || async { Ok(Arc::new(input_data)) }, - |input, pagination| { - let input_page = input - .iter() - .skip(pagination.offset) - .take(pagination.safe_limit(input.len())) - .cloned() - .collect::>(); - - assert_page(&input_page, &pagination); - - async move { - Ok(EntityPageListing { - list: input_page, - total_count: input.len(), - }) - } - }, - ); - - stream.try_collect::>().await.unwrap(); -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Helpers -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -fn entity_source( - total_entities_count: usize, - expected_entities_call_count: usize, -) -> Arc { - let mut entity_source = MockEntitySource::new(); - - entity_source - .expect_init_arguments() - .times(1) - .returning(|| NoArgs); - - entity_source - .expect_entities() - .times(expected_entities_call_count) - .returning(move |pagination| { - let result = (0..) - .skip(pagination.offset) - .take(pagination.safe_limit(total_entities_count)) - .map(|id| TestEntity { id }) - .collect::>(); - - EntityPageListing { - list: result, - total_count: total_entities_count, - } - }); - - Arc::new(entity_source) -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Clone)] -struct NoArgs; - -#[derive(Debug, PartialEq)] -struct TestEntity { - id: usize, -} - -#[async_trait::async_trait] -trait EntitySource { - async fn init_arguments(&self) -> NoArgs; - - async fn entities(&self, pagination: PaginationOpts) -> EntityPageListing; -} - -mockall::mock! { - pub EntitySource {} - - #[async_trait::async_trait] - impl EntitySource for EntitySource { - async fn init_arguments(&self) -> NoArgs; - - async fn entities(&self, pagination: PaginationOpts) -> EntityPageListing; - } -} - -////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////