Skip to content

Commit

Permalink
Improve sorting for GQL::getDatasetListFlows query
Browse files Browse the repository at this point in the history
  • Loading branch information
andriidemus committed Jan 3, 2025
1 parent 51fba2a commit 4403354
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 78 deletions.
20 changes: 1 addition & 19 deletions src/infra/flow-system/inmem/src/flow/inmem_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,25 +482,7 @@ impl FlowEventStore for InMemoryFlowEventStore {
filters: &DatasetFlowFilters,
pagination: PaginationOpts,
) -> FlowIDStream {
let flow_ids_page: Vec<_> = {
let state = self.inner.as_state();
let g = state.lock().unwrap();
g.all_flows_by_dataset
.get(dataset_id)
.map(|dataset_flow_ids| {
dataset_flow_ids
.iter()
.rev()
.filter(|flow_id| g.matches_dataset_flow(**flow_id, filters))
.skip(pagination.offset)
.take(pagination.limit)
.map(|flow_id| Ok(*flow_id))
.collect()
})
.unwrap_or_default()
};

Box::pin(futures::stream::iter(flow_ids_page))
self.get_all_flow_ids_by_datasets(HashSet::from([dataset_id.clone()]), filters, pagination)
}

#[tracing::instrument(level = "debug", skip_all, fields(%dataset_id))]
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions src/infra/flow-system/postgres/src/postgres_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,21 @@ impl FlowEventStore for PostgresFlowEventStore {

let mut query_stream = sqlx::query!(
r#"
SELECT flow_id FROM flows
WHERE dataset_id = $1
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))
ORDER BY flow_id DESC
WITH unsorted_flows AS
(SELECT
f.flow_id,
f.flow_status,
MAX(e.event_time) as last_event_time
FROM flows f
LEFT JOIN flow_events e USING(flow_id)
WHERE
f.dataset_id = $1
AND (cast($2 as dataset_flow_type) IS NULL OR f.dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR f.flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR f.initiator = ANY($4))
GROUP BY f.flow_id, f.flow_status)
SELECT flow_id FROM unsorted_flows
ORDER BY flow_status, last_event_time DESC
LIMIT $5 OFFSET $6
"#,
dataset_id,
Expand Down
19 changes: 16 additions & 3 deletions src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,25 @@ impl FlowEventStore for SqliteFlowEventStore {

let query_str = format!(
r#"
SELECT flow_id FROM flows
WITH unsorted_flows AS
(SELECT
f.flow_id,
f.flow_status,
MAX(e.event_time) as last_event_time,
(CASE
WHEN f.flow_status = 'waiting' THEN 1
WHEN f.flow_status = 'running' THEN 2
ELSE 3
END) AS ord_status
FROM flows f
LEFT JOIN flow_events e USING(flow_id)
WHERE dataset_id = $1
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND ($4 = 0 OR initiator IN ({}))
ORDER BY flow_id DESC
AND ($4 = 0 OR initiator in ({}))
GROUP BY f.flow_id, f.flow_status)
SELECT flow_id FROM unsorted_flows
ORDER BY ord_status, last_event_time DESC
LIMIT $5 OFFSET $6
"#,
maybe_initiators
Expand Down

0 comments on commit 4403354

Please sign in to comment.