Skip to content

Commit

Permalink
Preserve original formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
andriidemus committed Jan 8, 2025
1 parent 66ee429 commit c10512b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 31 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 4 additions & 10 deletions src/infra/flow-system/postgres/src/postgres_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,8 @@ impl FlowEventStore for PostgresFlowEventStore {

let mut query_stream = sqlx::query!(
r#"
SELECT
flow_id
FROM flows
WHERE
dataset_id = $1
SELECT flow_id FROM flows
WHERE dataset_id = $1
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))
Expand Down Expand Up @@ -681,11 +678,8 @@ impl FlowEventStore for PostgresFlowEventStore {

let mut query_stream = sqlx::query!(
r#"
SELECT
flow_id
FROM flows
WHERE
dataset_id = ANY($1)
SELECT flow_id FROM flows
WHERE dataset_id = ANY($1)
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND (cast($4 as TEXT[]) IS NULL OR initiator = ANY($4))
Expand Down
28 changes: 11 additions & 17 deletions src/infra/flow-system/sqlite/src/sqlite_flow_event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ impl SqliteFlowEventStore {
dataset_flow_type,
initiator,
)
.execute(connection_mut)
.await
.map_err(ErrorIntoInternal::int_err)?;
.execute(connection_mut)
.await
.map_err(ErrorIntoInternal::int_err)?;
}
FlowKey::System(fk_system) => {
let system_flow_type = fk_system.flow_type;
Expand Down Expand Up @@ -165,9 +165,9 @@ impl SqliteFlowEventStore {
maybe_scheduled_for_activation_at,
maybe_prev_stored_event_id,
)
.fetch_all(connection_mut)
.await
.map_err(|e| SaveEventsError::Internal(e.int_err()))?;
.fetch_all(connection_mut)
.await
.map_err(|e| SaveEventsError::Internal(e.int_err()))?;

// If a previously stored event id does not match the expected,
// this means we've just detected a concurrent modification (version conflict)
Expand Down Expand Up @@ -646,14 +646,11 @@ impl FlowEventStore for SqliteFlowEventStore {

let query_str = format!(
r#"
SELECT
flow_id
FROM flows
WHERE
dataset_id = $1
SELECT flow_id FROM flows
WHERE dataset_id = $1
AND (cast($2 as dataset_flow_type) IS NULL OR dataset_flow_type = $2)
AND (cast($3 as flow_status_type) IS NULL OR flow_status = $3)
AND ($4 = 0 OR initiator in ({}))
AND ($4 = 0 OR initiator IN ({}))
ORDER BY flow_status DESC, last_event_id DESC
LIMIT $5 OFFSET $6
"#,
Expand Down Expand Up @@ -760,11 +757,8 @@ impl FlowEventStore for SqliteFlowEventStore {

let query_str = format!(
r#"
SELECT
flow_id
FROM flows
WHERE
dataset_id in ({})
SELECT flow_id FROM flows
WHERE dataset_id in ({})
AND (cast($1 as dataset_flow_type) IS NULL OR dataset_flow_type = $1)
AND (cast($2 as flow_status_type) IS NULL OR flow_status = $2)
AND ($3 = 0 OR initiator in ({}))
Expand Down

0 comments on commit c10512b

Please sign in to comment.