Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/clickhouse/migrations/20260129000000.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Create "keys_last_used_v1" table with AggregatingMergeTree for pre-aggregated data
CREATE TABLE IF NOT EXISTS `default`.`key_last_used_v1` (
`workspace_id` String,
`key_space_id` String,
`key_id` String,
`identity_id` String,
`time` SimpleAggregateFunction(max, Int64),
`request_id` SimpleAggregateFunction(anyLast, String),
`outcome` SimpleAggregateFunction(anyLast, LowCardinality(String)),
`tags` SimpleAggregateFunction(anyLast, Array(String))
) ENGINE = AggregatingMergeTree()
ORDER BY (`workspace_id`, `key_space_id`, `key_id`)
TTL toDateTime(time / 1000) + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Create "keys_last_used_mv_v1" materialized view that pre-aggregates per key
CREATE MATERIALIZED VIEW IF NOT EXISTS `default`.`key_last_used_mv_v1`
TO `default`.`key_last_used_v1`
AS
SELECT
workspace_id,
key_space_id,
key_id,
anyLast(identity_id) as identity_id,
max(time) as time,
anyLast(request_id) as request_id,
anyLast(outcome) as outcome,
anyLast(tags) as tags
FROM `default`.`key_verifications_raw_v2`
GROUP BY workspace_id, key_space_id, key_id;
37 changes: 37 additions & 0 deletions pkg/clickhouse/schema/024_keys_last_used_v1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Materialized view to track the last verification time for each key and identity
-- This dramatically improves query performance for the dashboard's "API Requests" page
--
-- IMPORTANT: Stores ONE row per key (latest verification regardless of outcome).
-- Uses AggregatingMergeTree for automatic aggregation during merges.
-- Can be queried by key_id OR identity_id for flexible last-used tracking.

-- Target table that stores the latest verification per key
CREATE TABLE IF NOT EXISTS `default`.`key_last_used_v1` (
`workspace_id` String,
`key_space_id` String,
`key_id` String,
`identity_id` String,
`time` SimpleAggregateFunction(max, Int64),
`request_id` SimpleAggregateFunction(anyLast, String),
`outcome` SimpleAggregateFunction(anyLast, LowCardinality(String)),
`tags` SimpleAggregateFunction(anyLast, Array(String))
) ENGINE = AggregatingMergeTree()
ORDER BY (`workspace_id`, `key_space_id`, `key_id`)
TTL toDateTime(time / 1000) + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Materialized view that automatically populates the table from new inserts
CREATE MATERIALIZED VIEW IF NOT EXISTS `default`.`key_last_used_mv_v1`
TO `default`.`key_last_used_v1`
AS
SELECT
workspace_id,
key_space_id,
key_id,
anyLast(identity_id) as identity_id,
max(time) as time,
anyLast(request_id) as request_id,
anyLast(outcome) as outcome,
anyLast(tags) as tags
FROM `default`.`key_verifications_raw_v2`
GROUP BY workspace_id, key_space_id, key_id;
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara

const { queryTime: timestamp } = useQueryTime();

// Check if user explicitly set a time frame filter
const hasTimeFrameFilter = useMemo(() => {
return filters.some((filter) => filter.field === "startTime" || filter.field === "endTime");
}, [filters]);

const queryParams = useMemo(() => {
const params: KeysQueryOverviewLogsPayload = {
limit,
Expand All @@ -38,6 +43,10 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara
apiId,
since: "",
sorts: sorts.length > 0 ? sorts : null,
// Flag to indicate if user explicitly filtered by time frame
// If true, use new logic to find keys with ANY usage in the time frame
// If false or undefined, use the MV directly for speed
useTimeFrameFilter: hasTimeFrameFilter,
};

filters.forEach((filter) => {
Expand Down Expand Up @@ -119,7 +128,7 @@ export function useKeysOverviewLogsQuery({ apiId, limit = 50 }: UseLogsQueryPara
});

return params;
}, [filters, limit, timestamp, apiId, sorts]);
}, [filters, limit, timestamp, apiId, sorts, hasTimeFrameFilter]);

