Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/vast-cats-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@kopai/clickhouse-datasource": patch
---

Add option to optimize metrics discover performance using materialized views
236 changes: 236 additions & 0 deletions packages/clickhouse-datasource/src/datasource.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
type StartedTestContainer,
} from "testcontainers";
import { ClickHouseReadDatasource } from "./datasource.js";
import { getDiscoverMVSchema } from "./discover-mv-schema.js";
import { DISCOVER_NAMES_TABLE, DISCOVER_ATTRS_TABLE } from "./query-metrics.js";

/** Returns the first element of an array, failing the test if the array is empty. */
function firstRow<T>(data: T[]): T {
Expand Down Expand Up @@ -1393,6 +1395,16 @@ describe("ClickHouseReadDatasource", () => {
});

describe("discoverMetrics", () => {
beforeAll(async () => {
// Ensure no MV tables exist regardless of test ordering
await adminClient.command({
query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}`,
});
await adminClient.command({
query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`,
});
});

it("discovers all metric names and types", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
Expand Down Expand Up @@ -1508,6 +1520,230 @@ describe("ClickHouseReadDatasource", () => {
});
});

describe("discoverMetrics falls back without MVs", () => {
beforeAll(async () => {
// Ensure no MV tables exist regardless of test ordering
await adminClient.command({
query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}`,
});
await adminClient.command({
query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`,
});
});

it("returns results via full-scan when MV tables do not exist", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

expect(result.metrics.length).toBeGreaterThan(0);
const names = result.metrics.map((m) => m.name).sort();
expect(names).toContain("system.cpu.utilization");
});

it("falls back when only names MV table exists", async () => {
const namesOnly = `CREATE TABLE IF NOT EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}
(MetricName String, MetricType LowCardinality(String), MetricDescription String, MetricUnit String)
ENGINE = ReplacingMergeTree ORDER BY (MetricName, MetricType)`;
await adminClient.command({ query: namesOnly });

const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

// Should still work via fallback
expect(result.metrics.length).toBeGreaterThan(0);

await adminClient.command({
query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`,
});
});
});

describe("discoverMetrics with materialized views", () => {
beforeAll(async () => {
const schema = getDiscoverMVSchema(TEST_DATABASE);

// Create target tables
for (const stmt of schema.targetTables) {
await adminClient.command({ query: stmt });
}

// Create materialized views
for (const stmt of schema.materializedViews) {
await adminClient.command({ query: stmt });
}

// Backfill MV target tables from existing source data
// (MVs only capture new inserts; existing data needs manual backfill)
const metricTypes = [
{ type: "Gauge", table: "otel_metrics_gauge" },
{ type: "Sum", table: "otel_metrics_sum" },
{ type: "Histogram", table: "otel_metrics_histogram" },
{
type: "ExponentialHistogram",
table: "otel_metrics_exponential_histogram",
},
{ type: "Summary", table: "otel_metrics_summary" },
];
for (const { type, table } of metricTypes) {
await adminClient.command({
query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}
SELECT MetricName, '${type}' AS MetricType, MetricDescription, MetricUnit
FROM ${TEST_DATABASE}.${table}`,
});
await adminClient.command({
query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}
SELECT MetricName, '${type}' AS MetricType, 'attr' AS source, attr_key,
groupUniqArrayState(101)(Attributes[attr_key]) AS attr_values
FROM ${TEST_DATABASE}.${table}
ARRAY JOIN mapKeys(Attributes) AS attr_key
WHERE notEmpty(Attributes)
GROUP BY MetricName, MetricType, source, attr_key`,
});
await adminClient.command({
query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}
SELECT MetricName, '${type}' AS MetricType, 'res_attr' AS source, attr_key,
groupUniqArrayState(101)(ResourceAttributes[attr_key]) AS attr_values
FROM ${TEST_DATABASE}.${table}
ARRAY JOIN mapKeys(ResourceAttributes) AS attr_key
WHERE notEmpty(ResourceAttributes)
GROUP BY MetricName, MetricType, source, attr_key`,
});
}
});

it("discovers all metric names via MV fast path", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

expect(result.metrics.length).toBe(8);

const names = result.metrics.map((m) => m.name).sort();
expect(names).toEqual([
"dup.ts.gauge",
"http.server.request.count",
"http.server.request.duration",
"http.server.request.duration.exp",
"rpc.server.duration.summary",
"system.cpu.utilization",
"test.multi.attr",
"test.truncation.metric",
]);
});

