diff --git a/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs b/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs index 3dce1eca1f..3f2ba4190f 100644 --- a/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs +++ b/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs @@ -482,25 +482,7 @@ impl FlowEventStore for InMemoryFlowEventStore { filters: &DatasetFlowFilters, pagination: PaginationOpts, ) -> FlowIDStream { - let flow_ids_page: Vec<_> = { - let state = self.inner.as_state(); - let g = state.lock().unwrap(); - g.all_flows_by_dataset - .get(dataset_id) - .map(|dataset_flow_ids| { - dataset_flow_ids - .iter() - .rev() - .filter(|flow_id| g.matches_dataset_flow(**flow_id, filters)) - .skip(pagination.offset) - .take(pagination.limit) - .map(|flow_id| Ok(*flow_id)) - .collect() - }) - .unwrap_or_default() - }; - - Box::pin(futures::stream::iter(flow_ids_page)) + self.get_all_flow_ids_by_datasets(HashSet::from([dataset_id.clone()]), filters, pagination) } #[tracing::instrument(level = "debug", skip_all, fields(%dataset_id))] diff --git a/src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json b/src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json deleted file mode 100644 index 7d37630659..0000000000 --- a/src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT flow_id FROM flows\n WHERE dataset_id = $1\n AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))\n ORDER BY flow_id DESC\n LIMIT $5 OFFSET $6\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "flow_id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Text", - { - "Custom": { - "name": "dataset_flow_type", - "kind": { - "Enum": [ - "ingest", - "execute_transform", - "hard_compaction", - "reset" - ] - } - } - }, - { - "Custom": { - "name": "flow_status_type", - "kind": { - "Enum": [ - "waiting", - "running", - "finished" - ] - } - } - }, - "TextArray", - "Int8", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae" -} diff --git a/src/infra/flow-system/postgres/.sqlx/query-adf43305610c10c7eff3e20f4802ddeddc95dbfade44ed761aea26780b813319.json b/src/infra/flow-system/postgres/.sqlx/query-adf43305610c10c7eff3e20f4802ddeddc95dbfade44ed761aea26780b813319.json new file mode 100644 index 0000000000..ad5185db99 --- /dev/null +++ b/src/infra/flow-system/postgres/.sqlx/query-adf43305610c10c7eff3e20f4802ddeddc95dbfade44ed761aea26780b813319.json @@ -0,0 +1,50 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH unsorted_flows AS\n (SELECT\n f.flow_id,\n f.flow_status,\n MAX(e.event_time) as last_event_time\n FROM flows f\n LEFT JOIN flow_events e USING(flow_id)\n WHERE\n f.dataset_id = $1\n AND (cast($2 as dataset_flow_type) IS NULL OR f.dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR f.flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR f.initiator = ANY($4))\n GROUP BY f.flow_id, f.flow_status)\n SELECT flow_id FROM unsorted_flows\n ORDER BY flow_status, last_event_time DESC\n LIMIT $5 OFFSET $6\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "flow_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + { + "Custom": { + "name": "dataset_flow_type", + "kind": { + "Enum": [ + "ingest", + "execute_transform", + "hard_compaction", + "reset" + ] + } + } + }, + { + "Custom": { + "name": "flow_status_type", + "kind": { + "Enum": [ + "waiting", + "running", + "finished" + ] + } + } + }, + "TextArray", + "Int8", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "adf43305610c10c7eff3e20f4802ddeddc95dbfade44ed761aea26780b813319" +} diff --git a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs index e62a14ca76..bd4b255f6f 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs @@ -592,12 +592,21 @@ impl FlowEventStore for PostgresFlowEventStore { let mut query_stream = sqlx::query!( r#" - SELECT flow_id FROM flows - WHERE dataset_id = $1 - AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2) - AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3) - AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4)) - ORDER BY flow_id DESC + WITH unsorted_flows AS + (SELECT + f.flow_id, + f.flow_status, + MAX(e.event_time) as last_event_time + FROM flows f + LEFT JOIN flow_events e USING(flow_id) + WHERE + f.dataset_id = $1 + AND (cast($2 as dataset_flow_type) IS NULL OR f.dataset_flow_type = $2) + AND (cast($3 as flow_status_type) IS NULL OR f.flow_status = $3) + AND (cast($4 as TEXT[]) IS NULL OR f.initiator = ANY($4)) + GROUP BY f.flow_id, f.flow_status) + SELECT flow_id FROM unsorted_flows + ORDER BY flow_status, last_event_time DESC LIMIT $5 OFFSET $6 "#, dataset_id, diff --git a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs index 60faeb93dd..cd671044c8 100644 --- a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs +++ b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs @@ -646,12 +646,25 @@ impl FlowEventStore for SqliteFlowEventStore { let query_str = format!( r#" - SELECT flow_id FROM flows + WITH unsorted_flows AS + (SELECT + f.flow_id, + f.flow_status, + MAX(e.event_time) as last_event_time, + (CASE + WHEN f.flow_status = 'waiting' THEN 1 + WHEN f.flow_status = 'running' THEN 2 + ELSE 3 + END) AS ord_status + FROM flows f + LEFT JOIN flow_events e USING(flow_id) WHERE dataset_id = $1 AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2) AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3) - AND ($4 = 0 OR initiator IN ({})) - ORDER BY flow_id DESC + AND ($4 = 0 OR initiator in ({})) + GROUP BY f.flow_id, f.flow_status) + SELECT flow_id FROM unsorted_flows + ORDER BY ord_status, last_event_time DESC LIMIT $5 OFFSET $6 "#, maybe_initiators