Skip to content

Commit

Permalink
Fix manual trigger config merging (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko authored Dec 13, 2024
1 parent 83ae80f commit 78f8c7f
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 4 deletions.
6 changes: 6 additions & 0 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,12 @@ type FlowAbortedResult {
message: String!
}

type FlowConfigSnapshotModified implements FlowEvent {
eventId: EventID!
eventTime: DateTime!
configSnapshot: FlowConfigurationSnapshot!
}

type FlowConfiguration {
ingest: FlowConfigurationIngest
compaction: FlowConfigurationCompaction
Expand Down
26 changes: 25 additions & 1 deletion src/adapter/graphql/src/queries/flows/flow_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use chrono::{DateTime, Utc};
use {event_sourcing as evs, kamu_flow_system as fs, kamu_task_system as ts};

use super::{FlowStartCondition, FlowTriggerType};
use super::{FlowConfigurationSnapshot, FlowStartCondition, FlowTriggerType};
use crate::prelude::*;
use crate::queries::Task;
use crate::utils;
Expand All @@ -27,6 +27,8 @@ pub enum FlowEvent {
Initiated(FlowEventInitiated),
/// Start condition defined
StartConditionUpdated(FlowEventStartConditionUpdated),
/// Config snapshot modified
ConfigSnapshotModified(FlowConfigSnapshotModified),
/// Flow scheduled for activation
ScheduledForActivation(FlowEventScheduledForActivation),
/// Secondary trigger added
Expand Down Expand Up @@ -61,6 +63,9 @@ impl FlowEvent {
start_condition,
))
}
fs::FlowEvent::ConfigSnapshotModified(e) => {
Self::ConfigSnapshotModified(FlowConfigSnapshotModified::build(event_id, e))
}
fs::FlowEvent::TriggerAdded(e) => {
Self::TriggerAdded(FlowEventTriggerAdded::build(event_id, e, ctx).await?)
}
Expand Down Expand Up @@ -165,6 +170,25 @@ impl FlowEventTriggerAdded {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject)]
pub struct FlowConfigSnapshotModified {
event_id: EventID,
event_time: DateTime<Utc>,
config_snapshot: FlowConfigurationSnapshot,
}

