Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 103 additions & 16 deletions apps/api/src/routes/v1_analytics_getVerifications.happy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,35 @@ describe("with no data", () => {
describe.each([
// generate and query times are different to ensure the query covers the entire generate interval
// and the used toStartOf function in clickhouse
// NOTE: Using dynamic dates to avoid TTL deletion (table has 1 month TTL)
{
granularity: "hour",
generate: { start: "2024-12-05", end: "2024-12-07" },
query: { start: "2024-12-04", end: "2024-12-10" },
generateDaysAgo: { start: 5, end: 3 }, // 5 days ago to 3 days ago
queryDaysAgo: { start: 6, end: 0 }, // 6 days ago to now
},
{
granularity: "day",
generate: { start: "2024-12-05", end: "2024-12-07" },
query: { start: "2024-12-01", end: "2024-12-10" },
generateDaysAgo: { start: 5, end: 3 },
queryDaysAgo: { start: 9, end: 0 },
},
{
granularity: "month",
generate: { start: "2024-10-1", end: "2025-10-12" },
query: { start: "2023-12-01", end: "2026-12-10" },
generateDaysAgo: { start: 25, end: 5 },
queryDaysAgo: { start: 28, end: 0 },
},
])("per $granularity", (tc) => {
test("all verifications are accounted for", { timeout: 120_000 }, async (t) => {
const h = await IntegrationHarness.init(t);

const now = Date.now();
const generateStart = now - tc.generateDaysAgo.start * 24 * 60 * 60 * 1000;
const generateEnd = now - tc.generateDaysAgo.end * 24 * 60 * 60 * 1000;
const queryStart = now - tc.queryDaysAgo.start * 24 * 60 * 60 * 1000;
const queryEnd = now - tc.queryDaysAgo.end * 24 * 60 * 60 * 1000;

const verifications = generate({
start: new Date(tc.generate.start).getTime(),
end: new Date(tc.generate.end).getTime(),
start: generateStart,
end: generateEnd,
length: 10_000,
workspaceId: h.resources.userWorkspace.id,
keySpaceId: h.resources.userKeyAuth.id,
Expand All @@ -73,12 +80,13 @@ describe.each([

const inserted = await h.ch.querier.query({
query:
"SELECT COUNT(*) AS count from verifications.raw_key_verifications_v1 WHERE workspace_id={workspaceId:String}",
"SELECT COUNT(*) AS count from default.key_verifications_raw_v2 WHERE workspace_id={workspaceId:String}",
params: z.object({ workspaceId: z.string() }),
schema: z.object({ count: z.number() }),
})({
workspaceId: h.resources.userWorkspace.id,
});

expect(inserted.err).toEqual(undefined);
expect(inserted.val!.at(0)?.count).toEqual(verifications.length);

Expand All @@ -87,8 +95,8 @@ describe.each([
const res = await h.get<V1AnalyticsGetVerificationsResponse>({
url: "/v1/analytics.getVerifications",
searchparams: {
start: new Date(tc.query.start).getTime().toString(),
end: new Date(tc.query.end).getTime().toString(),
start: queryStart.toString(),
end: queryEnd.toString(),
groupBy: tc.granularity,
apiId: h.resources.userApi.id,
},
Expand All @@ -99,18 +107,78 @@ describe.each([

expect(res.status, `expected 200, received: ${JSON.stringify(res, null, 2)}`).toBe(200);

// For month/day granularity with Date columns, only count data from buckets
// that fall within the query date range (ignoring time component)
const queryStartDate = new Date(queryStart);
const queryEndDate = new Date(queryEnd);
const shouldCountVerification = (v: any) => {
if (tc.granularity === "month") {
// Get the bucket (month start) for this verification
const vDate = new Date(v.time);
const bucketDate = new Date(Date.UTC(vDate.getUTCFullYear(), vDate.getUTCMonth(), 1));

// Compare bucket date against query range (as Date, no time)
const startDateOnly = new Date(
Date.UTC(
queryStartDate.getUTCFullYear(),
queryStartDate.getUTCMonth(),
queryStartDate.getUTCDate(),
),
);
const endDateOnly = new Date(
Date.UTC(
queryEndDate.getUTCFullYear(),
queryEndDate.getUTCMonth(),
queryEndDate.getUTCDate(),
),
);

return bucketDate >= startDateOnly && bucketDate <= endDateOnly;
}
if (tc.granularity === "day") {
// Get the bucket (day start) for this verification
const vDate = new Date(v.time);
const bucketDate = new Date(
Date.UTC(vDate.getUTCFullYear(), vDate.getUTCMonth(), vDate.getUTCDate()),
);

const startDateOnly = new Date(
Date.UTC(
queryStartDate.getUTCFullYear(),
queryStartDate.getUTCMonth(),
queryStartDate.getUTCDate(),
),
);
const endDateOnly = new Date(
Date.UTC(
queryEndDate.getUTCFullYear(),
queryEndDate.getUTCMonth(),
queryEndDate.getUTCDate(),
),
);

return bucketDate >= startDateOnly && bucketDate <= endDateOnly;
}
return true; // For hour granularity, count all
};

let expectedTotal = 0;
const outcomes = verifications.reduce(
(acc, v) => {
if (!shouldCountVerification(v)) {
return acc;
}
if (!acc[v.outcome]) {
acc[v.outcome] = 0;
}
acc[v.outcome]++;
expectedTotal++;
return acc;
},
{} as { [K in (typeof POSSIBLE_OUTCOMES)[number]]: number },
);

expect(res.body.reduce((sum, d) => sum + d.total, 0)).toEqual(verifications.length);
expect(res.body.reduce((sum, d) => sum + d.total, 0)).toEqual(expectedTotal);
expect(res.body.reduce((sum, d) => sum + (d.valid ?? 0), 0)).toEqual(outcomes.VALID);
expect(res.body.reduce((sum, d) => sum + (d.notFound ?? 0), 0)).toEqual(0);
expect(res.body.reduce((sum, d) => sum + (d.forbidden ?? 0), 0)).toEqual(0);
Expand Down Expand Up @@ -420,13 +488,32 @@ describe("RFC scenarios", () => {

expect(res.body.length).lte(2);
expect(res.body.length).gte(1);

// With Date columns, the API compares Date('start') to Date(bucket)
// This includes all months where Date(month_start) falls between Date(start) and Date(end)
const startDate = new Date(start);
const endDate = new Date(end);

let total = 0;
const outcomes = verifications.reduce(
(acc, v) => {
if (
v.identity_id !== identity.id ||
new Date(v.time).getUTCMonth() !== new Date(now).getUTCMonth()
) {
if (v.identity_id !== identity.id) {
return acc;
}

// Check if verification date (as Date, no time) falls within range
const vDate = new Date(v.time);
const vDateOnly = new Date(
Date.UTC(vDate.getUTCFullYear(), vDate.getUTCMonth(), vDate.getUTCDate()),
);
const startDateOnly = new Date(
Date.UTC(startDate.getUTCFullYear(), startDate.getUTCMonth(), startDate.getUTCDate()),
);
const endDateOnly = new Date(
Date.UTC(endDate.getUTCFullYear(), endDate.getUTCMonth(), endDate.getUTCDate()),
);

if (vDateOnly < startDateOnly || vDateOnly > endDateOnly) {
return acc;
}

Expand Down
26 changes: 17 additions & 9 deletions apps/api/src/routes/v1_analytics_getVerifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,24 @@ export const registerV1AnalyticsGetVerifications = (app: App) =>

const tables = {
hour: {
name: "verifications.key_verifications_per_hour_v3",
name: "default.key_verifications_per_hour_v2",
fill: `WITH FILL
FROM toStartOfHour(fromUnixTimestamp64Milli({ start: Int64 }))
TO toStartOfHour(fromUnixTimestamp64Milli({ end: Int64 }))
STEP INTERVAL 1 HOUR`,
},
day: {
name: "verifications.key_verifications_per_day_v3",
name: "default.key_verifications_per_day_v2",
fill: `WITH FILL
FROM toStartOfDay(fromUnixTimestamp64Milli({ start: Int64 }))
TO toStartOfDay(fromUnixTimestamp64Milli({ end: Int64 }))
FROM toDate(toStartOfDay(fromUnixTimestamp64Milli({ start: Int64 })))
TO toDate(toStartOfDay(fromUnixTimestamp64Milli({ end: Int64 })))
STEP INTERVAL 1 DAY`,
},
month: {
name: "verifications.key_verifications_per_month_v3",
name: "default.key_verifications_per_month_v2",
fill: `WITH FILL
FROM toDateTime(toStartOfMonth(fromUnixTimestamp64Milli({ start: Int64 })))
TO toDateTime(toStartOfMonth(fromUnixTimestamp64Milli({ end: Int64 })))
FROM toDate(toStartOfMonth(fromUnixTimestamp64Milli({ start: Int64 })))
TO toDate(toStartOfMonth(fromUnixTimestamp64Milli({ end: Int64 })))
STEP INTERVAL 1 MONTH`,
},
} as const;
Expand Down Expand Up @@ -389,8 +389,16 @@ STEP INTERVAL 1 MONTH`,
if (filteredKeyIds && filteredKeyIds.length > 0) {
where.push("AND key_id IN {keyIds:Array(String)}");
}
where.push("AND time >= fromUnixTimestamp64Milli({start:Int64})");
where.push("AND time <= fromUnixTimestamp64Milli({end:Int64})");

// For month and day tables, the time column is Date not DateTime
// Convert the timestamp to Date for proper comparison
if (table === tables.month || table === tables.day) {
where.push("AND time >= toDate(fromUnixTimestamp64Milli({start:Int64}))");
where.push("AND time <= toDate(fromUnixTimestamp64Milli({end:Int64}))");
} else {
where.push("AND time >= fromUnixTimestamp64Milli({start:Int64})");
where.push("AND time <= fromUnixTimestamp64Milli({end:Int64})");
}

const query: string[] = [];
query.push(`SELECT ${[...new Set(select)].join(", ")}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ Users query against friendly table names that map to actual ClickHouse tables:

```go
TableAliases: map[string]string{
"key_verifications": "default.key_verifications_raw_v2",
"key_verifications_per_minute": "default.key_verifications_per_minute_v2",
"key_verifications_per_hour": "default.key_verifications_per_hour_v2",
"key_verifications_per_day": "default.key_verifications_per_day_v2",
"key_verifications_per_month": "default.key_verifications_per_month_v2",
"key_verifications_v1": "default.key_verifications_raw_v2",
"key_verifications_per_minute_v1": "default.key_verifications_per_minute_v2",
"key_verifications_per_hour_v1": "default.key_verifications_per_hour_v2",
"key_verifications_per_day_v1": "default.key_verifications_per_day_v2",
"key_verifications_per_month_v1": "default.key_verifications_per_month_v2",
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ If you need to understand or optimize a query:
1. Use the client's query method directly for custom queries:
```typescript
const result = await ch.querier.query({
query: `SELECT count() FROM verifications.raw_key_verifications_v1 WHERE workspace_id = {workspaceId: String}`,
query: `SELECT count() FROM raw_key_verifications_raw_v2 WHERE workspace_id = {workspaceId: String}`,
params: z.object({ workspaceId: z.string() }),
schema: z.object({ count: z.number() })
})({ workspaceId: "ws_123" });
Expand All @@ -179,7 +179,7 @@ If you need to understand or optimize a query:
2. Check performance with `EXPLAIN`:
```typescript
const explain = await ch.querier.query({
query: `EXPLAIN SELECT * FROM verifications.raw_key_verifications_v1 WHERE workspace_id = {workspaceId: String}`,
query: `EXPLAIN SELECT * FROM raw_key_verifications_raw_v2 WHERE workspace_id = {workspaceId: String}`,
params: z.object({ workspaceId: z.string() }),
schema: z.object({ explain: z.string() })
})({ workspaceId: "ws_123" });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ This ensures all values are properly escaped by ClickHouse's query engine.

### SQL Performance - ClickHouse

When querying ClickHouse, we have to be careful about what we are joining or querying since that particular data might be huge. For example, not handling queries to metrics.raw_api_requests_v1 properly can be quite problematic.
When querying ClickHouse, we have to be careful about what we are joining or querying since that particular data might be huge. For example, not handling queries to api_requests_raw_v2 properly can be quite problematic.

To improve query performance:
- Filter data as early as possible in the query
Expand All @@ -163,8 +163,8 @@ To improve query performance:
r.identifier,
r.passed,
m.host,
FROM ratelimits.raw_ratelimits_v1 r
LEFT JOIN metrics.raw_api_requests_v1 m ON
FROM ratelimits_raw_v2 r
LEFT JOIN api_requests_raw_v2 m ON
r.request_id = m.request_id
WHERE r.workspace_id = {workspaceId: String}
AND r.namespace_id = {namespaceId: String}
Expand All @@ -177,8 +177,8 @@ To improve query performance:
LIMIT {limit: Int}
````

Imagine this query: we are trying to get ratelimit details by joining with raw_api_requests_v1.
Since raw_api_requests_v1 might be huge, the query can take a long time to execute or consume too much memory.
Imagine this query: we are trying to get ratelimit details by joining with api_requests_raw_v2.
Since api_requests_raw_v2 might be huge, the query can take a long time to execute or consume too much memory.
To optimize this, we should first filter as much as possible, with time and workspace_id being the most important factors.
We can make the query much faster by eliminating unrelated workspaceId's and irrelevant time ranges.

Expand All @@ -192,7 +192,7 @@ WITH filtered_ratelimits AS (
namespace_id,
identifier,
toUInt8(passed) as status
FROM ratelimits.raw_ratelimits_v1 r
FROM ratelimits_raw_v2 r
WHERE workspace_id = {workspaceId: String}
AND namespace_id = {namespaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
Expand All @@ -212,7 +212,7 @@ SELECT
m.host,
FROM filtered_ratelimits fr
LEFT JOIN (
SELECT * FROM metrics.raw_api_requests_v1
SELECT * FROM api_requests_raw_v2
-- Those two filters are doing the heavy lifting now
WHERE workspace_id = {workspaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
Expand All @@ -228,19 +228,19 @@ Finally, don't forget about adding indexes for our filters. While this step is o
-- Composite index for workspace + time filtering
-- Most effective when filtering workspace_id first, then time
-- MINMAX type creates a sparse index with min/max values
ALTER TABLE ratelimits.raw_ratelimits_v1
ALTER TABLE ratelimits_raw_v2
ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1;

-- Single-column index for JOIN operations
-- Speeds up request_id matching between tables
-- GRANULARITY 1 means finest possible index precision
ALTER TABLE ratelimits.raw_ratelimits_v1
ALTER TABLE ratelimits_raw_v2
ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1;

-- Same indexes on metrics table to optimize both sides of JOIN
ALTER TABLE metrics.raw_api_requests_v1
ALTER TABLE api_requests_raw_v2
ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1;
ALTER TABLE metrics.raw_api_requests_v1
ALTER TABLE api_requests_raw_v2
ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1;
```

Expand Down
4 changes: 2 additions & 2 deletions go/apps/api/integration/multi_node_ratelimiting/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func RunRateLimitTest(
data, selectErr := clickhouse.Select[aggregatedCounts](
ctx,
h.CH.Conn(),
`SELECT count(*) as total_requests, countIf(passed > 0) as success_count, countIf(passed = 0) as failure_count FROM ratelimits.raw_ratelimits_v1 WHERE workspace_id = {workspace_id:String} AND namespace_id = {namespace_id:String}`,
`SELECT count(*) as total_requests, countIf(passed > 0) as success_count, countIf(passed = 0) as failure_count FROM default.ratelimits_raw_v2 WHERE workspace_id = {workspace_id:String} AND namespace_id = {namespace_id:String}`,
map[string]string{
"workspace_id": h.Resources().UserWorkspace.ID,
"namespace_id": namespaceID,
Expand All @@ -205,7 +205,7 @@ func RunRateLimitTest(

metricsCount := uint64(0)
uniqueCount := uint64(0)
row := h.CH.Conn().QueryRow(ctx, fmt.Sprintf(`SELECT count(*) as total_requests, count(DISTINCT request_id) as unique_requests FROM metrics.raw_api_requests_v1 WHERE workspace_id = '%s';`, h.Resources().UserWorkspace.ID))
row := h.CH.Conn().QueryRow(ctx, fmt.Sprintf(`SELECT count(*) as total_requests, count(DISTINCT request_id) as unique_requests FROM default.api_requests_raw_v2 WHERE workspace_id = '%s';`, h.Resources().UserWorkspace.ID))

err = row.Scan(&metricsCount, &uniqueCount)

Expand Down
4 changes: 2 additions & 2 deletions go/apps/api/integration/multi_node_usagelimiting/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func RunUsageLimitTest(
data, selectErr := clickhouse.Select[aggregatedCounts](
ctx,
h.CH.Conn(),
`SELECT count(*) as total_requests, countIf(outcome = 'VALID') as success_count, countIf(outcome = 'USAGE_EXCEEDED') as failure_count FROM verifications.raw_key_verifications_v1 WHERE workspace_id = {workspace_id:String} AND key_id = {key_id:String}`,
`SELECT count(*) as total_requests, countIf(outcome = 'VALID') as success_count, countIf(outcome = 'USAGE_EXCEEDED') as failure_count FROM default.key_verifications_raw_v2 WHERE workspace_id = {workspace_id:String} AND key_id = {key_id:String}`,
map[string]string{
"workspace_id": h.Resources().UserWorkspace.ID,
"key_id": keyResponse.KeyID,
Expand All @@ -203,7 +203,7 @@ func RunUsageLimitTest(
require.Eventually(t, func() bool {
metricsCount := uint64(0)
uniqueCount := uint64(0)
row := h.CH.Conn().QueryRow(ctx, fmt.Sprintf(`SELECT count(*) as total_requests, count(DISTINCT request_id) as unique_requests FROM metrics.raw_api_requests_v1 WHERE workspace_id = '%s';`, h.Resources().UserWorkspace.ID))
row := h.CH.Conn().QueryRow(ctx, fmt.Sprintf(`SELECT count(*) as total_requests, count(DISTINCT request_id) as unique_requests FROM default.api_requests_raw_v2 WHERE workspace_id = '%s';`, h.Resources().UserWorkspace.ID))

err := row.Scan(&metricsCount, &uniqueCount)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions go/apps/api/routes/v2_ratelimit_limit/200_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func TestLimitSuccessfully(t *testing.T) {
res := testutil.CallRoute[handler.Request, handler.Response](h, route, headers, req)
require.Equal(t, 200, res.Status, "expected 200, received: %v", res.Body)

row := schema.RatelimitRequestV1{}
row := schema.RatelimitV2{}
require.Eventually(t, func() bool {

data, err := clickhouse.Select[schema.RatelimitRequestV1](
data, err := clickhouse.Select[schema.RatelimitV2](
ctx,
h.ClickHouse.Conn(),
"SELECT * FROM ratelimits.raw_ratelimits_v1 WHERE workspace_id = {workspace_id:String} AND namespace_id = {namespace_id:String}",
"SELECT * FROM default.ratelimits_raw_v2 WHERE workspace_id = {workspace_id:String} AND namespace_id = {namespace_id:String}",
map[string]string{
"workspace_id": h.Resources().UserWorkspace.ID,
"namespace_id": namespaceID,
Expand Down
Loading