Skip to content

Commit

Permalink
Fix review comments - Iter 2
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Dec 16, 2024
1 parent 78f8c7f commit dfc1cd9
Show file tree
Hide file tree
Showing 25 changed files with 91 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE flow_trigger_events
event_payload JSONB NOT NULL
);

CREATE INDEX idx_flow_trigger_events_dataset_flow_key
CREATE UNIQUE INDEX idx_flow_trigger_events_dataset_flow_key
ON flow_trigger_events (dataset_id, dataset_flow_type)
WHERE dataset_id IS NOT NULL;

Expand Down
10 changes: 5 additions & 5 deletions src/adapter/graphql/src/queries/flows/flow_config_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ pub struct FlowConfigurationCompactionRule {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

impl From<fs::FlowConfigurationSnapshot> for FlowConfigurationSnapshot {
fn from(value: fs::FlowConfigurationSnapshot) -> Self {
impl From<fs::FlowConfigurationRule> for FlowConfigurationSnapshot {
fn from(value: fs::FlowConfigurationRule) -> Self {
match value {
fs::FlowConfigurationSnapshot::Ingest(ingest_rule) => Self::Ingest(ingest_rule.into()),
fs::FlowConfigurationSnapshot::Reset(reset_rule) => Self::Reset(reset_rule.into()),
fs::FlowConfigurationSnapshot::Compaction(compaction_rule) => {
fs::FlowConfigurationRule::IngestRule(ingest_rule) => Self::Ingest(ingest_rule.into()),
fs::FlowConfigurationRule::ResetRule(reset_rule) => Self::Reset(reset_rule.into()),
fs::FlowConfigurationRule::CompactionRule(compaction_rule) => {
Self::Compaction(FlowConfigurationCompactionRule {
compaction_rule: match compaction_rule {
fs::CompactionRule::Full(compaction_full_rule) => {
Expand Down
11 changes: 5 additions & 6 deletions src/adapter/graphql/src/scalars/flow_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use kamu_flow_system::{
CompactionRuleFull,
CompactionRuleMetadataOnly,
FlowConfigurationRule,
FlowConfigurationSnapshot,
IngestRule,
ResetRule,
};
Expand Down Expand Up @@ -274,12 +273,12 @@ impl FlowRunConfiguration {
dataset_flow_type: &DatasetFlowType,
dataset_handle: &DatasetHandle,
flow_run_configuration_maybe: Option<&FlowRunConfiguration>,
) -> Result<Option<FlowConfigurationSnapshot>, FlowInvalidRunConfigurations> {
) -> Result<Option<FlowConfigurationRule>, FlowInvalidRunConfigurations> {
match dataset_flow_type {
DatasetFlowType::Ingest => {
if let Some(flow_run_configuration) = flow_run_configuration_maybe {
if let Self::Ingest(ingest_input) = flow_run_configuration {
return Ok(Some(FlowConfigurationSnapshot::Ingest(IngestRule {
return Ok(Some(FlowConfigurationRule::IngestRule(IngestRule {
fetch_uncacheable: ingest_input.fetch_uncacheable,
})));
}
Expand All @@ -293,7 +292,7 @@ impl FlowRunConfiguration {
DatasetFlowType::HardCompaction => {
if let Some(flow_run_configuration) = flow_run_configuration_maybe {
if let Self::Compaction(compaction_input) = flow_run_configuration {
return Ok(Some(FlowConfigurationSnapshot::Compaction(
return Ok(Some(FlowConfigurationRule::CompactionRule(
match compaction_input {
CompactionConditionInput::Full(compaction_input) => {
CompactionRule::Full(
Expand Down Expand Up @@ -344,7 +343,7 @@ impl FlowRunConfiguration {
} else {
current_head_hash
};
return Ok(Some(FlowConfigurationSnapshot::Reset(ResetRule {
return Ok(Some(FlowConfigurationRule::ResetRule(ResetRule {
new_head_hash: reset_input.new_head_hash().map(Into::into),
old_head_hash,
recursive: reset_input.recursive,
Expand All @@ -355,7 +354,7 @@ impl FlowRunConfiguration {
.to_string(),
});
}
return Ok(Some(FlowConfigurationSnapshot::Reset(ResetRule {
return Ok(Some(FlowConfigurationRule::ResetRule(ResetRule {
new_head_hash: None,
old_head_hash: current_head_hash,
recursive: false,
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/graphql/src/scalars/flow_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl From<kamu_flow_system::FlowTriggerState> for FlowTrigger {
fn from(value: kamu_flow_system::FlowTriggerState) -> Self {
Self {
paused: !value.is_active(),
batching: if let Some(FlowTriggerRule::Batching(condition)) = &value.rule {
batching: if let FlowTriggerRule::Batching(condition) = &value.rule {
Some((*condition).into())
} else {
None
},
schedule: if let Some(FlowTriggerRule::Schedule(schedule_rule)) = &value.rule {
schedule: if let FlowTriggerRule::Schedule(schedule_rule) = &value.rule {
Some(schedule_rule.clone().into())
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions src/domain/flow-system/domain/src/aggregates/flow/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Flow {
flow_id: FlowID,
flow_key: FlowKey,
trigger: FlowTriggerType,
config_snapshot: Option<FlowConfigurationSnapshot>,
config_snapshot: Option<FlowConfigurationRule>,
) -> Self {
Self(
Aggregate::new(
Expand Down Expand Up @@ -65,7 +65,7 @@ impl Flow {
pub fn modify_config_snapshot(
&mut self,
now: DateTime<Utc>,
config_snapshot: FlowConfigurationSnapshot,
config_snapshot: FlowConfigurationRule,
) -> Result<(), ProjectionError<FlowState>> {
let event = FlowConfigSnapshotModified {
event_time: now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ pub struct FlowTrigger(Aggregate<FlowTriggerState, (dyn FlowTriggerEventStore +

impl FlowTrigger {
/// Creates a flow trigger rule
pub fn new(
now: DateTime<Utc>,
flow_key: FlowKey,
paused: bool,
rule: Option<FlowTriggerRule>,
) -> Self {
pub fn new(now: DateTime<Utc>, flow_key: FlowKey, paused: bool, rule: FlowTriggerRule) -> Self {
Self(
Aggregate::new(
flow_key.clone(),
Expand All @@ -50,7 +45,7 @@ impl FlowTrigger {
event_time: now,
flow_key: self.flow_key.clone(),
paused,
rule: Some(new_rule),
rule: new_rule,
};
self.apply(event)
}
Expand Down
4 changes: 2 additions & 2 deletions src/domain/flow-system/domain/src/entities/flow/flow_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct FlowEventInitiated {
pub flow_id: FlowID,
pub flow_key: FlowKey,
pub trigger: FlowTriggerType,
pub config_snapshot: Option<FlowConfigurationSnapshot>,
pub config_snapshot: Option<FlowConfigurationRule>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -90,7 +90,7 @@ pub struct FlowEventTriggerAdded {
pub struct FlowConfigSnapshotModified {
pub event_time: DateTime<Utc>,
pub flow_id: FlowID,
pub config_snapshot: FlowConfigurationSnapshot,
pub config_snapshot: FlowConfigurationRule,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct FlowState {
/// Flow outcome
pub outcome: Option<FlowOutcome>,
/// Flow config snapshot on the moment when flow was initiated
pub config_snapshot: Option<FlowConfigurationSnapshot>,
pub config_snapshot: Option<FlowConfigurationRule>,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct FlowTriggerEventCreated {
pub event_time: DateTime<Utc>,
pub flow_key: FlowKey,
pub paused: bool,
pub rule: Option<FlowTriggerRule>,
pub rule: FlowTriggerRule,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -39,7 +39,7 @@ pub struct FlowTriggerEventModified {
pub event_time: DateTime<Utc>,
pub flow_key: FlowKey,
pub paused: bool,
pub rule: Option<FlowTriggerRule>,
pub rule: FlowTriggerRule,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct FlowTriggerState {
/// Flow key
pub flow_key: FlowKey,
/// Trigger rule
pub rule: Option<FlowTriggerRule>,
pub rule: FlowTriggerRule,
/// Trigger status
pub status: FlowTriggerStatus,
}
Expand All @@ -31,17 +31,17 @@ impl FlowTriggerState {
}

pub fn try_get_schedule_rule(self) -> Option<Schedule> {
self.rule.and_then(|rule| match rule {
match self.rule {
FlowTriggerRule::Schedule(schedule) => Some(schedule),
FlowTriggerRule::Batching(_) => None,
})
}
}

pub fn try_get_batching_rule(self) -> Option<BatchingRule> {
self.rule.and_then(|rule| match rule {
match self.rule {
FlowTriggerRule::Batching(batching) => Some(batching),
FlowTriggerRule::Schedule(_) => None,
})
}
}
}

Expand Down

This file was deleted.

2 changes: 0 additions & 2 deletions src/domain/flow-system/domain/src/entities/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
mod batching_rule;
mod compaction_rule;
mod flow_key;
mod flow_run_snapshot;
mod flow_task_metadata;
mod flow_type;
mod ingest_rule;
Expand All @@ -20,7 +19,6 @@ mod schedule;
pub use batching_rule::*;
pub use compaction_rule::*;
pub use flow_key::*;
pub use flow_run_snapshot::*;
pub use flow_task_metadata::*;
pub use flow_type::*;
pub use ingest_rule::*;
Expand Down
2 changes: 1 addition & 1 deletion src/domain/flow-system/domain/src/flow_messages_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub struct FlowTriggerUpdatedMessage {
pub event_time: DateTime<Utc>,
pub flow_key: FlowKey,
pub paused: bool,
pub rule: Option<FlowTriggerRule>,
pub rule: FlowTriggerRule,
}

impl Message for FlowTriggerUpdatedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio_stream::Stream;
use crate::{
AccountFlowFilters,
DatasetFlowFilters,
FlowConfigurationSnapshot,
FlowConfigurationRule,
FlowID,
FlowKey,
FlowState,
Expand Down Expand Up @@ -89,7 +89,7 @@ pub trait FlowQueryService: Sync + Send {
trigger_time: DateTime<Utc>,
flow_key: FlowKey,
initiator_account_id: AccountID,
flow_run_snapshots_maybe: Option<FlowConfigurationSnapshot>,
flow_run_snapshot_maybe: Option<FlowConfigurationRule>,
) -> Result<FlowState, RequestFlowError>;

/// Attempts to cancel the tasks already scheduled for the given flow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub trait FlowConfigurationServiceExt {
async fn try_get_config_snapshot_by_key(
&self,
flow_key: FlowKey,
) -> Result<Option<FlowConfigurationSnapshot>, FindFlowConfigurationError>;
) -> Result<Option<FlowConfigurationRule>, FindFlowConfigurationError>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<T: FlowConfigurationService + ?Sized> FlowConfigurationServiceExt for T {
async fn try_get_config_snapshot_by_key(
&self,
flow_key: FlowKey,
) -> Result<Option<FlowConfigurationSnapshot>, FindFlowConfigurationError> {
) -> Result<Option<FlowConfigurationRule>, FindFlowConfigurationError> {
let maybe_snapshot = match flow_key {
FlowKey::System(_) => None,
FlowKey::Dataset(dataset_flow_key) => match dataset_flow_key.flow_type {
Expand All @@ -142,21 +142,21 @@ impl<T: FlowConfigurationService + ?Sized> FlowConfigurationServiceExt for T {
dataset_flow_key.flow_type,
)
.await?
.map(FlowConfigurationSnapshot::Ingest),
.map(FlowConfigurationRule::IngestRule),
DatasetFlowType::Reset => self
.try_get_dataset_reset_rule(
dataset_flow_key.dataset_id,
dataset_flow_key.flow_type,
)
.await?
.map(FlowConfigurationSnapshot::Reset),
.map(FlowConfigurationRule::ResetRule),
DatasetFlowType::HardCompaction => self
.try_get_dataset_compaction_rule(
dataset_flow_key.dataset_id,
dataset_flow_key.flow_type,
)
.await?
.map(FlowConfigurationSnapshot::Compaction),
.map(FlowConfigurationRule::CompactionRule),
},
};

Expand Down
12 changes: 6 additions & 6 deletions src/domain/flow-system/services/src/flow/flow_executor_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl FlowExecutorImpl {
// Split triggers by those which have a schedule or different rules
let (schedule_triggers, non_schedule_triggers): (Vec<_>, Vec<_>) = enabled_triggers
.into_iter()
.partition(|config| matches!(config.rule, Some(FlowTriggerRule::Schedule(_))));
.partition(|config| matches!(config.rule, FlowTriggerRule::Schedule(_)));

let scheduling_helper = target_catalog.get_one::<FlowSchedulingHelper>().unwrap();

Expand Down Expand Up @@ -363,14 +363,14 @@ impl FlowExecutorImpl {
pub fn make_task_logical_plan(
&self,
flow_key: &FlowKey,
maybe_config_snapshot: Option<&FlowConfigurationSnapshot>,
maybe_config_snapshot: Option<&FlowConfigurationRule>,
) -> Result<LogicalPlan, InternalError> {
match flow_key {
FlowKey::Dataset(flow_key) => match flow_key.flow_type {
DatasetFlowType::Ingest | DatasetFlowType::ExecuteTransform => {
let mut fetch_uncacheable = false;
if let Some(config_snapshot) = maybe_config_snapshot
&& let FlowConfigurationSnapshot::Ingest(ingest_rule) = config_snapshot
&& let FlowConfigurationRule::IngestRule(ingest_rule) = config_snapshot
{
fetch_uncacheable = ingest_rule.fetch_uncacheable;
}
Expand All @@ -385,7 +385,7 @@ impl FlowExecutorImpl {
let mut keep_metadata_only = false;

if let Some(config_snapshot) = maybe_config_snapshot
&& let FlowConfigurationSnapshot::Compaction(compaction_rule) =
&& let FlowConfigurationRule::CompactionRule(compaction_rule) =
config_snapshot
{
max_slice_size = compaction_rule.max_slice_size();
Expand All @@ -405,7 +405,7 @@ impl FlowExecutorImpl {
}
DatasetFlowType::Reset => {
if let Some(config_rule) = maybe_config_snapshot
&& let FlowConfigurationSnapshot::Reset(reset_rule) = config_rule
&& let FlowConfigurationRule::ResetRule(reset_rule) = config_rule
{
return Ok(LogicalPlan::ResetDataset(LogicalPlanResetDataset {
dataset_id: flow_key.dataset_id.clone(),
Expand Down Expand Up @@ -744,7 +744,7 @@ pub enum FlowTriggerContext {
pub struct DownstreamDependencyFlowPlan {
pub flow_key: FlowKey,
pub flow_trigger_rule: Option<FlowTriggerRule>,
pub maybe_config_snapshot: Option<FlowConfigurationSnapshot>,
pub maybe_config_snapshot: Option<FlowConfigurationRule>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl FlowQueryService for FlowQueryServiceImpl {
trigger_time: DateTime<Utc>,
flow_key: FlowKey,
initiator_account_id: AccountID,
config_snapshot_maybe: Option<FlowConfigurationSnapshot>,
config_snapshot_maybe: Option<FlowConfigurationRule>,
) -> Result<FlowState, RequestFlowError> {
let activation_time = self.executor_config.round_time(trigger_time)?;

Expand Down
Loading

0 comments on commit dfc1cd9

Please sign in to comment.