Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose up
ALTER TABLE ratelimits.raw_ratelimits_v1
ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1;

ALTER TABLE ratelimits.raw_ratelimits_v1
ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1;

-- +goose down
ALTER TABLE ratelimits.raw_ratelimits_v1
DROP INDEX idx_workspace_time;

ALTER TABLE ratelimits.raw_ratelimits_v1
DROP INDEX idx_request_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose up
ALTER TABLE metrics.raw_api_requests_v1
ADD INDEX idx_workspace_time (workspace_id, time) TYPE minmax GRANULARITY 1;

ALTER TABLE metrics.raw_api_requests_v1
ADD INDEX idx_request_id (request_id) TYPE minmax GRANULARITY 1;

-- +goose down
ALTER TABLE metrics.raw_api_requests_v1
DROP INDEX idx_workspace_time;

ALTER TABLE metrics.raw_api_requests_v1
DROP INDEX idx_request_id;
200 changes: 93 additions & 107 deletions internal/clickhouse/src/ratelimits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,123 +250,109 @@ export function getRatelimitLogs(ch: Querier) {
const paramSchemaExtension: Record<string, z.ZodType> = {};
const parameters: ExtendedParams = { ...args };

const statusCondition =
args.status
?.map((filter, index) => {
if (filter.operator === "is") {
const paramName = `statusValue_${index}`;
paramSchemaExtension[paramName] = z.boolean();
parameters[paramName] = filter.value === "passed";
return `passed = {${paramName}: Boolean}`;
}
return null;
})
.filter(Boolean)
.join(" OR ") || "TRUE";
const hasRequestIds = args.requestIds && args.requestIds.length > 0;
const hasStatusFilters = args.status && args.status.length > 0;
const hasIdentifierFilters = args.identifiers && args.identifiers.length > 0;

const identifierConditions =
args.identifiers
?.map((p, index) => {
const paramName = `identifierValue_${index}`;
paramSchemaExtension[paramName] = z.string();
parameters[paramName] = p.value;
const statusCondition = !hasStatusFilters
? "TRUE"
: args.status
?.map((filter, index) => {
if (filter.operator === "is") {
const paramName = `statusValue_${index}`;
paramSchemaExtension[paramName] = z.boolean();
parameters[paramName] = filter.value === "passed";
return `passed = {${paramName}: Boolean}`;
}
return null;
})
.filter(Boolean)
.join(" OR ") || "TRUE";

switch (p.operator) {
case "is":
return `identifier = {${paramName}: String}`;
case "contains":
return `like(identifier, CONCAT('%', {${paramName}: String}, '%'))`;
default:
return null;
}
})
.filter(Boolean)
.join(" OR ") || "TRUE";
const identifierConditions = !hasIdentifierFilters
? "TRUE"
: args.identifiers
?.map((p, index) => {
const paramName = `identifierValue_${index}`;
paramSchemaExtension[paramName] = z.string();
parameters[paramName] = p.value;
switch (p.operator) {
case "is":
return `identifier = {${paramName}: String}`;
case "contains":
return `position({${paramName}: String}, identifier) > 0`;
default:
return null;
}
})
.filter(Boolean)
.join(" OR ") || "TRUE";

const extendedParamsSchema = ratelimitLogsParams.extend(paramSchemaExtension);

const query = ch.query({
query: `
WITH filtered_requests AS (
SELECT
-- Rate limits fields
r.request_id,
r.time,
r.workspace_id,
r.namespace_id,
r.identifier,
r.passed,

-- Metrics fields
m.host,
m.method,
m.path,
m.request_headers,
m.request_body,
m.response_status,
m.response_headers,
m.response_body,
m.service_latency,
m.user_agent,
m.colo
FROM ratelimits.raw_ratelimits_v1 r
LEFT JOIN metrics.raw_api_requests_v1 m ON
r.request_id = m.request_id
WHERE r.workspace_id = {workspaceId: String}
AND r.namespace_id = {namespaceId: String}
AND r.time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
---------- Apply request ID filter if present (highest priority)
AND (
CASE
WHEN length({requestIds: Array(String)}) > 0 THEN
r.request_id IN {requestIds: Array(String)}
ELSE TRUE
END
)

---------- Apply identifier filter
AND (${identifierConditions})
---------- Apply status filter
AND (${statusCondition})

-- Apply cursor pagination last
AND (
CASE
WHEN {cursorTime: Nullable(UInt64)} IS NOT NULL
AND {cursorRequestId: Nullable(String)} IS NOT NULL
THEN (r.time, r.request_id) < (
{cursorTime: Nullable(UInt64)},
{cursorRequestId: Nullable(String)}
)
ELSE TRUE
END
)
)

SELECT
request_id,
time,
workspace_id,
namespace_id,
identifier,
toUInt8(passed) as status,
host,
method,
path,
request_headers,
request_body,
response_status,
response_headers,
response_body,
service_latency,
user_agent,
colo
FROM filtered_requests
ORDER BY time DESC, request_id DESC
LIMIT {limit: Int}`,
WITH filtered_ratelimits AS (
SELECT
request_id,
time,
workspace_id,
namespace_id,
identifier,
toUInt8(passed) as status
FROM ratelimits.raw_ratelimits_v1 r
WHERE workspace_id = {workspaceId: String}
AND namespace_id = {namespaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
${hasRequestIds ? "AND request_id IN {requestIds: Array(String)}" : ""}
AND (${identifierConditions})
AND (${statusCondition})
AND (({cursorTime: Nullable(UInt64)} IS NULL AND {cursorRequestId: Nullable(String)} IS NULL)
OR (time, request_id) < ({cursorTime: Nullable(UInt64)}, {cursorRequestId: Nullable(String)}))
)
SELECT
fr.request_id,
fr.time,
fr.workspace_id,
fr.namespace_id,
fr.identifier,
fr.status,
m.host,
m.method,
m.path,
m.request_headers,
m.request_body,
m.response_status,
m.response_headers,
m.response_body,
m.service_latency,
m.user_agent,
m.colo
FROM filtered_ratelimits fr
LEFT JOIN (
SELECT
request_id,
host,
method,
path,
request_headers,
request_body,
response_status,
response_headers,
response_body,
service_latency,
user_agent,
colo
FROM metrics.raw_api_requests_v1
WHERE workspace_id = {workspaceId: String}
AND time BETWEEN {startTime: UInt64} AND {endTime: UInt64}
) m ON fr.request_id = m.request_id
ORDER BY fr.time DESC, fr.request_id DESC
LIMIT {limit: Int}`,
params: extendedParamsSchema,
schema: ratelimitLogs,
});

return query(parameters);
};
}