Skip to content

Commit

Permalink
flowctl: Batch draft spec upserts, rather than incorrectly attempting…
Browse files Browse the repository at this point in the history
… to paginate the response

Also revert `--name` back to its previous behavior since pagination doesn't make sense there.
  • Loading branch information
jshearer committed Sep 16, 2024
1 parent f618926 commit afacaea
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 39 deletions.
13 changes: 6 additions & 7 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ mod test;
use crate::{
api_exec, api_exec_paginated, controlplane,
output::{to_table_row, CliOutput, JsonCell},
pagination::into_items,
};
use anyhow::Context;
use futures::{
stream::{FuturesUnordered, StreamExt},
TryStreamExt,
};
use futures::stream::{FuturesUnordered, StreamExt};
use itertools::Itertools;
use models::{CatalogType, RawValue};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -262,7 +258,10 @@ where
// not the original builder. And also to clarify that we're not
// moving `batch` into the async block.
let builder = builder.clone().in_("catalog_name", batch);
async move { into_items::<T>(builder).try_collect::<Vec<T>>().await }
async move {
// No need for pagination because we're paginating the inputs.
api_exec::<Vec<T>>(builder).await
}
})
.collect::<FuturesUnordered<_>>();
let mut rows = Vec::with_capacity(list.name_selector.name.len());
Expand Down Expand Up @@ -592,7 +591,7 @@ async fn do_draft(
};
tracing::debug!(?draft_spec, "inserting draft");

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
let rows: Vec<SpecSummaryItem> = api_exec(
ctx.controlplane_client()
.await?
.from("draft_specs")
Expand Down
72 changes: 40 additions & 32 deletions crates/flowctl/src/draft/author.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{api_exec, api_exec_paginated, catalog::SpecSummaryItem, controlplane, local_specs};
use crate::{api_exec, catalog::SpecSummaryItem, controlplane, local_specs};
use anyhow::Context;
use futures::{stream::FuturesUnordered, StreamExt};
use serde::Serialize;

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -46,13 +47,13 @@ pub async fn upsert_draft_specs(
expect_pub_id: Option<models::Id>,
}

let mut body: Vec<u8> = Vec::new();
body.push('[' as u8);
// Serialize DraftSpecs directly to JSON without going through
// serde_json::Value in order to avoid re-ordering fields which
// breaks sops hmac hashes.
let mut draft_specs: Vec<String> = vec![];

for row in collections.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
let mut body = vec![];
serde_json::to_writer(
&mut body,
&DraftSpec {
Expand All @@ -62,13 +63,11 @@ pub async fn upsert_draft_specs(
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
)?;
draft_specs.push(String::from_utf8(body).expect("serialized JSON is always UTF-8"));
}
for row in captures.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
let mut body = vec![];
serde_json::to_writer(
&mut body,
&DraftSpec {
Expand All @@ -78,13 +77,11 @@ pub async fn upsert_draft_specs(
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
)?;
draft_specs.push(String::from_utf8(body).expect("serialized JSON is always UTF-8"));
}
for row in materializations.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
let mut body = vec![];
serde_json::to_writer(
&mut body,
&DraftSpec {
Expand All @@ -94,13 +91,11 @@ pub async fn upsert_draft_specs(
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
)?;
draft_specs.push(String::from_utf8(body).expect("serialized JSON is always UTF-8"));
}
for row in tests.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
let mut body = vec![];
serde_json::to_writer(
&mut body,
&DraftSpec {
Expand All @@ -110,19 +105,32 @@ pub async fn upsert_draft_specs(
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
)?;
draft_specs.push(String::from_utf8(body).expect("serialized JSON is always UTF-8"));
}

const BATCH_SIZE: usize = 100;

// Upsert draft specs in batches
let mut futures = draft_specs
.chunks(BATCH_SIZE)
.map(|batch| {
let builder = client
.clone()
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(format!("[{}]", batch.join(",")))
.on_conflict("draft_id,catalog_name");
async move { api_exec::<Vec<SpecSummaryItem>>(builder).await }
})
.collect::<FuturesUnordered<_>>();

let mut rows = Vec::new();

while let Some(result) = futures.next().await {
rows.extend(result.context("executing live_specs_ext fetch")?);
}
body.push(']' as u8);

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
client
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(String::from_utf8(body).expect("serialized JSON is always UTF-8"))
.on_conflict("draft_id,catalog_name"),
)
.await?;
Ok(rows)
}

Expand Down

0 comments on commit afacaea

Please sign in to comment.