Skip to content

Commit

Permalink
Covered new GraphQL API for flow config with unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zaychenko-sergei committed Dec 26, 2023
1 parent 75a32b7 commit 88c4b39
Show file tree
Hide file tree
Showing 8 changed files with 780 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type CreateDatasetResultSuccess implements CreateDatasetResult & CreateDatasetFr
}

type CronExpression {
expression: String!
cronExpression: String!
}

type DataBatch {
Expand Down Expand Up @@ -302,9 +302,9 @@ type DatasetFlowConfigs {
}

type DatasetFlowConfigsMut {
setConfigTimeDelta(datasetFlowType: DatasetFlowType!, paused: Boolean!, every: TimeDeltaInput!): Boolean!
setConfigCronExpression(datasetFlowType: DatasetFlowType!, paused: Boolean!, cronExpression: String!): Boolean!
setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, throttlingPeriod: TimeDeltaInput, minimalDataBatch: Int): Boolean!
setConfigTimeDelta(datasetFlowType: DatasetFlowType!, paused: Boolean!, every: TimeDeltaInput!): FlowConfiguration!
setConfigCronExpression(datasetFlowType: DatasetFlowType!, paused: Boolean!, cronExpression: String!): FlowConfiguration!
setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, throttlingPeriod: TimeDeltaInput, minimalDataBatch: Int): FlowConfiguration!
}

enum DatasetFlowType {
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ thiserror = { version = "1", default-features = false }

[dev-dependencies]
# TODO: Limit to mock or in-memory implementations only
event-bus = { workspace = true }
kamu = { workspace = true }
event-bus = {workspace = true }
kamu-flow-system-inmem = { workspace = true }

env_logger = "0.10"
mockall = "0.11"
Expand Down
22 changes: 11 additions & 11 deletions src/adapter/graphql/src/mutations/dataset_flow_configs_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use kamu_flow_system::{
use opendatafabric as odf;

use crate::prelude::*;
use crate::queries::{DatasetFlowType, TimeUnit};
use crate::queries::{DatasetFlowType, FlowConfiguration, TimeUnit};
use crate::{utils, LoggedInGuard};

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -45,14 +45,14 @@ impl DatasetFlowConfigsMut {
dataset_flow_type: DatasetFlowType,
paused: bool,
every: TimeDeltaInput,
) -> Result<bool> {
) -> Result<FlowConfiguration> {
self.ensure_expected_dataset_kind(ctx, dataset_flow_type)
.await?;
self.ensure_scheduling_permission(ctx).await?;

let flow_config_service = from_catalog::<dyn FlowConfigurationService>(ctx).unwrap();

flow_config_service
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
Expand All @@ -67,7 +67,7 @@ impl DatasetFlowConfigsMut {
SetFlowConfigurationError::Internal(e) => GqlError::Internal(e),
})?;

Ok(true)
Ok(res.into())
}

#[graphql(guard = "LoggedInGuard::new()")]
Expand All @@ -77,29 +77,29 @@ impl DatasetFlowConfigsMut {
dataset_flow_type: DatasetFlowType,
paused: bool,
cron_expression: String,
) -> Result<bool> {
) -> Result<FlowConfiguration> {
self.ensure_expected_dataset_kind(ctx, dataset_flow_type)
.await?;
self.ensure_scheduling_permission(ctx).await?;

let flow_config_service = from_catalog::<dyn FlowConfigurationService>(ctx).unwrap();

flow_config_service
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
paused,
FlowConfigurationRule::Schedule(Schedule::CronExpression(ScheduleCronExpression {
expression: cron_expression,
cron_expression,
})),
)
.await
.map_err(|e| match e {
SetFlowConfigurationError::Internal(e) => GqlError::Internal(e),
})?;

Ok(true)
Ok(res.into())
}

