Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatasetOwnershipService: moved to the kamu-dataset area & implemented via DatasetEntryServiceImpl #1004

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Recommendation: for ease of reading, use the following order:
- OSO: added resource storage for access speed
- E2E: Using the correct account in multi-tenant mode
- And also the possibility of set it up
- `DatasetOwnershipService`: moved to the `kamu-dataset` crate area & implemented via `DatasetEntryServiceImpl`

## [0.213.1] - 2024-12-18
### Fixed
Expand Down
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_core::DatasetOwnershipService;
use kamu_datasets::DatasetOwnershipService;
use opendatafabric as odf;

use crate::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use chrono::Utc;
use fs::FlowConfigurationService;
use kamu_accounts::Account;
use kamu_core::DatasetOwnershipService;
use kamu_datasets::DatasetOwnershipService;
use kamu_flow_system as fs;
use opendatafabric::DatasetID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use futures::StreamExt;
use kamu_accounts::Account as AccountEntity;
use kamu_core::DatasetOwnershipService;
use kamu_datasets::DatasetOwnershipService;
use kamu_flow_system::FlowConfigurationService;

use crate::prelude::*;
Expand Down
12 changes: 4 additions & 8 deletions src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ use indoc::indoc;
use kamu::testing::{MetadataFactory, MockDatasetActionAuthorizer, MockDatasetChangesService};
use kamu::{
CreateDatasetFromSnapshotUseCaseImpl,
DatasetOwnershipServiceInMemory,
DatasetOwnershipServiceInMemoryStateInitializer,
DatasetRegistryRepoBridge,
DatasetRepositoryLocalFs,
DatasetRepositoryWriter,
MetadataQueryServiceImpl,
Expand All @@ -28,8 +25,8 @@ use kamu_accounts::{JwtAuthenticationConfig, DEFAULT_ACCOUNT_NAME, DEFAULT_ACCOU
use kamu_accounts_inmem::InMemoryAccessTokenRepository;
use kamu_accounts_services::{AccessTokenServiceImpl, AuthenticationServiceImpl};
use kamu_core::*;
use kamu_datasets_inmem::InMemoryDatasetDependencyRepository;
use kamu_datasets_services::DependencyGraphServiceImpl;
use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEntryRepository};
use kamu_datasets_services::{DatasetEntryServiceImpl, DependencyGraphServiceImpl};
use kamu_flow_system::FlowAgentConfig;
use kamu_flow_system_inmem::{InMemoryFlowConfigurationEventStore, InMemoryFlowEventStore};
use kamu_task_system_inmem::InMemoryTaskEventStore;
Expand Down Expand Up @@ -643,7 +640,6 @@ impl FlowConfigHarness {
.add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir))
.bind::<dyn DatasetRepository, DatasetRepositoryLocalFs>()
.bind::<dyn DatasetRepositoryWriter, DatasetRepositoryLocalFs>()
.add::<DatasetRegistryRepoBridge>()
.add::<MetadataQueryServiceImpl>()
.add::<CreateDatasetFromSnapshotUseCaseImpl>()
.add_value(dataset_changes_mock)
Expand All @@ -665,8 +661,8 @@ impl FlowConfigHarness {
))
.add::<TaskSchedulerImpl>()
.add::<InMemoryTaskEventStore>()
.add::<DatasetOwnershipServiceInMemory>()
.add::<DatasetOwnershipServiceInMemoryStateInitializer>()
.add::<DatasetEntryServiceImpl>()
.add::<InMemoryDatasetEntryRepository>()
.add::<DatabaseTransactionRunner>();

