Skip to content

Commit

Permalink
flowctl: Batch draft spec upserts, rather than incorrectly attempting…
Browse files Browse the repository at this point in the history
… to paginate the response

Also revert `--name` back to its previous behavior since pagination doesn't make sense there.
  • Loading branch information
jshearer committed Sep 16, 2024
1 parent f618926 commit 1df9e8f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 70 deletions.
7 changes: 5 additions & 2 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ where
// not the original builder. And also to clarify that we're not
// moving `batch` into the async block.
let builder = builder.clone().in_("catalog_name", batch);
async move { into_items::<T>(builder).try_collect::<Vec<T>>().await }
async move {
// No need for pagination because we're paginating the inputs.
api_exec::<Vec<T>>(builder).await
}
})
.collect::<FuturesUnordered<_>>();
let mut rows = Vec::with_capacity(list.name_selector.name.len());
Expand Down Expand Up @@ -592,7 +595,7 @@ async fn do_draft(
};
tracing::debug!(?draft_spec, "inserting draft");

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
let rows: Vec<SpecSummaryItem> = api_exec(
ctx.controlplane_client()
.await?
.from("draft_specs")
Expand Down
121 changes: 53 additions & 68 deletions crates/flowctl/src/draft/author.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{api_exec, api_exec_paginated, catalog::SpecSummaryItem, controlplane, local_specs};
use crate::{api_exec, catalog::SpecSummaryItem, controlplane, local_specs};
use anyhow::Context;
use futures::{stream::FuturesUnordered, StreamExt};
use serde::Serialize;

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -46,83 +47,67 @@ pub async fn upsert_draft_specs(
expect_pub_id: Option<models::Id>,
}

let mut body: Vec<u8> = Vec::new();
body.push('[' as u8);
let mut draft_specs: Vec<serde_json::Value> = vec![];

for row in collections.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_id,
catalog_name: row.collection.to_string(),
spec_type: "collection",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
draft_specs.push(serde_json::to_value(DraftSpec {
draft_id,
catalog_name: row.collection.to_string(),
spec_type: "collection",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
})?);
}
for row in captures.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_id,
catalog_name: row.capture.to_string(),
spec_type: "capture",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
draft_specs.push(serde_json::to_value(DraftSpec {
draft_id,
catalog_name: row.capture.to_string(),
spec_type: "capture",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
})?);
}
for row in materializations.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_id,
catalog_name: row.materialization.to_string(),
spec_type: "materialization",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
draft_specs.push(serde_json::to_value(DraftSpec {
draft_id,
catalog_name: row.materialization.to_string(),
spec_type: "materialization",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
})?);
}
for row in tests.iter() {
if body.len() != 1 {
body.push(',' as u8);
}
serde_json::to_writer(
&mut body,
&DraftSpec {
draft_id,
catalog_name: row.test.to_string(),
spec_type: "test",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
},
)
.unwrap();
draft_specs.push(serde_json::to_value(DraftSpec {
draft_id,
catalog_name: row.test.to_string(),
spec_type: "test",
spec: &row.model,
expect_pub_id: row.expect_pub_id,
})?);
}

const BATCH_SIZE: usize = 100;

// Upsert draft specs in batches
let mut futures = draft_specs
.chunks(BATCH_SIZE)
.map(|batch| {
let builder = client
.clone()
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(serde_json::to_string(&batch).expect("serializing draft specs"))
.on_conflict("draft_id,catalog_name");
async move { api_exec::<Vec<SpecSummaryItem>>(builder).await }
})
.collect::<FuturesUnordered<_>>();

let mut rows = Vec::new();

while let Some(result) = futures.next().await {
rows.extend(result.context("executing live_specs_ext fetch")?);
}
body.push(']' as u8);

let rows: Vec<SpecSummaryItem> = api_exec_paginated(
client
.from("draft_specs")
.select("catalog_name,spec_type")
.upsert(String::from_utf8(body).expect("serialized JSON is always UTF-8"))
.on_conflict("draft_id,catalog_name"),
)
.await?;
Ok(rows)
}

Expand Down

0 comments on commit 1df9e8f

Please sign in to comment.