impl FlowConfigSnapshotModified {
pub(crate) fn build(event_id: evs::EventID, event: fs::FlowConfigSnapshotModified) -> Self {
Self {
event_id: event_id.into(),
event_time: event.event_time,
config_snapshot: event.config_snapshot.into(),
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject)]
#[graphql(complex)]
pub struct FlowEventScheduledForActivation {
Expand Down
14 changes: 14 additions & 0 deletions src/domain/flow-system/domain/src/aggregates/flow/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ impl Flow {
}
}

/// Define config snapshot
pub fn modify_config_snapshot(
&mut self,
now: DateTime<Utc>,
config_snapshot: FlowConfigurationSnapshot,
) -> Result<(), ProjectionError<FlowState>> {
let event = FlowConfigSnapshotModified {
event_time: now,
flow_id: self.flow_id,
config_snapshot,
};
self.apply(event)
}

/// Add extra trigger, if it's unique
pub fn add_trigger_if_unique(
&mut self,
Expand Down
18 changes: 18 additions & 0 deletions src/domain/flow-system/domain/src/entities/flow/flow_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum FlowEvent {
Initiated(FlowEventInitiated),
/// Start condition updated
StartConditionUpdated(FlowEventStartConditionUpdated),
/// Config snapshot modified
ConfigSnapshotModified(FlowConfigSnapshotModified),
/// Secondary trigger added
TriggerAdded(FlowEventTriggerAdded),
/// Scheduled for activation at a particular time
Expand All @@ -41,6 +43,7 @@ impl FlowEvent {
match self {
FlowEvent::Initiated(_) => "FlowEventInitiated",
FlowEvent::StartConditionUpdated(_) => "FlowEventStartConditionUpdated",
FlowEvent::ConfigSnapshotModified(_) => "FlowEventConfigSnapshotModified",
FlowEvent::TriggerAdded(_) => "FlowEventTriggerAdded",
FlowEvent::ScheduledForActivation(_) => "FlowEventScheduledForActivation",
FlowEvent::TaskScheduled(_) => "FlowEventTaskScheduled",
Expand Down Expand Up @@ -83,6 +86,15 @@ pub struct FlowEventTriggerAdded {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FlowConfigSnapshotModified {
pub event_time: DateTime<Utc>,
pub flow_id: FlowID,
pub config_snapshot: FlowConfigurationSnapshot,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FlowEventScheduledForActivation {
pub event_time: DateTime<Utc>,
Expand Down Expand Up @@ -133,6 +145,7 @@ impl FlowEvent {
match self {
FlowEvent::Initiated(e) => e.flow_id,
FlowEvent::StartConditionUpdated(e) => e.flow_id,
FlowEvent::ConfigSnapshotModified(e) => e.flow_id,
FlowEvent::TriggerAdded(e) => e.flow_id,
FlowEvent::ScheduledForActivation(e) => e.flow_id,
FlowEvent::TaskScheduled(e) => e.flow_id,
Expand All @@ -146,6 +159,7 @@ impl FlowEvent {
match self {
FlowEvent::Initiated(e) => e.event_time,
FlowEvent::StartConditionUpdated(e) => e.event_time,
FlowEvent::ConfigSnapshotModified(e) => e.event_time,
FlowEvent::TriggerAdded(e) => e.event_time,
FlowEvent::ScheduledForActivation(e) => e.event_time,
FlowEvent::TaskScheduled(e) => e.event_time,
Expand All @@ -159,6 +173,7 @@ impl FlowEvent {
match self {
FlowEvent::Initiated(_) => Some(FlowStatus::Waiting),
FlowEvent::StartConditionUpdated(_)
| FlowEvent::ConfigSnapshotModified(_)
| FlowEvent::TriggerAdded(_)
| FlowEvent::ScheduledForActivation(_)
| FlowEvent::TaskScheduled(_) => None,
Expand All @@ -174,6 +189,9 @@ impl_enum_variant!(FlowEvent::Initiated(FlowEventInitiated));
impl_enum_variant!(FlowEvent::StartConditionUpdated(
FlowEventStartConditionUpdated
));
impl_enum_variant!(FlowEvent::ConfigSnapshotModified(
FlowConfigSnapshotModified
));
impl_enum_variant!(FlowEvent::TriggerAdded(FlowEventTriggerAdded));
impl_enum_variant!(FlowEvent::ScheduledForActivation(
FlowEventScheduledForActivation
Expand Down
7 changes: 7 additions & 0 deletions src/domain/flow-system/domain/src/entities/flow/flow_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ impl Projection for FlowState {
})
}
}
E::ConfigSnapshotModified(FlowConfigSnapshotModified {
config_snapshot,
..
}) => Ok(FlowState {
config_snapshot: Some(config_snapshot),
..s
}),
E::TriggerAdded(FlowEventTriggerAdded { ref trigger, .. }) => {
if s.outcome.is_some() {
Err(ProjectionError::new(Some(s), event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ impl FlowSchedulingHelper {
throttling_boundary_time,
trigger_time,
)?;
};

if let Some(config_snapshot) = config_snapshot_maybe {
flow.modify_config_snapshot(trigger_time, config_snapshot)
.int_err()?;
}

// Schedule the flow earlier than previously planned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ async fn test_manual_trigger() {
finish_in_with: Some((Duration::milliseconds(10), TaskOutcome::Success(TaskResult::Empty))),
expected_logical_plan: LogicalPlan::UpdateDataset(LogicalPlanUpdateDataset {
dataset_id: bar_id.clone(),
fetch_uncacheable: false
fetch_uncacheable: true
}),
});
let task2_handle = task2_driver.run();
Expand All @@ -501,6 +501,7 @@ async fn test_manual_trigger() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(40),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -509,6 +510,9 @@ async fn test_manual_trigger() {
flow_key: bar_flow_key,
run_since_start: Duration::milliseconds(80),
initiator_id: None,
flow_configuration_snapshot_maybe: Some(FlowConfigurationSnapshot::Ingest(IngestRule {
fetch_uncacheable: true
})),
});
let trigger1_handle = trigger1_driver.run();

Expand Down Expand Up @@ -718,6 +722,7 @@ async fn test_ingest_trigger_with_ingest_config() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(40),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -726,6 +731,7 @@ async fn test_ingest_trigger_with_ingest_config() {
flow_key: bar_flow_key,
run_since_start: Duration::milliseconds(80),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger1_handle = trigger1_driver.run();

Expand Down Expand Up @@ -908,6 +914,7 @@ async fn test_manual_trigger_compaction() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -916,6 +923,7 @@ async fn test_manual_trigger_compaction() {
flow_key: bar_flow_key,
run_since_start: Duration::milliseconds(50),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger1_handle = trigger1_driver.run();

Expand Down Expand Up @@ -1054,6 +1062,7 @@ async fn test_manual_trigger_reset() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -1170,6 +1179,7 @@ async fn test_reset_trigger_keep_metadata_compaction_for_derivatives() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -1383,6 +1393,7 @@ async fn test_manual_trigger_compaction_with_config() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(20),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -1487,6 +1498,7 @@ async fn test_full_hard_compaction_trigger_keep_metadata_compaction_for_derivati
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -1714,6 +1726,7 @@ async fn test_manual_trigger_keep_metadata_only_with_recursive_compaction() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -1943,6 +1956,7 @@ async fn test_manual_trigger_keep_metadata_only_without_recursive_compaction() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -2099,6 +2113,7 @@ async fn test_manual_trigger_keep_metadata_only_compaction_multiple_accounts() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand Down Expand Up @@ -3319,6 +3334,7 @@ async fn test_throttling_manual_triggers() {
flow_key: foo_flow_key.clone(),
run_since_start: Duration::milliseconds(20),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -3327,6 +3343,7 @@ async fn test_throttling_manual_triggers() {
flow_key: foo_flow_key.clone(),
run_since_start: Duration::milliseconds(30),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger1_handle = trigger1_driver.run();

Expand All @@ -3335,6 +3352,7 @@ async fn test_throttling_manual_triggers() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(70),
initiator_id: None,
flow_configuration_snapshot_maybe: None,
});
let trigger2_handle = trigger2_driver.run();

Expand Down Expand Up @@ -5389,6 +5407,7 @@ async fn test_list_all_flow_initiators() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: Some(foo_account_id.clone()),
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -5397,6 +5416,7 @@ async fn test_list_all_flow_initiators() {
flow_key: bar_flow_key,
run_since_start: Duration::milliseconds(50),
initiator_id: Some(bar_account_id.clone()),
flow_configuration_snapshot_maybe: None,
});
let trigger1_handle = trigger1_driver.run();

Expand Down Expand Up @@ -5549,6 +5569,7 @@ async fn test_list_all_datasets_with_flow() {
flow_key: foo_flow_key,
run_since_start: Duration::milliseconds(10),
initiator_id: Some(foo_account_id.clone()),
flow_configuration_snapshot_maybe: None,
});
let trigger0_handle = trigger0_driver.run();

Expand All @@ -5557,6 +5578,7 @@ async fn test_list_all_datasets_with_flow() {
flow_key: bar_flow_key,
run_since_start: Duration::milliseconds(50),
initiator_id: Some(bar_account_id.clone()),
flow_configuration_snapshot_maybe: None,
});
let trigger1_handle = trigger1_driver.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use chrono::{DateTime, Duration, Utc};
use database_common_macros::transactional_method1;
use dill::Catalog;
use kamu_accounts::DEFAULT_ACCOUNT_ID;
use kamu_flow_system::{FlowKey, FlowQueryService, RequestFlowError};
use kamu_flow_system::{FlowConfigurationSnapshot, FlowKey, FlowQueryService, RequestFlowError};
use opendatafabric::AccountID;
use time_source::SystemTimeSource;

Expand All @@ -29,6 +29,7 @@ pub(crate) struct ManualFlowTriggerArgs {
pub(crate) flow_key: FlowKey,
pub(crate) run_since_start: Duration,
pub(crate) initiator_id: Option<AccountID>,
pub(crate) flow_configuration_snapshot_maybe: Option<FlowConfigurationSnapshot>,
}

impl ManualFlowTriggerDriver {
Expand Down Expand Up @@ -65,7 +66,7 @@ impl ManualFlowTriggerDriver {
.initiator_id
.clone()
.unwrap_or(DEFAULT_ACCOUNT_ID.clone()),
None,
self.args.flow_configuration_snapshot_maybe.clone(),
)
.await?;
Ok(())
Expand Down

0 comments on commit 78f8c7f

Please sign in to comment.