Skip to content

Commit

Permalink
dekaf: refactor fetch_all_collection_names to use pagination
Browse files Browse the repository at this point in the history
It was running up against the query result limit and not returning all of the collections. Fortunately we already had a convenient paginated stream API ready to go!
  • Loading branch information
jshearer committed Sep 12, 2024
1 parent 076a8c1 commit 8c90141
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ doc = { path = "../doc" }
gazette = { path = "../gazette" }
labels = { path = "../labels" }
ops = { path = "../ops" }
flowctl = { path = "../flowctl" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
simd-doc = { path = "../simd-doc" }
Expand Down
22 changes: 10 additions & 12 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use futures::{StreamExt, TryStreamExt};
use gazette::{broker, journal, uuid};
use proto_flow::flow;

Expand All @@ -11,21 +12,18 @@ pub async fn fetch_all_collection_names(
struct Row {
catalog_name: String,
}
let rows: Vec<Row> = client
let rows_builder = client
.from("live_specs_ext")
.eq("spec_type", "collection")
.select("catalog_name")
.execute()
.select("catalog_name");

let items = flowctl::pagination::into_items::<Row>(rows_builder)
.map(|res| res.map(|Row { catalog_name }| catalog_name))
.try_collect()
.await
.and_then(|r| r.error_for_status())
.context("listing current catalog specifications")?
.json()
.await?;

Ok(rows
.into_iter()
.map(|Row { catalog_name }| catalog_name)
.collect())
.context("listing current catalog specifications")?;

Ok(items)
}

/// Collection is the assembled metadata of a collection being accessed as a Kafka topic.
Expand Down
5 changes: 2 additions & 3 deletions crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ mod generate;
mod local_specs;
mod ops;
mod output;
mod pagination;
pub mod pagination;
mod poll;
mod preview;
mod raw;

use output::{Output, OutputType};
use pagination::into_items;
use poll::poll_while_queued;

use crate::pagination::into_items;

/// A command-line tool for working with Estuary Flow.
#[derive(Debug, Parser)]
#[clap(
Expand Down
4 changes: 3 additions & 1 deletion crates/flowctl/src/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ where
) -> PageTurnerOutput<Self, PaginationRequest> {
let resp: Vec<Item> = api_exec::<Vec<Item>>(request.builder.clone()).await?;

if resp.len() == request.page_size
// Sometimes, it seems, we can get back more than the requested page size.
// So far I've only seen this on a request of 1,000 and a response of 1,001.
if resp.len() >= request.page_size
// If the original builder had a limit set to the same value as page_size
// this ensures that we stop right at the limit, instead of issuing an extra
// request for 0 rows.
Expand Down

0 comments on commit 8c90141

Please sign in to comment.