diff --git a/pkg/clickhouse/migrations/20260129000000.sql b/pkg/clickhouse/migrations/20260129000000.sql new file mode 100644 index 0000000000..379a227d46 --- /dev/null +++ b/pkg/clickhouse/migrations/20260129000000.sql @@ -0,0 +1,30 @@ +-- Create "keys_last_used_v1" table with AggregatingMergeTree for pre-aggregated data +CREATE TABLE IF NOT EXISTS `default`.`key_last_used_v1` ( + `workspace_id` String, + `key_space_id` String, + `key_id` String, + `identity_id` String, + `time` SimpleAggregateFunction(max, Int64), + `request_id` SimpleAggregateFunction(anyLast, String), + `outcome` SimpleAggregateFunction(anyLast, LowCardinality(String)), + `tags` SimpleAggregateFunction(anyLast, Array(String)) +) ENGINE = AggregatingMergeTree() +ORDER BY (`workspace_id`, `key_space_id`, `key_id`) +TTL toDateTime(time / 1000) + INTERVAL 90 DAY +SETTINGS index_granularity = 8192; + +-- Create "keys_last_used_mv_v1" materialized view that pre-aggregates per key +CREATE MATERIALIZED VIEW IF NOT EXISTS `default`.`key_last_used_mv_v1` +TO `default`.`key_last_used_v1` +AS +SELECT + workspace_id, + key_space_id, + key_id, + anyLast(identity_id) as identity_id, + max(time) as time, + anyLast(request_id) as request_id, + anyLast(outcome) as outcome, + anyLast(tags) as tags +FROM `default`.`key_verifications_raw_v2` +GROUP BY workspace_id, key_space_id, key_id; diff --git a/pkg/clickhouse/schema/024_keys_last_used_v1.sql b/pkg/clickhouse/schema/024_keys_last_used_v1.sql new file mode 100644 index 0000000000..c755fdb4a5 --- /dev/null +++ b/pkg/clickhouse/schema/024_keys_last_used_v1.sql @@ -0,0 +1,37 @@ +-- Materialized view to track the last verification time for each key and identity +-- This dramatically improves query performance for the dashboard's "API Requests" page +-- +-- IMPORTANT: Stores ONE row per key (latest verification regardless of outcome). +-- Uses AggregatingMergeTree for automatic aggregation during merges. +-- Can be queried by key_id OR identity_id for flexible last-used tracking. + +-- Target table that stores the latest verification per key +CREATE TABLE IF NOT EXISTS `default`.`key_last_used_v1` ( + `workspace_id` String, + `key_space_id` String, + `key_id` String, + `identity_id` String, + `time` SimpleAggregateFunction(max, Int64), + `request_id` SimpleAggregateFunction(anyLast, String), + `outcome` SimpleAggregateFunction(anyLast, LowCardinality(String)), + `tags` SimpleAggregateFunction(anyLast, Array(String)) +) ENGINE = AggregatingMergeTree() +ORDER BY (`workspace_id`, `key_space_id`, `key_id`) +TTL toDateTime(time / 1000) + INTERVAL 90 DAY +SETTINGS index_granularity = 8192; + +-- Materialized view that automatically populates the table from new inserts +CREATE MATERIALIZED VIEW IF NOT EXISTS `default`.`key_last_used_mv_v1` +TO `default`.`key_last_used_v1` +AS +SELECT + workspace_id, + key_space_id, + key_id, + anyLast(identity_id) as identity_id, + max(time) as time, + anyLast(request_id) as request_id, + anyLast(outcome) as outcome, + anyLast(tags) as tags +FROM `default`.`key_verifications_raw_v2` +GROUP BY workspace_id, key_space_id, key_id; diff --git a/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/hooks/use-logs-query.ts b/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/hooks/use-logs-query.ts index c5d6ac96be..02aee0bb37 100644 --- a/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/hooks/use-logs-query.ts +++ b/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/hooks/use-logs-query.ts @@ -25,6 +25,11 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara const { queryTime: timestamp } = useQueryTime(); + // Check if user explicitly set a time frame filter + const hasTimeFrameFilter = useMemo(() => { + return filters.some((filter) => filter.field === "startTime" || filter.field === "endTime"); + }, [filters]); + const queryParams = useMemo(() => { const params: KeysQueryOverviewLogsPayload = { limit, @@ -38,6 +43,10 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara apiId, since: "", sorts: sorts.length > 0 ? sorts : null, + // Flag to indicate if user explicitly filtered by time frame + // If true, use new logic to find keys with ANY usage in the time frame + // If false or undefined, use the MV directly for speed + useTimeFrameFilter: hasTimeFrameFilter, }; filters.forEach((filter) => { @@ -119,7 +128,7 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara }); return params; - }, [filters, limit, timestamp, apiId, sorts]); + }, [filters, limit, timestamp, apiId, sorts, hasTimeFrameFilter]); // Main query for historical data const { diff --git a/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/query-logs.schema.ts b/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/query-logs.schema.ts index 888a97bd3a..2527a0f032 100644 --- a/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/query-logs.schema.ts +++ b/web/apps/dashboard/app/(app)/[workspaceSlug]/apis/[apiId]/_overview/components/table/query-logs.schema.ts @@ -12,6 +12,10 @@ export const keysQueryOverviewLogsPayload = z.object({ apiId: z.string(), since: z.string(), cursor: z.number().nullable().optional().nullable(), + // Flag to indicate if user explicitly filtered by time frame + // If true, use new logic to find keys with ANY usage in the time frame + // If false or undefined, use the MV directly for speed + useTimeFrameFilter: z.boolean().optional(), outcomes: z .array( z.object({ diff --git a/web/apps/dashboard/lib/trpc/routers/api/keys/query-overview-logs/index.ts b/web/apps/dashboard/lib/trpc/routers/api/keys/query-overview-logs/index.ts index 4b54245da6..0176ae3db7 100644 --- a/web/apps/dashboard/lib/trpc/routers/api/keys/query-overview-logs/index.ts +++ b/web/apps/dashboard/lib/trpc/routers/api/keys/query-overview-logs/index.ts @@ -23,6 +23,7 @@ type KeysOverviewLogsResponse = z.infer; */ export const queryKeysOverviewLogs = workspaceProcedure .use(withRatelimit(ratelimit.read)) + .meta({ skipBatch: true }) .input(keysQueryOverviewLogsPayload) .output(KeysOverviewLogsResponse) .query(async ({ ctx, input }) => { @@ -42,13 +43,18 @@ export const queryKeysOverviewLogs = workspaceProcedure cursorTime: input.cursor ?? null, workspaceId: ctx.workspace.id, keyspaceId: keyspaceId, + // Flag to indicate if user explicitly filtered by time frame + // If true, use new logic to find keys with ANY usage in the time frame + // If false or undefined, use the MV directly for speed + useTimeFrameFilter: input.useTimeFrameFilter ?? false, // Only include keyIds filters if explicitly provided in the input keyIds: input.keyIds ? transformedInputs.keyIds : null, // Pass tags to ClickHouse for filtering tags: transformedInputs.tags, // Nullify these as we'll filter in the database - names: null, - identities: null, + // Use nullish coalescing to properly handle empty arrays vs null + names: input.names ?? null, + identities: input.identities ?? null, }); if (!clickhouseResult || clickhouseResult.err) { diff --git a/web/internal/clickhouse/src/keys/keys.ts b/web/internal/clickhouse/src/keys/keys.ts index c960ba8a77..d56eded034 100644 --- a/web/internal/clickhouse/src/keys/keys.ts +++ b/web/internal/clickhouse/src/keys/keys.ts @@ -66,6 +66,7 @@ export const keysOverviewLogsParams = z.object({ }), ) .nullable(), + useTimeFrameFilter: z.boolean().optional(), }); export const roleSchema = z.object({ @@ -241,104 +242,180 @@ export function getKeysOverviewLogs(ch: Querier) { [...orderByWithoutTime, `time ${timeDirection}`].join(", ") || "time DESC"; // Fallback if empty // Create cursor condition based on time direction - let cursorCondition: string; + let havingCursorCondition: string; - // For first page or no cursor provided if (args.cursorTime) { // For subsequent pages, use cursor based on time direction if (timeDirection === "ASC") { - cursorCondition = ` - AND (time > {cursorTime: Nullable(UInt64)}) - `; + havingCursorCondition = "\n AND (last_time > {cursorTime: Nullable(UInt64)})"; } else { - cursorCondition = ` - AND (time < {cursorTime: Nullable(UInt64)}) - `; + havingCursorCondition = "\n AND (last_time < {cursorTime: Nullable(UInt64)})"; } } else { - cursorCondition = ` - AND ({cursorTime: Nullable(UInt64)} IS NULL) - `; + havingCursorCondition = ""; } + // Detect if this is a rolling/relative time window (last X hours/days) + // vs an explicit historical range + const now = Date.now(); + // If user explicitly filtered by time, use historical path to find ALL keys with activity in window + // Otherwise use MV fast path for recent "last used" data + const isRollingWindow = !args.useTimeFrameFilter && args.endTime >= now - 5 * 60 * 1000; + const extendedParamsSchema = keysOverviewLogsParams.extend(paramSchemaExtension); - const query = ch.query({ - query: ` -WITH - -- First CTE: Filter raw verification records based on conditions from client - filtered_keys AS ( + + // Build top_keys CTE based on query type + const topKeysCTE = isRollingWindow + ? `top_keys AS ( SELECT - request_id, - time, key_id, - tags, - outcome - FROM default.key_verifications_raw_v2 + workspace_id, + key_space_id, + max(time) as last_time, + anyLast(request_id) as last_request_id, + anyLast(tags) as last_tags + FROM default.key_last_used_v1 WHERE workspace_id = {workspaceId: String} AND key_space_id = {keyspaceId: String} - AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} - -- Apply dynamic key ID filtering (equals or contains) AND (${keyIdConditions}) - -- Apply dynamic outcome filtering + AND time >= {startTime: UInt64} + AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} + GROUP BY key_id, workspace_id, key_space_id + HAVING last_time > 0${havingCursorCondition} + ORDER BY last_time ${timeDirection} + LIMIT {limit: Int} + )` + : `top_keys AS ( + SELECT + key_id, + workspace_id, + key_space_id, + max(time) as last_time, + anyLast(request_id) as last_request_id, + anyLast(tags) as last_tags + FROM ( + -- Get activity from hourly aggregates (complete hours) + SELECT + key_id, + workspace_id, + key_space_id, + toInt64(toUnixTimestamp(time) * 1000) as time, + '' as request_id, + tags + FROM default.key_verifications_per_hour_v2 + WHERE workspace_id = {workspaceId: String} + AND key_space_id = {keyspaceId: String} + AND time BETWEEN toDateTime(fromUnixTimestamp64Milli({startTime: UInt64})) + AND toDateTime(fromUnixTimestamp64Milli({endTime: UInt64})) + AND (${keyIdConditions}) + + UNION ALL + + -- Get activity from raw table (current incomplete hour) + SELECT + key_id, + workspace_id, + key_space_id, + time, + request_id, + tags + FROM default.key_verifications_raw_v2 + WHERE workspace_id = {workspaceId: String} + AND key_space_id = {keyspaceId: String} + AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} + AND (${keyIdConditions}) + ) + GROUP BY key_id, workspace_id, key_space_id + HAVING last_time > 0 + ${havingCursorCondition} + ORDER BY last_time ${timeDirection} + LIMIT {limit: Int} + )`; + + const query = ch.query({ + query: ` +WITH + ${topKeysCTE}, + -- Second CTE: Get counts from hourly table (complete hours only) + hourly_counts AS ( + SELECT + h.key_id, + h.outcome, + toUInt64(sum(h.count)) as count + FROM default.key_verifications_per_hour_v2 h + INNER JOIN top_keys t ON h.key_id = t.key_id + WHERE h.workspace_id = {workspaceId: String} + AND h.key_space_id = {keyspaceId: String} + AND h.time BETWEEN toDateTime(fromUnixTimestamp64Milli({startTime: UInt64})) + AND toDateTime(fromUnixTimestamp64Milli({endTime: UInt64})) + AND h.time < toStartOfHour(now()) -- Only complete hours + AND (${outcomeCondition}) + AND (${tagConditions}) + GROUP BY h.key_id, h.outcome + ), + -- Third CTE: Get counts from raw table for current incomplete hour + recent_counts AS ( + SELECT + v.key_id, + v.outcome, + toUInt64(count(*)) as count + FROM default.key_verifications_raw_v2 v + INNER JOIN top_keys t ON v.key_id = t.key_id + WHERE v.workspace_id = {workspaceId: String} + AND v.key_space_id = {keyspaceId: String} + AND v.time >= toUnixTimestamp(toStartOfHour(now())) * 1000 + AND v.time BETWEEN {startTime: UInt64} AND {endTime: UInt64} AND (${outcomeCondition}) - -- Apply dynamic tag filtering AND (${tagConditions}) - -- Handle pagination using only time as cursor - ${cursorCondition} + GROUP BY v.key_id, v.outcome ), - -- Second CTE: Calculate per-key aggregated metrics - -- This groups all verifications by key_id to get summary counts and most recent activity - aggregated_data AS ( + -- Fourth CTE: Combine hourly and recent counts + combined_counts AS ( + SELECT key_id, outcome, count FROM hourly_counts + UNION ALL + SELECT key_id, outcome, count FROM recent_counts + ), + -- Fifth CTE: Aggregate combined counts + aggregated_counts AS ( SELECT key_id, - -- Find the timestamp of the latest verification for this key - max(time) as last_request_time, - -- Get the request_id of the latest verification (based on time) - argMax(request_id, time) as last_request_id, - -- Get the tags from the latest verification (based on time) - argMax(tags, time) as tags, - -- Count valid verifications - countIf(outcome = 'VALID') as valid_count, - -- Count all non-valid verifications - countIf(outcome != 'VALID') as error_count - FROM filtered_keys + sumIf(count, outcome = 'VALID') as valid_count, + sumIf(count, outcome != 'VALID') as error_count + FROM combined_counts GROUP BY key_id ), - -- Third CTE: Build detailed outcome distribution - -- This provides a breakdown of the exact counts for each outcome type + -- Sixth CTE: Build outcome distribution outcome_counts AS ( SELECT key_id, outcome, - -- Convert to UInt32 for consistency - toUInt32(count(*)) as count - FROM filtered_keys + toUInt32(sum(count)) as count + FROM combined_counts GROUP BY key_id, outcome ) - -- Main query: Join the aggregated data with detailed outcome counts + -- Main query: Join metadata from MV with aggregated counts SELECT - a.key_id, - a.last_request_time as time, - a.last_request_id as request_id, - a.tags, - a.valid_count, - a.error_count, - -- Create an array of tuples containing all outcomes and their counts - -- This will be transformed into an object in the application code - groupArray((o.outcome, o.count)) as outcome_counts_array - FROM aggregated_data a - LEFT JOIN outcome_counts o ON a.key_id = o.key_id - -- Group by all non-aggregated fields to allow the groupArray operation + t.key_id as key_id, + t.last_time as time, + t.last_request_id as request_id, + t.last_tags as tags, + COALESCE(a.valid_count, 0) as valid_count, + COALESCE(a.error_count, 0) as error_count, + arrayFilter(x -> tupleElement(x, 1) IS NOT NULL, + groupArray(tuple(o.outcome, o.count)) + ) as outcome_counts_array + FROM top_keys t + LEFT JOIN aggregated_counts a ON t.key_id = a.key_id + LEFT JOIN outcome_counts o ON t.key_id = o.key_id GROUP BY - a.key_id, - a.last_request_time, - a.last_request_id, - a.tags, + t.key_id, + t.last_time, + t.last_request_id, + t.last_tags, a.valid_count, a.error_count - -- Sort results with most recent verification first + HAVING COALESCE(a.valid_count, 0) > 0 OR COALESCE(a.error_count, 0) > 0 ORDER BY ${orderByClause} - -- Limit results for pagination LIMIT {limit: Int} `, params: extendedParamsSchema, diff --git a/web/internal/clickhouse/src/latest_verifications.ts b/web/internal/clickhouse/src/latest_verifications.ts index 6865a123b1..4500f98e97 100644 --- a/web/internal/clickhouse/src/latest_verifications.ts +++ b/web/internal/clickhouse/src/latest_verifications.ts @@ -18,9 +18,9 @@ export function getLatestVerifications(ch: Querier) { region, tags FROM default.key_verifications_raw_v2 - WHERE workspace_id = {workspaceId: String} + PREWHERE workspace_id = {workspaceId: String} AND key_space_id = {keySpaceId: String} - AND key_id = {keyId: String} + WHERE key_id = {keyId: String} ORDER BY time DESC LIMIT {limit: Int}`, params, diff --git a/web/internal/clickhouse/src/logs.ts b/web/internal/clickhouse/src/logs.ts index 6dabe2f39f..94d9b8ed48 100644 --- a/web/internal/clickhouse/src/logs.ts +++ b/web/internal/clickhouse/src/logs.ts @@ -73,12 +73,17 @@ export function getLogs(ch: Querier) { const extendedParamsSchema = getLogsClickhousePayload.extend(paramSchemaExtension); - const filterConditions = ` + // PREWHERE clause for indexed columns (workspace_id, time) + // This filters rows before reading other columns, dramatically reducing I/O + const prewhereConditions = ` workspace_id = {workspaceId: String} AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} + `; + // WHERE clause for non-indexed filters + const whereConditions = ` ---------- Apply request ID filter if present (highest priority) - AND ( + ( CASE WHEN length({requestIds: Array(String)}) > 0 THEN request_id IN {requestIds: Array(String)} @@ -142,7 +147,8 @@ export function getLogs(ch: Querier) { SELECT count(request_id) as total_count FROM default.api_requests_raw_v2 - WHERE ${filterConditions}`, + PREWHERE ${prewhereConditions} + WHERE ${whereConditions}`, params: extendedParamsSchema, schema: z.object({ total_count: z.int(), @@ -166,7 +172,8 @@ export function getLogs(ch: Querier) { error, service_latency FROM default.api_requests_raw_v2 - WHERE ${filterConditions} AND ({cursorTime: Nullable(UInt64)} IS NULL OR time < {cursorTime: Nullable(UInt64)}) + PREWHERE ${prewhereConditions} + WHERE ${whereConditions} AND ({cursorTime: Nullable(UInt64)} IS NULL OR time < {cursorTime: Nullable(UInt64)}) ORDER BY time DESC LIMIT {limit: Int}`, params: extendedParamsSchema, diff --git a/web/internal/clickhouse/src/verifications.ts b/web/internal/clickhouse/src/verifications.ts index ee579dfe51..22ea8c8915 100644 --- a/web/internal/clickhouse/src/verifications.ts +++ b/web/internal/clickhouse/src/verifications.ts @@ -132,12 +132,17 @@ export function getKeyDetailsLogs(ch: Querier) { const extendedParamsSchema = keyDetailsLogsParams.extend(paramSchemaExtension); - const baseConditions = ` + // PREWHERE clause for indexed columns + const prewhereConditions = ` workspace_id = {workspaceId: String} AND key_space_id = {keyspaceId: String} AND key_id = {keyId: String} AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} - AND (${tagCondition}) + `; + + // WHERE clause for non-indexed filters + const whereConditions = ` + (${tagCondition}) AND (${outcomeCondition}) `; @@ -147,7 +152,8 @@ export function getKeyDetailsLogs(ch: Querier) { SELECT count(request_id) as total_count FROM default.key_verifications_raw_v2 - WHERE ${baseConditions}`, + PREWHERE ${prewhereConditions} + WHERE ${whereConditions}`, params: extendedParamsSchema, schema: z.object({ total_count: z.int(), @@ -163,7 +169,8 @@ export function getKeyDetailsLogs(ch: Querier) { outcome, tags FROM default.key_verifications_raw_v2 - WHERE ${baseConditions} + PREWHERE ${prewhereConditions} + WHERE ${whereConditions} -- Handle pagination using time as cursor ${cursorCondition} ORDER BY time DESC @@ -304,10 +311,15 @@ export function getIdentityLogs(ch: Querier) { const extendedParamsSchema = identityLogsParams.extend(paramSchemaExtension); - const baseConditions = ` + // PREWHERE clause for indexed columns + const prewhereConditions = ` workspace_id = {workspaceId: String} - AND (${keyIdConditions}) AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} + `; + + // WHERE clause for non-indexed filters + const whereConditions = ` + (${keyIdConditions}) AND (${tagCondition}) AND (${outcomeCondition}) `; @@ -318,7 +330,8 @@ export function getIdentityLogs(ch: Querier) { SELECT count(request_id) as total_count FROM default.key_verifications_raw_v2 - WHERE ${baseConditions}`, + PREWHERE ${prewhereConditions} + WHERE ${whereConditions}`, params: extendedParamsSchema, schema: z.object({ total_count: z.int(), @@ -335,7 +348,8 @@ export function getIdentityLogs(ch: Querier) { tags, key_id as keyId FROM default.key_verifications_raw_v2 - WHERE ${baseConditions} + PREWHERE ${prewhereConditions} + WHERE ${whereConditions} ${cursorCondition} ORDER BY time DESC LIMIT {limit: Int}