diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index e344baea21a..67134f24a6e 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -13,6 +13,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 ### :rocket: Features * feat(otlp-transformer): add custom protobuf logs serializer [#6228](https://github.com/open-telemetry/opentelemetry-js/pull/6228) @pichlermarc +* feat(otlp-transformer): add custom protobuf logs export response deserializer [#6530](https://github.com/open-telemetry/opentelemetry-js/pull/6530) @pichlermarc ### :bug: Bug Fixes diff --git a/experimental/packages/otlp-transformer/package.json b/experimental/packages/otlp-transformer/package.json index 989d86f69bd..7aa2b48d788 100644 --- a/experimental/packages/otlp-transformer/package.json +++ b/experimental/packages/otlp-transformer/package.json @@ -17,8 +17,11 @@ "compile": "tsc --build tsconfig.json tsconfig.esm.json tsconfig.esnext.json", "clean": "tsc --build --clean tsconfig.json tsconfig.esm.json tsconfig.esnext.json", "protos": "npm run submodule && npm run protos:generate", - "protos:generate:js": "pbjs -t static-module -p ./protos -w commonjs --null-defaults -o ./src/generated/root.js ./protos/opentelemetry/proto/common/v1/common.proto ./protos/opentelemetry/proto/resource/v1/resource.proto ./protos/opentelemetry/proto/trace/v1/trace.proto ./protos/opentelemetry/proto/collector/trace/v1/trace_service.proto ./protos/opentelemetry/proto/metrics/v1/metrics.proto ./protos/opentelemetry/proto/collector/metrics/v1/metrics_service.proto ./protos/opentelemetry/proto/logs/v1/logs.proto ./protos/opentelemetry/proto/collector/logs/v1/logs_service.proto && pbjs -t static-module -p ./test/fixtures -w commonjs --null-defaults -o ./test/generated/testbed.js ./test/fixtures/testbed.proto", - "protos:generate:ts": "pbts -o ./src/generated/root.d.ts ./src/generated/root.js && pbts -o ./test/generated/testbed.d.ts ./test/generated/testbed.js", + "protos:generate:js": "pbjs -t static-module -p ./protos -w commonjs --null-defaults -o ./src/generated/root.js ./protos/opentelemetry/proto/common/v1/common.proto ./protos/opentelemetry/proto/resource/v1/resource.proto ./protos/opentelemetry/proto/trace/v1/trace.proto ./protos/opentelemetry/proto/collector/trace/v1/trace_service.proto ./protos/opentelemetry/proto/metrics/v1/metrics.proto ./protos/opentelemetry/proto/collector/metrics/v1/metrics_service.proto && npm run protos:generate:js:tests", + "protos:generate:js:tests": "npm run protos:generate:js:testbed && npm run protos:generate:js:test-signals", + "protos:generate:js:testbed": "pbjs -t static-module -p ./test/fixtures -w commonjs --null-defaults -o ./test/generated/testbed.js ./test/fixtures/testbed.proto", + "protos:generate:js:test-signals": "pbjs -r signals -t static-module -p ./protos -w commonjs --null-defaults -o ./test/generated/signals.js ./protos/opentelemetry/proto/common/v1/common.proto ./protos/opentelemetry/proto/resource/v1/resource.proto ./protos/opentelemetry/proto/logs/v1/logs.proto ./protos/opentelemetry/proto/collector/logs/v1/logs_service.proto", + "protos:generate:ts": "pbts -o ./src/generated/root.d.ts ./src/generated/root.js && pbts -o ./test/generated/testbed.d.ts ./test/generated/testbed.js && pbts -o ./test/generated/signals.d.ts ./test/generated/signals.js", "protos:generate": "npm run protos:generate:js && npm run protos:generate:ts", "lint": "eslint . --ext .ts", "lint:fix": "eslint . --ext .ts --fix", diff --git a/experimental/packages/otlp-transformer/src/common/protobuf/protobuf-reader.ts b/experimental/packages/otlp-transformer/src/common/protobuf/protobuf-reader.ts new file mode 100644 index 00000000000..c4a9a5d5df9 --- /dev/null +++ b/experimental/packages/otlp-transformer/src/common/protobuf/protobuf-reader.ts @@ -0,0 +1,109 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Minimal binary protobuf reader. + * Only implements the wire-types that we currently need; this is not intended + * to be a general-purpose protobuf reader. + * + * Since the values we parse are generally small and not very nested, it's public + * interface does not enforce the same low-allocation philosophy that ProtobufWriter does. + * If this is needed in the future, we should refactor this to fit the use-case. + */ +export class ProtobufReader { + pos: number = 0; + private readonly _buf: Uint8Array; + private readonly _textDecoder: { + decode: (input?: Uint8Array | null) => string; + }; + + constructor(buf: Uint8Array) { + this._buf = buf; + this._textDecoder = new TextDecoder(); + } + + isAtEnd(): boolean { + return this.pos >= this._buf.length; + } + + /** Read a varint and decode it as a tag, returning field number and wire type. */ + readTag(): { fieldNumber: number; wireType: number } { + const raw = this.readVarint(); + return { fieldNumber: raw >>> 3, wireType: raw & 0x7 }; + } + + /** + * Read a base-128 varint. + * Returns a JS `number`; precision above 2^53 is silently lost. + */ + readVarint(): number { + let result = 0; + let shift = 0; + while (this.pos < this._buf.length) { + const b = this._buf[this.pos++]; + result += (b & 0x7f) * Math.pow(2, shift); + shift += 7; + if ((b & 0x80) === 0) break; + } + return result; + } + + /** Read a length-delimited byte sequence (bytes field or embedded message). */ + readBytes(): Uint8Array { + const len = this.readVarint(); + const slice = this._buf.subarray(this.pos, this.pos + len); + this.pos += len; + return slice; + } + + /** Read a length-delimited UTF-8 string. */ + readString(): string { + return this._textDecoder.decode(this.readBytes()); + } + + /** + * Skip an unknown field. + * Handles wire types 0 (varint), 1 (64-bit), 2 (length-delimited), + * 3 (start-group), 4 (end-group), and 5 (32-bit). + */ + skip(wireType: number): void { + switch (wireType) { + case 0: // varint + this.readVarint(); + break; + case 1: // 64-bit fixed + this.pos += 8; + break; + case 2: // length-delimited + this.readBytes(); + break; + case 3: // start group (deprecated) + // We should never encounter this, but let's handle it gracefully in case we do: + // Read nested tags until matching end-group (wire type 4) is found. + // Groups can be nested, so continue until the end-group for this + // start-group is encountered. + while (!this.isAtEnd()) { + const { wireType: nestedWireType } = this.readTag(); + if (nestedWireType === 4) { + // matched end-group for this start-group + break; + } + // recursive skip also handles nested groups + this.skip(nestedWireType); + } + break; + case 4: // end group + // End-group should be handled by the start-group logic above. + // When encountered directly in skip, treat it as a no-op (it signals + // termination of the enclosing group). + break; + case 5: // 32-bit fixed + this.pos += 4; + break; + default: + throw new Error(`Unknown wire type ${wireType}, cannot safely skip`); + } + } +} diff --git a/experimental/packages/otlp-transformer/src/logs/protobuf/logs.ts b/experimental/packages/otlp-transformer/src/logs/protobuf/logs.ts index 139db40c080..be522120935 100644 --- a/experimental/packages/otlp-transformer/src/logs/protobuf/logs.ts +++ b/experimental/packages/otlp-transformer/src/logs/protobuf/logs.ts @@ -2,17 +2,12 @@ * Copyright The OpenTelemetry Authors * SPDX-License-Identifier: Apache-2.0 */ -import * as root from '../../generated/root'; - -import type { IExportLogsServiceResponse } from '../export-response'; import type { ReadableLogRecord } from '@opentelemetry/sdk-logs'; -import type { ExportType } from '../../common/protobuf/protobuf-export-type'; +import type { IExportLogsServiceResponse } from '../export-response'; import type { ISerializer } from '../../i-serializer'; import { serializeLogsExportRequest } from './logs-serializer'; - -const logsResponseType = root.opentelemetry.proto.collector.logs.v1 - .ExportLogsServiceResponse as ExportType; +import { deserializeExportLogsServiceResponse } from './response-deserializer'; /* * @experimental this serializer may receive breaking changes in minor versions, pin this package's version when using this constant @@ -25,6 +20,6 @@ export const ProtobufLogsSerializer: ISerializer< return serializeLogsExportRequest(arg); }, deserializeResponse: (arg: Uint8Array) => { - return logsResponseType.decode(arg); + return deserializeExportLogsServiceResponse(arg); }, }; diff --git a/experimental/packages/otlp-transformer/src/logs/protobuf/response-deserializer.ts b/experimental/packages/otlp-transformer/src/logs/protobuf/response-deserializer.ts new file mode 100644 index 00000000000..76e22a3fe2c --- /dev/null +++ b/experimental/packages/otlp-transformer/src/logs/protobuf/response-deserializer.ts @@ -0,0 +1,86 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { + IExportLogsServiceResponse, + IExportLogsPartialSuccess, +} from '../export-response'; +import { ProtobufReader } from '../../common/protobuf/protobuf-reader'; + +/** + * Parse an ExportLogsPartialSuccess embedded message from raw bytes. + * + * Field map (opentelemetry/proto/collector/logs/v1/logs_service.proto): + * 1 rejected_log_records int64 (varint) + * 2 error_message string (length-delimited) + */ +function deserializePartialSuccess( + data: Uint8Array +): IExportLogsPartialSuccess { + const reader = new ProtobufReader(data); + const result: IExportLogsPartialSuccess = {}; + + while (!reader.isAtEnd()) { + const { fieldNumber, wireType } = reader.readTag(); + switch (fieldNumber) { + case 1: // rejected_log_records (int64, varint) + // expected wire type 0 (varint) + if (wireType === 0) { + result.rejectedLogRecords = reader.readVarint(); + } else { + // unexpected wire type for this field; skip it safely + reader.skip(wireType); + } + break; + case 2: // error_message (string, length-delimited) + // expected wire type 2 (length-delimited) + if (wireType === 2) { + result.errorMessage = reader.readString(); + } else { + // unexpected wire type for this field; skip it safely + reader.skip(wireType); + } + break; + default: + reader.skip(wireType); + break; + } + } + + return result; +} + +/** + * Parse an ExportLogsServiceResponse protobuf message from raw bytes. + * + * Field map (opentelemetry/proto/collector/logs/v1/logs_service.proto): + * 1 partial_success ExportLogsPartialSuccess (length-delimited) + */ +export function deserializeExportLogsServiceResponse( + data: Uint8Array +): IExportLogsServiceResponse { + const reader = new ProtobufReader(data); + const result: IExportLogsServiceResponse = {}; + + while (!reader.isAtEnd()) { + const { fieldNumber, wireType } = reader.readTag(); + switch (fieldNumber) { + case 1: // partial_success (ExportLogsPartialSuccess, length-delimited) + // expected wire type 2 (length-delimited / embedded message) + if (wireType === 2) { + result.partialSuccess = deserializePartialSuccess(reader.readBytes()); + } else { + // unexpected wire type for this field; skip it safely + reader.skip(wireType); + } + break; + default: + reader.skip(wireType); + break; + } + } + + return result; +} diff --git a/experimental/packages/otlp-transformer/test/logs.test.ts b/experimental/packages/otlp-transformer/test/logs.test.ts index 483ca77fe07..f7db2c7638d 100644 --- a/experimental/packages/otlp-transformer/test/logs.test.ts +++ b/experimental/packages/otlp-transformer/test/logs.test.ts @@ -14,13 +14,16 @@ import { SeverityNumber } from '@opentelemetry/api-logs'; import type { Encoder } from '../src/common/utils'; import { JSON_ENCODER, PROTOBUF_ENCODER } from '../src/common/utils'; import { toBase64 } from './utils'; -import * as root from '../src/generated/root'; +import * as signals from '../test/generated/signals'; import type { IExportLogsServiceRequest } from '../src/logs/internal-types'; import { ESeverityNumber } from '../src/logs/internal-types'; import { createExportLogsServiceRequest } from '../src/logs/internal'; import { ProtobufLogsSerializer } from '../src/logs/protobuf'; import { JsonLogsSerializer } from '../src/logs/json'; -import { GROWING_BUFFER_DEBUG_MESSAGE } from '../src/common/protobuf/protobuf-writer'; +import { + GROWING_BUFFER_DEBUG_MESSAGE, + ProtobufWriter, +} from '../src/common/protobuf/protobuf-writer'; function createExpectedLogJson(encoder: Encoder): IExportLogsServiceRequest { const timeUnixNano = encoder.encodeHrTime([1680253513, 123241635]); @@ -360,13 +363,13 @@ describe('Logs', () => { const serialized = ProtobufLogsSerializer.serializeRequest([log_1_1_1]); assert.ok(serialized, 'serialized response is undefined'); const decoded = - root.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.decode( + signals.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.decode( serialized ); const expected = createExpectedLogProtobuf(); const decodedObj = - root.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.toObject( + signals.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest.toObject( decoded, { // This incurs some precision loss that's taken into account in createExpectedLogsProtobuf() @@ -385,7 +388,7 @@ describe('Logs', () => { it('deserializes a response', () => { const protobufSerializedResponse = - root.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse.encode( + signals.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse.encode( { partialSuccess: { errorMessage: 'foo', @@ -415,6 +418,51 @@ describe('Logs', () => { ); sinon.assert.neverCalledWith(diagStub, GROWING_BUFFER_DEBUG_MESSAGE); }); + + it('does not throw when encountering unexpected wiretypes during deserialization', function () { + const writer = new ProtobufWriter(50); + // Construct an ExportLogsServiceResponse where the embedded + // ExportLogsPartialSuccess has fields encoded with incorrect wire types. + // ExportLogsServiceResponse { 1: partial_success (length-delimited) } + // ExportLogsPartialSuccess expects: + // 1: rejected_log_records (varint) + // 2: error_message (length-delimited string) + + // first pretend the field number 1 is a varint (type 0, correct format expects a length delimited field) + writer.writeTag(1, 0); + writer.writeVarint(3); + + // also pretend we have an extra field that we don't know yet what to do with + writer.writeTag(99, 0); + writer.writeVarint(42); + + // now write field 1 again, but this time as length-delimited, as expected. + writer.writeTag(1, 2); + const lengthVarintPosition = writer.startLengthDelimited(); + const innerStartPos = writer.pos; + // instead of putting the correct data here, we put unexpected wire-types for each field, ensuring it's handled gracefully. + // Write field 1 but with wire type 2 (length-delimited) and a string (correct format expects a varint) + writer.writeTag(1, 2); + writer.writeString('not-a-number'); + // Write field 2 but with wire type 0 (varint) instead of length-delimited (corrent format expects a string, which is length delimited) + writer.writeTag(2, 0); + writer.writeVarint(12345); + // Write field 99, which is completely unknown to us; pretend it's a varint (type 0) + writer.writeTag(99, 0); + writer.writeVarint(42); + + // finish up + writer.finishLengthDelimited( + lengthVarintPosition, + writer.pos - innerStartPos + ); + + // Ensure deserialization does not throw when encountering these + // unexpected wire types. + assert.doesNotThrow(() => + ProtobufLogsSerializer.deserializeResponse(writer.finish()) + ); + }); }); describe('JsonLogsSerializer', function () { diff --git a/experimental/packages/otlp-transformer/test/protobuf/protobuf-reader.test.ts b/experimental/packages/otlp-transformer/test/protobuf/protobuf-reader.test.ts new file mode 100644 index 00000000000..72fdcc5fd58 --- /dev/null +++ b/experimental/packages/otlp-transformer/test/protobuf/protobuf-reader.test.ts @@ -0,0 +1,605 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as assert from 'assert'; +import { ProtobufReader } from '../../src/common/protobuf/protobuf-reader'; +import { ProtobufWriter } from '../../src/common/protobuf/protobuf-writer'; + +import * as testbed from '../generated/testbed'; + +describe('ProtobufReader', function () { + describe('isAtEnd()', function () { + it('returns true immediately for an empty buffer', function () { + assert.ok(new ProtobufReader(new Uint8Array(0)).isAtEnd()); + }); + + it('returns false when bytes remain', function () { + assert.ok(!new ProtobufReader(new Uint8Array([0x01])).isAtEnd()); + }); + + it('returns true after all bytes are consumed', function () { + const reader = new ProtobufReader(new Uint8Array([0x01, 0x7f])); + reader.readVarint(); // consume first byte + assert.ok(!reader.isAtEnd()); + reader.readVarint(); // consume second byte + assert.ok(reader.isAtEnd()); + }); + }); + + describe('readVarint()', function () { + function decodeVarint(value: number): number { + const writer = new ProtobufWriter(16); + writer.writeVarint(value); + const bytes = writer.finish(); + return new ProtobufReader(new Uint8Array(bytes)).readVarint(); + } + + it('decodes 0', function () { + assert.strictEqual(decodeVarint(0), 0); + }); + + it('decodes 1', function () { + assert.strictEqual(decodeVarint(1), 1); + }); + + it('decodes 127 (max single-byte varint)', function () { + assert.strictEqual(decodeVarint(127), 127); + }); + + it('decodes 128 (first two-byte varint)', function () { + // 128 = 0b10000000 → low 7 bits = 0, continuation, next group = 1 + assert.strictEqual(decodeVarint(128), 128); + }); + + it('decodes 300 (two-byte mid-range)', function () { + // 300 = 0x12c → low 7 bits = 0x2c | 0x80 = 0xac, next group = 0x02 + assert.strictEqual(decodeVarint(300), 300); + }); + + it('decodes 2^14 = 16384 (three-byte boundary)', function () { + assert.strictEqual(decodeVarint(16384), 16384); + }); + + it('decodes 2^21 = 2097152 (four-byte boundary)', function () { + assert.strictEqual(decodeVarint(2097152), 2097152); + }); + + it('decodes 2^28 = 268435456 (five-byte boundary)', function () { + assert.strictEqual(decodeVarint(268435456), 268435456); + }); + + it('decodes 2^32 = 4294967296 (exceeds 32-bit unsigned range)', function () { + // Encoding: groups of 7 bits LSB-first. + // 2^32 sits entirely in group 4 (bits 28-34): value=16, byte=0x10 (no continuation) + assert.strictEqual(decodeVarint(4294967296), 4294967296); + }); + + it('decodes 2^35 = 34359738368 (six-byte varint)', function () { + // Bit 35 is set, falls into group 5 (bits 35-41): value=1, byte=0x01 + assert.strictEqual(decodeVarint(34359738368), 34359738368); + }); + + it('decodes 2^53 = 9007199254740992 (Number.MAX_SAFE_INTEGER boundary)', function () { + // Bit 53 is set, falls into group 7 (bits 49-55): value=16, byte=0x10 + assert.strictEqual(decodeVarint(9007199254740992), 9007199254740992); + }); + + it('advances pos by the number of bytes consumed', function () { + // [0x80, 0x01] = 128 (2 bytes), [0x7f] = 127 (1 byte) + const writer = new ProtobufWriter(16); + writer.writeVarint(128); + writer.writeVarint(127); + const reader = new ProtobufReader(writer.finish()); + assert.strictEqual(reader.pos, 0); + reader.readVarint(); + assert.strictEqual(reader.pos, 2); + reader.readVarint(); + assert.strictEqual(reader.pos, 3); + }); + }); + + describe('readTag()', function () { + function decodeTag(input: { fieldNumber: number; wireType: number }): { + fieldNumber: number; + wireType: number; + } { + const writer = new ProtobufWriter(8); + writer.writeTag(input.fieldNumber, input.wireType); + return new ProtobufReader(writer.finish()).readTag(); + } + + it('decodes field 1, wire type 0 (varint)', function () { + const { fieldNumber, wireType } = decodeTag({ + fieldNumber: 1, + wireType: 0, + }); + assert.strictEqual(fieldNumber, 1); + assert.strictEqual(wireType, 0); + }); + + it('decodes field 2, wire type 2 (length-delimited)', function () { + const { fieldNumber, wireType } = decodeTag({ + fieldNumber: 2, + wireType: 2, + }); + assert.strictEqual(fieldNumber, 2); + assert.strictEqual(wireType, 2); + }); + + it('decodes field 8, wire type 5 (fixed32)', function () { + const { fieldNumber, wireType } = decodeTag({ + fieldNumber: 8, + wireType: 5, + }); + assert.strictEqual(fieldNumber, 8); + assert.strictEqual(wireType, 5); + }); + + it('decodes field 9, wire type 1 (fixed64)', function () { + const { fieldNumber, wireType } = decodeTag({ + fieldNumber: 9, + wireType: 1, + }); + assert.strictEqual(fieldNumber, 9); + assert.strictEqual(wireType, 1); + }); + + it('decodes field 16, wire type 0 (two-byte tag varint)', function () { + const { fieldNumber, wireType } = decodeTag({ + fieldNumber: 16, + wireType: 0, + }); + assert.strictEqual(fieldNumber, 16); + assert.strictEqual(wireType, 0); + }); + + it('advances pos by the size of the tag varint', function () { + const writer = new ProtobufWriter(16); + writer.writeTag(1, 0); // field 1 wire 0 + writer.writeTag(16, 0); // field 16 wire 0 (2-byte tag) + const reader = new ProtobufReader(writer.finish()); + reader.readTag(); + assert.strictEqual(reader.pos, 1); + reader.readTag(); + assert.strictEqual(reader.pos, 3); + }); + }); + + describe('readString()', function () { + it('reads an empty string (length prefix 0x00)', function () { + const writer = new ProtobufWriter(8); + writer.writeString(''); + const reader = new ProtobufReader(writer.finish()); + assert.strictEqual(reader.readString(), ''); + assert.ok(reader.isAtEnd()); + }); + + it('reads a short ASCII string', function () { + // 'test' → length 4, bytes 0x74 0x65 0x73 0x74 + const writer = new ProtobufWriter(16); + writer.writeString('test'); + const reader = new ProtobufReader(writer.finish()); + assert.strictEqual(reader.readString(), 'test'); + assert.ok(reader.isAtEnd()); + }); + + it('reads a multi-byte UTF-8 string (emoji)', function () { + const text = '😀'; // U+1F600, 4 bytes in UTF-8 + const writer = new ProtobufWriter(16); + writer.writeString(text); + const reader = new ProtobufReader(writer.finish()); + assert.strictEqual(reader.readString(), text); + }); + + it('advances pos past the length prefix and string data', function () { + // [length=4, 't','e','s','t', sentinel=0x01] + const writer = new ProtobufWriter(16); + writer.writeString('test'); + writer.writeVarint(1); + const reader = new ProtobufReader(writer.finish()); + reader.readString(); + assert.strictEqual(reader.pos, 5); + assert.strictEqual(reader.readVarint(), 1); + }); + }); + + describe('readBytes()', function () { + it('reads an empty byte slice (length prefix 0x00)', function () { + const reader = new ProtobufReader(new Uint8Array([0x00])); + assert.deepStrictEqual(reader.readBytes(), new Uint8Array([])); + assert.ok(reader.isAtEnd()); + }); + + it('reads a non-empty byte slice', function () { + const reader = new ProtobufReader( + new Uint8Array([0x03, 0x01, 0x02, 0x03]) + ); + assert.deepStrictEqual( + reader.readBytes(), + new Uint8Array([0x01, 0x02, 0x03]) + ); + assert.ok(reader.isAtEnd()); + }); + + it('returns a view into the same buffer (no copy necessary)', function () { + const original = new Uint8Array([0x02, 0xaa, 0xbb]); + const result = new ProtobufReader(original).readBytes(); + // The subarray should share the underlying ArrayBuffer + assert.strictEqual(result.buffer, original.buffer); + }); + + it('advances pos past the length prefix and data bytes', function () { + // [length=2, 0xaa, 0xbb, sentinel=0x07] + const reader = new ProtobufReader( + new Uint8Array([0x02, 0xaa, 0xbb, 0x07]) + ); + reader.readBytes(); + assert.strictEqual(reader.pos, 3); + assert.strictEqual(reader.readVarint(), 7); + }); + }); + + describe('skip()', function () { + it('wire type 0 (varint): skips a multi-byte varint', function () { + // [0xac, 0x02] = 300 (2 bytes), followed by sentinel 0x07 + const reader = new ProtobufReader(new Uint8Array([0xac, 0x02, 0x07])); + reader.skip(0); + assert.strictEqual(reader.pos, 2); + assert.strictEqual(reader.readVarint(), 7); + }); + + it('wire type 1 (fixed64): skips exactly 8 bytes', function () { + const buf = new Uint8Array(9); + buf[8] = 0x2a; // sentinel + const reader = new ProtobufReader(buf); + reader.skip(1); + assert.strictEqual(reader.pos, 8); + assert.strictEqual(reader.readVarint(), 0x2a); + }); + + it('wire type 2 (length-delimited): skips length prefix + data', function () { + // [length=3, 0x01, 0x02, 0x03, sentinel=0x2a] + const reader = new ProtobufReader( + new Uint8Array([0x03, 0x01, 0x02, 0x03, 0x2a]) + ); + reader.skip(2); + assert.strictEqual(reader.pos, 4); + assert.strictEqual(reader.readVarint(), 0x2a); + }); + + it('wire type 5 (fixed32): skips exactly 4 bytes', function () { + const buf = new Uint8Array(5); + buf[4] = 0x2a; // sentinel + const reader = new ProtobufReader(buf); + reader.skip(5); + assert.strictEqual(reader.pos, 4); + assert.strictEqual(reader.readVarint(), 0x2a); + }); + + it('wire type 3 and 4: group is handled and does not throw', function () { + // Construct a deprecated start-group for field 1 containing an inner + // varint field 2 with value 3, then the matching end-group for field 1. + // start-group key for field 1: (1 << 3) | 3 = 0x0B + // field 2 varint key: (2 << 3) | 0 = 0x10 + // varint value 3: 0x03 + // end-group key for field 1: (1 << 3) | 4 = 0x0C + // Append a sentinel byte after the group so we can assert the reader + // stopped at the correct position and didn't consume the sentinel. + const bytes = new Uint8Array([0x0b, 0x10, 0x03, 0x0c, 0x2a]); + const reader = new ProtobufReader(bytes); + + // Read the start-group tag, ensure it is field 1 wire type 3 + const { fieldNumber, wireType } = reader.readTag(); + assert.strictEqual(fieldNumber, 1); + assert.strictEqual(wireType, 3); + + // Skip the group and verify we've advanced past the end-group but not + // the sentinel (sentinel should remain to be read by caller). + reader.skip(wireType); + assert.strictEqual(reader.pos, bytes.length - 1); + assert.strictEqual(reader.readVarint(), 0x2a); + }); + + it('unknown tag (invalid wire type): skip throws and does not consume subsequent bytes', function () { + // Construct a tag with field 1 and invalid wire type 7: (1 << 3) | 7 = 0x0f + // Append a sentinel byte after the tag so we can verify it wasn't consumed. + const bytes = new Uint8Array([0x0f, 0x2a]); + const reader = new ProtobufReader(bytes); + + const { fieldNumber, wireType } = reader.readTag(); + assert.strictEqual(fieldNumber, 1); + assert.strictEqual(wireType, 7); + + // skip should throw for unknown wire type + assert.throws(() => reader.skip(wireType)); + + // position should not have advanced past the tag-varint (only readTag consumed it) + assert.strictEqual(reader.pos, 1); + // sentinel should still be present + assert.strictEqual(reader.readVarint(), 0x2a); + }); + + it('skips consecutive fields of different wire types', function () { + // varint 300 (2 bytes) + length-delimited [0x01,0x02] (3 bytes) + fixed32 zeros (4 bytes) = 9 bytes, sentinel + const reader = new ProtobufReader( + new Uint8Array([ + 0xac, + 0x02, // wire 0: varint 300 + 0x02, + 0x01, + 0x02, // wire 2: 2-byte payload + 0x00, + 0x00, + 0x00, + 0x00, // wire 5: fixed32 zeros + 0x05, // sentinel varint + ]) + ); + reader.skip(0); // skip varint + reader.skip(2); // skip length-delimited + reader.skip(5); // skip fixed32 + assert.strictEqual(reader.readVarint(), 5); + }); + }); + + describe('round-trip with testbed.proto', function () { + // Full decoded message for testbed.TestMessage (used in round-trip tests). + // + // Fields 1–7 are fully decoded using ProtobufReader. + // Fields 8 (fixed32, wire type 5), 9 (fixed64, wire type 1), and + // 10 (double, wire type 1) are explicitly skipped via reader.skip() so + // that the round-trip tests also check forward-compatible skipping. + interface DecodedTestMessage { + field1?: number; + field2?: string; + field3?: { field1?: number; field2?: string }; + field4: number[]; + field5: string[]; + field6: Uint8Array[]; + field7: { field1?: number; field2?: string }[]; + /** field numbers that were encountered but skipped (8, 9, 10) */ + skippedFields: number[]; + } + + function decodeNested(bytes: Uint8Array): { + field1?: number; + field2?: string; + } { + const reader = new ProtobufReader(bytes); + const out: { field1?: number; field2?: string } = {}; + while (!reader.isAtEnd()) { + const { fieldNumber, wireType } = reader.readTag(); + if (fieldNumber === 1) out.field1 = reader.readVarint(); + else if (fieldNumber === 2) out.field2 = reader.readString(); + else reader.skip(wireType); + } + return out; + } + + function decodeTestMessage(data: Uint8Array): DecodedTestMessage { + // Wrap in a plain Uint8Array so that readBytes() always returns a plain + // Uint8Array subarray rather than a Node.js Buffer slice. protobuf.js + // finish() may return a Buffer on Node.js, and Buffer !== Uint8Array in + // assert.deepStrictEqual even when the byte contents are identical. + const reader = new ProtobufReader(new Uint8Array(data)); + const out: DecodedTestMessage = { + field4: [], + field5: [], + field6: [], + field7: [], + skippedFields: [], + }; + + while (!reader.isAtEnd()) { + const { fieldNumber, wireType } = reader.readTag(); + switch (fieldNumber) { + case 1: + out.field1 = reader.readVarint(); + break; + case 2: + out.field2 = reader.readString(); + break; + case 3: + out.field3 = decodeNested(reader.readBytes()); + break; + case 4: + if (wireType === 2) { + // packed varint: proto3 default for repeated scalars + const packed = new ProtobufReader(reader.readBytes()); + while (!packed.isAtEnd()) { + out.field4.push(packed.readVarint()); + } + } else { + // non-packed (wire type 0) + out.field4.push(reader.readVarint()); + } + break; + case 5: + out.field5.push(reader.readString()); + break; + case 6: + out.field6.push(reader.readBytes()); + break; + case 7: + out.field7.push(decodeNested(reader.readBytes())); + break; + default: + // fields 8 (wire 5), 9 (wire 1), 10 (wire 1) land here + out.skippedFields.push(fieldNumber); + reader.skip(wireType); + break; + } + } + + return out; + } + + it('decodes scalar int32 (field1) and string (field2)', function () { + const encoded = testbed.TestMessage.encode({ + field1: 42, + field2: 'hello world', + }).finish(); + + const decoded = decodeTestMessage(encoded); + assert.strictEqual(decoded.field1, 42); + assert.strictEqual(decoded.field2, 'hello world'); + }); + + it('decodes a nested message (field3)', function () { + const encoded = testbed.TestMessage.encode({ + field3: { field1: 100, field2: 'nested' }, + }).finish(); + + const { field3 } = decodeTestMessage(encoded); + assert.ok(field3, 'field3 should be present'); + assert.strictEqual(field3.field1, 100); + assert.strictEqual(field3.field2, 'nested'); + }); + + it('decodes a nested message that has only field1 (field2 absent)', function () { + const encoded = testbed.TestMessage.encode({ + field3: { field1: 7 }, + }).finish(); + + const { field3 } = decodeTestMessage(encoded); + assert.ok(field3); + assert.strictEqual(field3.field1, 7); + assert.strictEqual(field3.field2, undefined); + }); + + it('decodes repeated int32 (field4)', function () { + const values = [0, 1, 127, 128, 300, 268435456]; + const encoded = testbed.TestMessage.encode({ field4: values }).finish(); + assert.deepStrictEqual(decodeTestMessage(encoded).field4, values); + }); + + it('decodes repeated string (field5) including empty string and emoji', function () { + const values = ['', 'foo', 'bar baz', '😀']; + const encoded = testbed.TestMessage.encode({ field5: values }).finish(); + assert.deepStrictEqual(decodeTestMessage(encoded).field5, values); + }); + + it('decodes repeated bytes (field6) including an empty slice', function () { + const values = [ + new Uint8Array([0x01, 0x02, 0x03]), + new Uint8Array([]), + new Uint8Array([0xff, 0x00, 0xab]), + ]; + const encoded = testbed.TestMessage.encode({ field6: values }).finish(); + const decoded = decodeTestMessage(encoded).field6; + assert.strictEqual(decoded.length, values.length); + for (let i = 0; i < values.length; i++) { + assert.deepStrictEqual(decoded[i], values[i]); + } + }); + + it('decodes repeated nested messages (field7)', function () { + const encoded = testbed.TestMessage.encode({ + field7: [ + { field1: 10, field2: 'x' }, + { field1: 20 }, // field2 absent + ], + }).finish(); + + const { field7 } = decodeTestMessage(encoded); + assert.strictEqual(field7.length, 2); + assert.deepStrictEqual(field7[0], { field1: 10, field2: 'x' }); + assert.deepStrictEqual(field7[1], { field1: 20 }); + }); + + it('skips fixed32 (field8, packed wire type 2) without corrupting adjacent fields', function () { + const encoded = testbed.TestMessage.encode({ + field1: 7, + field8: [0x11223344, 0x55667788], + }).finish(); + + const decoded = decodeTestMessage(encoded); + assert.strictEqual(decoded.field1, 7); + // proto3 uses packed encoding for repeated scalars → one length-delimited + // tag covers all values, so field8 appears exactly once in skippedFields + assert.deepStrictEqual(decoded.skippedFields, [8]); + }); + + it('skips fixed64 (field9, packed wire type 2) without corrupting adjacent fields', function () { + const encoded = testbed.TestMessage.encode({ + field2: 'after skip', + field9: [42, 0], + }).finish(); + + const decoded = decodeTestMessage(encoded); + assert.strictEqual(decoded.field2, 'after skip'); + // packed: both values share one tag occurrence + assert.deepStrictEqual(decoded.skippedFields, [9]); + }); + + it('skips double (field10, packed wire type 2) without corrupting adjacent fields', function () { + const encoded = testbed.TestMessage.encode({ + field2: 'sentinel', + field10: [1.5, -0.5, 0.0], + }).finish(); + + const decoded = decodeTestMessage(encoded); + assert.strictEqual(decoded.field2, 'sentinel'); + // packed: all three values share one tag occurrence + assert.deepStrictEqual(decoded.skippedFields, [10]); + }); + + it('handles the full TestMessage with all fields populated', function () { + const inputField6 = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([]), + new Uint8Array([0xff]), + ]; + + const encoded = testbed.TestMessage.encode({ + field1: 42, + field2: 'hello world', + field3: { field1: 100, field2: 'nested string' }, + field4: [0, 1, 127, 128, 300], + field5: ['foo', '', '😀'], + field6: inputField6, + field7: [{ field1: 10, field2: 'x' }, { field1: 20 }], + field8: [0x11223344], + field9: [42], + field10: [1.5], + }).finish(); + + const decoded = decodeTestMessage(encoded); + + assert.strictEqual(decoded.field1, 42); + assert.strictEqual(decoded.field2, 'hello world'); + assert.ok(decoded.field3); + assert.strictEqual(decoded.field3.field1, 100); + assert.strictEqual(decoded.field3.field2, 'nested string'); + assert.deepStrictEqual(decoded.field4, [0, 1, 127, 128, 300]); + assert.deepStrictEqual(decoded.field5, ['foo', '', '😀']); + assert.strictEqual(decoded.field6.length, inputField6.length); + for (let i = 0; i < inputField6.length; i++) { + assert.deepStrictEqual(decoded.field6[i], inputField6[i]); + } + assert.deepStrictEqual(decoded.field7, [ + { field1: 10, field2: 'x' }, + { field1: 20 }, + ]); + // fields 8, 9, 10 reached and skipped, 1 occurrence each + assert.deepStrictEqual( + decoded.skippedFields.sort((a, b) => a - b), + [8, 9, 10] + ); + }); + + it('returns empty result for an empty buffer (zero-length message)', function () { + const decoded = decodeTestMessage(new Uint8Array(0)); + assert.strictEqual(decoded.field1, undefined); + assert.strictEqual(decoded.field2, undefined); + assert.strictEqual(decoded.field3, undefined); + assert.deepStrictEqual(decoded.field4, []); + assert.deepStrictEqual(decoded.field5, []); + assert.deepStrictEqual(decoded.field6, []); + assert.deepStrictEqual(decoded.field7, []); + assert.deepStrictEqual(decoded.skippedFields, []); + }); + }); +}); diff --git a/experimental/packages/otlp-transformer/tsconfig.json b/experimental/packages/otlp-transformer/tsconfig.json index d68d60fa7e7..5ef953e7597 100644 --- a/experimental/packages/otlp-transformer/tsconfig.json +++ b/experimental/packages/otlp-transformer/tsconfig.json @@ -10,6 +10,8 @@ "src/**/*.ts", "src/generated/*.js", "src/generated/*.ts", + "test/generated/*.js", + "test/generated/*.ts", "test/**/*.ts" ], "references": [