diff --git a/Cargo.lock b/Cargo.lock index e6b890dfc3..557ffecfc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3282,6 +3282,7 @@ dependencies = [ "kamu-core", "kamu-data-utils", "kamu-flow-system", + "kamu-flow-system-inmem", "kamu-task-system", "mockall", "opendatafabric", diff --git a/resources/schema.gql b/resources/schema.gql index 2333ce3b52..a95f838c92 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -142,7 +142,7 @@ type CreateDatasetResultSuccess implements CreateDatasetResult & CreateDatasetFr } type CronExpression { - expression: String! + cronExpression: String! } type DataBatch { @@ -302,9 +302,9 @@ type DatasetFlowConfigs { } type DatasetFlowConfigsMut { - setConfigTimeDelta(datasetFlowType: DatasetFlowType!, paused: Boolean!, every: TimeDeltaInput!): Boolean! - setConfigCronExpression(datasetFlowType: DatasetFlowType!, paused: Boolean!, cronExpression: String!): Boolean! - setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, throttlingPeriod: TimeDeltaInput, minimalDataBatch: Int): Boolean! + setConfigTimeDelta(datasetFlowType: DatasetFlowType!, paused: Boolean!, every: TimeDeltaInput!): FlowConfiguration! + setConfigCronExpression(datasetFlowType: DatasetFlowType!, paused: Boolean!, cronExpression: String!): FlowConfiguration! + setConfigBatching(datasetFlowType: DatasetFlowType!, paused: Boolean!, throttlingPeriod: TimeDeltaInput, minimalDataBatch: Int): FlowConfiguration! } enum DatasetFlowType { diff --git a/src/adapter/graphql/Cargo.toml b/src/adapter/graphql/Cargo.toml index b4b2a8aeea..4d728fc71a 100644 --- a/src/adapter/graphql/Cargo.toml +++ b/src/adapter/graphql/Cargo.toml @@ -44,8 +44,9 @@ thiserror = { version = "1", default-features = false } [dev-dependencies] # TODO: Limit to mock or in-memory implementations only +event-bus = { workspace = true } kamu = { workspace = true } -event-bus = {workspace = true } +kamu-flow-system-inmem = { workspace = true } env_logger = "0.10" mockall = "0.11" diff --git a/src/adapter/graphql/src/mutations/dataset_flow_configs_mut.rs b/src/adapter/graphql/src/mutations/dataset_flow_configs_mut.rs index 4909ad1714..db8227c971 100644 --- a/src/adapter/graphql/src/mutations/dataset_flow_configs_mut.rs +++ b/src/adapter/graphql/src/mutations/dataset_flow_configs_mut.rs @@ -22,7 +22,7 @@ use kamu_flow_system::{ use opendatafabric as odf; use crate::prelude::*; -use crate::queries::{DatasetFlowType, TimeUnit}; +use crate::queries::{DatasetFlowType, FlowConfiguration, TimeUnit}; use crate::{utils, LoggedInGuard}; /////////////////////////////////////////////////////////////////////////////// @@ -45,14 +45,14 @@ impl DatasetFlowConfigsMut { dataset_flow_type: DatasetFlowType, paused: bool, every: TimeDeltaInput, - ) -> Result { + ) -> Result { self.ensure_expected_dataset_kind(ctx, dataset_flow_type) .await?; self.ensure_scheduling_permission(ctx).await?; let flow_config_service = from_catalog::(ctx).unwrap(); - flow_config_service + let res = flow_config_service .set_configuration( Utc::now(), FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into()) @@ -67,7 +67,7 @@ impl DatasetFlowConfigsMut { SetFlowConfigurationError::Internal(e) => GqlError::Internal(e), })?; - Ok(true) + Ok(res.into()) } #[graphql(guard = "LoggedInGuard::new()")] @@ -77,21 +77,21 @@ impl DatasetFlowConfigsMut { dataset_flow_type: DatasetFlowType, paused: bool, cron_expression: String, - ) -> Result { + ) -> Result { self.ensure_expected_dataset_kind(ctx, dataset_flow_type) .await?; self.ensure_scheduling_permission(ctx).await?; let flow_config_service = from_catalog::(ctx).unwrap(); - flow_config_service + let res = flow_config_service .set_configuration( Utc::now(), FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into()) .into(), paused, FlowConfigurationRule::Schedule(Schedule::CronExpression(ScheduleCronExpression { - expression: cron_expression, + cron_expression, })), ) .await @@ -99,7 +99,7 @@ impl DatasetFlowConfigsMut { SetFlowConfigurationError::Internal(e) => GqlError::Internal(e), })?; - Ok(true) + Ok(res.into()) } #[graphql(guard = "LoggedInGuard::new()")] @@ -110,14 +110,14 @@ impl DatasetFlowConfigsMut { paused: bool, throttling_period: Option, minimal_data_batch: Option, - ) -> Result { + ) -> Result { self.ensure_expected_dataset_kind(ctx, dataset_flow_type) .await?; self.ensure_scheduling_permission(ctx).await?; let flow_config_service = from_catalog::(ctx).unwrap(); - flow_config_service + let res = flow_config_service .set_configuration( Utc::now(), FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into()) @@ -133,7 +133,7 @@ impl DatasetFlowConfigsMut { SetFlowConfigurationError::Internal(e) => GqlError::Internal(e), })?; - Ok(true) + Ok(res.into()) } #[graphql(skip)] diff --git a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs index ab7332fc68..f168dbe68d 100644 --- a/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs +++ b/src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs @@ -61,9 +61,32 @@ impl DatasetFlowConfigs { .await .int_err()?; - Ok(maybe_flow_config.map(|flow_config| FlowConfiguration { - paused: !flow_config.is_active(), - batching: if let FlowConfigurationRule::StartCondition(condition) = &flow_config.rule { + Ok(maybe_flow_config.map(|flow_config| flow_config.into())) + } +} + +/////////////////////////////////////////////////////////////////////////////// + +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +#[graphql(remote = "kamu_flow_system::DatasetFlowType")] +pub enum DatasetFlowType { + Ingest, + ExecuteQuery, + Compaction, +} + +#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)] +pub struct FlowConfiguration { + pub paused: bool, + pub schedule: Option, + pub batching: Option, +} + +impl From for FlowConfiguration { + fn from(value: kamu_flow_system::FlowConfigurationState) -> Self { + Self { + paused: !value.is_active(), + batching: if let FlowConfigurationRule::StartCondition(condition) = &value.rule { Some(FlowConfigurationBatching { throttling_period: condition.throttling_period.map(|tp| tp.into()), minimal_data_batch: condition.minimal_data_batch, @@ -71,7 +94,7 @@ impl DatasetFlowConfigs { } else { None }, - schedule: if let FlowConfigurationRule::Schedule(schedule) = flow_config.rule { + schedule: if let FlowConfigurationRule::Schedule(schedule) = value.rule { match schedule { Schedule::TimeDelta(time_delta) => Some(FlowConfigurationSchedule::TimeDelta( time_delta.every.into(), @@ -83,35 +106,16 @@ impl DatasetFlowConfigs { } else { None }, - })) + } } } -/////////////////////////////////////////////////////////////////////////////// - -#[derive(Enum, Copy, Clone, Eq, PartialEq)] -#[graphql(remote = "kamu_flow_system::DatasetFlowType")] -pub enum DatasetFlowType { - Ingest, - ExecuteQuery, - Compaction, -} - -#[derive(SimpleObject, Debug, Clone, PartialEq, Eq)] -pub struct FlowConfiguration { - pub paused: bool, - pub schedule: Option, - pub batching: Option, -} - #[derive(Union, Debug, Clone, PartialEq, Eq)] pub enum FlowConfigurationSchedule { TimeDelta(TimeDelta), Cron(CronExpression), } -///////////////////////////////////////////////////////////////////////////////// - #[derive(SimpleObject, Debug, Clone, PartialEq, Eq)] pub struct FlowConfigurationBatching { pub throttling_period: Option, @@ -122,13 +126,13 @@ pub struct FlowConfigurationBatching { #[derive(SimpleObject, Debug, Clone, PartialEq, Eq)] pub struct CronExpression { - pub expression: String, + pub cron_expression: String, } impl From for CronExpression { fn from(value: ScheduleCronExpression) -> Self { Self { - expression: value.expression, + cron_expression: value.cron_expression, } } } diff --git a/src/adapter/graphql/tests/tests/mod.rs b/src/adapter/graphql/tests/tests/mod.rs index 35b5dfdc43..6acfc56868 100644 --- a/src/adapter/graphql/tests/tests/mod.rs +++ b/src/adapter/graphql/tests/tests/mod.rs @@ -11,6 +11,7 @@ mod test_accounts; mod test_auth; mod test_error_handling; mod test_gql_data; +mod test_gql_dataset_flow_configs; mod test_gql_datasets; mod test_gql_metadata; mod test_gql_metadata_chain; diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs new file mode 100644 index 0000000000..bbbb302004 --- /dev/null +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs @@ -0,0 +1,730 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use async_graphql::{value, Value}; +use dill::Component; +use event_bus::EventBus; +use indoc::indoc; +use kamu::testing::MetadataFactory; +use kamu::{DatasetRepositoryLocalFs, DependencyGraphServiceInMemory}; +use kamu_core::{auth, CreateDatasetResult, DatasetRepository, SystemTimeSourceDefault}; +use kamu_flow_system_inmem::{FlowConfigurationEventStoreInMem, FlowConfigurationServiceInMemory}; +use opendatafabric::*; + +use crate::utils::{authentication_catalogs, expect_anonymous_access_error}; + +//////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_crud_time_delta_root_dataset() { + let harness = FlowConfigHarness::new(); + + let create_result = harness.create_root_dataset().await; + + let request_code = indoc!( + r#" + { + datasets { + byId (datasetId: "") { + flowConfigs { + flowConfig (datasetFlowType: "INGEST") { + __typename + paused + schedule { + __typename + ... on TimeDelta { + every + unit + } + } + batching { + __typename + } + } + } + } + } + } + "# + ) + .replace("", &create_result.dataset_handle.id.to_string()); + + let schema = kamu_adapter_graphql::schema_quiet(); + let res = schema + .execute( + async_graphql::Request::new(request_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "flowConfig": Value::Null + } + } + } + }) + ); + + let mutation_code = FlowConfigHarness::set_config_time_delta_mutation( + &create_result.dataset_handle.id, + "INGEST", + false, + 1, + "DAYS", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "setConfigTimeDelta": { + "__typename": "FlowConfiguration", + "paused": false, + "schedule": { + "__typename": "TimeDelta", + "every": 1, + "unit": "DAYS" + }, + "batching": Value::Null + } + } + } + } + }) + ); + + let mutation_code = FlowConfigHarness::set_config_time_delta_mutation( + &create_result.dataset_handle.id, + "INGEST", + true, + 2, + "HOURS", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "setConfigTimeDelta": { + "__typename": "FlowConfiguration", + "paused": true, + "schedule": { + "__typename": "TimeDelta", + "every": 2, + "unit": "HOURS" + }, + "batching": Value::Null + } + } + } + } + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_crud_cron_root_dataset() { + let harness = FlowConfigHarness::new(); + + let create_result = harness.create_root_dataset().await; + + let request_code = indoc!( + r#" + { + datasets { + byId (datasetId: "") { + flowConfigs { + flowConfig (datasetFlowType: "INGEST") { + __typename + paused + schedule { + __typename + ... on CronExpression { + cronExpression + } + } + batching { + __typename + } + } + } + } + } + } + "# + ) + .replace("", &create_result.dataset_handle.id.to_string()); + + let schema = kamu_adapter_graphql::schema_quiet(); + let res = schema + .execute( + async_graphql::Request::new(request_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "flowConfig": Value::Null + } + } + } + }) + ); + + let mutation_code = FlowConfigHarness::set_config_cron_expression_multation( + &create_result.dataset_handle.id, + "INGEST", + false, + "0 */2 * * *", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "setConfigCronExpression": { + "__typename": "FlowConfiguration", + "paused": false, + "schedule": { + "__typename": "CronExpression", + "cronExpression": "0 */2 * * *", + }, + "batching": Value::Null + } + } + } + } + }) + ); + + let mutation_code = FlowConfigHarness::set_config_cron_expression_multation( + &create_result.dataset_handle.id, + "INGEST", + true, + "0 0 */1 * *", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "setConfigCronExpression": { + "__typename": "FlowConfiguration", + "paused": true, + "schedule": { + "__typename": "CronExpression", + "cronExpression": "0 0 */1 * *", + }, + "batching": Value::Null + } + } + } + } + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_crud_batching_derived_dataset() { + let harness = FlowConfigHarness::new(); + + harness.create_root_dataset().await; + let create_derived_result = harness.create_derived_dataset().await; + + let request_code = indoc!( + r#" + { + datasets { + byId (datasetId: "") { + flowConfigs { + flowConfig (datasetFlowType: "EXECUTE_QUERY") { + __typename + paused + schedule { + __typename + } + batching { + __typename + throttlingPeriod { + every + unit + } + minimalDataBatch + } + } + } + } + } + } + "# + ) + .replace("", &create_derived_result.dataset_handle.id.to_string()); + + let schema = kamu_adapter_graphql::schema_quiet(); + let res = schema + .execute( + async_graphql::Request::new(request_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "flowConfig": Value::Null + } + } + } + }) + ); + + let mutation_code = FlowConfigHarness::set_config_batching_mutation( + &create_derived_result.dataset_handle.id, + "EXECUTE_QUERY", + false, + (30, "MINUTES"), + 100, + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_ok(), "{:?}", res); + assert_eq!( + res.data, + value!({ + "datasets": { + "byId": { + "flowConfigs": { + "setConfigBatching": { + "__typename": "FlowConfiguration", + "paused": false, + "schedule": Value::Null, + "batching": { + "__typename": "FlowConfigurationBatching", + "throttlingPeriod": { + "every": 30, + "unit": "MINUTES" + }, + "minimalDataBatch": 100 + } + } + } + } + } + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_incorrect_dataset_kinds_for_flow_type() { + let harness = FlowConfigHarness::new(); + + let create_root_result = harness.create_root_dataset().await; + let create_derived_result = harness.create_derived_dataset().await; + + //// + + let mutation_code = FlowConfigHarness::set_config_batching_mutation( + &create_root_result.dataset_handle.id, + "EXECUTE_QUERY", + false, + (30, "MINUTES"), + 100, + ); + + let schema = kamu_adapter_graphql::schema_quiet(); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_err()); + assert_eq!( + res.errors + .into_iter() + .map(|e| e.message) + .collect::>(), + vec!["Expected Derivative dataset kind".to_string()] + ); + + //// + + let mutation_code = FlowConfigHarness::set_config_cron_expression_multation( + &create_derived_result.dataset_handle.id, + "INGEST", + false, + "0 */2 * * *", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_err()); + assert_eq!( + res.errors + .into_iter() + .map(|e| e.message) + .collect::>(), + vec!["Expected Root dataset kind".to_string()] + ); + + //// + + let mutation_code = FlowConfigHarness::set_config_time_delta_mutation( + &create_derived_result.dataset_handle.id, + "INGEST", + false, + 2, + "HOURS", + ); + + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_authorized.clone()), + ) + .await; + + assert!(res.is_err()); + assert_eq!( + res.errors + .into_iter() + .map(|e| e.message) + .collect::>(), + vec!["Expected Root dataset kind".to_string()] + ); +} + +//////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_anonymous_setters_fail() { + let harness = FlowConfigHarness::new(); + + let create_root_result = harness.create_root_dataset().await; + let create_derived_result = harness.create_derived_dataset().await; + + let mutation_codes = [ + FlowConfigHarness::set_config_time_delta_mutation( + &create_root_result.dataset_handle.id, + "INGEST", + false, + 30, + "MINUTES", + ), + FlowConfigHarness::set_config_cron_expression_multation( + &create_root_result.dataset_handle.id, + "INGEST", + false, + "* */2 * * *", + ), + FlowConfigHarness::set_config_batching_mutation( + &create_derived_result.dataset_handle.id, + "EXECUTE_QUERY", + false, + (30, "MINUTES"), + 100, + ), + ]; + + let schema = kamu_adapter_graphql::schema_quiet(); + for mutation_code in mutation_codes { + let res = schema + .execute( + async_graphql::Request::new(mutation_code.clone()) + .data(harness.catalog_anonymous.clone()), + ) + .await; + + expect_anonymous_access_error(res); + } +} + +//////////////////////////////////////////////////////////////////////////////////////// + +struct FlowConfigHarness { + _tempdir: tempfile::TempDir, + _catalog_base: dill::Catalog, + catalog_anonymous: dill::Catalog, + catalog_authorized: dill::Catalog, + dataset_repo: Arc, +} + +impl FlowConfigHarness { + fn new() -> Self { + let tempdir = tempfile::tempdir().unwrap(); + + let catalog_base = dill::CatalogBuilder::new() + .add::() + .add_builder( + DatasetRepositoryLocalFs::builder() + .with_root(tempdir.path().join("datasets")) + .with_multi_tenant(false), + ) + .bind::() + .add::() + .add::() + .add::() + .add::() + .add::() + .build(); + + // Init dataset with no sources + let (catalog_anonymous, catalog_authorized) = authentication_catalogs(&catalog_base); + + let dataset_repo = catalog_authorized + .get_one::() + .unwrap(); + + Self { + _tempdir: tempdir, + _catalog_base: catalog_base, + catalog_anonymous, + catalog_authorized, + dataset_repo, + } + } + + async fn create_root_dataset(&self) -> CreateDatasetResult { + self.dataset_repo + .create_dataset_from_snapshot( + None, + MetadataFactory::dataset_snapshot() + .kind(DatasetKind::Root) + .name("foo") + .build(), + ) + .await + .unwrap() + } + + async fn create_derived_dataset(&self) -> CreateDatasetResult { + self.dataset_repo + .create_dataset_from_snapshot( + None, + MetadataFactory::dataset_snapshot() + .name("bar") + .kind(DatasetKind::Derivative) + .push_event(MetadataFactory::set_transform(["foo"]).build()) + .build(), + ) + .await + .unwrap() + } + + fn set_config_time_delta_mutation( + id: &DatasetID, + dataset_flow_type: &str, + paused: bool, + every: u64, + unit: &str, + ) -> String { + indoc!( + r#" + mutation { + datasets { + byId (datasetId: "") { + flowConfigs { + setConfigTimeDelta ( + datasetFlowType: "", + paused: , + every: { every: , unit: "" } + ) { + __typename, + paused + schedule { + __typename + ... on TimeDelta { + every + unit + } + } + batching { + __typename + } + } + } + } + } + } + "# + ) + .replace("", &id.to_string()) + .replace("", dataset_flow_type) + .replace("", if paused { "true" } else { "false" }) + .replace("", every.to_string().as_str()) + .replace("", unit) + } + + fn set_config_cron_expression_multation( + id: &DatasetID, + dataset_flow_type: &str, + paused: bool, + cron_expression: &str, + ) -> String { + indoc!( + r#" + mutation { + datasets { + byId (datasetId: "") { + flowConfigs { + setConfigCronExpression ( + datasetFlowType: "", + paused: , + cronExpression: "" + ) { + __typename, + paused + schedule { + __typename + ... on CronExpression { + cronExpression + } + } + batching { + __typename + } + } + } + } + } + } + "# + ) + .replace("", &id.to_string()) + .replace("", dataset_flow_type) + .replace("", if paused { "true" } else { "false" }) + .replace("", cron_expression) + } + + fn set_config_batching_mutation( + id: &DatasetID, + dataset_flow_type: &str, + paused: bool, + throttling_period: (u64, &str), + minimial_data_batch: u64, + ) -> String { + indoc!( + r#" + mutation { + datasets { + byId (datasetId: "") { + flowConfigs { + setConfigBatching ( + datasetFlowType: "", + paused: , + throttlingPeriod: { every: , unit: "" }, + minimalDataBatch: + ) { + __typename, + paused + schedule { + __typename + } + batching { + __typename + throttlingPeriod { + every + unit + } + minimalDataBatch + } + } + } + } + } + } + "# + ) + .replace("", &id.to_string()) + .replace("", dataset_flow_type) + .replace("", if paused { "true" } else { "false" }) + .replace("", &throttling_period.0.to_string()) + .replace("", throttling_period.1) + .replace("", &minimial_data_batch.to_string()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/flow-system/src/entities/shared/schedule.rs b/src/domain/flow-system/src/entities/shared/schedule.rs index 7967490fe0..17fc9fba7c 100644 --- a/src/domain/flow-system/src/entities/shared/schedule.rs +++ b/src/domain/flow-system/src/entities/shared/schedule.rs @@ -31,7 +31,7 @@ pub struct ScheduleTimeDelta { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ScheduleCronExpression { - pub expression: String, + pub cron_expression: String, } /////////////////////////////////////////////////////////////////////////////////////////