Implement optimized SQLite datasource with transactional writes#28
Conversation
- discover metrics cache
- store discoverMetrics in memory to avoid extremely slow query
📝 WalkthroughWalkthroughAdds an in-memory OptimizedDatasource wrapper and factory, renames core implementation to DbDatasource, converts metric/trace/log writes to explicit transactions, moves datasource instantiation to module scope in the app server, and makes attributeValue a recursive Zod/type supporting nested arrays and objects. Changes
Sequence DiagramsequenceDiagram
participant App as App Server
participant ODS as OptimizedDatasource
participant DDS as DbDatasource
participant Cache as In-Memory Discovery
participant DB as SQLite DB
rect rgba(100, 150, 200, 0.5)
Note over App,DB: Write Path
App->>ODS: writeMetrics(metrics)
ODS->>DDS: writeMetrics(metrics)
DDS->>DB: BEGIN TRANSACTION
DDS->>DB: INSERT/UPSERT metric/point rows
DDS->>DB: INSERT/UPSERT trace/log rows
DB-->>DDS: Success
DDS->>DB: COMMIT
ODS->>Cache: Update metric attributes & resource attrs
ODS-->>App: Write complete
end
rect rgba(150, 200, 100, 0.5)
Note over App,Cache: Discovery Path
App->>ODS: discoverMetrics(filter)
ODS->>Cache: Retrieve aggregated metrics state
Cache-->>ODS: Per-metric attributes (with truncation flags)
ODS-->>App: DiscoveredMetric[]
end
rect rgba(200, 150, 100, 0.5)
Note over App,DB: Read Path
App->>ODS: getTraces/getLogs/getMetrics(filter)
ODS->>DDS: getTraces/getLogs/getMetrics(filter)
DDS->>DB: SELECT rows
DB-->>DDS: Results
DDS-->>ODS: Typed results
ODS-->>App: Results
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/sqlite-datasource/src/db-datasource.ts (2)
44-165:⚠️ Potential issue | 🟠 MajorAvoid duplicate metric inserts across resourceMetrics.
Because the row buffers are defined once and the transaction is executed inside theresourceMetricsloop (Line 138), each iteration re-inserts all previously collected rows, duplicating data when there are multipleresourceMetrics. Consider moving the transaction outside the loop (single insert pass), or re-scope/reset buffers per iteration.🔧 Proposed fix (single transaction after row collection)
- for (const resourceMetric of metricsData.resourceMetrics ?? []) { + for (const resourceMetric of metricsData.resourceMetrics ?? []) { const { resource, schemaUrl: resourceSchemaUrl } = resourceMetric; ... - this.sqliteConnection.exec("BEGIN"); - try { - for (const { table, rows } of [ - { table: "otel_metrics_gauge" as const, rows: gaugeRows }, - ... - ]) { - for (const row of rows) { - const { sql, parameters } = queryBuilder - .insertInto(table) - .values(row) - .compile(); - this.sqliteConnection - .prepare(sql) - .run(...(parameters as (string | number | bigint | null)[])); - } - } - this.sqliteConnection.exec("COMMIT"); - } catch (error) { - this.sqliteConnection.exec("ROLLBACK"); - throw error; - } } + + this.sqliteConnection.exec("BEGIN"); + try { + for (const { table, rows } of [ + { table: "otel_metrics_gauge" as const, rows: gaugeRows }, + { table: "otel_metrics_sum" as const, rows: sumRows }, + { table: "otel_metrics_histogram" as const, rows: histogramRows }, + { + table: "otel_metrics_exponential_histogram" as const, + rows: expHistogramRows, + }, + { table: "otel_metrics_summary" as const, rows: summaryRows }, + ]) { + for (const row of rows) { + const { sql, parameters } = queryBuilder + .insertInto(table) + .values(row) + .compile(); + this.sqliteConnection + .prepare(sql) + .run(...(parameters as (string | number | bigint | null)[])); + } + } + this.sqliteConnection.exec("COMMIT"); + } catch (error) { + this.sqliteConnection.exec("ROLLBACK"); + throw error; + }
775-903:⚠️ Potential issue | 🟠 MajorDbDatasource.discoverMetrics() misses metrics with empty attributes/resourceAttributes.
The
json_each(Attributes)andjson_each(ResourceAttributes)queries produce no rows when those JSON objects are{}, causing metrics with completely empty attributes to be omitted from discovery results. While the application usesOptimizedDatasource(which maintains in-memory discovery state), the underlyingDbDatasource.discoverMetrics()method has this bug and should be fixed.Implement the proposed fix by seeding
discoveryStatewith a base metrics list queried withoutjson_each()before processing attribute tuples. Additionally, add a test onDbDatasourcedirectly (not justOptimizedDatasource) that inserts a metric with{}for bothAttributesandResourceAttributes, and verifies it appears in the discovery result.🔧 Proposed fix (seed base metrics before tuple aggregation)
// Build discovery state from DB const discoveryState = new Map<string, DiscoveryMetricState>(); + // Seed base metric list (covers empty Attributes/ResourceAttributes) + const baseMetricsSql = METRIC_TABLES.map( + ({ table, type }) => + `SELECT DISTINCT MetricName, MetricUnit, MetricDescription, '${type}' as MetricType + FROM ${table}` + ).join(" UNION ALL "); + + const baseMetrics = this.sqliteConnection.prepare(baseMetricsSql).all() as { + MetricName: string; + MetricUnit: string | null; + MetricDescription: string | null; + MetricType: datasource.MetricType; + }[]; + + for (const m of baseMetrics) { + const metricKey = `${m.MetricName}:${m.MetricType}`; + if (!discoveryState.has(metricKey)) { + discoveryState.set(metricKey, { + name: m.MetricName, + type: m.MetricType, + unit: m.MetricUnit || undefined, + description: m.MetricDescription || undefined, + attributes: new Map(), + resourceAttributes: new Map(), + }); + } + }
🤖 Fix all issues with AI agents
In `@packages/sqlite-datasource/src/optimized-datasource.ts`:
- Around line 233-242: The extractAnyValue function currently only returns
primitive fields and leaves arrayValue/kvlistValue as opaque objects; update
extractAnyValue to recursively handle nested AnyValue shapes: if v.arrayValue
exists, map each element through extractAnyValue and return an array; if
v.kvlistValue (or kvListValue) exists, convert the list of {key, value} entries
into an object mapping each key to extractAnyValue(entry.value); continue to
return stringValue/boolValue/intValue/doubleValue/bytesValue when present and
fall back to the original value only if no known fields match—this will match
the recursive behavior of anyValueToSimple and fix discoverMetrics for nested
attributes.
🧹 Nitpick comments (1)
packages/core/src/denormalized-signals-zod.ts (1)
10-18: Add a max-depth guard to avoid pathological recursion.
z.lazynow enables arbitrary nesting depth; a deeply nested payload can cause long parse times or a stack overflow. Consider enforcing a reasonable max depth.🛡️ Proposed depth guard
type AttributeValue = | string | number | boolean | AttributeValue[] | { [key: string]: AttributeValue }; +const MAX_ATTRIBUTE_DEPTH = 32; +const withinAttributeDepth = (value: AttributeValue, depth = 0): boolean => { + if (depth > MAX_ATTRIBUTE_DEPTH) return false; + if (Array.isArray(value)) { + return value.every((item) => withinAttributeDepth(item, depth + 1)); + } + if (value && typeof value === "object") { + return Object.values(value).every((item) => + withinAttributeDepth(item, depth + 1) + ); + } + return true; +}; + const attributeValue: z.ZodType<AttributeValue> = z.lazy(() => z.union([ z.string(), z.number(), z.boolean(), z.array(attributeValue), z.record(z.string(), attributeValue), ]) -); +).refine((value) => withinAttributeDepth(value), { + message: `AttributeValue exceeds max depth of ${MAX_ATTRIBUTE_DEPTH}`, +});
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Chores