Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GQL, DatasetMetadata: be prepared for not accessed datasets #1011

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Recommendation: for ease of reading, use the following order:
- E2E: Using the correct account in multi-tenant mode
- And also the possibility of set it up
- `DatasetOwnershipService`: moved to the `kamu-dataset` crate area & implemented via `DatasetEntryServiceImpl`
- GQL, `DatasetMetadata.currentUpstreamDependencies`: indication if datasets not found/not accessed
- GQL, `DatasetMetadata.currentDownstreamDependencies`: exclude datasets that cannot be accessed

## [0.213.1] - 2024-12-18
### Fixed
Expand Down
23 changes: 19 additions & 4 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ type DatasetMetadata {
"""
Current upstream dependencies of a dataset
"""
currentUpstreamDependencies: [Dataset!]!
currentUpstreamDependencies: [UpstreamDatasetResult!]!
"""
Current downstream dependencies of a dataset
"""
Expand Down Expand Up @@ -725,7 +725,7 @@ type DatasetMut {
"""
Set visibility for the dataset
"""
setVisibility(visibility: DatasetVisibilityInput!): SetDatasetPropertyResult!
setVisibility(visibility: DatasetVisibilityInput!): SetDatasetVisibilityResult!
}

scalar DatasetName
Expand Down Expand Up @@ -1758,11 +1758,11 @@ type SetDataSchema {
schema: DataSchema!
}

interface SetDatasetPropertyResult {
interface SetDatasetVisibilityResult {
message: String!
}

type SetDatasetPropertyResultSuccess implements SetDatasetPropertyResult {
type SetDatasetVisibilityResultSuccess implements SetDatasetVisibilityResult {
dummy: String
message: String!
}
Expand Down Expand Up @@ -1996,6 +1996,21 @@ interface UpdateReadmeResult {
message: String!
}

interface UpstreamDatasetResult {
message: String!
}

type UpstreamDatasetResultFound implements UpstreamDatasetResult {
dataset: Dataset!
message: String!
}

type UpstreamDatasetResultNotFound implements UpstreamDatasetResult {
datasetId: DatasetID!
datasetAlias: DatasetAlias!
message: String!
}

type ViewAccessToken {
"""
Unique identifier of the access token
Expand Down
14 changes: 7 additions & 7 deletions src/adapter/graphql/src/mutations/dataset_mut/dataset_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl DatasetMut {
&self,
ctx: &Context<'_>,
visibility: DatasetVisibilityInput,
) -> Result<SetDatasetPropertyResult> {
) -> Result<SetDatasetVisibilityResult> {
ensure_account_owns_dataset(ctx, &self.dataset_handle).await?;

let rebac_svc = from_catalog_n!(ctx, dyn kamu_auth_rebac::RebacService);
Expand All @@ -186,7 +186,7 @@ impl DatasetMut {
.int_err()?;
}

Ok(SetDatasetPropertyResultSuccess::default().into())
Ok(SetDatasetVisibilityResultSuccess::default().into())
}
}

Expand Down Expand Up @@ -292,20 +292,20 @@ pub enum DatasetVisibilityInput {

#[derive(Interface, Debug)]
#[graphql(field(name = "message", ty = "String"))]
pub enum SetDatasetPropertyResult {
Success(SetDatasetPropertyResultSuccess),
pub enum SetDatasetVisibilityResult {
Success(SetDatasetVisibilityResultSuccess),
}

#[derive(SimpleObject, Debug, Default)]
#[graphql(complex)]
pub struct SetDatasetPropertyResultSuccess {
pub struct SetDatasetVisibilityResultSuccess {
_dummy: Option<String>,
}

#[ComplexObject]
impl SetDatasetPropertyResultSuccess {
impl SetDatasetVisibilityResultSuccess {
async fn message(&self) -> String {
"Updated".to_string()
"Success".to_string()
}
}

Expand Down
142 changes: 116 additions & 26 deletions src/adapter/graphql/src/queries/datasets/dataset_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use chrono::prelude::*;
use kamu_core::auth::{ClassifyByAllowanceResponse, DatasetAction};
use kamu_core::{
self as domain,
MetadataChainExt,
Expand All @@ -25,8 +26,6 @@ use crate::utils::get_dataset;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct DatasetMetadata {
dataset_handle: odf::DatasetHandle,
}
Expand Down Expand Up @@ -81,31 +80,50 @@ impl DatasetMetadata {
}
}

// TODO: Private Datasets: tests
/// Current upstream dependencies of a dataset
async fn current_upstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let (dependency_graph_service, dataset_registry) = from_catalog_n!(
async fn current_upstream_dependencies(
&self,
ctx: &Context<'_>,
) -> Result<Vec<UpstreamDatasetResult>> {
let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!(
ctx,
dyn domain::DependencyGraphService,
dyn domain::DatasetRegistry
dyn domain::DatasetRegistry,
dyn kamu_core::auth::DatasetActionAuthorizer
);

use tokio_stream::StreamExt;
let upstream_dataset_ids: Vec<_> = dependency_graph_service
use futures::{StreamExt, TryStreamExt};

let upstream_dataset_handles = dependency_graph_service
.get_upstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let mut upstream = Vec::with_capacity(upstream_dataset_ids.len());
for upstream_dataset_id in upstream_dataset_ids {
let hdl = dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()?;
.then(|upstream_dataset_id| {
let dataset_registry = dataset_registry.clone();
async move {
dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()
}
})
.try_collect::<Vec<_>>()
.await?;

let upstream_dataset_handles_len = upstream_dataset_handles.len();
let ClassifyByAllowanceResponse {
authorized_handles,
unauthorized_handles_with_errors,
} = dataset_action_authorizer
.classify_datasets_by_allowance(upstream_dataset_handles, DatasetAction::Read)
.await?;

let mut upstream = Vec::with_capacity(upstream_dataset_handles_len);
for hdl in authorized_handles {
let maybe_account = Account::from_dataset_alias(ctx, &hdl.alias).await?;
if let Some(account) = maybe_account {
upstream.push(Dataset::new(account, hdl));
upstream.push(UpstreamDatasetResult::found(Dataset::new(account, hdl)));
} else {
tracing::warn!(
"Skipped upstream dataset '{}' with unresolved account",
Expand All @@ -114,28 +132,51 @@ impl DatasetMetadata {
}
}

upstream.extend(
unauthorized_handles_with_errors
.into_iter()
.map(|(hdl, _)| UpstreamDatasetResult::not_found(hdl)),
);

Ok(upstream)
}

// TODO: Convert to collection
// TODO: Private Datasets: tests
/// Current downstream dependencies of a dataset
async fn current_downstream_dependencies(&self, ctx: &Context<'_>) -> Result<Vec<Dataset>> {
let (dependency_graph_service, dataset_registry) = from_catalog_n!(
let (dependency_graph_service, dataset_registry, dataset_action_authorizer) = from_catalog_n!(
ctx,
dyn domain::DependencyGraphService,
dyn domain::DatasetRegistry
dyn domain::DatasetRegistry,
dyn kamu_core::auth::DatasetActionAuthorizer
);

use tokio_stream::StreamExt;
let downstream_dataset_ids: Vec<_> = dependency_graph_service
use futures::{StreamExt, TryStreamExt};

let downstream_dataset_handles = dependency_graph_service
.get_downstream_dependencies(&self.dataset_handle.id)
.await
.int_err()?
.collect()
.await;

let mut downstream = Vec::with_capacity(downstream_dataset_ids.len());
for downstream_dataset_id in downstream_dataset_ids {
.then(|upstream_dataset_id| {
let dataset_registry = dataset_registry.clone();
async move {
dataset_registry
.resolve_dataset_handle_by_ref(&upstream_dataset_id.as_local_ref())
.await
.int_err()
}
})
.try_collect::<Vec<_>>()
.await?;

let authorized_downstream_dataset_ids = dataset_action_authorizer
.classify_datasets_by_allowance(downstream_dataset_handles, DatasetAction::Read)
.await?
.authorized_handles;

let mut downstream = Vec::with_capacity(authorized_downstream_dataset_ids.len());
for downstream_dataset_id in authorized_downstream_dataset_ids {
let hdl = dataset_registry
.resolve_dataset_handle_by_ref(&downstream_dataset_id.as_local_ref())
.await
Expand Down Expand Up @@ -284,3 +325,52 @@ impl DatasetMetadata {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Debug, Clone)]
#[graphql(field(name = "message", ty = "String"))]
enum UpstreamDatasetResult {
Found(UpstreamDatasetResultFound),
NotFound(UpstreamDatasetResultNotFound),
}

impl UpstreamDatasetResult {
pub fn found(dataset: Dataset) -> Self {
Self::Found(UpstreamDatasetResultFound { dataset })
}

pub fn not_found(dataset_handle: odf::DatasetHandle) -> Self {
Self::NotFound(UpstreamDatasetResultNotFound {
dataset_id: dataset_handle.id.into(),
dataset_alias: dataset_handle.alias.into(),
})
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct UpstreamDatasetResultFound {
pub dataset: Dataset,
}

#[ComplexObject]
impl UpstreamDatasetResultFound {
async fn message(&self) -> String {
"Found".to_string()
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct UpstreamDatasetResultNotFound {
pub dataset_id: DatasetID,
pub dataset_alias: DatasetAlias,
}

#[ComplexObject]
impl UpstreamDatasetResultNotFound {
async fn message(&self) -> String {
"Not found".to_string()
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading