Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flowctl: Batch draft spec upserts, rather than incorrectly attempting to paginate the response #1629

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ mod test;
use crate::{
api_exec, api_exec_paginated, controlplane,
output::{to_table_row, CliOutput, JsonCell},
pagination::into_items,
};
use anyhow::Context;
use futures::{
stream::{FuturesUnordered, StreamExt},
TryStreamExt,
};
use futures::stream::{FuturesUnordered, StreamExt};
use itertools::Itertools;
use models::{CatalogType, RawValue};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -262,7 +258,10 @@ where
// not the original builder. And also to clarify that we're not
// moving `batch` into the async block.
let builder = builder.clone().in_("catalog_name", batch);
async move { into_items::<T>(builder).try_collect::<Vec<T>>().await }
async move {
// No need for pagination because we're paginating the inputs.
api_exec::<Vec<T>>(builder).await
}
})
.collect::<FuturesUnordered<_>>();
let mut rows = Vec::with_capacity(list.name_selector.name.len());
Expand Down Expand Up @@ -596,7 +595,7 @@ async fn do_draft(
};
tracing::debug!(?draft_spec, "inserting draft");

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
let rows: Vec<SpecSummaryItem> = api_exec(
ctx.controlplane_client()
.await?
.from("draft_specs")
Expand Down
96 changes: 48 additions & 48 deletions crates/flowctl/src/draft/author.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{api_exec, api_exec_paginated, catalog::SpecSummaryItem, controlplane, local_specs};
use crate::{api_exec, catalog::SpecSummaryItem, controlplane, local_specs};
use anyhow::Context;
use futures::{stream::FuturesOrdered, StreamExt};
use serde::Serialize;

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -46,83 +47,82 @@ pub async fn upsert_draft_specs(
expect_pub_id: Option<models::Id>,
}

let mut body: Vec<u8> = Vec::new();
body.push('[' as u8);
// Serialize DraftSpecs directly to JSON without going through
// serde_json::Value in order to avoid re-ordering fields which
// breaks sops hmac hashes.
let mut draft_specs: Vec<String> = vec![];

for row in collections.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_specs.push(
serde_json::to_string(&DraftSpec {
draft_id,
catalog_name: row.collection.to_string(),
spec_type: "collection",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
})
.unwrap(),
);
}
for row in captures.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_specs.push(
serde_json::to_string(&DraftSpec {
draft_id,
catalog_name: row.capture.to_string(),
spec_type: "capture",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
})
.unwrap(),
);
}
for row in materializations.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_specs.push(
serde_json::to_string(&DraftSpec {
draft_id,
catalog_name: row.materialization.to_string(),
spec_type: "materialization",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
})
.unwrap(),
);
}
for row in tests.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_specs.push(
serde_json::to_string(&DraftSpec {
draft_id,
catalog_name: row.test.to_string(),
spec_type: "test",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
})
.unwrap(),
);
}

const BATCH_SIZE: usize = 100;

// Upsert draft specs in batches
let mut futures = draft_specs
.chunks(BATCH_SIZE)
.map(|batch| {
let builder = client
.clone()
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(format!("[{}]", batch.join(",")))
.on_conflict("draft_id,catalog_name");
async move { api_exec::<Vec<SpecSummaryItem>>(builder).await }
})
.collect::<FuturesOrdered<_>>();

let mut rows = Vec::new();

while let Some(result) = futures.next().await {
rows.extend(result.context("executing live_specs_ext fetch")?);
}
jshearer marked this conversation as resolved.
Show resolved Hide resolved
body.push(']' as u8);

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
client
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(String::from_utf8(body).expect("serialized JSON is always UTF-8"))
.on_conflict("draft_id,catalog_name"),
)
.await?;
Ok(rows)
}

Expand Down
4 changes: 3 additions & 1 deletion crates/flowctl/src/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ where
) -> PageTurnerOutput<Self, PaginationRequest> {
let resp: Vec<Item> = api_exec::<Vec<Item>>(request.builder.clone()).await?;

if resp.len() == request.page_size
// Sometimes, it seems, we can get back more than the requested page size.
// So far I've only seen this on a request of 1,000 and a response of 1,001.
if resp.len() >= request.page_size
// If the original builder had a limit set to the same value as page_size
// this ensures that we stop right at the limit, instead of issuing an extra
// request for 0 rows.
Expand Down
Loading