diff --git a/.changeset/vast-cats-own.md b/.changeset/vast-cats-own.md new file mode 100644 index 0000000..638d7a9 --- /dev/null +++ b/.changeset/vast-cats-own.md @@ -0,0 +1,5 @@ +--- +"@kopai/clickhouse-datasource": patch +--- + +Add option to optimize metrics discover performance using materialized views diff --git a/packages/clickhouse-datasource/src/datasource.test.ts b/packages/clickhouse-datasource/src/datasource.test.ts index 4a5e97e..6a02047 100644 --- a/packages/clickhouse-datasource/src/datasource.test.ts +++ b/packages/clickhouse-datasource/src/datasource.test.ts @@ -8,6 +8,8 @@ import { type StartedTestContainer, } from "testcontainers"; import { ClickHouseReadDatasource } from "./datasource.js"; +import { getDiscoverMVSchema } from "./discover-mv-schema.js"; +import { DISCOVER_NAMES_TABLE, DISCOVER_ATTRS_TABLE } from "./query-metrics.js"; /** Returns the first element of an array, failing the test if the array is empty. */ function firstRow(data: T[]): T { @@ -1393,6 +1395,16 @@ describe("ClickHouseReadDatasource", () => { }); describe("discoverMetrics", () => { + beforeAll(async () => { + // Ensure no MV tables exist regardless of test ordering + await adminClient.command({ + query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}`, + }); + await adminClient.command({ + query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`, + }); + }); + it("discovers all metric names and types", async () => { const result = await ds.discoverMetrics({ requestContext: requestContext(), @@ -1508,6 +1520,230 @@ describe("ClickHouseReadDatasource", () => { }); }); + describe("discoverMetrics falls back without MVs", () => { + beforeAll(async () => { + // Ensure no MV tables exist regardless of test ordering + await adminClient.command({ + query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE}`, + }); + await adminClient.command({ + query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`, + }); + }); + + it("returns results via full-scan when MV tables do not exist", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + expect(result.metrics.length).toBeGreaterThan(0); + const names = result.metrics.map((m) => m.name).sort(); + expect(names).toContain("system.cpu.utilization"); + }); + + it("falls back when only names MV table exists", async () => { + const namesOnly = `CREATE TABLE IF NOT EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE} +(MetricName String, MetricType LowCardinality(String), MetricDescription String, MetricUnit String) +ENGINE = ReplacingMergeTree ORDER BY (MetricName, MetricType)`; + await adminClient.command({ query: namesOnly }); + + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + // Should still work via fallback + expect(result.metrics.length).toBeGreaterThan(0); + + await adminClient.command({ + query: `DROP TABLE IF EXISTS ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE}`, + }); + }); + }); + + describe("discoverMetrics with materialized views", () => { + beforeAll(async () => { + const schema = getDiscoverMVSchema(TEST_DATABASE); + + // Create target tables + for (const stmt of schema.targetTables) { + await adminClient.command({ query: stmt }); + } + + // Create materialized views + for (const stmt of schema.materializedViews) { + await adminClient.command({ query: stmt }); + } + + // Backfill MV target tables from existing source data + // (MVs only capture new inserts; existing data needs manual backfill) + const metricTypes = [ + { type: "Gauge", table: "otel_metrics_gauge" }, + { type: "Sum", table: "otel_metrics_sum" }, + { type: "Histogram", table: "otel_metrics_histogram" }, + { + type: "ExponentialHistogram", + table: "otel_metrics_exponential_histogram", + }, + { type: "Summary", table: "otel_metrics_summary" }, + ]; + for (const { type, table } of metricTypes) { + await adminClient.command({ + query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_NAMES_TABLE} +SELECT MetricName, '${type}' AS MetricType, MetricDescription, MetricUnit +FROM ${TEST_DATABASE}.${table}`, + }); + await adminClient.command({ + query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE} +SELECT MetricName, '${type}' AS MetricType, 'attr' AS source, attr_key, + groupUniqArrayState(101)(Attributes[attr_key]) AS attr_values +FROM ${TEST_DATABASE}.${table} +ARRAY JOIN mapKeys(Attributes) AS attr_key +WHERE notEmpty(Attributes) +GROUP BY MetricName, MetricType, source, attr_key`, + }); + await adminClient.command({ + query: `INSERT INTO ${TEST_DATABASE}.${DISCOVER_ATTRS_TABLE} +SELECT MetricName, '${type}' AS MetricType, 'res_attr' AS source, attr_key, + groupUniqArrayState(101)(ResourceAttributes[attr_key]) AS attr_values +FROM ${TEST_DATABASE}.${table} +ARRAY JOIN mapKeys(ResourceAttributes) AS attr_key +WHERE notEmpty(ResourceAttributes) +GROUP BY MetricName, MetricType, source, attr_key`, + }); + } + }); + + it("discovers all metric names via MV fast path", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + expect(result.metrics.length).toBe(8); + + const names = result.metrics.map((m) => m.name).sort(); + expect(names).toEqual([ + "dup.ts.gauge", + "http.server.request.count", + "http.server.request.duration", + "http.server.request.duration.exp", + "rpc.server.duration.summary", + "system.cpu.utilization", + "test.multi.attr", + "test.truncation.metric", + ]); + }); + + it("returns correct metric type via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const gauge = result.metrics.find( + (m) => m.name === "system.cpu.utilization" + ); + expect(gauge?.type).toBe("Gauge"); + expect(gauge?.unit).toBe("1"); + expect(gauge?.description).toBe("CPU utilization"); + }); + + it("returns attribute keys and values via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const gauge = result.metrics.find( + (m) => m.name === "system.cpu.utilization" + ); + expect(gauge?.attributes.values).toHaveProperty("cpu"); + expect(gauge?.attributes.values["cpu"]).toContain("0"); + }); + + it("returns resource attributes via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const gauge = result.metrics.find( + (m) => m.name === "system.cpu.utilization" + ); + expect(gauge?.resourceAttributes.values).toHaveProperty( + "service.version" + ); + }); + + it("sets _truncated when attribute values exceed 100 via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const metric = defined( + result.metrics.find((m) => m.name === "test.truncation.metric"), + "truncation metric" + ); + expect(metric.attributes._truncated).toBe(true); + const idxValues = defined(metric.attributes.values["idx"], "idx values"); + expect(idxValues.length).toBeLessThanOrEqual(100); + }); + + it("returns correct multi-attr keys via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const metric = defined( + result.metrics.find((m) => m.name === "test.multi.attr"), + "multi-attr metric" + ); + + const attrKeys = Object.keys(metric.attributes.values).sort(); + expect(attrKeys).toEqual(["env", "region", "tier"]); + expect(metric.attributes.values["region"]).toEqual(["us-east"]); + expect(metric.attributes.values["env"]).toEqual(["prod"]); + expect(metric.attributes.values["tier"]).toEqual(["premium"]); + + const resKeys = Object.keys(metric.resourceAttributes.values); + expect(resKeys).toEqual(["cloud.provider"]); + expect(metric.resourceAttributes.values["cloud.provider"]).toEqual([ + "aws", + ]); + }); + + it("does not set _truncated when within limit via MVs", async () => { + const result = await ds.discoverMetrics({ + requestContext: requestContext(), + }); + + const gauge = defined( + result.metrics.find((m) => m.name === "system.cpu.utilization"), + "gauge metric" + ); + expect(gauge.attributes._truncated).toBeUndefined(); + }); + }); + + describe("getDiscoverMVSchema validation", () => { + it("rejects database names with SQL injection", () => { + expect(() => getDiscoverMVSchema("db; DROP TABLE x")).toThrow( + /Invalid database name/ + ); + }); + + it("rejects empty database name", () => { + expect(() => getDiscoverMVSchema("")).toThrow(/Invalid database name/); + }); + + it("rejects database names starting with a digit", () => { + expect(() => getDiscoverMVSchema("1bad")).toThrow( + /Invalid database name/ + ); + }); + + it("accepts valid database names", () => { + expect(() => getDiscoverMVSchema("otel_default")).not.toThrow(); + expect(() => getDiscoverMVSchema("_private")).not.toThrow(); + }); + }); + describe("multi-tenant isolation", () => { it("routes traces to the correct database", async () => { const tenantA = await ds.getTraces({ diff --git a/packages/clickhouse-datasource/src/datasource.ts b/packages/clickhouse-datasource/src/datasource.ts index 8c48fd3..efec022 100644 --- a/packages/clickhouse-datasource/src/datasource.ts +++ b/packages/clickhouse-datasource/src/datasource.ts @@ -12,6 +12,10 @@ import { buildLogsQuery } from "./query-logs.js"; import { buildMetricsQuery, buildDiscoverMetricsQueries, + buildDiscoverMetricsFromMV, + buildDetectDiscoverMVQuery, + DISCOVER_NAMES_TABLE, + DISCOVER_ATTRS_TABLE, } from "./query-metrics.js"; import { parseChRow, @@ -196,6 +200,31 @@ export class ClickHouseReadDatasource return { data, nextCursor }; } + /** + * Detect whether both MV target tables exist in the given database. + * Returns true only if both names and attrs tables are present. + */ + private async hasDiscoverMVs(auth: { + username: string; + password: string; + database: string; + }): Promise { + const rs = await this.client.query({ + query: buildDetectDiscoverMVQuery(), + format: "JSONEachRow", + auth: { username: auth.username, password: auth.password }, + http_headers: { "X-ClickHouse-Database": auth.database }, + }); + const found = new Set(); + for await (const batch of rs.stream()) { + for (const row of batch) { + const json = row.json() as { name: string }; + found.add(json.name); + } + } + return found.has(DISCOVER_NAMES_TABLE) && found.has(DISCOVER_ATTRS_TABLE); + } + async discoverMetrics(options?: { requestContext?: unknown; }): Promise { @@ -203,23 +232,68 @@ export class ClickHouseReadDatasource assertClickHouseRequestContext(ctx); const { database, username, password } = ctx; - const { namesQuery, attributesQuery } = buildDiscoverMetricsQueries(); const auth = { username, password }; const http_headers = { "X-ClickHouse-Database": database }; - const [nameRows, attrRows] = await Promise.all([ - this.client - .query({ query: namesQuery, format: "JSONEachRow", auth, http_headers }) - .then((rs) => streamParse(rs, chDiscoverNameRowSchema)), - this.client - .query({ - query: attributesQuery, - format: "JSONEachRow", - auth, - http_headers, - }) - .then((rs) => streamParse(rs, chDiscoverAttrRowSchema)), - ]); + // Try MV fast path; degrade to full-scan on any failure + let nameRows: z.infer[] = []; + let attrRows: z.infer[] = []; + + let useMV = false; + try { + useMV = await this.hasDiscoverMVs({ username, password, database }); + } catch { + // detection failed — fall through to full-scan + } + + if (useMV) { + try { + const { namesQuery, attributesQuery } = buildDiscoverMetricsFromMV(); + [nameRows, attrRows] = await Promise.all([ + this.client + .query({ + query: namesQuery, + format: "JSONEachRow", + auth, + http_headers, + }) + .then((rs) => streamParse(rs, chDiscoverNameRowSchema)), + this.client + .query({ + query: attributesQuery, + format: "JSONEachRow", + auth, + http_headers, + }) + .then((rs) => streamParse(rs, chDiscoverAttrRowSchema)), + ]); + } catch { + // MV query failed — fall back to full-scan + useMV = false; + } + } + + if (!useMV) { + const { namesQuery, attributesQuery } = buildDiscoverMetricsQueries(); + [nameRows, attrRows] = await Promise.all([ + this.client + .query({ + query: namesQuery, + format: "JSONEachRow", + auth, + http_headers, + }) + .then((rs) => streamParse(rs, chDiscoverNameRowSchema)), + this.client + .query({ + query: attributesQuery, + format: "JSONEachRow", + auth, + http_headers, + }) + .then((rs) => streamParse(rs, chDiscoverAttrRowSchema)), + ]); + } // Build lookup map for attributes const attrMap = new Map< diff --git a/packages/clickhouse-datasource/src/discover-mv-schema.ts b/packages/clickhouse-datasource/src/discover-mv-schema.ts new file mode 100644 index 0000000..72ebd42 --- /dev/null +++ b/packages/clickhouse-datasource/src/discover-mv-schema.ts @@ -0,0 +1,109 @@ +/** + * DDL statements for optional materialized views that accelerate metrics discovery. + * + * These MVs are NOT required — the datasource falls back to full table scans + * when they don't exist. Create them for near-instant discover on large datasets. + * + * Usage: + * 1. Replace `otel_default` with your database name + * 2. For replicated setups, replace `ReplacingMergeTree` with + * `ReplicatedReplacingMergeTree` and `AggregatingMergeTree` with + * `ReplicatedAggregatingMergeTree` + * 3. Run target table DDLs first, then MVs + * + * See ADR-044 for design rationale and benchmarks. + */ + +import { + DISCOVER_NAMES_TABLE, + DISCOVER_ATTRS_TABLE, + METRIC_TABLES, +} from "./query-metrics.js"; + +const DB_IDENTIFIER_RE = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + +function targetTableDDL(db: string): string[] { + return [ + `CREATE TABLE IF NOT EXISTS ${db}.${DISCOVER_NAMES_TABLE} +( + \`MetricName\` String CODEC(ZSTD(1)), + \`MetricType\` LowCardinality(String) CODEC(ZSTD(1)), + \`MetricDescription\` String CODEC(ZSTD(1)), + \`MetricUnit\` String CODEC(ZSTD(1)) +) +ENGINE = ReplacingMergeTree +ORDER BY (MetricName, MetricType) +SETTINGS index_granularity = 8192`, + + `CREATE TABLE IF NOT EXISTS ${db}.${DISCOVER_ATTRS_TABLE} +( + \`MetricName\` String CODEC(ZSTD(1)), + \`MetricType\` LowCardinality(String) CODEC(ZSTD(1)), + \`source\` LowCardinality(String) CODEC(ZSTD(1)), + \`attr_key\` LowCardinality(String) CODEC(ZSTD(1)), + \`attr_values\` AggregateFunction(groupUniqArray(101), String) CODEC(ZSTD(1)) +) +ENGINE = AggregatingMergeTree +ORDER BY (MetricName, MetricType, source, attr_key) +SETTINGS index_granularity = 8192`, + ]; +} + +function materializedViewDDL(db: string): string[] { + const stmts: string[] = []; + + for (const { type, table } of METRIC_TABLES) { + // Names MV + stmts.push( + `CREATE MATERIALIZED VIEW IF NOT EXISTS ${db}.${DISCOVER_NAMES_TABLE}_${table.replace("otel_metrics_", "")}_mv +TO ${db}.${DISCOVER_NAMES_TABLE} +AS SELECT MetricName, '${type}' AS MetricType, MetricDescription, MetricUnit +FROM ${db}.${table}` + ); + + // Attributes MV + stmts.push( + `CREATE MATERIALIZED VIEW IF NOT EXISTS ${db}.${DISCOVER_ATTRS_TABLE}_${table.replace("otel_metrics_", "")}_attr_mv +TO ${db}.${DISCOVER_ATTRS_TABLE} +AS SELECT MetricName, '${type}' AS MetricType, 'attr' AS source, attr_key, + groupUniqArrayState(101)(Attributes[attr_key]) AS attr_values +FROM ${db}.${table} +ARRAY JOIN mapKeys(Attributes) AS attr_key +WHERE notEmpty(Attributes) +GROUP BY MetricName, MetricType, source, attr_key` + ); + + // ResourceAttributes MV + stmts.push( + `CREATE MATERIALIZED VIEW IF NOT EXISTS ${db}.${DISCOVER_ATTRS_TABLE}_${table.replace("otel_metrics_", "")}_res_attr_mv +TO ${db}.${DISCOVER_ATTRS_TABLE} +AS SELECT MetricName, '${type}' AS MetricType, 'res_attr' AS source, attr_key, + groupUniqArrayState(101)(ResourceAttributes[attr_key]) AS attr_values +FROM ${db}.${table} +ARRAY JOIN mapKeys(ResourceAttributes) AS attr_key +WHERE notEmpty(ResourceAttributes) +GROUP BY MetricName, MetricType, source, attr_key` + ); + } + + return stmts; +} + +/** + * Generate all DDL statements needed to set up metrics discover MVs. + * + * @param database - The ClickHouse database name (e.g., "otel_default") + * @returns Object with arrays of SQL statements for each phase + */ +export function getDiscoverMVSchema(database: string): { + targetTables: string[]; + materializedViews: string[]; +} { + if (!DB_IDENTIFIER_RE.test(database)) { + throw new Error(`Invalid database name: ${database}`); + } + return { + targetTables: targetTableDDL(database), + materializedViews: materializedViewDDL(database), + }; +} diff --git a/packages/clickhouse-datasource/src/index.ts b/packages/clickhouse-datasource/src/index.ts index 66a3f86..8da9fcc 100644 --- a/packages/clickhouse-datasource/src/index.ts +++ b/packages/clickhouse-datasource/src/index.ts @@ -1,3 +1,4 @@ export { ClickHouseReadDatasource } from "./datasource.js"; export type { ClickHouseRequestContext } from "./types.js"; export { assertClickHouseRequestContext } from "./types.js"; +export { getDiscoverMVSchema } from "./discover-mv-schema.js"; diff --git a/packages/clickhouse-datasource/src/query-metrics.ts b/packages/clickhouse-datasource/src/query-metrics.ts index 69fbbf1..0feee4d 100644 --- a/packages/clickhouse-datasource/src/query-metrics.ts +++ b/packages/clickhouse-datasource/src/query-metrics.ts @@ -1,13 +1,17 @@ import type { dataFilterSchemas, datasource } from "@kopai/core"; import { nanosToDateTime64 } from "./timestamp.js"; -const TABLE_MAP: Record = { - Gauge: "otel_metrics_gauge", - Sum: "otel_metrics_sum", - Histogram: "otel_metrics_histogram", - ExponentialHistogram: "otel_metrics_exponential_histogram", - Summary: "otel_metrics_summary", -}; +export const METRIC_TABLES = [ + { type: "Gauge", table: "otel_metrics_gauge" }, + { type: "Sum", table: "otel_metrics_sum" }, + { type: "Histogram", table: "otel_metrics_histogram" }, + { type: "ExponentialHistogram", table: "otel_metrics_exponential_histogram" }, + { type: "Summary", table: "otel_metrics_summary" }, +] as const; + +const TABLE_MAP: Record = Object.fromEntries( + METRIC_TABLES.map(({ type, table }) => [type, table]) +) as Record; const COMMON_COLUMNS = [ "ResourceAttributes", @@ -188,8 +192,46 @@ LIMIT {limit:UInt32}`; return { query, params }; } +// --------------------------------------------------------------------------- +// Materialized-view target table names for metrics discovery. +// When these tables exist, discoverMetrics uses them for near-instant results. +// --------------------------------------------------------------------------- + +export const DISCOVER_NAMES_TABLE = "otel_metrics_discover_names"; +export const DISCOVER_ATTRS_TABLE = "otel_metrics_discover_attrs"; + +/** + * Query to detect whether the MV target tables exist in the current database. + * Returns rows with a `name` column for each table found. + */ +export function buildDetectDiscoverMVQuery(): string { + return `SELECT name FROM system.tables WHERE database = currentDatabase() AND name IN ('${DISCOVER_NAMES_TABLE}', '${DISCOVER_ATTRS_TABLE}')`; +} + +/** + * Build queries that read from the MV target tables. + */ +export function buildDiscoverMetricsFromMV(): { + namesQuery: string; + attributesQuery: string; +} { + const namesQuery = ` +SELECT MetricName, MetricType, MetricDescription, MetricUnit +FROM ${DISCOVER_NAMES_TABLE} FINAL +ORDER BY MetricName, MetricType`; + + const attributesQuery = ` +SELECT MetricName, MetricType, source, attr_key, + groupUniqArrayMerge(101)(attr_values) AS attr_values +FROM ${DISCOVER_ATTRS_TABLE} +GROUP BY MetricName, MetricType, source, attr_key +ORDER BY MetricName, MetricType, source, attr_key`; + + return { namesQuery, attributesQuery }; +} + /** - * Build the two queries for discoverMetrics. + * Build the two queries for discoverMetrics (full table scan fallback). */ export function buildDiscoverMetricsQueries(): { namesQuery: string;