it("returns correct metric type via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const gauge = result.metrics.find(
(m) => m.name === "system.cpu.utilization"
);
expect(gauge?.type).toBe("Gauge");
expect(gauge?.unit).toBe("1");
expect(gauge?.description).toBe("CPU utilization");
});

it("returns attribute keys and values via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const gauge = result.metrics.find(
(m) => m.name === "system.cpu.utilization"
);
expect(gauge?.attributes.values).toHaveProperty("cpu");
expect(gauge?.attributes.values["cpu"]).toContain("0");
});

it("returns resource attributes via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const gauge = result.metrics.find(
(m) => m.name === "system.cpu.utilization"
);
expect(gauge?.resourceAttributes.values).toHaveProperty(
"service.version"
);
});

it("sets _truncated when attribute values exceed 100 via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const metric = defined(
result.metrics.find((m) => m.name === "test.truncation.metric"),
"truncation metric"
);
expect(metric.attributes._truncated).toBe(true);
const idxValues = defined(metric.attributes.values["idx"], "idx values");
expect(idxValues.length).toBeLessThanOrEqual(100);
});

it("returns correct multi-attr keys via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const metric = defined(
result.metrics.find((m) => m.name === "test.multi.attr"),
"multi-attr metric"
);

const attrKeys = Object.keys(metric.attributes.values).sort();
expect(attrKeys).toEqual(["env", "region", "tier"]);
expect(metric.attributes.values["region"]).toEqual(["us-east"]);
expect(metric.attributes.values["env"]).toEqual(["prod"]);
expect(metric.attributes.values["tier"]).toEqual(["premium"]);

const resKeys = Object.keys(metric.resourceAttributes.values);
expect(resKeys).toEqual(["cloud.provider"]);
expect(metric.resourceAttributes.values["cloud.provider"]).toEqual([
"aws",
]);
});

it("does not set _truncated when within limit via MVs", async () => {
const result = await ds.discoverMetrics({
requestContext: requestContext(),
});

const gauge = defined(
result.metrics.find((m) => m.name === "system.cpu.utilization"),
"gauge metric"
);
expect(gauge.attributes._truncated).toBeUndefined();
});
});

describe("getDiscoverMVSchema validation", () => {
it("rejects database names with SQL injection", () => {
expect(() => getDiscoverMVSchema("db; DROP TABLE x")).toThrow(
/Invalid database name/
);
});

it("rejects empty database name", () => {
expect(() => getDiscoverMVSchema("")).toThrow(/Invalid database name/);
});

it("rejects database names starting with a digit", () => {
expect(() => getDiscoverMVSchema("1bad")).toThrow(
/Invalid database name/
);
});

it("accepts valid database names", () => {
expect(() => getDiscoverMVSchema("otel_default")).not.toThrow();
expect(() => getDiscoverMVSchema("_private")).not.toThrow();
});
});