// Main query for historical data
const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export const keysQueryOverviewLogsPayload = z.object({
apiId: z.string(),
since: z.string(),
cursor: z.number().nullable().optional().nullable(),
// Flag to indicate if user explicitly filtered by time frame
// If true, use new logic to find keys with ANY usage in the time frame
// If false or undefined, use the MV directly for speed
useTimeFrameFilter: z.boolean().optional(),
outcomes: z
.array(
z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type KeysOverviewLogsResponse = z.infer<typeof KeysOverviewLogsResponse>;
*/
export const queryKeysOverviewLogs = workspaceProcedure
.use(withRatelimit(ratelimit.read))
.meta({ skipBatch: true })
.input(keysQueryOverviewLogsPayload)
.output(KeysOverviewLogsResponse)
.query(async ({ ctx, input }) => {
Expand All @@ -42,13 +43,18 @@ export const queryKeysOverviewLogs = workspaceProcedure
cursorTime: input.cursor ?? null,
workspaceId: ctx.workspace.id,
keyspaceId: keyspaceId,
// Flag to indicate if user explicitly filtered by time frame
// If true, use new logic to find keys with ANY usage in the time frame
// If false or undefined, use the MV directly for speed
useTimeFrameFilter: input.useTimeFrameFilter ?? false,
// Only include keyIds filters if explicitly provided in the input
keyIds: input.keyIds ? transformedInputs.keyIds : null,
// Pass tags to ClickHouse for filtering
tags: transformedInputs.tags,
// Nullify these as we'll filter in the database
names: null,
identities: null,
// Use nullish coalescing to properly handle empty arrays vs null
names: input.names ?? null,
identities: input.identities ?? null,
});

if (!clickhouseResult || clickhouseResult.err) {
Expand Down
207 changes: 142 additions & 65 deletions web/internal/clickhouse/src/keys/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const keysOverviewLogsParams = z.object({
}),
)
.nullable(),
useTimeFrameFilter: z.boolean().optional(),
});

export const roleSchema = z.object({
Expand Down Expand Up @@ -241,104 +242,180 @@ export function getKeysOverviewLogs(ch: Querier) {
[...orderByWithoutTime, `time ${timeDirection}`].join(", ") || "time DESC"; // Fallback if empty

// Create cursor condition based on time direction
let cursorCondition: string;
let havingCursorCondition: string;

// For first page or no cursor provided
if (args.cursorTime) {
// For subsequent pages, use cursor based on time direction
if (timeDirection === "ASC") {
cursorCondition = `
AND (time > {cursorTime: Nullable(UInt64)})
`;
havingCursorCondition = "\n AND (last_time > {cursorTime: Nullable(UInt64)})";
} else {
cursorCondition = `
AND (time < {cursorTime: Nullable(UInt64)})
`;
havingCursorCondition = "\n AND (last_time < {cursorTime: Nullable(UInt64)})";
}
} else {
cursorCondition = `
AND ({cursorTime: Nullable(UInt64)} IS NULL)
`;
havingCursorCondition = "";
}

// Detect if this is a rolling/relative time window (last X hours/days)
// vs an explicit historical range
const now = Date.now();
// If user explicitly filtered by time, use historical path to find ALL keys with activity in window
// Otherwise use MV fast path for recent "last used" data
const isRollingWindow = !args.useTimeFrameFilter && args.endTime >= now - 5 * 60 * 1000;

const extendedParamsSchema = keysOverviewLogsParams.extend(paramSchemaExtension);
const query = ch.query({
query: `
WITH
-- First CTE: Filter raw verification records based on conditions from client
filtered_keys AS (

// Build top_keys CTE based on query type
const topKeysCTE = isRollingWindow
? `top_keys AS (
SELECT
request_id,
time,
key_id,
tags,
outcome
FROM default.key_verifications_raw_v2
workspace_id,
key_space_id,
max(time) as last_time,
anyLast(request_id) as last_request_id,
anyLast(tags) as last_tags
FROM default.key_last_used_v1
WHERE workspace_id = {workspaceId: String}
AND key_space_id = {keyspaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
-- Apply dynamic key ID filtering (equals or contains)
AND (${keyIdConditions})
-- Apply dynamic outcome filtering
AND time >= {startTime: UInt64}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
GROUP BY key_id, workspace_id, key_space_id
HAVING last_time > 0${havingCursorCondition}
ORDER BY last_time ${timeDirection}
LIMIT {limit: Int}
)`
: `top_keys AS (
SELECT
key_id,
workspace_id,
key_space_id,
max(time) as last_time,
anyLast(request_id) as last_request_id,
anyLast(tags) as last_tags
FROM (
-- Get activity from hourly aggregates (complete hours)
SELECT
key_id,
workspace_id,
key_space_id,
toInt64(toUnixTimestamp(time) * 1000) as time,
'' as request_id,
tags
FROM default.key_verifications_per_hour_v2
WHERE workspace_id = {workspaceId: String}
AND key_space_id = {keyspaceId: String}
AND time BETWEEN toDateTime(fromUnixTimestamp64Milli({startTime: UInt64}))
AND toDateTime(fromUnixTimestamp64Milli({endTime: UInt64}))
AND (${keyIdConditions})

UNION ALL

-- Get activity from raw table (current incomplete hour)
SELECT
key_id,
workspace_id,
key_space_id,
time,
request_id,
tags
FROM default.key_verifications_raw_v2
WHERE workspace_id = {workspaceId: String}
AND key_space_id = {keyspaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
AND (${keyIdConditions})
)
GROUP BY key_id, workspace_id, key_space_id
HAVING last_time > 0
${havingCursorCondition}
ORDER BY last_time ${timeDirection}
LIMIT {limit: Int}
)`;

const query = ch.query({
query: `
WITH
${topKeysCTE},
-- Second CTE: Get counts from hourly table (complete hours only)
hourly_counts AS (
SELECT
h.key_id,
h.outcome,
toUInt64(sum(h.count)) as count
FROM default.key_verifications_per_hour_v2 h
INNER JOIN top_keys t ON h.key_id = t.key_id
WHERE h.workspace_id = {workspaceId: String}
AND h.key_space_id = {keyspaceId: String}
AND h.time BETWEEN toDateTime(fromUnixTimestamp64Milli({startTime: UInt64}))
AND toDateTime(fromUnixTimestamp64Milli({endTime: UInt64}))
AND h.time < toStartOfHour(now()) -- Only complete hours
AND (${outcomeCondition})
AND (${tagConditions})
GROUP BY h.key_id, h.outcome
),
-- Third CTE: Get counts from raw table for current incomplete hour
recent_counts AS (
SELECT
v.key_id,
v.outcome,
toUInt64(count(*)) as count
FROM default.key_verifications_raw_v2 v
INNER JOIN top_keys t ON v.key_id = t.key_id
WHERE v.workspace_id = {workspaceId: String}
AND v.key_space_id = {keyspaceId: String}
AND v.time >= toUnixTimestamp(toStartOfHour(now())) * 1000
AND v.time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
AND (${outcomeCondition})
-- Apply dynamic tag filtering
AND (${tagConditions})
-- Handle pagination using only time as cursor
${cursorCondition}
GROUP BY v.key_id, v.outcome
),
-- Second CTE: Calculate per-key aggregated metrics
-- This groups all verifications by key_id to get summary counts and most recent activity
aggregated_data AS (
-- Fourth CTE: Combine hourly and recent counts
combined_counts AS (
SELECT key_id, outcome, count FROM hourly_counts
UNION ALL
SELECT key_id, outcome, count FROM recent_counts
),
-- Fifth CTE: Aggregate combined counts
aggregated_counts AS (
SELECT
key_id,
-- Find the timestamp of the latest verification for this key
max(time) as last_request_time,
-- Get the request_id of the latest verification (based on time)
argMax(request_id, time) as last_request_id,
-- Get the tags from the latest verification (based on time)
argMax(tags, time) as tags,
-- Count valid verifications
countIf(outcome = 'VALID') as valid_count,
-- Count all non-valid verifications
countIf(outcome != 'VALID') as error_count
FROM filtered_keys
sumIf(count, outcome = 'VALID') as valid_count,
sumIf(count, outcome != 'VALID') as error_count
FROM combined_counts
GROUP BY key_id
),
-- Third CTE: Build detailed outcome distribution
-- This provides a breakdown of the exact counts for each outcome type
-- Sixth CTE: Build outcome distribution
outcome_counts AS (
SELECT
key_id,
outcome,
-- Convert to UInt32 for consistency
toUInt32(count(*)) as count
FROM filtered_keys
toUInt32(sum(count)) as count
FROM combined_counts
GROUP BY key_id, outcome
)
-- Main query: Join the aggregated data with detailed outcome counts
-- Main query: Join metadata from MV with aggregated counts
SELECT
a.key_id,
a.last_request_time as time,
a.last_request_id as request_id,
a.tags,
a.valid_count,
a.error_count,
-- Create an array of tuples containing all outcomes and their counts
-- This will be transformed into an object in the application code
groupArray((o.outcome, o.count)) as outcome_counts_array
FROM aggregated_data a
LEFT JOIN outcome_counts o ON a.key_id = o.key_id
-- Group by all non-aggregated fields to allow the groupArray operation
t.key_id as key_id,
t.last_time as time,
t.last_request_id as request_id,
t.last_tags as tags,
COALESCE(a.valid_count, 0) as valid_count,
COALESCE(a.error_count, 0) as error_count,
arrayFilter(x -> tupleElement(x, 1) IS NOT NULL,
groupArray(tuple(o.outcome, o.count))
) as outcome_counts_array
FROM top_keys t
LEFT JOIN aggregated_counts a ON t.key_id = a.key_id
LEFT JOIN outcome_counts o ON t.key_id = o.key_id
GROUP BY
a.key_id,
a.last_request_time,
a.last_request_id,
a.tags,
t.key_id,
t.last_time,
t.last_request_id,
t.last_tags,
a.valid_count,
a.error_count
-- Sort results with most recent verification first
HAVING COALESCE(a.valid_count, 0) > 0 OR COALESCE(a.error_count, 0) > 0
ORDER BY ${orderByClause}
-- Limit results for pagination
LIMIT {limit: Int}
`,
params: extendedParamsSchema,
Expand Down
Loading