Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions web/apps/dashboard/app/(app)/[workspaceSlug]/logs/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { Log } from "@unkey/clickhouse/src/logs";
import type { RatelimitLog } from "@unkey/clickhouse/src/ratelimits";

type LogWithRequestResponse = {
response_body: string;
request_headers: string[];
};

type V1ResponseBody = {
keyId: string;
Expand All @@ -24,7 +28,7 @@ type V2ResponseBody = {
export type ResponseBody = V1ResponseBody | V2ResponseBody;

export const extractResponseField = <K extends keyof V1ResponseBody>(
log: Log | RatelimitLog,
log: Log | LogWithRequestResponse,
fieldName: K,
): V1ResponseBody[K] | null => {
if (!log?.response_body) {
Expand All @@ -47,7 +51,10 @@ export const extractResponseField = <K extends keyof V1ResponseBody>(
}
};

export const getRequestHeader = (log: Log | RatelimitLog, headerName: string): string | null => {
export const getRequestHeader = (
log: Log | LogWithRequestResponse,
headerName: string,
): string | null => {
if (!headerName.trim()) {
console.error("Invalid header name provided");
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { Log } from "@unkey/clickhouse/src/logs";
import type { RatelimitLog } from "@unkey/clickhouse/src/ratelimits";

type LogWithRequestResponse = {
response_body: string;
request_headers: string[];
};

export type ResponseBody = {
keyId: string;
Expand All @@ -18,7 +22,7 @@ export type ResponseBody = {
};

export const extractResponseField = <K extends keyof ResponseBody>(
log: Log | RatelimitLog,
log: Log | LogWithRequestResponse,
fieldName: K,
): ResponseBody[K] | null => {
if (!log?.response_body) {
Expand All @@ -35,7 +39,10 @@ export const extractResponseField = <K extends keyof ResponseBody>(
}
};

export const getRequestHeader = (log: Log | RatelimitLog, headerName: string): string | null => {
export const getRequestHeader = (
log: Log | LogWithRequestResponse,
headerName: string,
): string | null => {
if (!headerName.trim()) {
console.error("Invalid header name provided");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,12 @@ export const RatelimitOverviewLogsTable = ({
<span className="text-accent-12">
{new Intl.NumberFormat().format(historicalLogs.length)}
</span>
<span>of</span>
{new Intl.NumberFormat().format(totalCount)}
{totalCount >= 0 && (
<>
<span>of</span>
{new Intl.NumberFormat().format(totalCount)}
</>
)}
<span>rate limit identifiers</span>
</div>
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,38 @@
import { HISTORICAL_DATA_WINDOW } from "@/components/logs/constants";
import { trpc } from "@/lib/trpc/client";
import { useQueryTime } from "@/providers/query-time-provider";
import type { RatelimitLog } from "@unkey/clickhouse/src/ratelimits";
import { useCallback, useEffect, useMemo, useState } from "react";
import type { RatelimitLog, RatelimitLogEnrichment } from "@unkey/clickhouse/src/ratelimits";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useFilters } from "../../../hooks/use-filters";
import type { RatelimitQueryLogsPayload } from "../query-logs.schema";

export type EnrichedRatelimitLog = RatelimitLog & RatelimitLogEnrichment;

const ENRICHMENT_DEFAULTS: Omit<RatelimitLogEnrichment, "request_id"> = {
host: "",
method: "",
path: "",
request_headers: [],
request_body: "",
response_status: 0,
response_headers: [],
response_body: "",
service_latency: 0,
user_agent: "",
region: "",
};

function enrichLogs(
logs: RatelimitLog[],
enrichmentMap: Map<string, RatelimitLogEnrichment>,
): EnrichedRatelimitLog[] {
return logs.map((log) => ({
...ENRICHMENT_DEFAULTS,
...log,
...(enrichmentMap.get(log.request_id) ?? {}),
}));
}

type UseLogsQueryParams = {
limit?: number;
pollIntervalMs?: number;
Expand All @@ -20,8 +47,14 @@ export function useRatelimitLogsQuery({
pollIntervalMs = 5000,
startPolling = false,
}: UseLogsQueryParams) {
const [historicalLogsMap, setHistoricalLogsMap] = useState(() => new Map<string, RatelimitLog>());
const [realtimeLogsMap, setRealtimeLogsMap] = useState(() => new Map<string, RatelimitLog>());
const [historicalLogsMap, setHistoricalLogsMap] = useState(
() => new Map<string, EnrichedRatelimitLog>(),
);
const [realtimeLogsMap, setRealtimeLogsMap] = useState(
() => new Map<string, EnrichedRatelimitLog>(),
);
const enrichmentMapRef = useRef(new Map<string, RatelimitLogEnrichment>());
const [enrichmentVersion, setEnrichmentVersion] = useState(0);
const [totalCount, setTotalCount] = useState(0);

const { queryTime: timestamp } = useQueryTime();
Expand Down Expand Up @@ -109,6 +142,61 @@ export function useRatelimitLogsQuery({
refetchOnWindowFocus: false,
});

// Fetch enrichment data for a batch of logs
const fetchEnrichment = useCallback(
async (logs: RatelimitLog[]) => {
if (logs.length === 0) {
return;
}

const unenrichedIds = logs
.filter((log) => !enrichmentMapRef.current.has(log.request_id))
.map((log) => log.request_id);

if (unenrichedIds.length === 0) {
return;
}

const times = logs.map((l) => l.time);
const minTime = Math.min(...times);
const maxTime = Math.max(...times);

// Batch into chunks of 100 to stay within the tRPC endpoint limit
const BATCH_SIZE = 100;
const chunks: string[][] = [];
for (let i = 0; i < unenrichedIds.length; i += BATCH_SIZE) {
chunks.push(unenrichedIds.slice(i, i + BATCH_SIZE));
}

try {
const results = await Promise.all(
chunks.map((chunk) =>
queryClient.ratelimit.logs.enrichment.fetch({
requestIds: chunk,
startTime: minTime,
endTime: maxTime,
}),
),
);

let added = false;
for (const result of results) {
for (const item of result.enrichment) {
enrichmentMapRef.current.set(item.request_id, item);
added = true;
}
}

if (added) {
setEnrichmentVersion((v) => v + 1);
}
} catch (error) {
console.error("Error fetching log enrichment:", error);
}
},
[queryClient],
);

// Query for new logs (polling)
const pollForNewLogs = useCallback(async () => {
try {
Expand All @@ -123,34 +211,46 @@ export function useRatelimitLogsQuery({
return;
}

setRealtimeLogsMap((prevMap) => {
const newMap = new Map(prevMap);
let added = 0;
// Build the list of new logs outside the updater to avoid mutation inside React setState
const newLogs: RatelimitLog[] = [];
for (const log of result.ratelimitLogs) {
if (realtimeLogsMap.has(log.request_id) || historicalLogsMap.has(log.request_id)) {
continue;
}
newLogs.push(log);
}

if (newLogs.length > 0) {
setRealtimeLogsMap((prevMap) => {
const newMap = new Map(prevMap);

for (const log of result.ratelimitLogs) {
// Skip if exists in either map
if (newMap.has(log.request_id) || historicalLogsMap.has(log.request_id)) {
continue;
}
for (const log of newLogs) {
if (newMap.has(log.request_id)) {
continue;
}

newMap.set(log.request_id, log);
added++;

// Remove oldest entries when exceeding the size limit to prevent memory issues
// We use min(limit, REALTIME_DATA_LIMIT) to ensure a reasonable upper bound
if (newMap.size > Math.min(limit, REALTIME_DATA_LIMIT)) {
// Find and remove the entry with the oldest timestamp
const entries = Array.from(newMap.entries());
const oldestEntry = entries.reduce((oldest, current) => {
return oldest[1].time < current[1].time ? oldest : current;
});
newMap.delete(oldestEntry[0]);
const enriched: EnrichedRatelimitLog = {
...ENRICHMENT_DEFAULTS,
...log,
...(enrichmentMapRef.current.get(log.request_id) ?? {}),
};
newMap.set(log.request_id, enriched);

if (newMap.size > Math.min(limit, REALTIME_DATA_LIMIT)) {
const entries = Array.from(newMap.entries());
const oldestEntry = entries.reduce((oldest, current) => {
return oldest[1].time < current[1].time ? oldest : current;
});
newMap.delete(oldestEntry[0]);
}
}
}

// If nothing was added, return old map to prevent re-render
return added > 0 ? newMap : prevMap;
});
return newMap;
});

// Fire enrichment for new logs in background
fetchEnrichment(newLogs);
}
} catch (error) {
console.error("Error polling for new logs:", error);
}
Expand All @@ -160,8 +260,10 @@ export function useRatelimitLogsQuery({
limit,
pollIntervalMs,
historicalLogsMap,
realtimeLogsMap,
realtimeLogs,
historicalLogs,
fetchEnrichment,
]);

// Set up polling effect
Expand All @@ -172,21 +274,68 @@ export function useRatelimitLogsQuery({
}
}, [startPolling, pollForNewLogs, pollIntervalMs]);

// Update historical logs effect
// Fetch enrichment when new initial data arrives
useEffect(() => {
if (initialData) {
const newMap = new Map<string, RatelimitLog>();
const allLogs: RatelimitLog[] = [];
initialData.pages.forEach((page) => {
page.ratelimitLogs.forEach((log) => {
newMap.set(log.request_id, log);
});
for (const log of page.ratelimitLogs) {
allLogs.push(log);
}
});

if (initialData.pages.length > 0) {
setTotalCount(initialData.pages[0].total);
}

fetchEnrichment(allLogs);
}
}, [initialData, fetchEnrichment]);

// Re-merge enrichment into historical and realtime logs when enrichment changes
// biome-ignore lint/correctness/useExhaustiveDependencies: enrichmentVersion triggers re-merge when ref updates
useEffect(() => {
if (initialData) {
const allLogs: RatelimitLog[] = [];
initialData.pages.forEach((page) => {
for (const log of page.ratelimitLogs) {
allLogs.push(log);
}
});

const newMap = new Map<string, EnrichedRatelimitLog>();
for (const log of enrichLogs(allLogs, enrichmentMapRef.current)) {
newMap.set(log.request_id, log);
}
setHistoricalLogsMap(newMap);
}
}, [initialData]);

// Also re-merge enrichment into realtime logs
setRealtimeLogsMap((prevMap) => {
if (prevMap.size === 0) {
return prevMap;
}
const newMap = new Map<string, EnrichedRatelimitLog>();
let changed = false;
for (const [id, log] of prevMap) {
const enrichment = enrichmentMapRef.current.get(id);
if (enrichment) {
newMap.set(id, { ...log, ...enrichment });
changed = true;
} else {
newMap.set(id, log);
}
}
return changed ? newMap : prevMap;
});
}, [initialData, enrichmentVersion]);

// Clear enrichment cache when query params change (filters, time range, namespace)
// biome-ignore lint/correctness/useExhaustiveDependencies: queryParams covers all filter/time/namespace changes
useEffect(() => {
enrichmentMapRef.current.clear();
setEnrichmentVersion(0);
}, [queryParams]);

// Reset realtime logs effect
useEffect(() => {
Expand All @@ -207,6 +356,6 @@ export function useRatelimitLogsQuery({
};
}

const sortLogs = (logs: RatelimitLog[]) => {
const sortLogs = (logs: EnrichedRatelimitLog[]) => {
return logs.toSorted((a, b) => b.time - a.time);
};
Loading