describe("multi-tenant isolation", () => {
it("routes traces to the correct database", async () => {
const tenantA = await ds.getTraces({
Expand Down
102 changes: 88 additions & 14 deletions packages/clickhouse-datasource/src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import { buildLogsQuery } from "./query-logs.js";
import {
buildMetricsQuery,
buildDiscoverMetricsQueries,
buildDiscoverMetricsFromMV,
buildDetectDiscoverMVQuery,
DISCOVER_NAMES_TABLE,
DISCOVER_ATTRS_TABLE,
} from "./query-metrics.js";
import {
parseChRow,
Expand Down Expand Up @@ -196,30 +200,100 @@ export class ClickHouseReadDatasource
return { data, nextCursor };
}

/**
* Detect whether both MV target tables exist in the given database.
* Returns true only if both names and attrs tables are present.
*/
private async hasDiscoverMVs(auth: {
username: string;
password: string;
database: string;
}): Promise<boolean> {
const rs = await this.client.query({
query: buildDetectDiscoverMVQuery(),
format: "JSONEachRow",
auth: { username: auth.username, password: auth.password },
http_headers: { "X-ClickHouse-Database": auth.database },
});
const found = new Set<string>();
for await (const batch of rs.stream()) {
for (const row of batch) {
const json = row.json() as { name: string };
found.add(json.name);
}
}
return found.has(DISCOVER_NAMES_TABLE) && found.has(DISCOVER_ATTRS_TABLE);
}
Comment on lines +207 to +226
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

MV fast-path gating can return incomplete discovery during setup.

hasDiscoverMVs only checks target tables. If targets are created before MVs/backfill (the documented setup order), discoverMetrics can take MV path too early and return incomplete/empty results instead of degrading to full-scan.

💡 Suggested guard to prevent false-positive MV fast-path
     if (useMV) {
       try {
         const { namesQuery, attributesQuery } = buildDiscoverMetricsFromMV();
         [nameRows, attrRows] = await Promise.all([
           this.client
             .query({
               query: namesQuery,
               format: "JSONEachRow",
               auth,
               http_headers,
             })
             .then((rs) => streamParse(rs, chDiscoverNameRowSchema)),
           this.client
             .query({
               query: attributesQuery,
               format: "JSONEachRow",
               auth,
               http_headers,
             })
             .then((rs) => streamParse(rs, chDiscoverAttrRowSchema)),
         ]);
+        // Guard rollout/setup window: target tables may exist before MV pipeline is ready.
+        if (nameRows.length === 0 && attrRows.length === 0) {
+          useMV = false;
+        }
       } catch {
         // MV query failed — fall back to full-scan
         useMV = false;
       }
     }

Also applies to: 249-276

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/clickhouse-datasource/src/datasource.ts` around lines 207 - 226,
hasDiscoverMVs currently returns true if the two target tables exist, which can
false-positive during setup; update hasDiscoverMVs (and the analogous check at
the other occurrence) to also verify the tables contain data before enabling the
MV fast-path: after detecting DISCOVER_NAMES_TABLE and DISCOVER_ATTRS_TABLE, run
a lightweight count (e.g., SELECT count() or SELECT any() LIMIT 1) or equivalent
minimal row-existence check against both DISCOVER_NAMES_TABLE and
DISCOVER_ATTRS_TABLE and only return true when both exist and have at least one
row; modify the functions referencing hasDiscoverMVs accordingly so the
fast-path is gated by existence+non-empty checks instead of existence-only.


async discoverMetrics(options?: {
requestContext?: unknown;
}): Promise<datasource.MetricsDiscoveryResult> {
const ctx = options?.requestContext;
assertClickHouseRequestContext(ctx);
const { database, username, password } = ctx;

const { namesQuery, attributesQuery } = buildDiscoverMetricsQueries();
const auth = { username, password };
const http_headers = { "X-ClickHouse-Database": database };

const [nameRows, attrRows] = await Promise.all([
this.client
.query({ query: namesQuery, format: "JSONEachRow", auth, http_headers })
.then((rs) => streamParse(rs, chDiscoverNameRowSchema)),
this.client
.query({
query: attributesQuery,
format: "JSONEachRow",
auth,
http_headers,
})
.then((rs) => streamParse(rs, chDiscoverAttrRowSchema)),
]);
// Try MV fast path; degrade to full-scan on any failure
let nameRows: z.infer<typeof chDiscoverNameRowSchema>[] = [];
let attrRows: z.infer<typeof chDiscoverAttrRowSchema>[] = [];

let useMV = false;
try {
useMV = await this.hasDiscoverMVs({ username, password, database });
} catch {
// detection failed — fall through to full-scan
}

if (useMV) {
try {
const { namesQuery, attributesQuery } = buildDiscoverMetricsFromMV();
[nameRows, attrRows] = await Promise.all([
this.client
.query({
query: namesQuery,
format: "JSONEachRow",
auth,
http_headers,
})
.then((rs) => streamParse(rs, chDiscoverNameRowSchema)),
this.client
.query({
query: attributesQuery,
format: "JSONEachRow",
auth,
http_headers,
})
.then((rs) => streamParse(rs, chDiscoverAttrRowSchema)),
]);
} catch {
// MV query failed — fall back to full-scan
useMV = false;
}
}

if (!useMV) {
const { namesQuery, attributesQuery } = buildDiscoverMetricsQueries();
[nameRows, attrRows] = await Promise.all([
this.client
.query({
query: namesQuery,
format: "JSONEachRow",
auth,
http_headers,
})
.then((rs) => streamParse(rs, chDiscoverNameRowSchema)),
this.client
.query({
query: attributesQuery,
format: "JSONEachRow",
auth,
http_headers,
})
.then((rs) => streamParse(rs, chDiscoverAttrRowSchema)),
]);
}

// Build lookup map for attributes
const attrMap = new Map<
Expand Down
Loading