diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bf50b6d4f..2493f57db2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Recommendation: for ease of reading, use the following order: --> ## [Unreleased] +### Changed +- GraphQL: accountListFlows returns list sorted by status and last event time ### Fixed - GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here) - Flow trigger status now become disable on flow fail diff --git a/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs b/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs index 1c40c0aadf..72b4a86195 100644 --- a/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs +++ b/src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs @@ -482,25 +482,7 @@ impl FlowEventStore for InMemoryFlowEventStore { filters: &DatasetFlowFilters, pagination: PaginationOpts, ) -> FlowIDStream { - let flow_ids_page: Vec<_> = { - let state = self.inner.as_state(); - let g = state.lock().unwrap(); - g.all_flows_by_dataset - .get(dataset_id) - .map(|dataset_flow_ids| { - dataset_flow_ids - .iter() - .rev() - .filter(|flow_id| g.matches_dataset_flow(**flow_id, filters)) - .skip(pagination.offset) - .take(pagination.limit) - .map(|flow_id| Ok(*flow_id)) - .collect() - }) - .unwrap_or_default() - }; - - Box::pin(futures::stream::iter(flow_ids_page)) + self.get_all_flow_ids_by_datasets(HashSet::from([dataset_id.clone()]), filters, pagination) } #[tracing::instrument(level = "debug", skip_all, fields(%dataset_id))] @@ -548,25 +530,62 @@ impl FlowEventStore for InMemoryFlowEventStore { let flow_ids_page: Vec<_> = { let state = self.inner.as_state(); let g = state.lock().unwrap(); - let mut result: Vec> = vec![]; - let mut total_count = 0; - for flow_id in g.all_flows.iter().rev() { + + // Collect FlowID -> Most recent event time, for sorting purposes + let recent_events: HashMap> = g.events.iter().fold( + HashMap::new(), + |mut acc: HashMap>, i: &FlowEvent| { + let event_time = i.event_time(); + acc.entry(i.flow_id()) + .and_modify(|val| { + if event_time.gt(val) { + *val = event_time; + }; + }) + .or_insert(event_time); + acc + }, + ); + + // Split events by type + let mut waiting_flows: Vec<_> = vec![]; + let mut running_flows: Vec<_> = vec![]; + let mut finished_flows: Vec<_> = vec![]; + for flow_id in &g.all_flows { + // Also also apply given filters on this stage in order to reduce amount of + // items to process in further steps let flow_key = g.flow_key_by_flow_id.get(flow_id).unwrap(); if let FlowKey::Dataset(flow_key_dataset) = flow_key { if dataset_ids.contains(&flow_key_dataset.dataset_id) && g.matches_dataset_flow(*flow_id, filters) { - if result.len() >= pagination.limit { - break; - } - if total_count >= pagination.offset { - result.push(Ok(*flow_id)); + if let Some(flow) = g.flow_search_index.get(flow_id) { + let item = (flow_id, recent_events.get(flow_id)); + match flow.flow_status { + FlowStatus::Waiting => waiting_flows.push(item), + FlowStatus::Running => running_flows.push(item), + FlowStatus::Finished => finished_flows.push(item), + } } - total_count += 1; } - }; + } } - result + // Sort every group separately + waiting_flows.sort_by(|a, b| b.cmp(a)); + running_flows.sort_by(|a, b| b.cmp(a)); + finished_flows.sort_by(|a, b| b.cmp(a)); + + let mut ordered_flows = vec![]; + ordered_flows.append(&mut waiting_flows); + ordered_flows.append(&mut running_flows); + ordered_flows.append(&mut finished_flows); + + ordered_flows + .iter() + .skip(pagination.offset) + .take(pagination.limit) + .map(|(flow_id, _)| Ok(**flow_id)) + .collect() }; Box::pin(futures::stream::iter(flow_ids_page)) diff --git a/src/infra/flow-system/postgres/.sqlx/query-849e4e06bc203af6f1b895b839dc64fb200f2a9b93a80a5cc9ab8f7471047639.json b/src/infra/flow-system/postgres/.sqlx/query-82f1523001347f5984f9c9a44f1aece20ea61b689ec9e98a3a955e52f2a7b782.json similarity index 88% rename from src/infra/flow-system/postgres/.sqlx/query-849e4e06bc203af6f1b895b839dc64fb200f2a9b93a80a5cc9ab8f7471047639.json rename to src/infra/flow-system/postgres/.sqlx/query-82f1523001347f5984f9c9a44f1aece20ea61b689ec9e98a3a955e52f2a7b782.json index e77580e5dd..4b816f6090 100644 --- a/src/infra/flow-system/postgres/.sqlx/query-849e4e06bc203af6f1b895b839dc64fb200f2a9b93a80a5cc9ab8f7471047639.json +++ b/src/infra/flow-system/postgres/.sqlx/query-82f1523001347f5984f9c9a44f1aece20ea61b689ec9e98a3a955e52f2a7b782.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT flow_id FROM flows\n WHERE dataset_id = ANY($1)\n AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))\n ORDER BY flow_id DESC\n LIMIT $5 OFFSET $6\n ", + "query": "\n SELECT flow_id FROM flows\n WHERE dataset_id = ANY($1)\n AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))\n ORDER BY flow_status, last_event_id DESC\n LIMIT $5 OFFSET $6\n ", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "849e4e06bc203af6f1b895b839dc64fb200f2a9b93a80a5cc9ab8f7471047639" + "hash": "82f1523001347f5984f9c9a44f1aece20ea61b689ec9e98a3a955e52f2a7b782" } diff --git a/src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json b/src/infra/flow-system/postgres/.sqlx/query-c6c83bf1eda6fcc75731d78f7926dda132b5d484ee0e88da24759583ead5ccb6.json similarity index 84% rename from src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json rename to src/infra/flow-system/postgres/.sqlx/query-c6c83bf1eda6fcc75731d78f7926dda132b5d484ee0e88da24759583ead5ccb6.json index 7d37630659..efce10ea96 100644 --- a/src/infra/flow-system/postgres/.sqlx/query-617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae.json +++ b/src/infra/flow-system/postgres/.sqlx/query-c6c83bf1eda6fcc75731d78f7926dda132b5d484ee0e88da24759583ead5ccb6.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT flow_id FROM flows\n WHERE dataset_id = $1\n AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))\n ORDER BY flow_id DESC\n LIMIT $5 OFFSET $6\n ", + "query": "\n SELECT flow_id FROM flows\n WHERE dataset_id = $1\n AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)\n AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)\n AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))\n ORDER BY flow_status, last_event_id DESC\n LIMIT $5 OFFSET $6\n ", "describe": { "columns": [ { @@ -46,5 +46,5 @@ false ] }, - "hash": "617464a1636be54a17ae8c7cdb8a328dfb878f37aa1c1f8b3d2e073a12292cae" + "hash": "c6c83bf1eda6fcc75731d78f7926dda132b5d484ee0e88da24759583ead5ccb6" } diff --git a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs index 061dfaf63d..170594f01a 100644 --- a/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs +++ b/src/infra/flow-system/postgres/src/postgres_flow_event_store.rs @@ -597,7 +597,7 @@ impl FlowEventStore for PostgresFlowEventStore { AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2) AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3) AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4)) - ORDER BY flow_id DESC + ORDER BY flow_status, last_event_id DESC LIMIT $5 OFFSET $6 "#, dataset_id, @@ -683,7 +683,7 @@ impl FlowEventStore for PostgresFlowEventStore { AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2) AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3) AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4)) - ORDER BY flow_id DESC + ORDER BY flow_status, last_event_id DESC LIMIT $5 OFFSET $6 "#, dataset_ids as Vec, diff --git a/src/infra/flow-system/repo-tests/src/test_flow_event_store.rs b/src/infra/flow-system/repo-tests/src/test_flow_event_store.rs index 0d5579bed1..7eaf5a038d 100644 --- a/src/infra/flow-system/repo-tests/src/test_flow_event_store.rs +++ b/src/infra/flow-system/repo-tests/src/test_flow_event_store.rs @@ -39,12 +39,12 @@ pub async fn test_dataset_flow_empty_filters_distingush_dataset(catalog: &Catalo }, 6, vec![ - foo_cases.compaction_flow_ids.flow_id_finished, - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_finished, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.ingest_flow_ids.flow_id_finished, ], ) .await; @@ -59,12 +59,12 @@ pub async fn test_dataset_flow_empty_filters_distingush_dataset(catalog: &Catalo }, 6, vec![ - bar_cases.compaction_flow_ids.flow_id_finished, - bar_cases.compaction_flow_ids.flow_id_running, bar_cases.compaction_flow_ids.flow_id_waiting, - bar_cases.ingest_flow_ids.flow_id_finished, - bar_cases.ingest_flow_ids.flow_id_running, bar_cases.ingest_flow_ids.flow_id_waiting, + bar_cases.compaction_flow_ids.flow_id_running, + bar_cases.ingest_flow_ids.flow_id_running, + bar_cases.compaction_flow_ids.flow_id_finished, + bar_cases.ingest_flow_ids.flow_id_finished, ], ) .await; @@ -140,9 +140,9 @@ pub async fn test_dataset_flow_filter_by_flow_type(catalog: &Catalog) { ..Default::default() }, vec![ - foo_cases.ingest_flow_ids.flow_id_finished, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.ingest_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_finished, ], ), ( @@ -151,9 +151,9 @@ pub async fn test_dataset_flow_filter_by_flow_type(catalog: &Catalog) { ..Default::default() }, vec![ - foo_cases.compaction_flow_ids.flow_id_finished, - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, ], ), ( @@ -261,10 +261,10 @@ pub async fn test_dataset_flow_filter_by_initiator_with_multiple_variants(catalo ..Default::default() }, vec![ - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, ], ), // should return the same amount even if some non existing user was provided @@ -274,10 +274,10 @@ pub async fn test_dataset_flow_filter_by_initiator_with_multiple_variants(catalo ..Default::default() }, vec![ - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, ], ), ]; @@ -362,29 +362,29 @@ pub async fn test_dataset_flow_filter_by_datasets(catalog: &Catalog) { ( vec![foo_cases.dataset_id.clone()], vec![ - foo_cases.compaction_flow_ids.flow_id_finished, - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_finished, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.ingest_flow_ids.flow_id_finished, ], ), ( vec![foo_cases.dataset_id.clone(), bar_cases.dataset_id.clone()], vec![ - bar_cases.compaction_flow_ids.flow_id_finished, - bar_cases.compaction_flow_ids.flow_id_running, bar_cases.compaction_flow_ids.flow_id_waiting, - bar_cases.ingest_flow_ids.flow_id_finished, - bar_cases.ingest_flow_ids.flow_id_running, bar_cases.ingest_flow_ids.flow_id_waiting, - foo_cases.compaction_flow_ids.flow_id_finished, - foo_cases.compaction_flow_ids.flow_id_running, foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_finished, - foo_cases.ingest_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_waiting, + bar_cases.compaction_flow_ids.flow_id_running, + bar_cases.ingest_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, + bar_cases.compaction_flow_ids.flow_id_finished, + bar_cases.ingest_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.ingest_flow_ids.flow_id_finished, ], ), (vec![DatasetID::new_seeded_ed25519(b"wrong")], vec![]), @@ -461,13 +461,26 @@ pub async fn test_dataset_flow_filter_by_datasets_with_pagination(catalog: &Cata let bar_cases = make_dataset_test_case(flow_event_store.clone()).await; make_system_test_case(flow_event_store.clone()).await; + // Expected order: + // bar compact waiting + // bar ingest waiting <- (foo+bar) offset: 1 + // foo compact waiting + // foo ingest waiting + // bar compact running + // bar ingest running + // foo compact running <- (foo) offset: 2 + // foo ingest running <- (foo+bar) offset: 1, limit: 7 + // bar compact finished + // bar ingest finished + // foo compact finished <- (foo) offset: 2, limit: 3 + // foo ingest finished let cases = vec![ ( vec![foo_cases.dataset_id.clone()], vec![ - foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, ], PaginationOpts { offset: 2, @@ -477,13 +490,13 @@ pub async fn test_dataset_flow_filter_by_datasets_with_pagination(catalog: &Cata ( vec![foo_cases.dataset_id.clone(), bar_cases.dataset_id.clone()], vec![ + bar_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_waiting, + foo_cases.ingest_flow_ids.flow_id_waiting, bar_cases.compaction_flow_ids.flow_id_running, - bar_cases.compaction_flow_ids.flow_id_waiting, - bar_cases.ingest_flow_ids.flow_id_finished, bar_cases.ingest_flow_ids.flow_id_running, - bar_cases.ingest_flow_ids.flow_id_waiting, - foo_cases.compaction_flow_ids.flow_id_finished, foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.ingest_flow_ids.flow_id_running, ], PaginationOpts { offset: 1, @@ -526,8 +539,8 @@ pub async fn test_dataset_flow_pagination(catalog: &Catalog) { limit: 2, }, vec![ - foo_cases.compaction_flow_ids.flow_id_finished, - foo_cases.compaction_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_waiting, + foo_cases.ingest_flow_ids.flow_id_waiting, ], ), ( @@ -536,9 +549,9 @@ pub async fn test_dataset_flow_pagination(catalog: &Catalog) { limit: 3, }, vec![ - foo_cases.compaction_flow_ids.flow_id_waiting, - foo_cases.ingest_flow_ids.flow_id_finished, + foo_cases.compaction_flow_ids.flow_id_running, foo_cases.ingest_flow_ids.flow_id_running, + foo_cases.compaction_flow_ids.flow_id_finished, ], ), ( @@ -547,8 +560,8 @@ pub async fn test_dataset_flow_pagination(catalog: &Catalog) { limit: 2, }, vec![ - foo_cases.ingest_flow_ids.flow_id_running, - foo_cases.ingest_flow_ids.flow_id_waiting, + foo_cases.compaction_flow_ids.flow_id_finished, + foo_cases.ingest_flow_ids.flow_id_finished, ], ), ( @@ -556,7 +569,7 @@ pub async fn test_dataset_flow_pagination(catalog: &Catalog) { offset: 5, limit: 2, }, - vec![foo_cases.ingest_flow_ids.flow_id_waiting], + vec![foo_cases.ingest_flow_ids.flow_id_finished], ), ( PaginationOpts { @@ -599,7 +612,7 @@ pub async fn test_dataset_flow_pagination_with_filters(catalog: &Catalog) { }, 3, vec![ - foo_cases.ingest_flow_ids.flow_id_finished, + foo_cases.ingest_flow_ids.flow_id_waiting, foo_cases.ingest_flow_ids.flow_id_running, ], ), @@ -2236,21 +2249,23 @@ struct TestFlowIDs { async fn make_dataset_test_case(flow_event_store: Arc) -> DatasetTestCase { let (_, dataset_id) = DatasetID::new_generated_ed25519(); + let ingest_flow_ids = make_dataset_test_flows( + &dataset_id, + DatasetFlowType::Ingest, + flow_event_store.clone(), + ) + .await; + let compaction_flow_ids = make_dataset_test_flows( + &dataset_id, + DatasetFlowType::HardCompaction, + flow_event_store, + ) + .await; DatasetTestCase { dataset_id: dataset_id.clone(), - ingest_flow_ids: make_dataset_test_flows( - &dataset_id, - DatasetFlowType::Ingest, - flow_event_store.clone(), - ) - .await, - compaction_flow_ids: make_dataset_test_flows( - &dataset_id, - DatasetFlowType::HardCompaction, - flow_event_store, - ) - .await, + ingest_flow_ids, + compaction_flow_ids, } } diff --git a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs index 44e6a2b209..dc6281ded7 100644 --- a/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs +++ b/src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs @@ -651,7 +651,7 @@ impl FlowEventStore for SqliteFlowEventStore { AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2) AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3) AND ($4 = 0 OR initiator IN ({})) - ORDER BY flow_id DESC + ORDER BY flow_status DESC, last_event_id DESC LIMIT $5 OFFSET $6 "#, maybe_initiators @@ -762,7 +762,7 @@ impl FlowEventStore for SqliteFlowEventStore { AND (cast($1 as dataset_flow_type) IS NULL OR dataset_flow_type = $1) AND (cast($2 as flow_status_type) IS NULL OR flow_status = $2) AND ($3 = 0 OR initiator in ({})) - ORDER BY flow_id DESC + ORDER BY flow_status DESC, last_event_id DESC LIMIT $4 OFFSET $5 "#, sqlite_generate_placeholders_list(dataset_ids.len(), 6),