NoOpDatabasePlugin::init_database_components(&mut b);
Expand Down
10 changes: 4 additions & 6 deletions src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use indoc::indoc;
use kamu::testing::{MetadataFactory, MockDatasetChangesService};
use kamu::{
CreateDatasetFromSnapshotUseCaseImpl,
DatasetOwnershipServiceInMemory,
DatasetRegistryRepoBridge,
DatasetRepositoryLocalFs,
DatasetRepositoryWriter,
MetadataQueryServiceImpl,
Expand Down Expand Up @@ -47,8 +45,8 @@ use kamu_core::{
TenancyConfig,
MESSAGE_PRODUCER_KAMU_CORE_DATASET_SERVICE,
};
use kamu_datasets_inmem::InMemoryDatasetDependencyRepository;
use kamu_datasets_services::DependencyGraphServiceImpl;
use kamu_datasets_inmem::{InMemoryDatasetDependencyRepository, InMemoryDatasetEntryRepository};
use kamu_datasets_services::{DatasetEntryServiceImpl, DependencyGraphServiceImpl};
use kamu_flow_system::{
Flow,
FlowAgentConfig,
Expand Down Expand Up @@ -3111,7 +3109,6 @@ impl FlowRunsHarness {
.add_builder(DatasetRepositoryLocalFs::builder().with_root(datasets_dir))
.bind::<dyn DatasetRepository, DatasetRepositoryLocalFs>()
.bind::<dyn DatasetRepositoryWriter, DatasetRepositoryLocalFs>()
.add::<DatasetRegistryRepoBridge>()
.add::<MetadataQueryServiceImpl>()
.add::<CreateDatasetFromSnapshotUseCaseImpl>()
.add_value(dataset_changes_mock)
Expand All @@ -3132,7 +3129,8 @@ impl FlowRunsHarness {
.add::<AccessTokenServiceImpl>()
.add::<InMemoryAccessTokenRepository>()
.add_value(JwtAuthenticationConfig::default())
.add::<DatasetOwnershipServiceInMemory>()
.add::<DatasetEntryServiceImpl>()
.add::<InMemoryDatasetEntryRepository>()
.add::<DatabaseTransactionRunner>();

NoOpDatabasePlugin::init_database_components(&mut b);
Expand Down
18 changes: 10 additions & 8 deletions src/adapter/graphql/tests/utils/auth_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ use kamu_adapter_graphql::ANONYMOUS_ACCESS_FORBIDDEN_MESSAGE;
pub async fn authentication_catalogs(
base_catalog: &dill::Catalog,
) -> (dill::Catalog, dill::Catalog) {
let catalog_anonymous = dill::CatalogBuilder::new_chained(base_catalog)
.add_value(CurrentAccountSubject::anonymous(
AnonymousAccountReason::NoAuthenticationProvided,
))
.build();

let current_account_subject = CurrentAccountSubject::new_test();
let mut predefined_accounts_config = PredefinedAccountsConfig::new();

Expand All @@ -36,12 +30,20 @@ pub async fn authentication_catalogs(
panic!()
}

let catalog_authorized = dill::CatalogBuilder::new_chained(base_catalog)
let base_auth_catalog = dill::CatalogBuilder::new_chained(base_catalog)
.add::<LoginPasswordAuthProvider>()
.add::<PredefinedAccountsRegistrator>()
.add::<InMemoryAccountRepository>()
.add_value(predefined_accounts_config.clone())
.build();

let catalog_anonymous = dill::CatalogBuilder::new_chained(&base_auth_catalog)
.add_value(CurrentAccountSubject::anonymous(
AnonymousAccountReason::NoAuthenticationProvided,
))
.build();
let catalog_authorized = dill::CatalogBuilder::new_chained(&base_auth_catalog)
.add_value(current_account_subject)
.add_value(predefined_accounts_config)
.build();

init_on_startup::run_startup_jobs(&catalog_authorized)
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/http/tests/harness/client_side_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub(crate) struct ClientSideHarness {
catalog: dill::Catalog,
pull_dataset_use_case: Arc<dyn PullDatasetUseCase>,
push_dataset_use_case: Arc<dyn PushDatasetUseCase>,
access_token_resover: Arc<dyn OdfServerAccessTokenResolver>,
access_token_resolver: Arc<dyn OdfServerAccessTokenResolver>,
options: ClientSideHarnessOptions,
}

Expand Down Expand Up @@ -162,7 +162,7 @@ impl ClientSideHarness {

let pull_dataset_use_case = catalog.get_one::<dyn PullDatasetUseCase>().unwrap();
let push_dataset_use_case = catalog.get_one::<dyn PushDatasetUseCase>().unwrap();
let access_token_resover = catalog
let access_token_resolver = catalog
.get_one::<dyn OdfServerAccessTokenResolver>()
.unwrap();

Expand All @@ -171,7 +171,7 @@ impl ClientSideHarness {
catalog,
pull_dataset_use_case,
push_dataset_use_case,
access_token_resover,
access_token_resolver,
options,
}
}
Expand Down Expand Up @@ -341,7 +341,7 @@ impl ClientSideHarness {
let mut ws_url = url.odf_to_transport_protocol().unwrap();
ws_url.ensure_trailing_slash();
let maybe_access_token = self
.access_token_resover
.access_token_resolver
.resolve_odf_dataset_access_token(&ws_url);

ws_url = ws_url.join(method).unwrap();
Expand Down
3 changes: 0 additions & 3 deletions src/app/cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,6 @@ pub fn configure_server_catalog(base_catalog: &Catalog) -> CatalogBuilder {

b.add::<DatasetChangesServiceImpl>();

b.add::<DatasetOwnershipServiceInMemory>();
b.add::<DatasetOwnershipServiceInMemoryStateInitializer>();

kamu_task_system_services::register_dependencies(&mut b);

b.add_value(kamu_flow_system_inmem::domain::FlowAgentConfig::new(
Expand Down
2 changes: 0 additions & 2 deletions src/domain/core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub use transform::*;
pub use watermark::*;

pub mod dataset_changes_service;
pub mod dataset_ownership_service;
pub mod dataset_registry;
pub mod dependency_graph_service;
pub mod engine_provisioner;
Expand All @@ -43,7 +42,6 @@ pub mod sync_service;
pub mod verification_service;

pub use dataset_changes_service::*;
pub use dataset_ownership_service::*;
pub use dataset_registry::*;
pub use dependency_graph_service::*;
pub use engine_provisioner::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub trait DatasetEntryService: Sync + Send {
// TODO: Private Datasets: extract to DatasetEntryRegistry?
fn all_entries(&self) -> DatasetEntryStream;

fn entries_owned_by(&self, owner_id: &odf::AccountID) -> DatasetEntryStream;

async fn list_all_entries(
&self,
pagination: PaginationOpts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,26 @@
// by the Apache License, Version 2.0.

use internal_error::InternalError;
use opendatafabric::{AccountID, DatasetID};
use opendatafabric as odf;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// TODO: Private Datasets: replace with DatasetEntry-related service
#[async_trait::async_trait]
pub trait DatasetOwnershipService: Sync + Send {
async fn get_dataset_owners(
async fn get_dataset_owner(
&self,
dataset_id: &DatasetID,
) -> Result<Vec<AccountID>, InternalError>;
dataset_id: &odf::DatasetID,
) -> Result<odf::AccountID, InternalError>;

async fn get_owned_datasets(
&self,
account_id: &AccountID,
) -> Result<Vec<DatasetID>, InternalError>;
account_id: &odf::AccountID,
) -> Result<Vec<odf::DatasetID>, InternalError>;

async fn is_dataset_owned_by(
&self,
dataset_id: &DatasetID,
account_id: &AccountID,
dataset_id: &odf::DatasetID,
account_id: &odf::AccountID,
) -> Result<bool, InternalError>;
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/datasets/domain/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
mod dataset_entry_service;
mod dataset_env_var_service;
mod dataset_key_value_service;
mod dataset_ownership_service;

pub use dataset_entry_service::*;
pub use dataset_env_var_service::*;
pub use dataset_key_value_service::*;
pub use dataset_ownership_service::*;
67 changes: 66 additions & 1 deletion src/domain/datasets/services/src/dataset_entry_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use database_common::{EntityPageListing, EntityPageStreamer, PaginationOpts};
use dill::{component, interface, meta, Catalog};
use internal_error::{InternalError, ResultIntoInternal};
use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal};
use kamu_accounts::{AccountRepository, CurrentAccountSubject};
use kamu_core::{
DatasetHandleStream,
Expand Down Expand Up @@ -68,6 +68,7 @@ struct AccountsCache {
#[component(pub)]
#[interface(dyn DatasetEntryService)]
#[interface(dyn DatasetRegistry)]
#[interface(dyn DatasetOwnershipService)]
#[interface(dyn MessageConsumer)]
#[interface(dyn MessageConsumerT<DatasetLifecycleMessage>)]
#[meta(MessageConsumerMeta {
Expand Down Expand Up @@ -336,6 +337,19 @@ impl DatasetEntryService for DatasetEntryServiceImpl {
)
}

fn entries_owned_by(&self, owner_id: &odf::AccountID) -> DatasetEntryStream {
let owner_id = owner_id.clone();

EntityPageStreamer::default().into_stream(
|| async { Ok(Arc::new(owner_id)) },
move |owner_id, pagination| async move {
Comment on lines +344 to +345
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|| async { ... },
move |...| async move {
    ...
}

Offtop: ah, the magical Rust

self.list_entries_owned_by(&owner_id, pagination)
.await
.int_err()
},
)
}

async fn list_all_entries(
&self,
pagination: PaginationOpts,
Expand Down Expand Up @@ -511,6 +525,57 @@ impl DatasetRegistry for DatasetEntryServiceImpl {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
impl DatasetOwnershipService for DatasetEntryServiceImpl {
async fn get_dataset_owner(
&self,
dataset_id: &odf::DatasetID,
) -> Result<odf::AccountID, InternalError> {
let dataset_entry = self
.dataset_entry_repo
.get_dataset_entry(dataset_id)
.await
.int_err()?;

Ok(dataset_entry.owner_id)
}

async fn get_owned_datasets(
&self,
account_id: &odf::AccountID,
) -> Result<Vec<odf::DatasetID>, InternalError> {
use futures::TryStreamExt;

let owned_dataset_ids = self
.entries_owned_by(account_id)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|dataset_entry| dataset_entry.id)
.collect::<Vec<_>>();

Ok(owned_dataset_ids)
}

async fn is_dataset_owned_by(
&self,
dataset_id: &odf::DatasetID,
account_id: &odf::AccountID,
) -> Result<bool, InternalError> {
let get_res = self.dataset_entry_repo.get_dataset_entry(dataset_id).await;

match get_res {
Ok(dataset_entry) => Ok(dataset_entry.owner_id == *account_id),
Err(err) => match err {
GetDatasetEntryError::NotFound(_) => Ok(false),
unexpected_error => Err(unexpected_error.int_err()),
},
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

impl MessageConsumer for DatasetEntryServiceImpl {}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading
Loading