Skip to content

Commit

Permalink
Flow configuration separation (#982)
Browse files Browse the repository at this point in the history
* Add flow trigger

* Add gql apis

* Add trigger event repo tests

* Rename account flow mut

* Fix review comments - Iter 1

* Fix crash after flow trigger set

* Fix manual trigger config merging (#990)

* Fix review comments - Iter 2

* Extend clean up migration

* Experiment jsonb migration

* Update changelog
  • Loading branch information
rmn-boiko authored Dec 27, 2024
1 parent 741548d commit 6637ed5
Show file tree
Hide file tree
Showing 127 changed files with 6,610 additions and 3,574 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Added
- New entity `FlowTrigger` which is now responsible for flow activation and schedules
### Changed
- `DatasetFlowConfigsMut` now has only one method `setConfig` for all types of configurations

## [0.214.0] - 2024-12-23
### Added
- New `kamu system decode` command that can decode an arbitrary block file for debugging
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions migrations/postgres/20241204155207_cleanup_flow_configurations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- delete from outbox_message_consumptions where producer_name='dev.kamu.domain.flow-system.FlowConfigurationService';

DO $$
BEGIN
-- Update `event_payload` to remove "paused" and "schedule_condition" for IngestRule in both 'Created' and 'Modified'
UPDATE flow_configuration_events
SET event_payload =
CASE
-- Handle Ingest 'Created' case
WHEN event_payload @> '{"Created": {"rule": {"IngestRule": {}}}}'::jsonb THEN
jsonb_set(
event_payload - 'Created',
'{Created}',
jsonb_set(
(event_payload->'Created') - 'paused',
'{rule,IngestRule}',
(event_payload->'Created'->'rule'->'IngestRule') - 'schedule_condition'
)
)
-- Handle Ingest 'Modified' case
WHEN event_payload @> '{"Modified": {"rule": {"IngestRule": {}}}}'::jsonb THEN
jsonb_set(
event_payload - 'Modified',
'{Modified}',
jsonb_set(
(event_payload->'Modified') - 'paused',
'{rule,IngestRule}',
(event_payload->'Modified'->'rule'->'IngestRule') - 'schedule_condition'
)
)
-- Handle Compaction 'Created' case
WHEN event_payload @> '{"Created": {"rule": {"CompactionRule": {}}}}'::jsonb THEN
jsonb_set(
event_payload - 'Created',
'{Created}',
(event_payload->'Created') - 'paused'
)
-- Handle Compaction 'Modified' case
WHEN event_payload @> '{"Modified": {"rule": {"CompactionRule": {}}}}'::jsonb THEN
jsonb_set(
event_payload - 'Modified',
'{Modified}',
(event_payload->'Modified') - 'paused'
)
ELSE
event_payload
END
WHERE event_payload @> '{"Created": {"rule": {"IngestRule": {}}}}'::jsonb
OR event_payload @> '{"Modified": {"rule": {"IngestRule": {}}}}'::jsonb
OR event_payload @> '{"Created": {"rule": {"CompactionRule": {}}}}'::jsonb
OR event_payload @> '{"Modified": {"rule": {"CompactionRule": {}}}}'::jsonb;

-- Delete rows where event_payload matches TransformRule pattern for both 'Created' and 'Modified'
DELETE FROM flow_configuration_events
WHERE event_payload @> '{"Created": {"rule": {"TransformRule": {}}}}'::jsonb
OR event_payload @> '{"Modified": {"rule": {"TransformRule": {}}}}'::jsonb;
END $$;


-- Update flow_events payload
UPDATE flow_events
SET event_payload = jsonb_set(
event_payload - 'Initiated',
'{Initiated}',
jsonb_set(
event_payload->'Initiated',
'{config_snapshot}',
jsonb_set(
event_payload->'Initiated'->'config_snapshot',
'{IngestRule}',
(event_payload->'Initiated'->'config_snapshot'->'Ingest') - 'schedule_condition'
) - 'Ingest'
)
)
WHERE event_payload @> '{"Initiated": {"config_snapshot": {"Ingest": {"schedule_condition": {}}}}}'::jsonb;
26 changes: 26 additions & 0 deletions migrations/postgres/20241204175207_flow_trigger_events_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* ------------------------------ */

CREATE SEQUENCE flow_trigger_event_id_seq AS BIGINT;

/* ------------------------------ */

CREATE TABLE flow_trigger_events
(
event_id BIGINT PRIMARY KEY DEFAULT NEXTVAL('flow_trigger_event_id_seq'),
dataset_id VARCHAR(100),
dataset_flow_type dataset_flow_type,
system_flow_type system_flow_type,
event_type VARCHAR(50) NOT NULL,
event_time TIMESTAMPTZ NOT NULL,
event_payload JSONB NOT NULL
);

CREATE INDEX idx_flow_trigger_events_dataset_flow_key
ON flow_trigger_events (dataset_id, dataset_flow_type)
WHERE dataset_id IS NOT NULL;

CREATE INDEX idx_flow_trigger_events_system_flow_key
ON flow_trigger_events (system_flow_type)
WHERE system_flow_type IS NOT NULL;

/* ------------------------------ */
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
delete from outbox_message_consumptions where producer_name = "dev.kamu.domain.flow-system.FlowConfigurationService";

DELETE FROM flow_configuration_events;
DELETE FROM flow_events;
DELETE FROM flows;
33 changes: 33 additions & 0 deletions migrations/sqlite/20241204175207_flow_trigger_events_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* ------------------------------ */

CREATE TABLE flow_trigger_events
(
event_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
created_time timestamptz NOT NULL,
dataset_id VARCHAR(100),
dataset_flow_type VARCHAR(20) CHECK (
dataset_flow_type IN (
'ingest',
'execute_transform',
'hard_compaction',
'reset'
)
),
system_flow_type VARCHAR(10) CHECK (
system_flow_type IN ('gc')
),
event_type VARCHAR(50) NOT NULL,
event_time TIMESTAMPTZ NOT NULL,
event_payload JSONB NOT NULL
);


CREATE INDEX idx_flow_trigger_events_dataset_id_idx
ON flow_trigger_events (dataset_id, dataset_flow_type)
WHERE dataset_id IS NOT NULL;

CREATE INDEX idx_flow_trigger_events_system_flow_type_idx
ON flow_trigger_events (system_flow_type)
WHERE system_flow_type IS NOT NULL;

/* ------------------------------ */
Loading

0 comments on commit 6637ed5

Please sign in to comment.