Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use nexus_auth::context::OpContext;
use nexus_db_errors::ErrorHandler;
use nexus_db_errors::public_error_from_diesel;
use nexus_db_model::SagaState;
use omicron_common::api::external::DataPageParams;
use omicron_common::api::external::Error;
use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use std::ops::Add;
use uuid::Uuid;

impl DataStore {
pub async fn saga_create(
Expand Down Expand Up @@ -130,6 +132,27 @@ impl DataStore {
}
}

/// Returns a single page of unfinished sagas assigned to SEC `sec_id`.
pub async fn saga_list_recovery_candidates(
&self,
opctx: &OpContext,
sec_id: db::saga_types::SecId,
pagparams: &DataPageParams<'_, Uuid>,
) -> Result<Vec<db::saga_types::Saga>, Error> {
use nexus_db_schema::schema::saga::dsl;
let conn = self.pool_connection_authorized(opctx).await?;
paginated(dsl::saga, dsl::id, pagparams)
.filter(
dsl::saga_state.eq_any(SagaState::RECOVERY_CANDIDATE_STATES),
)
.filter(dsl::current_sec.eq(sec_id))
.select(db::saga_types::Saga::as_select())
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}

/// many queries as needed (in batches) to get them all
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a stray line

/// Returns a list of unfinished sagas assigned to SEC `sec_id`, making as
/// many queries as needed (in batches) to get them all
pub async fn saga_list_recovery_candidates_batched(
Expand All @@ -142,25 +165,16 @@ impl DataStore {
SQL_BATCH_SIZE,
dropshot::PaginationOrder::Ascending,
);
let conn = self.pool_connection_authorized(opctx).await?;
while let Some(p) = paginator.next() {
use nexus_db_schema::schema::saga::dsl;

let mut batch =
paginated(dsl::saga, dsl::id, &p.current_pagparams())
.filter(
dsl::saga_state
.eq_any(SagaState::RECOVERY_CANDIDATE_STATES),
)
.filter(dsl::current_sec.eq(sec_id))
.select(db::saga_types::Saga::as_select())
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?;
let mut batch = self
.saga_list_recovery_candidates(
opctx,
sec_id,
&p.current_pagparams(),
)
.await?;

paginator = p.found_batch(&batch, &|row| row.id);
paginator = p.found_batch(&batch, &|row| row.id.0.0);
sagas.append(&mut batch);
}
Ok(sagas)
Expand Down
Loading
Loading