diff --git a/packages/o11ylogsdb/test/chunk.test.ts b/packages/o11ylogsdb/test/chunk.test.ts new file mode 100644 index 00000000..35412f67 --- /dev/null +++ b/packages/o11ylogsdb/test/chunk.test.ts @@ -0,0 +1,180 @@ +import { describe, expect, it } from "vitest"; + +import { + CHUNK_VERSION, + ChunkBuilder, + type ChunkPolicy, + chunkWireSize, + DefaultChunkPolicy, + deserializeChunk, + readRecords, + serializeChunk, +} from "../src/chunk.js"; +import { defaultRegistry } from "../src/codec-baseline.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const resource: Resource = { attributes: [{ key: "service.name", value: "test" }] }; +const scope: InstrumentationScope = { name: "test-scope" }; +const registry = defaultRegistry(); + +function rec(partial: Partial & { timeUnixNano: bigint }): LogRecord { + return { + severityNumber: 9, + severityText: "INFO", + body: "hello", + attributes: [], + ...partial, + }; +} + +describe("ChunkBuilder", () => { + it("freezes an empty chunk with sentinel severity range and zero time", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + const chunk = builder.freeze(); + expect(chunk.header.nLogs).toBe(0); + expect(chunk.header.severityRange).toEqual({ min: 1, max: 24 }); + expect(chunk.header.timeRange).toEqual({ minNano: "0", maxNano: "0" }); + }); + + it("computes time range from first/last record", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + builder.append(rec({ timeUnixNano: 100n })); + builder.append(rec({ timeUnixNano: 200n })); + builder.append(rec({ timeUnixNano: 300n })); + const chunk = builder.freeze(); + expect(chunk.header.timeRange).toEqual({ minNano: "100", maxNano: "300" }); + }); + + it("computes severity range across all records", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + builder.append(rec({ timeUnixNano: 1n, severityNumber: 17 })); // ERROR + builder.append(rec({ timeUnixNano: 2n, severityNumber: 5 })); // DEBUG + builder.append(rec({ timeUnixNano: 3n, severityNumber: 9 })); // INFO + const chunk = builder.freeze(); + expect(chunk.header.severityRange).toEqual({ min: 5, max: 17 }); + }); + + it("hoists resource and scope into the header", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + builder.append(rec({ timeUnixNano: 1n })); + const chunk = builder.freeze(); + expect(chunk.header.resource).toEqual(resource); + expect(chunk.header.scope).toEqual(scope); + }); + + it("size() reflects appended records and reset() clears them", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + expect(builder.size()).toBe(0); + builder.append(rec({ timeUnixNano: 1n })); + builder.append(rec({ timeUnixNano: 2n })); + expect(builder.size()).toBe(2); + builder.reset(); + expect(builder.size()).toBe(0); + }); +}); + +describe("chunk wire format", () => { + it("round-trips a chunk through serialize/deserialize", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + builder.append(rec({ timeUnixNano: 1n, body: "first" })); + builder.append(rec({ timeUnixNano: 2n, body: "second" })); + const chunk = builder.freeze(); + const bytes = serializeChunk(chunk); + const parsed = deserializeChunk(bytes); + expect(parsed.header).toEqual(chunk.header); + expect(Array.from(parsed.payload)).toEqual(Array.from(chunk.payload)); + }); + + it("rejects chunks with bad magic bytes", () => { + const bytes = new Uint8Array([0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00]); + expect(() => deserializeChunk(bytes)).toThrow(/bad chunk magic/); + }); + + it("schemaVersion in the header equals CHUNK_VERSION", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + const chunk = builder.freeze(); + expect(chunk.header.schemaVersion).toBe(CHUNK_VERSION); + }); +}); + +describe("chunkWireSize", () => { + it("matches serializeChunk(c).length without materializing the buffer", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + for (let i = 0; i < 10; i++) { + builder.append(rec({ timeUnixNano: BigInt(i), body: `record-${i}` })); + } + const chunk = builder.freeze(); + expect(chunkWireSize(chunk)).toBe(serializeChunk(chunk).length); + }); +}); + +describe("readRecords with default policy", () => { + it("round-trips records through NDJSON encode/decode", () => { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + const inputs = [ + rec({ timeUnixNano: 100n, body: "alpha", severityNumber: 9 }), + rec({ timeUnixNano: 200n, body: "beta", severityNumber: 13 }), + ]; + for (const r of inputs) builder.append(r); + const chunk = builder.freeze(); + const records = readRecords(chunk, registry); + expect(records.length).toBe(2); + expect(records[0]?.body).toBe("alpha"); + expect(records[0]?.timeUnixNano).toBe(100n); + expect(records[1]?.body).toBe("beta"); + expect(records[1]?.severityNumber).toBe(13); + }); +}); + +describe("ChunkPolicy hooks", () => { + it("preEncode/postDecode round-trips a meta blob", () => { + const seenMeta: unknown[] = []; + const policy: ChunkPolicy = { + bodyCodec: () => "zstd-19", + preEncode: (records) => ({ records, meta: { tag: "preEncodeMeta" } }), + postDecode: (records, meta) => { + seenMeta.push(meta); + return records; + }, + }; + const builder = new ChunkBuilder(resource, scope, policy, registry); + builder.append(rec({ timeUnixNano: 1n })); + const chunk = builder.freeze(); + expect(chunk.header.codecMeta).toEqual({ tag: "preEncodeMeta" }); + readRecords(chunk, registry, policy); + expect(seenMeta).toEqual([{ tag: "preEncodeMeta" }]); + }); + + it("encodePayload/decodePayload bypasses NDJSON entirely", () => { + let encodeCalls = 0; + let decodeCalls = 0; + const policy: ChunkPolicy = { + bodyCodec: () => "raw", + encodePayload: (records) => { + encodeCalls++; + const json = JSON.stringify(records.map((r) => r.body)); + return { payload: new TextEncoder().encode(json), meta: { kind: "binary" } }; + }, + decodePayload: (buf, _n, meta) => { + decodeCalls++; + expect(meta).toEqual({ kind: "binary" }); + const bodies = JSON.parse(new TextDecoder().decode(buf)) as string[]; + return bodies.map((body, i) => ({ + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body, + attributes: [], + })); + }, + }; + const builder = new ChunkBuilder(resource, scope, policy, registry); + builder.append(rec({ timeUnixNano: 1n, body: "x" })); + builder.append(rec({ timeUnixNano: 2n, body: "y" })); + const chunk = builder.freeze(); + expect(encodeCalls).toBe(1); + const records = readRecords(chunk, registry, policy); + expect(decodeCalls).toBe(1); + expect(records.map((r) => r.body)).toEqual(["x", "y"]); + }); +}); diff --git a/packages/o11ylogsdb/test/compact.test.ts b/packages/o11ylogsdb/test/compact.test.ts new file mode 100644 index 00000000..fdecb113 --- /dev/null +++ b/packages/o11ylogsdb/test/compact.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; + +import { ChunkBuilder, DefaultChunkPolicy, readRecords } from "../src/chunk.js"; +import { defaultRegistry } from "../src/codec-baseline.js"; +import { compactChunk } from "../src/compact.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const resource: Resource = { attributes: [{ key: "service.name", value: "test" }] }; +const scope: InstrumentationScope = { name: "test-scope" }; +const registry = defaultRegistry(); + +function buildZ19Chunk(n: number) { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy("zstd-19"), registry); + for (let i = 0; i < n; i++) { + const r: LogRecord = { + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body: `record ${i} payload-payload-payload-payload`, + attributes: [], + }; + builder.append(r); + } + return builder.freeze(); +} + +describe("compactChunk", () => { + it("re-encodes the payload under a new codec and preserves records", () => { + const z19 = buildZ19Chunk(50); + const { chunk: z3 } = compactChunk(z19, registry, "zstd-3"); + expect(z3.header.codecName).toBe("zstd-3"); + expect(z3.header.payloadBytes).toBe(z3.payload.length); + // Round-trip equivalence — same record sequence. + const before = readRecords(z19, registry); + const after = readRecords(z3, registry); + expect(after.length).toBe(before.length); + for (let i = 0; i < before.length; i++) { + expect(after[i]?.body).toBe(before[i]?.body); + expect(after[i]?.timeUnixNano).toBe(before[i]?.timeUnixNano); + } + }); + + it("reports stats matching the new and old payload sizes", () => { + const z19 = buildZ19Chunk(50); + const { chunk: z3, stats } = compactChunk(z19, registry, "zstd-3"); + expect(stats.inputBytes).toBe(z19.payload.length); + expect(stats.outputBytes).toBe(z3.payload.length); + expect(stats.decodeMillis).toBeGreaterThanOrEqual(0); + expect(stats.encodeMillis).toBeGreaterThanOrEqual(0); + }); + + it("is a no-op when target codec equals current codec", () => { + const z19 = buildZ19Chunk(10); + const { chunk, stats } = compactChunk(z19, registry, "zstd-19"); + expect(chunk).toBe(z19); + expect(stats.decodeMillis).toBe(0); + expect(stats.encodeMillis).toBe(0); + }); + + it("does not mutate the input chunk", () => { + const z19 = buildZ19Chunk(10); + const originalCodec = z19.header.codecName; + const originalPayloadLen = z19.payload.length; + compactChunk(z19, registry, "zstd-3"); + expect(z19.header.codecName).toBe(originalCodec); + expect(z19.payload.length).toBe(originalPayloadLen); + }); +}); diff --git a/packages/o11ylogsdb/test/drain.test.ts b/packages/o11ylogsdb/test/drain.test.ts new file mode 100644 index 00000000..885b784b --- /dev/null +++ b/packages/o11ylogsdb/test/drain.test.ts @@ -0,0 +1,138 @@ +import { describe, expect, it } from "vitest"; + +import { + DRAIN_DEFAULT_CONFIG, + Drain, + mergeTemplate, + PARAM_STR, + similarity, + tokenize, +} from "../src/drain.js"; + +describe("tokenize", () => { + it("splits on whitespace runs", () => { + expect(tokenize("foo bar baz")).toEqual(["foo", "bar", "baz"]); + }); + + it("returns an empty array for empty/whitespace-only input", () => { + expect(tokenize("")).toEqual([]); + expect(tokenize(" ")).toEqual([]); + }); +}); + +describe("similarity", () => { + it("returns 1.0 for identical token lists with no params", () => { + const [sim, paramCount] = similarity(["a", "b", "c"], ["a", "b", "c"]); + expect(sim).toBe(1); + expect(paramCount).toBe(0); + }); + + it("treats wildcard positions as non-matches in the numerator", () => { + const [sim, paramCount] = similarity([PARAM_STR, "b", "c"], ["a", "b", "c"]); + expect(sim).toBeCloseTo(2 / 3); + expect(paramCount).toBe(1); + }); + + it("returns 0 when no positions match", () => { + const [sim] = similarity(["a", "b", "c"], ["x", "y", "z"]); + expect(sim).toBe(0); + }); +}); + +describe("mergeTemplate", () => { + it("replaces mismatched positions with PARAM_STR and reports change", () => { + const tpl = ["GET", "/api/users/123", "200"]; + const changed = mergeTemplate(tpl, ["GET", "/api/users/456", "200"]); + expect(tpl).toEqual(["GET", PARAM_STR, "200"]); + expect(changed).toBe(true); + }); + + it("returns false when nothing changes", () => { + const tpl = ["GET", "/health", "200"]; + const changed = mergeTemplate(tpl, ["GET", "/health", "200"]); + expect(changed).toBe(false); + }); +}); + +describe("Drain.matchOrAdd", () => { + it("creates a new cluster for the first line of a shape", () => { + const drain = new Drain(); + const r = drain.matchOrAdd("user 42 logged in"); + expect(r.isNew).toBe(true); + expect(r.templateId).toBe(1); + expect(drain.templateCount()).toBe(1); + }); + + it("matches a similar second line and merges variable positions", () => { + const drain = new Drain(); + const a = drain.matchOrAdd("user 42 logged in"); + const b = drain.matchOrAdd("user 99 logged in"); + expect(b.isNew).toBe(false); + expect(b.templateId).toBe(a.templateId); + expect(b.vars).toEqual(["99"]); + expect(drain.templateCount()).toBe(1); + }); + + it("creates a separate cluster for a different token-count shape", () => { + const drain = new Drain(); + drain.matchOrAdd("user 42 logged in"); + drain.matchOrAdd("connection lost"); + expect(drain.templateCount()).toBe(2); + }); + + it("emits stable, sequential cluster ids", () => { + const drain = new Drain(); + expect(drain.matchOrAdd("a x b").templateId).toBe(1); + expect(drain.matchOrAdd("c y d").templateId).toBe(2); + expect(drain.matchOrAdd("a z b").templateId).toBe(1); + }); +}); + +describe("Drain.matchTemplate", () => { + it("returns undefined for a never-seen line", () => { + const drain = new Drain(); + expect(drain.matchTemplate("nothing here")).toBeUndefined(); + }); + + it("does not mutate state", () => { + const drain = new Drain(); + drain.matchOrAdd("user 42 logged in"); + const before = drain.templateCount(); + drain.matchTemplate("user 99 logged in"); + drain.matchTemplate("brand new line never seen"); + expect(drain.templateCount()).toBe(before); + }); +}); + +describe("Drain.reconstruct", () => { + it("produces a single-space-joined line that round-trips simple templates", () => { + const drain = new Drain(); + drain.matchOrAdd("user 42 logged in"); + const r = drain.matchOrAdd("user 99 logged in"); + const cluster = [...drain.templates()][0]; + expect(cluster).toBeDefined(); + const tokens = (cluster?.template ?? "").split(" "); + expect(Drain.reconstruct(tokens, r.vars)).toBe("user 99 logged in"); + }); + + it("normalizes runs of whitespace to single spaces (the documented contract)", () => { + const drain = new Drain(); + drain.matchOrAdd("user 42 logged in"); + const r = drain.matchOrAdd("user 99 logged in"); + const cluster = [...drain.templates()][0]; + expect(cluster).toBeDefined(); + const tokens = (cluster?.template ?? "").split(" "); + expect(Drain.reconstruct(tokens, r.vars)).toBe("user 99 logged in"); + }); +}); + +describe("DRAIN_DEFAULT_CONFIG", () => { + it("matches the published reference defaults", () => { + expect(DRAIN_DEFAULT_CONFIG).toEqual({ + depth: 4, + simTh: 0.4, + maxChildren: 100, + parametrizeNumericTokens: true, + }); + }); +}); diff --git a/packages/o11ylogsdb/test/engine.test.ts b/packages/o11ylogsdb/test/engine.test.ts new file mode 100644 index 00000000..197e0d3d --- /dev/null +++ b/packages/o11ylogsdb/test/engine.test.ts @@ -0,0 +1,126 @@ +import { describe, expect, it } from "vitest"; + +import { DefaultChunkPolicy } from "../src/chunk.js"; +import { LogStore } from "../src/engine.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const resource: Resource = { attributes: [{ key: "service.name", value: "test" }] }; +const scope: InstrumentationScope = { name: "test-scope" }; + +function rec(partial: Partial & { timeUnixNano: bigint }): LogRecord { + return { + severityNumber: 9, + severityText: "INFO", + body: "hello", + attributes: [], + ...partial, + }; +} + +describe("LogStore.append", () => { + it("interns one stream per (resource, scope) tuple", () => { + const store = new LogStore(); + store.append(resource, scope, rec({ timeUnixNano: 1n })); + store.append(resource, scope, rec({ timeUnixNano: 2n })); + expect(store.streams.size()).toBe(1); + }); + + it("interns separate streams for different resources", () => { + const store = new LogStore(); + store.append( + { attributes: [{ key: "service.name", value: "a" }] }, + scope, + rec({ timeUnixNano: 1n }) + ); + store.append( + { attributes: [{ key: "service.name", value: "b" }] }, + scope, + rec({ timeUnixNano: 2n }) + ); + expect(store.streams.size()).toBe(2); + }); + + it("freezes a chunk when rowsPerChunk is reached", () => { + const store = new LogStore({ rowsPerChunk: 4 }); + for (let i = 0; i < 4; i++) { + store.append(resource, scope, rec({ timeUnixNano: BigInt(i) })); + } + const stats = store.stats(); + expect(stats.chunks).toBe(1); + expect(stats.totalLogs).toBe(4); + }); + + it("does not freeze in-flight chunks until flush", () => { + const store = new LogStore({ rowsPerChunk: 1024 }); + for (let i = 0; i < 5; i++) { + store.append(resource, scope, rec({ timeUnixNano: BigInt(i) })); + } + expect(store.stats().chunks).toBe(0); + store.flush(); + expect(store.stats().chunks).toBe(1); + expect(store.stats().totalLogs).toBe(5); + }); +}); + +describe("LogStore.iterRecords", () => { + it("round-trips appended records via the default policy", () => { + const store = new LogStore({ rowsPerChunk: 4 }); + const inputs = [1n, 2n, 3n, 4n, 5n]; + for (const t of inputs) store.append(resource, scope, rec({ timeUnixNano: t, body: `r${t}` })); + store.flush(); + const collected: LogRecord[] = []; + for (const { records } of store.iterRecords()) collected.push(...records); + expect(collected.length).toBe(inputs.length); + expect(collected.map((r) => r.timeUnixNano)).toEqual(inputs); + }); +}); + +describe("LogStore.stats", () => { + it("reports zero bytes per log when empty", () => { + const store = new LogStore(); + const stats = store.stats(); + expect(stats.streams).toBe(0); + expect(stats.totalLogs).toBe(0); + expect(stats.bytesPerLog).toBe(0); + }); + + it("counts every chunk's wire size, including header overhead", () => { + const store = new LogStore({ rowsPerChunk: 4 }); + for (let i = 0; i < 8; i++) { + store.append(resource, scope, rec({ timeUnixNano: BigInt(i) })); + } + const stats = store.stats(); + expect(stats.chunks).toBe(2); + expect(stats.totalLogs).toBe(8); + expect(stats.totalChunkBytes).toBeGreaterThan(0); + expect(stats.bytesPerLog).toBeCloseTo(stats.totalChunkBytes / stats.totalLogs); + }); +}); + +describe("LogStore.policyFactory", () => { + it("creates exactly one policy per stream and reuses it across chunks", () => { + const calls: string[] = []; + const store = new LogStore({ + rowsPerChunk: 2, + policyFactory: (_id, r) => { + const name = r.attributes.find((kv) => kv.key === "service.name")?.value ?? "?"; + calls.push(String(name)); + return new DefaultChunkPolicy(); + }, + }); + const resA: Resource = { attributes: [{ key: "service.name", value: "A" }] }; + const resB: Resource = { attributes: [{ key: "service.name", value: "B" }] }; + // 4 records on A → 2 chunks via the same policy instance. + for (let i = 0; i < 4; i++) store.append(resA, scope, rec({ timeUnixNano: BigInt(i) })); + // 2 records on B → 1 chunk via a *separate* policy instance. + for (let i = 0; i < 2; i++) store.append(resB, scope, rec({ timeUnixNano: BigInt(i) })); + expect(calls).toEqual(["A", "B"]); + }); + + it("falls back to the configured policy when no factory is set", () => { + const policy = new DefaultChunkPolicy(); + const store = new LogStore({ policy }); + const id = store.streams.intern(resource, scope); + expect(store.policyFor(id)).toBe(policy); + }); +}); diff --git a/packages/o11ylogsdb/test/policy-roundtrip.test.ts b/packages/o11ylogsdb/test/policy-roundtrip.test.ts new file mode 100644 index 00000000..0ef208df --- /dev/null +++ b/packages/o11ylogsdb/test/policy-roundtrip.test.ts @@ -0,0 +1,174 @@ +import { describe, expect, it } from "vitest"; + +import { ChunkBuilder, type ChunkPolicy, DefaultChunkPolicy, readRecords } from "../src/chunk.js"; +import { defaultRegistry } from "../src/codec-baseline.js"; +import { ColumnarDrainPolicy, ColumnarRawPolicy } from "../src/codec-columnar.js"; +import { DrainChunkPolicy } from "../src/codec-drain.js"; +import { TypedColumnarDrainPolicy } from "../src/codec-typed.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const resource: Resource = { attributes: [{ key: "service.name", value: "test" }] }; +const scope: InstrumentationScope = { name: "test-scope" }; +const registry = defaultRegistry(); + +function makeRecords(): LogRecord[] { + // Mix of templated lines (similar shape) and a varied body — exercises + // every policy's body classifier path. + const records: LogRecord[] = []; + const userIds = ["alice", "bob", "carol", "dave", "eve"]; + for (let i = 0; i < 60; i++) { + records.push({ + timeUnixNano: BigInt(1_000_000_000 + i * 1000), + severityNumber: i % 3 === 0 ? 13 : 9, + severityText: i % 3 === 0 ? "WARN" : "INFO", + body: `user ${userIds[i % userIds.length]} request ${i} completed`, + attributes: [{ key: "host", value: `node-${i % 3}` }], + }); + } + return records; +} + +function freezeWith(policy: ChunkPolicy, records: readonly LogRecord[]) { + const builder = new ChunkBuilder(resource, scope, policy, registry); + for (const r of records) builder.append(r); + return builder.freeze(); +} + +interface PolicyCase { + name: string; + make: () => ChunkPolicy; +} + +const policies: PolicyCase[] = [ + { name: "DefaultChunkPolicy", make: () => new DefaultChunkPolicy() }, + { name: "ColumnarRawPolicy", make: () => new ColumnarRawPolicy() }, + { name: "ColumnarDrainPolicy", make: () => new ColumnarDrainPolicy() }, + { name: "DrainChunkPolicy", make: () => new DrainChunkPolicy() }, + { name: "TypedColumnarDrainPolicy", make: () => new TypedColumnarDrainPolicy() }, +]; + +describe.each(policies)("$name", ({ make }) => { + it("round-trips a templated-text chunk", () => { + const policy = make(); + const records = makeRecords(); + const chunk = freezeWith(policy, records); + const decoded = readRecords(chunk, registry, policy); + expect(decoded.length).toBe(records.length); + for (let i = 0; i < records.length; i++) { + expect(decoded[i]?.timeUnixNano).toBe(records[i]?.timeUnixNano); + expect(decoded[i]?.severityNumber).toBe(records[i]?.severityNumber); + // Body is canonicalized through Drain whitespace normalization on + // policies that template; our records have only single spaces, so + // strict equality holds for the templated path too. + expect(decoded[i]?.body).toBe(records[i]?.body); + } + }); + + it("round-trips an empty chunk", () => { + const policy = make(); + const chunk = freezeWith(policy, []); + const decoded = readRecords(chunk, registry, policy); + expect(decoded.length).toBe(0); + }); + + it("round-trips a single-record chunk", () => { + const policy = make(); + const record: LogRecord = { + timeUnixNano: 42n, + severityNumber: 9, + severityText: "INFO", + body: "single record", + attributes: [], + }; + const chunk = freezeWith(policy, [record]); + const decoded = readRecords(chunk, registry, policy); + expect(decoded.length).toBe(1); + expect(decoded[0]?.body).toBe(record.body); + expect(decoded[0]?.timeUnixNano).toBe(record.timeUnixNano); + }); +}); + +describe("TypedColumnarDrainPolicy slot detectors", () => { + it("round-trips PREFIXED_INT64 slots (e.g., blk_)", () => { + const policy = new TypedColumnarDrainPolicy(); + const records: LogRecord[] = []; + for (let i = 0; i < 80; i++) { + records.push({ + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body: `block blk_${1_000_000 + i} replicated to node 7`, + attributes: [], + }); + } + const chunk = freezeWith(policy, records); + const decoded = readRecords(chunk, registry, policy); + expect(decoded.length).toBe(records.length); + for (let i = 0; i < records.length; i++) { + expect(decoded[i]?.body).toBe(records[i]?.body); + } + }); + + it("round-trips UUID slots", () => { + const policy = new TypedColumnarDrainPolicy(); + const records: LogRecord[] = []; + const uuids = [ + "550e8400-e29b-41d4-a716-446655440000", + "550e8400-e29b-41d4-a716-446655440001", + "550e8400-e29b-41d4-a716-446655440002", + ]; + for (let i = 0; i < 80; i++) { + records.push({ + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body: `request ${uuids[i % uuids.length]} accepted`, + attributes: [], + }); + } + const chunk = freezeWith(policy, records); + const decoded = readRecords(chunk, registry, policy); + for (let i = 0; i < records.length; i++) { + expect(decoded[i]?.body).toBe(records[i]?.body); + } + }); + + it("round-trips SIGNED_INT slots", () => { + const policy = new TypedColumnarDrainPolicy(); + const records: LogRecord[] = []; + for (let i = 0; i < 80; i++) { + records.push({ + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body: `processed ${i * 7 - 13} items in pool`, + attributes: [], + }); + } + const chunk = freezeWith(policy, records); + const decoded = readRecords(chunk, registry, policy); + for (let i = 0; i < records.length; i++) { + expect(decoded[i]?.body).toBe(records[i]?.body); + } + }); + + it("falls back to STRING for slots below the typed threshold", () => { + // Below TYPED_SLOT_MIN_RECORDS=50; should still round-trip. + const policy = new TypedColumnarDrainPolicy(); + const records: LogRecord[] = []; + for (let i = 0; i < 10; i++) { + records.push({ + timeUnixNano: BigInt(i), + severityNumber: 9, + severityText: "INFO", + body: `block blk_${1_000_000 + i} replicated to node 7`, + attributes: [], + }); + } + const chunk = freezeWith(policy, records); + const decoded = readRecords(chunk, registry, policy); + for (let i = 0; i < records.length; i++) { + expect(decoded[i]?.body).toBe(records[i]?.body); + } + }); +}); diff --git a/packages/o11ylogsdb/test/public-api.test.ts b/packages/o11ylogsdb/test/public-api.test.ts new file mode 100644 index 00000000..0125af3f --- /dev/null +++ b/packages/o11ylogsdb/test/public-api.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it } from "vitest"; + +import * as o11ylogsdb from "../src/index.js"; + +// Smoke test: the public API exports the symbols downstream code expects. +// If a refactor accidentally drops or renames an export, this fails fast. +describe("public API surface", () => { + it("exports the engine, codec stack, and query primitives", () => { + const expected = [ + // Engine + "LogStore", + "StreamRegistry", + // Chunk + "CHUNK_VERSION", + "ChunkBuilder", + "DefaultChunkPolicy", + "deserializeChunk", + "readRecords", + "serializeChunk", + // Codec registry + baseline + "CodecRegistry", + "defaultRegistry", + "GzipCodec", + "ZstdCodec", + "lengthPrefixStringCodec", + "rawCodec", + "rawInt64Codec", + // Policies + "ColumnarDrainPolicy", + "ColumnarRawPolicy", + "DrainChunkPolicy", + "TypedColumnarDrainPolicy", + // Compaction + "compactChunk", + // Drain + "Drain", + "DRAIN_DEFAULT_CONFIG", + "PARAM_STR", + "mergeTemplate", + "similarity", + "tokenize", + // Classify + "TemplatedClassifier", + "defaultClassifier", + // Query + "query", + "queryStream", + // Version + "VERSION", + ] as const; + for (const name of expected) { + expect(o11ylogsdb).toHaveProperty(name); + } + }); + + it("VERSION is a string (the package version constant)", () => { + expect(typeof o11ylogsdb.VERSION).toBe("string"); + }); +}); diff --git a/packages/o11ylogsdb/test/query.test.ts b/packages/o11ylogsdb/test/query.test.ts new file mode 100644 index 00000000..74711349 --- /dev/null +++ b/packages/o11ylogsdb/test/query.test.ts @@ -0,0 +1,189 @@ +import { describe, expect, it } from "vitest"; + +import { LogStore } from "../src/engine.js"; +import { query } from "../src/query.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const scope: InstrumentationScope = { name: "test-scope" }; + +function rec(partial: Partial & { timeUnixNano: bigint }): LogRecord { + return { + severityNumber: 9, + severityText: "INFO", + body: "hello", + attributes: [], + ...partial, + }; +} + +function buildStore(): { store: LogStore; resA: Resource; resB: Resource } { + const resA: Resource = { attributes: [{ key: "service.name", value: "checkout" }] }; + const resB: Resource = { attributes: [{ key: "service.name", value: "payments" }] }; + const store = new LogStore({ rowsPerChunk: 4 }); + // checkout: 7 records spanning t=1000-7000, mixed severities + store.append( + resA, + scope, + rec({ timeUnixNano: 1000n, body: "request accepted", severityNumber: 9 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 2000n, body: "validation failed", severityNumber: 13 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 3000n, body: "request accepted", severityNumber: 9 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 4000n, body: "internal error", severityNumber: 17 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 5000n, body: "request accepted", severityNumber: 9 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 6000n, body: "request accepted", severityNumber: 9 }) + ); + store.append( + resA, + scope, + rec({ timeUnixNano: 7000n, body: "request accepted", severityNumber: 9 }) + ); + // payments: 4 records spanning t=10000-13000 + store.append(resB, scope, rec({ timeUnixNano: 10_000n, body: "charge ok", severityNumber: 9 })); + store.append( + resB, + scope, + rec({ timeUnixNano: 11_000n, body: "charge declined", severityNumber: 13 }) + ); + store.append(resB, scope, rec({ timeUnixNano: 12_000n, body: "charge ok", severityNumber: 9 })); + store.append(resB, scope, rec({ timeUnixNano: 13_000n, body: "charge ok", severityNumber: 9 })); + store.flush(); + return { store, resA, resB }; +} + +describe("query: time-range pruning", () => { + it("returns only records whose timestamps fall in [from, to)", () => { + const { store } = buildStore(); + const result = query(store, { range: { from: 2000n, to: 5000n } }); + expect(result.records.map((r) => r.timeUnixNano)).toEqual([2000n, 3000n, 4000n]); + }); + + it("prunes chunks that cannot contain any matching record", () => { + const { store } = buildStore(); + // Range entirely within payments — checkout chunks get pruned. + const result = query(store, { range: { from: 10_000n, to: 13_500n } }); + expect(result.records.length).toBe(4); + // checkout has 2 chunks, payments has 1 — at least 2 should prune. + expect(result.stats.chunksPruned).toBeGreaterThanOrEqual(2); + }); +}); + +describe("query: severity zone-map pruning", () => { + it("returns only records at or above the threshold", () => { + const { store } = buildStore(); + const result = query(store, { severityGte: 13 }); + expect(result.records.map((r) => r.severityNumber)).toEqual([13, 17, 13]); + }); + + it("prunes chunks whose maxSeverity is below the threshold", () => { + const { store } = buildStore(); + // severityGte=20 — no chunk's max is that high, all should prune. + const result = query(store, { severityGte: 20 }); + expect(result.records.length).toBe(0); + expect(result.stats.chunksPruned).toBeGreaterThan(0); + }); +}); + +describe("query: resource attribute equality", () => { + it("prunes streams whose resource attributes don't match", () => { + const { store } = buildStore(); + const result = query(store, { resourceEquals: { "service.name": "payments" } }); + expect(result.records.length).toBe(4); + expect(result.stats.streamsPruned).toBe(1); + }); + + it("returns nothing if no stream matches", () => { + const { store } = buildStore(); + const result = query(store, { resourceEquals: { "service.name": "ghost" } }); + expect(result.records.length).toBe(0); + expect(result.stats.streamsPruned).toBe(2); + }); +}); + +describe("query: body substring", () => { + it("matches case-sensitively against string bodies", () => { + const { store } = buildStore(); + const result = query(store, { bodyContains: "declined" }); + expect(result.records.length).toBe(1); + expect(result.records[0]?.body).toBe("charge declined"); + }); + + it("returns no records when a non-string body is asked to match", () => { + const store = new LogStore({ rowsPerChunk: 4 }); + const r: Resource = { attributes: [{ key: "service.name", value: "x" }] }; + store.append(r, scope, rec({ timeUnixNano: 1n, body: { kind: "map" } })); + store.flush(); + const result = query(store, { bodyContains: "map" }); + expect(result.records.length).toBe(0); + }); +}); + +describe("query: bodyLeafEquals", () => { + it("matches dot-path leaves on KVList bodies", () => { + const store = new LogStore({ rowsPerChunk: 4 }); + const r: Resource = { attributes: [{ key: "service.name", value: "api" }] }; + store.append( + r, + scope, + rec({ timeUnixNano: 1n, body: { req: { method: "GET", status: 200 } } }) + ); + store.append( + r, + scope, + rec({ timeUnixNano: 2n, body: { req: { method: "POST", status: 201 } } }) + ); + store.append( + r, + scope, + rec({ timeUnixNano: 3n, body: { req: { method: "GET", status: 500 } } }) + ); + store.flush(); + const result = query(store, { bodyLeafEquals: { "body.req.method": "GET" } }); + expect(result.records.length).toBe(2); + expect(result.records.map((rec) => rec.timeUnixNano)).toEqual([1n, 3n]); + }); + + it("rejects records with non-KVList bodies", () => { + const { store } = buildStore(); + // bodies are strings — bodyLeafEquals should match nothing. + const result = query(store, { bodyLeafEquals: { "body.x": "y" } }); + expect(result.records.length).toBe(0); + }); +}); + +describe("query: limit", () => { + it("short-circuits after N records", () => { + const { store } = buildStore(); + const result = query(store, { limit: 3 }); + expect(result.records.length).toBe(3); + }); +}); + +describe("query stats", () => { + it("reports streamsScanned, chunksScanned, recordsScanned/Emitted", () => { + const { store } = buildStore(); + const result = query(store, {}); + expect(result.stats.streamsScanned).toBe(2); + expect(result.stats.chunksScanned).toBe(3); // checkout=2, payments=1 + expect(result.stats.recordsScanned).toBe(11); + expect(result.stats.recordsEmitted).toBe(11); + }); +}); diff --git a/packages/o11ylogsdb/test/stream.test.ts b/packages/o11ylogsdb/test/stream.test.ts new file mode 100644 index 00000000..01e32433 --- /dev/null +++ b/packages/o11ylogsdb/test/stream.test.ts @@ -0,0 +1,98 @@ +import { describe, expect, it } from "vitest"; + +import { ChunkBuilder, DefaultChunkPolicy } from "../src/chunk.js"; +import { defaultRegistry } from "../src/codec-baseline.js"; +import { StreamRegistry } from "../src/stream.js"; +import type { InstrumentationScope, LogRecord, Resource } from "../src/types.js"; + +const registry = defaultRegistry(); + +function makeResource(serviceName: string): Resource { + return { attributes: [{ key: "service.name", value: serviceName }] }; +} + +function makeScope(name: string): InstrumentationScope { + return { name }; +} + +function makeChunk(resource: Resource, scope: InstrumentationScope, ts: bigint) { + const builder = new ChunkBuilder(resource, scope, new DefaultChunkPolicy(), registry); + const record: LogRecord = { + timeUnixNano: ts, + severityNumber: 9, + severityText: "INFO", + body: "hello", + attributes: [], + }; + builder.append(record); + return builder.freeze(); +} + +describe("StreamRegistry.intern", () => { + it("returns the same id for the same (resource, scope) reference", () => { + const reg = new StreamRegistry(); + const resource = makeResource("a"); + const scope = makeScope("s"); + expect(reg.intern(resource, scope)).toBe(reg.intern(resource, scope)); + }); + + it("returns the same id for structurally equal resources/scopes", () => { + const reg = new StreamRegistry(); + const id1 = reg.intern(makeResource("a"), makeScope("s")); + const id2 = reg.intern(makeResource("a"), makeScope("s")); + expect(id1).toBe(id2); + }); + + it("returns different ids for different services", () => { + const reg = new StreamRegistry(); + const id1 = reg.intern(makeResource("a"), makeScope("s")); + const id2 = reg.intern(makeResource("b"), makeScope("s")); + expect(id1).not.toBe(id2); + }); + + it("returns different ids for different scopes", () => { + const reg = new StreamRegistry(); + const resource = makeResource("a"); + const id1 = reg.intern(resource, makeScope("s1")); + const id2 = reg.intern(resource, makeScope("s2")); + expect(id1).not.toBe(id2); + }); +}); + +describe("StreamRegistry chunk lists", () => { + it("appends chunks in insertion order", () => { + const reg = new StreamRegistry(); + const resource = makeResource("a"); + const scope = makeScope("s"); + const id = reg.intern(resource, scope); + reg.appendChunk(id, makeChunk(resource, scope, 1n)); + reg.appendChunk(id, makeChunk(resource, scope, 2n)); + reg.appendChunk(id, makeChunk(resource, scope, 3n)); + const chunks = reg.chunksOf(id); + expect(chunks.length).toBe(3); + expect(chunks.map((c) => c.header.timeRange.minNano)).toEqual(["1", "2", "3"]); + }); + + it("chunksOf throws for an unknown stream id", () => { + const reg = new StreamRegistry(); + expect(() => reg.chunksOf(999)).toThrow(/unknown id/); + }); + + it("ids() lists every interned stream and size() agrees", () => { + const reg = new StreamRegistry(); + reg.intern(makeResource("a"), makeScope("s")); + reg.intern(makeResource("b"), makeScope("s")); + reg.intern(makeResource("c"), makeScope("s")); + expect(reg.ids().length).toBe(3); + expect(reg.size()).toBe(3); + }); + + it("resourceOf and scopeOf return the originals", () => { + const reg = new StreamRegistry(); + const resource = makeResource("checkout"); + const scope = makeScope("server"); + const id = reg.intern(resource, scope); + expect(reg.resourceOf(id)).toEqual(resource); + expect(reg.scopeOf(id)).toEqual(scope); + }); +});