Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/yellow-sites-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@kopai/clickhouse-datasource": minor
"@kopai/sqlite-datasource": minor
"@kopai/collector": minor
"@kopai/core": minor
"@kopai/api": minor
"@kopai/app": minor
"@kopai/cli": minor
"@kopai/sdk": minor
"@kopai/ui": minor
---

Add aggregate metrics
6 changes: 6 additions & 0 deletions packages/api/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ describe("apiRoutes", () => {
let getMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["getMetrics"]>
>;
let getAggregatedMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["getAggregatedMetrics"]>
>;
let discoverMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["discoverMetrics"]>
>;
Expand All @@ -37,6 +40,8 @@ describe("apiRoutes", () => {
getTracesSpy = vi.fn<datasource.ReadTracesDatasource["getTraces"]>();
getLogsSpy = vi.fn<datasource.ReadLogsDatasource["getLogs"]>();
getMetricsSpy = vi.fn<datasource.ReadMetricsDatasource["getMetrics"]>();
getAggregatedMetricsSpy =
vi.fn<datasource.ReadMetricsDatasource["getAggregatedMetrics"]>();
discoverMetricsSpy =
vi.fn<datasource.ReadMetricsDatasource["discoverMetrics"]>();
getServicesSpy =
Expand All @@ -51,6 +56,7 @@ describe("apiRoutes", () => {
getTraces: getTracesSpy,
getLogs: getLogsSpy,
getMetrics: getMetricsSpy,
getAggregatedMetrics: getAggregatedMetricsSpy,
discoverMetrics: discoverMetricsSpy,
getServices: getServicesSpy,
getOperations: getOperationsSpy,
Expand Down
15 changes: 10 additions & 5 deletions packages/api/src/routes/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@ export const metricsRoutes: FastifyPluginAsyncZod<{
nextCursor: z.string().nullable(),
});

const aggregatedResponseSchema = z.object({
data: z.array(denormalizedSignals.aggregatedMetricSchema),
nextCursor: z.null(),
});

fastify.route({
method: "POST",
url: "/signals/metrics/search",
schema: {
description: "Search metrics matching a filter",
body: dataFilterSchemas.metricsDataFilterSchema,
response: {
200: searchResponseSchema,
200: z.union([searchResponseSchema, aggregatedResponseSchema]),
"4xx": problemDetailsSchema,
"5xx": problemDetailsSchema,
},
},
handler: async (req, res) => {
const result = await opts.readMetricsDatasource.getMetrics({
...req.body,
requestContext: req.requestContext,
});
const params = { ...req.body, requestContext: req.requestContext };
const result = req.body.aggregate
? await opts.readMetricsDatasource.getAggregatedMetrics(params)
: await opts.readMetricsDatasource.getMetrics(params);
res.send(result);
},
});
Expand Down
79 changes: 79 additions & 0 deletions packages/api/src/signals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ describe("signalsRoutes", () => {
let getMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["getMetrics"]>
>;
let getAggregatedMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["getAggregatedMetrics"]>
>;
let discoverMetricsSpy: ReturnType<
typeof vi.fn<datasource.ReadMetricsDatasource["discoverMetrics"]>
>;
Expand All @@ -37,6 +40,8 @@ describe("signalsRoutes", () => {
getTracesSpy = vi.fn<datasource.ReadTracesDatasource["getTraces"]>();
getLogsSpy = vi.fn<datasource.ReadLogsDatasource["getLogs"]>();
getMetricsSpy = vi.fn<datasource.ReadMetricsDatasource["getMetrics"]>();
getAggregatedMetricsSpy =
vi.fn<datasource.ReadMetricsDatasource["getAggregatedMetrics"]>();
discoverMetricsSpy =
vi.fn<datasource.ReadMetricsDatasource["discoverMetrics"]>();
getServicesSpy =
Expand All @@ -51,6 +56,7 @@ describe("signalsRoutes", () => {
getTraces: getTracesSpy,
getLogs: getLogsSpy,
getMetrics: getMetricsSpy,
getAggregatedMetrics: getAggregatedMetricsSpy,
discoverMetrics: discoverMetricsSpy,
getServices: getServicesSpy,
getOperations: getOperationsSpy,
Expand Down Expand Up @@ -288,6 +294,79 @@ describe("signalsRoutes", () => {
title: "Internal server error",
});
});

it("calls getAggregatedMetrics when aggregate is set", async () => {
const aggregatedResult = {
data: [{ groups: { signal: "/v1/traces" }, value: 1024 }],
nextCursor: null,
};
getAggregatedMetricsSpy.mockResolvedValue(aggregatedResult);

const filter = {
metricType: "Sum" as const,
metricName: "kopai.ingestion.bytes",
aggregate: "sum" as const,
groupBy: ["signal"],
};
const response = await server.inject({
method: "POST",
url: "/signals/metrics/search",
payload: filter,
});

expect(response.statusCode).toBe(200);
expect(response.json()).toEqual(aggregatedResult);
expect(getAggregatedMetricsSpy).toHaveBeenCalled();
expect(getMetricsSpy).not.toHaveBeenCalled();
});

it("calls getMetrics (not getAggregatedMetrics) when aggregate is absent", async () => {
getMetricsSpy.mockResolvedValue({ data: [mockMetric], nextCursor: null });

const response = await server.inject({
method: "POST",
url: "/signals/metrics/search",
payload: { metricType: "Gauge" },
});

expect(response.statusCode).toBe(200);
expect(getMetricsSpy).toHaveBeenCalled();
expect(getAggregatedMetricsSpy).not.toHaveBeenCalled();
});

it("rejects groupBy without aggregate", async () => {
const response = await server.inject({
method: "POST",
url: "/signals/metrics/search",
payload: { metricType: "Sum", groupBy: ["signal"] },
});

expect(response.statusCode).toBe(400);
});

it("rejects aggregate on Histogram metric type", async () => {
const response = await server.inject({
method: "POST",
url: "/signals/metrics/search",
payload: { metricType: "Histogram", aggregate: "sum" },
});

expect(response.statusCode).toBe(400);
});

it("rejects cursor with aggregate", async () => {
const response = await server.inject({
method: "POST",
url: "/signals/metrics/search",
payload: {
metricType: "Sum",
aggregate: "sum",
cursor: "123:456",
},
});

expect(response.statusCode).toBe(400);
});
});

describe("GET /signals/metrics/discover", () => {
Expand Down
5 changes: 4 additions & 1 deletion packages/app/src/collector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ import type { datasource } from "@kopai/core";
export const otelCollectorRoutes: FastifyPluginAsyncZod<{
telemetryDatasource: datasource.WriteTelemetryDatasource;
}> = async function (fastify, opts) {
fastify.register(collectorRoutes, opts);
fastify.register(collectorRoutes, {
...opts,
ingestionMetricsDatasource: opts.telemetryDatasource,
});
};
5 changes: 5 additions & 0 deletions packages/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Search metrics by type.
kopai metrics search --type Gauge --name cpu_usage
kopai metrics search --type Histogram --service my-api
kopai metrics search --type Sum --attr "host=server1"

# Aggregate: total bytes ingested grouped by signal
kopai metrics search --type Sum --name kopai.ingestion.bytes --aggregate sum --group-by signal
```

**Types:** Gauge, Sum, Histogram, ExponentialHistogram, Summary
Expand All @@ -151,6 +154,8 @@ kopai metrics search --type Sum --attr "host=server1"
- `--resource-attr <key=value>` - repeatable
- `--scope-attr <key=value>` - repeatable
- `--sort` - ASC or DESC
- `--aggregate <fn>` - aggregation function (sum, avg, min, max, count). Gauge/Sum only
- `--group-by <attr>` - group by attribute key (repeatable, requires --aggregate)

**Fields:** TimeUnix, StartTimeUnix, MetricType, MetricName, MetricDescription, MetricUnit, ServiceName, Value, Count, Sum, Min, Max, Attributes, ResourceAttributes, ScopeName, ScopeAttributes, Exemplars, BucketCounts, ExplicitBounds

Expand Down
42 changes: 40 additions & 2 deletions packages/cli/src/commands/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Command } from "commander";
import { Command, InvalidArgumentError } from "commander";
import {
createClient,
parseAttributes,
Expand All @@ -22,6 +22,8 @@ interface MetricsSearchOptions extends ClientOptions {
resourceAttr?: string[];
scopeAttr?: string[];
sort?: string;
aggregate?: string;
groupBy?: string[];
}

interface MetricsDiscoverOptions extends ClientOptions {
Expand Down Expand Up @@ -69,6 +71,16 @@ export function createMetricsCommand(): Command {
[]
)
.option("--sort <order>", "Sort order (ASC|DESC)")
.option(
"--aggregate <fn>",
"Aggregation function (sum|avg|min|max|count)"
)
.option(
"--group-by <attr>",
"Group by attribute key (repeatable)",
collect,
[]
)
).action(async (opts: MetricsSearchOptions) => {
const format = detectFormat(opts.json, opts.table);
const fields = parseFields(opts.fields);
Expand All @@ -93,9 +105,17 @@ export function createMetricsCommand(): Command {
scopeAttributes: parseAttributes(opts.scopeAttr),
limit,
sortOrder: opts.sort as "ASC" | "DESC" | undefined,
aggregate: toAggregateFn(opts.aggregate),
groupBy:
opts.groupBy && opts.groupBy.length > 0 ? opts.groupBy : undefined,
};

const result = await client.searchMetricsPage(filter);
const result = filter.aggregate
? await client.searchAggregatedMetrics({
...filter,
aggregate: filter.aggregate,
})
: await client.searchMetricsPage(filter);
output(result.data, { format, fields });
} catch (err) {
outputError(err, format === "json");
Expand Down Expand Up @@ -129,3 +149,21 @@ export function createMetricsCommand(): Command {
function collect(value: string, previous: string[]): string[] {
return previous.concat([value]);
}

type AggregateFn = "sum" | "avg" | "min" | "max" | "count";

function isAggregateFn(value: string): value is AggregateFn {
return (
value === "sum" ||
value === "avg" ||
value === "min" ||
value === "max" ||
value === "count"
);
}

function toAggregateFn(value: string | undefined): AggregateFn | undefined {
if (value === undefined) return undefined;
if (isAggregateFn(value)) return value;
throw new InvalidArgumentError(`Invalid aggregate function: ${value}`);
}
79 changes: 79 additions & 0 deletions packages/clickhouse-datasource/src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import { buildLogsQuery } from "./query-logs.js";
import {
buildMetricsQuery,
buildAggregatedMetricsQuery,
buildDiscoverMetricsFromMV,
} from "./query-metrics.js";
import {
Expand Down Expand Up @@ -66,6 +67,11 @@ function isChError(err: unknown, code: string): boolean {
);
}

/** Type predicate: narrows unknown to a string-keyed record. */
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

/** Collect all rows from a ResultSet stream, parsing each with the given schema. */
async function streamParse<S extends z.ZodType>(
resultSet: ResultSet<"JSONEachRow">,
Expand Down Expand Up @@ -311,6 +317,79 @@ export class ClickHouseReadDatasource
return { data, nextCursor };
}

async getAggregatedMetrics(
filter: dataFilterSchemas.MetricsDataFilter & {
requestContext?: unknown;
}
): Promise<{
data: denormalizedSignals.AggregatedMetricRow[];
nextCursor: null;
}> {
assertClickHouseRequestContext(filter.requestContext);
const { database, username, password } = filter.requestContext;
const log = getLogger(filter.requestContext);
const start = performance.now();

let chNode: string | undefined;
try {
const { query, params } = buildAggregatedMetricsQuery(filter);

const resultSet = await this.client.query({
query,
query_params: params,
format: "JSONEachRow",
auth: { username, password },
http_headers: { "X-ClickHouse-Database": database },
});
chNode = getChNode(resultSet);

const groupByKeys = filter.groupBy ?? [];
const data: denormalizedSignals.AggregatedMetricRow[] = [];
for await (const batch of resultSet.stream()) {
for (const row of batch) {
const json = row.json();
if (!isRecord(json)) continue;
const groups: Record<string, string> = {};
for (let i = 0; i < groupByKeys.length; i++) {
const key = groupByKeys[i];
if (key !== undefined) {
groups[key] = String(json[`group_${String(i)}`] ?? "");
}
}
data.push({ groups, value: Number(json.value) });
}
}

const durationMs = Math.round(performance.now() - start);
log.info(
{
database,
username,
method: "getAggregatedMetrics",
durationMs,
rowCount: data.length,
chNode,
},
"query complete"
);
return { data, nextCursor: null };
} catch (err) {
const durationMs = Math.round(performance.now() - start);
log.error(
{
database,
username,
method: "getAggregatedMetrics",
durationMs,
chNode,
err,
},
"query failed"
);
throw err;
}
}

async getServices(opts?: {
requestContext?: unknown;
}): Promise<{ services: string[] }> {
Expand Down
Loading