Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

398 flows provide graphql api to manually cancel a flow #416

4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - 2024-01-05
### Added
- New GraphQL API to manually schedule dataset flows
- New GraphQL APIs to manually schedule and cancel dataset flows
- Cron expression implementation for dataset flows
### Changed
- Automatically cancelling scheduled tasks if parent flow is cancelled or aborted

## [0.150.1] - 2023-12-29
### Added
Expand Down
46 changes: 45 additions & 1 deletion resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ type BlockRef {
}


type CancelFlowAlreadyFinished implements CancelFlowResult {
zaychenko-sergei marked this conversation as resolved.
Show resolved Hide resolved
flowId: FlowID!
message: String!
}

interface CancelFlowResult {
message: String!
}

type CancelFlowSuccess implements CancelFlowResult {
flow: Flow!
message: String!
}

type Checkpoint {
physicalHash: Multihash!
size: Int!
Expand Down Expand Up @@ -310,6 +324,7 @@ type DatasetFlowConfigsMut {

type DatasetFlowRunsMut {
triggerFlow(datasetFlowType: DatasetFlowType!): TriggerFlowResult!
cancelFlow(flowId: FlowID!): CancelFlowResult!
}

enum DatasetFlowType {
Expand Down Expand Up @@ -575,7 +590,9 @@ type FetchStepUrl {


type Flow {
flowId: Int!
flowId: FlowID!
status: FlowStatus!
outcome: FlowOutcome
}

type FlowConfiguration {
Expand All @@ -591,12 +608,39 @@ type FlowConfigurationBatching {

union FlowConfigurationSchedule = TimeDelta | CronExpression

scalar FlowID

type FlowIncompatibleDatasetKind implements SetFlowConfigResult & TriggerFlowResult {
expectedDatasetKind: DatasetKind!
actualDatasetKind: DatasetKind!
message: String!
}

type FlowNotFound implements CancelFlowResult {
flowId: FlowID!
message: String!
}

enum FlowOutcome {
SUCCESS
FAILED
CANCELLED
ABORTED
}

enum FlowStatus {
DRAFT
QUEUED
SCHEDULED
FINISHED
}

type FlowUnmatchedInDataset implements CancelFlowResult {
flowId: FlowID!
datasetAlias: DatasetAlias!
message: String!
}
zaychenko-sergei marked this conversation as resolved.
Show resolved Hide resolved


type InputSlice {
datasetId: DatasetID!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ impl From<TimeDeltaInput> for chrono::Duration {

///////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Debug, Clone)]
#[derive(Interface, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub enum SetFlowConfigResult {
Success(SetFlowConfigSuccess),
IncompatibleDatasetKind(FlowIncompatibleDatasetKind),
}

#[derive(SimpleObject, Debug, Clone)]
#[derive(SimpleObject, Clone)]
#[graphql(complex)]
pub struct SetFlowConfigSuccess {
pub config: FlowConfiguration,
Expand Down
50 changes: 50 additions & 0 deletions src/adapter/graphql/src/mutations/flows_mut/dataset_flow_errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_flow_system as fs;

use crate::prelude::*;

///////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub(crate) struct FlowNotFound {
pub flow_id: FlowID,
}

#[ComplexObject]
impl FlowNotFound {
pub async fn message(&self) -> String {
let flow_id: fs::FlowID = self.flow_id.into();
format!("Flow '{}' was not found", flow_id)
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub(crate) struct FlowUnmatchedInDataset {
pub flow_id: FlowID,
pub dataset_alias: DatasetAlias,
}

#[ComplexObject]
impl FlowUnmatchedInDataset {
pub async fn message(&self) -> String {
let flow_id: fs::FlowID = self.flow_id.into();
format!(
"Flow '{}' does not belong to dataset '{}'",
flow_id, self.dataset_alias
)
}
}

///////////////////////////////////////////////////////////////////////////////
107 changes: 90 additions & 17 deletions src/adapter/graphql/src/mutations/flows_mut/dataset_flow_runs_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
// by the Apache License, Version 2.0.

use chrono::Utc;
use kamu_core::CurrentAccountSubject;
use kamu_flow_system::{FlowKeyDataset, FlowService, RequestFlowError};
use opendatafabric as odf;
use {kamu_flow_system as fs, opendatafabric as odf};

use super::{
check_if_flow_belongs_to_dataset,
ensure_expected_dataset_kind,
ensure_scheduling_permission,
FlowInDatasetError,
FlowIncompatibleDatasetKind,
FlowNotFound,
FlowUnmatchedInDataset,
};
use crate::prelude::*;
use crate::LoggedInGuard;
use crate::{utils, LoggedInGuard};

///////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -47,44 +49,78 @@ impl DatasetFlowRunsMut {

ensure_scheduling_permission(ctx, &self.dataset_handle).await?;

let flow_service = from_catalog::<dyn FlowService>(ctx).unwrap();
let current_account_subject = from_catalog::<CurrentAccountSubject>(ctx).unwrap();
let logged_account = match current_account_subject.as_ref() {
CurrentAccountSubject::Logged(la) => la,
CurrentAccountSubject::Anonymous(_) => {
unreachable!("LoggedInGuard would not allow anonymous accounts here")
}
};
let flow_service = from_catalog::<dyn fs::FlowService>(ctx).unwrap();
let logged_account = utils::get_logged_account(ctx);

let res = flow_service
.trigger_manual_flow(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
fs::FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
odf::AccountID::from(odf::FAKE_ACCOUNT_ID),
logged_account.account_name.clone(),
logged_account.account_name,
)
.await
.map_err(|e| match e {
RequestFlowError::Internal(e) => GqlError::Internal(e),
fs::RequestFlowError::Internal(e) => GqlError::Internal(e),
})?;

Ok(TriggerFlowResult::Success(TriggerFlowSuccess {
flow: res.into(),
}))
}

#[graphql(guard = "LoggedInGuard::new()")]
async fn cancel_flow(&self, ctx: &Context<'_>, flow_id: FlowID) -> Result<CancelFlowResult> {
ensure_scheduling_permission(ctx, &self.dataset_handle).await?;

if let Some(error) =
check_if_flow_belongs_to_dataset(ctx, flow_id, &self.dataset_handle).await?
{
return Ok(match error {
FlowInDatasetError::NotFound(e) => CancelFlowResult::NotFound(e),
FlowInDatasetError::UnmatchedInDataset(e) => {
CancelFlowResult::UnmatchedInDataset(e)
}
});
}

let flow_service = from_catalog::<dyn fs::FlowService>(ctx).unwrap();
let logged_account = utils::get_logged_account(ctx);

let res = flow_service
.cancel_flow(
flow_id.into(),
odf::AccountID::from(odf::FAKE_ACCOUNT_ID),
logged_account.account_name,
)
.await;

match res {
Ok(flow_state) => Ok(CancelFlowResult::Success(CancelFlowSuccess {
flow: flow_state.into(),
})),
Err(e) => match e {
fs::CancelFlowError::AlreadyFinished(_) => Ok(CancelFlowResult::AlreadyFinished(
CancelFlowAlreadyFinished { flow_id },
)),
fs::CancelFlowError::NotFound(_) => unreachable!("Flow checked already"),
fs::CancelFlowError::Internal(e) => Err(GqlError::Internal(e)),
},
}
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Debug, Clone)]
#[derive(Interface, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub enum TriggerFlowResult {
Success(TriggerFlowSuccess),
IncompatibleDatasetKind(FlowIncompatibleDatasetKind),
}

#[derive(SimpleObject, Debug, Clone)]
#[derive(SimpleObject, Clone)]
#[graphql(complex)]
pub struct TriggerFlowSuccess {
pub flow: Flow,
Expand All @@ -98,3 +134,40 @@ impl TriggerFlowSuccess {
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub enum CancelFlowResult {
Success(CancelFlowSuccess),
AlreadyFinished(CancelFlowAlreadyFinished),
NotFound(FlowNotFound),
UnmatchedInDataset(FlowUnmatchedInDataset),
}

#[derive(SimpleObject, Clone)]
#[graphql(complex)]
pub struct CancelFlowSuccess {
pub flow: Flow,
}

#[ComplexObject]
impl CancelFlowSuccess {
pub async fn message(&self) -> String {
format!("Success")
}
}

#[derive(SimpleObject, Clone)]
#[graphql(complex)]
pub struct CancelFlowAlreadyFinished {
pub flow_id: FlowID,
}

#[ComplexObject]
impl CancelFlowAlreadyFinished {
pub async fn message(&self) -> String {
format!("Cancelling already finished flows is not allowed")
}
}

///////////////////////////////////////////////////////////////////////////////
51 changes: 50 additions & 1 deletion src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
// by the Apache License, Version 2.0.

use kamu_core::GetSummaryOpts;
use opendatafabric as odf;
use {kamu_flow_system as fs, opendatafabric as odf};

use super::{FlowNotFound, FlowUnmatchedInDataset};
use crate::prelude::*;
use crate::utils;

Expand Down Expand Up @@ -37,6 +38,54 @@ pub(crate) async fn ensure_scheduling_permission(

///////////////////////////////////////////////////////////////////////////////

#[derive(Interface, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub(crate) enum FlowInDatasetError {
NotFound(FlowNotFound),
UnmatchedInDataset(FlowUnmatchedInDataset),
}

pub(crate) async fn check_if_flow_belongs_to_dataset(
ctx: &Context<'_>,
flow_id: FlowID,
dataset_handle: &odf::DatasetHandle,
) -> Result<Option<FlowInDatasetError>> {
let flow_service = from_catalog::<dyn fs::FlowService>(ctx).unwrap();

match flow_service.get_flow(flow_id.into()).await {
Ok(flow_state) => match flow_state.flow_key {
fs::FlowKey::Dataset(fk_dataset) => {
if fk_dataset.dataset_id != dataset_handle.id {
return Ok(Some(FlowInDatasetError::UnmatchedInDataset(
FlowUnmatchedInDataset {
flow_id,
dataset_alias: dataset_handle.alias.clone().into(),
},
)));
}
}
fs::FlowKey::System(_) => {
return Ok(Some(FlowInDatasetError::UnmatchedInDataset(
FlowUnmatchedInDataset {
flow_id,
dataset_alias: dataset_handle.alias.clone().into(),
},
)))
}
},
Err(e) => match e {
fs::GetFlowError::NotFound(_) => {
return Ok(Some(FlowInDatasetError::NotFound(FlowNotFound { flow_id })))
}
fs::GetFlowError::Internal(e) => return Err(GqlError::Internal(e)),
},
}

Ok(None)
}

///////////////////////////////////////////////////////////////////////////////

pub(crate) async fn ensure_expected_dataset_kind(
ctx: &Context<'_>,
dataset_handle: &odf::DatasetHandle,
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/graphql/src/mutations/flows_mut/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
// by the Apache License, Version 2.0.

mod dataset_flow_configs_mut;
mod dataset_flow_errors;
mod dataset_flow_runs_mut;
mod dataset_flows_mut;
mod flows_mut_utils;

pub(crate) use dataset_flow_configs_mut::*;
pub(crate) use dataset_flow_errors::*;
pub(crate) use dataset_flow_runs_mut::*;
pub(crate) use dataset_flows_mut::*;
pub(crate) use flows_mut_utils::*;
Loading
Loading