diff --git a/internal/clickhouse/schema/048_raw_ratelimits_metrics_indexes_v1.sql b/internal/clickhouse/schema/048_raw_ratelimits_metrics_indexes_v1.sql new file mode 100644 index 0000000000..41d6116ac6 --- /dev/null +++ b/internal/clickhouse/schema/048_raw_ratelimits_metrics_indexes_v1.sql @@ -0,0 +1,13 @@ +-- +goose up +ALTER TABLE ratelimits.raw_ratelimits_v1 + ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1; + +ALTER TABLE ratelimits.raw_ratelimits_v1 + ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1; + +-- +goose down +ALTER TABLE ratelimits.raw_ratelimits_v1 + DROP INDEX idx_workspace_time; + +ALTER TABLE ratelimits.raw_ratelimits_v1 + DROP INDEX idx_request_id; diff --git a/internal/clickhouse/schema/049_raw_api_metrics_ratelimit_indexes_v1.sql b/internal/clickhouse/schema/049_raw_api_metrics_ratelimit_indexes_v1.sql new file mode 100644 index 0000000000..882d430d74 --- /dev/null +++ b/internal/clickhouse/schema/049_raw_api_metrics_ratelimit_indexes_v1.sql @@ -0,0 +1,13 @@ +-- +goose up +ALTER TABLE metrics.raw_api_requests_v1 + ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1; + +ALTER TABLE metrics.raw_api_requests_v1 + ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1; + +-- +goose down +ALTER TABLE metrics.raw_api_requests_v1 + DROP INDEX idx_workspace_time; + +ALTER TABLE metrics.raw_api_requests_v1 + DROP INDEX idx_request_id; diff --git a/internal/clickhouse/src/ratelimits.ts b/internal/clickhouse/src/ratelimits.ts index 152f838360..977c9b647c 100644 --- a/internal/clickhouse/src/ratelimits.ts +++ b/internal/clickhouse/src/ratelimits.ts @@ -250,123 +250,109 @@ export function getRatelimitLogs(ch: Querier) { const paramSchemaExtension: Record = {}; const parameters: ExtendedParams = { ...args }; - const statusCondition = - args.status - ?.map((filter, index) => { - if (filter.operator === "is") { - const paramName = `statusValue_${index}`; - paramSchemaExtension[paramName] = z.boolean(); - parameters[paramName] = filter.value === "passed"; - return `passed = {${paramName}: Boolean}`; - } - return null; - }) - .filter(Boolean) - .join(" OR ") || "TRUE"; + const hasRequestIds = args.requestIds && args.requestIds.length > 0; + const hasStatusFilters = args.status && args.status.length > 0; + const hasIdentifierFilters = args.identifiers && args.identifiers.length > 0; - const identifierConditions = - args.identifiers - ?.map((p, index) => { - const paramName = `identifierValue_${index}`; - paramSchemaExtension[paramName] = z.string(); - parameters[paramName] = p.value; + const statusCondition = !hasStatusFilters + ? "TRUE" + : args.status + ?.map((filter, index) => { + if (filter.operator === "is") { + const paramName = `statusValue_${index}`; + paramSchemaExtension[paramName] = z.boolean(); + parameters[paramName] = filter.value === "passed"; + return `passed = {${paramName}: Boolean}`; + } + return null; + }) + .filter(Boolean) + .join(" OR ") || "TRUE"; - switch (p.operator) { - case "is": - return `identifier = {${paramName}: String}`; - case "contains": - return `like(identifier, CONCAT('%', {${paramName}: String}, '%'))`; - default: - return null; - } - }) - .filter(Boolean) - .join(" OR ") || "TRUE"; + const identifierConditions = !hasIdentifierFilters + ? "TRUE" + : args.identifiers + ?.map((p, index) => { + const paramName = `identifierValue_${index}`; + paramSchemaExtension[paramName] = z.string(); + parameters[paramName] = p.value; + switch (p.operator) { + case "is": + return `identifier = {${paramName}: String}`; + case "contains": + return `position({${paramName}: String}, identifier) > 0`; + default: + return null; + } + }) + .filter(Boolean) + .join(" OR ") || "TRUE"; const extendedParamsSchema = ratelimitLogsParams.extend(paramSchemaExtension); const query = ch.query({ query: ` - WITH filtered_requests AS ( - SELECT - -- Rate limits fields - r.request_id, - r.time, - r.workspace_id, - r.namespace_id, - r.identifier, - r.passed, - - -- Metrics fields - m.host, - m.method, - m.path, - m.request_headers, - m.request_body, - m.response_status, - m.response_headers, - m.response_body, - m.service_latency, - m.user_agent, - m.colo - FROM ratelimits.raw_ratelimits_v1 r - LEFT JOIN metrics.raw_api_requests_v1 m ON - r.request_id = m.request_id - WHERE r.workspace_id = {workspaceId: String} - AND r.namespace_id = {namespaceId: String} - AND r.time BETWEEN {startTime: UInt64} AND {endTime: UInt64} - ---------- Apply request ID filter if present (highest priority) - AND ( - CASE - WHEN length({requestIds: Array(String)}) > 0 THEN - r.request_id IN {requestIds: Array(String)} - ELSE TRUE - END - ) - - ---------- Apply identifier filter - AND (${identifierConditions}) - ---------- Apply status filter - AND (${statusCondition}) - - -- Apply cursor pagination last - AND ( - CASE - WHEN {cursorTime: Nullable(UInt64)} IS NOT NULL - AND {cursorRequestId: Nullable(String)} IS NOT NULL - THEN (r.time, r.request_id) < ( - {cursorTime: Nullable(UInt64)}, - {cursorRequestId: Nullable(String)} - ) - ELSE TRUE - END - ) - ) - - SELECT - request_id, - time, - workspace_id, - namespace_id, - identifier, - toUInt8(passed) as status, - host, - method, - path, - request_headers, - request_body, - response_status, - response_headers, - response_body, - service_latency, - user_agent, - colo - FROM filtered_requests - ORDER BY time DESC, request_id DESC - LIMIT {limit: Int}`, +WITH filtered_ratelimits AS ( + SELECT + request_id, + time, + workspace_id, + namespace_id, + identifier, + toUInt8(passed) as status + FROM ratelimits.raw_ratelimits_v1 r + WHERE workspace_id = {workspaceId: String} + AND namespace_id = {namespaceId: String} + AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} + ${hasRequestIds ? "AND request_id IN {requestIds: Array(String)}" : ""} + AND (${identifierConditions}) + AND (${statusCondition}) + AND (({cursorTime: Nullable(UInt64)} IS NULL AND {cursorRequestId: Nullable(String)} IS NULL) + OR (time, request_id) < ({cursorTime: Nullable(UInt64)}, {cursorRequestId: Nullable(String)})) +) +SELECT + fr.request_id, + fr.time, + fr.workspace_id, + fr.namespace_id, + fr.identifier, + fr.status, + m.host, + m.method, + m.path, + m.request_headers, + m.request_body, + m.response_status, + m.response_headers, + m.response_body, + m.service_latency, + m.user_agent, + m.colo +FROM filtered_ratelimits fr +LEFT JOIN ( + SELECT + request_id, + host, + method, + path, + request_headers, + request_body, + response_status, + response_headers, + response_body, + service_latency, + user_agent, + colo + FROM metrics.raw_api_requests_v1 + WHERE workspace_id = {workspaceId: String} + AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64} +) m ON fr.request_id = m.request_id +ORDER BY fr.time DESC, fr.request_id DESC +LIMIT {limit: Int}`, params: extendedParamsSchema, schema: ratelimitLogs, }); + return query(parameters); }; }