#[graphql(guard = "LoggedInGuard::new()")]
Expand All @@ -110,14 +110,14 @@ impl DatasetFlowConfigsMut {
paused: bool,
throttling_period: Option<TimeDeltaInput>,
minimal_data_batch: Option<i32>,
) -> Result<bool> {
) -> Result<FlowConfiguration> {
self.ensure_expected_dataset_kind(ctx, dataset_flow_type)
.await?;
self.ensure_scheduling_permission(ctx).await?;

let flow_config_service = from_catalog::<dyn FlowConfigurationService>(ctx).unwrap();

flow_config_service
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
Expand All @@ -133,7 +133,7 @@ impl DatasetFlowConfigsMut {
SetFlowConfigurationError::Internal(e) => GqlError::Internal(e),
})?;

Ok(true)
Ok(res.into())
}

#[graphql(skip)]
Expand Down
56 changes: 30 additions & 26 deletions src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,40 @@ impl DatasetFlowConfigs {
.await
.int_err()?;

Ok(maybe_flow_config.map(|flow_config| FlowConfiguration {
paused: !flow_config.is_active(),
batching: if let FlowConfigurationRule::StartCondition(condition) = &flow_config.rule {
Ok(maybe_flow_config.map(|flow_config| flow_config.into()))
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Enum, Copy, Clone, Eq, PartialEq)]
#[graphql(remote = "kamu_flow_system::DatasetFlowType")]
pub enum DatasetFlowType {
Ingest,
ExecuteQuery,
Compaction,
}

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct FlowConfiguration {
pub paused: bool,
pub schedule: Option<FlowConfigurationSchedule>,
pub batching: Option<FlowConfigurationBatching>,
}

impl From<kamu_flow_system::FlowConfigurationState> for FlowConfiguration {
fn from(value: kamu_flow_system::FlowConfigurationState) -> Self {
Self {
paused: !value.is_active(),
batching: if let FlowConfigurationRule::StartCondition(condition) = &value.rule {
Some(FlowConfigurationBatching {
throttling_period: condition.throttling_period.map(|tp| tp.into()),
minimal_data_batch: condition.minimal_data_batch,
})
} else {
None
},
schedule: if let FlowConfigurationRule::Schedule(schedule) = flow_config.rule {
schedule: if let FlowConfigurationRule::Schedule(schedule) = value.rule {
match schedule {
Schedule::TimeDelta(time_delta) => Some(FlowConfigurationSchedule::TimeDelta(
time_delta.every.into(),
Expand All @@ -83,35 +106,16 @@ impl DatasetFlowConfigs {
} else {
None
},
}))
}
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Enum, Copy, Clone, Eq, PartialEq)]
#[graphql(remote = "kamu_flow_system::DatasetFlowType")]
pub enum DatasetFlowType {
Ingest,
ExecuteQuery,
Compaction,
}

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct FlowConfiguration {
pub paused: bool,
pub schedule: Option<FlowConfigurationSchedule>,
pub batching: Option<FlowConfigurationBatching>,
}

#[derive(Union, Debug, Clone, PartialEq, Eq)]
pub enum FlowConfigurationSchedule {
TimeDelta(TimeDelta),
Cron(CronExpression),
}

/////////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct FlowConfigurationBatching {
pub throttling_period: Option<TimeDelta>,
Expand All @@ -122,13 +126,13 @@ pub struct FlowConfigurationBatching {

#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)]
pub struct CronExpression {
pub expression: String,
pub cron_expression: String,
}

impl From<ScheduleCronExpression> for CronExpression {
fn from(value: ScheduleCronExpression) -> Self {
Self {
expression: value.expression,
cron_expression: value.cron_expression,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/graphql/tests/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod test_accounts;
mod test_auth;
mod test_error_handling;
mod test_gql_data;
mod test_gql_dataset_flow_configs;
mod test_gql_datasets;
mod test_gql_metadata;
mod test_gql_metadata_chain;
Expand Down
Loading

0 comments on commit 88c4b39

Please sign in to comment.