Skip to content

Pulsar: JSON schema support with broker-side registration (#3183)#3210

Merged
jeremydmiller merged 1 commit into
mainfrom
feat/pulsar-json-schema-3183
Jun 23, 2026
Merged

Pulsar: JSON schema support with broker-side registration (#3183)#3210
jeremydmiller merged 1 commit into
mainfrom
feat/pulsar-json-schema-3183

Conversation

@jeremydmiller

Copy link
Copy Markdown
Member

Part of the Re-Evaluate Pulsar Integration epic #3176. Addresses #3183 (the headline differentiator) — the JSON-first slice of the revised scope.

The Pulsar transport previously sent/received raw bytes only, with no schema registered on the broker — the single biggest divergence from idiomatic Pulsar. This adds JSON schema support so a topic gets broker-side schema registration and Pulsar's schema compatibility / evolution checks, while Wolverine keeps owning the message body (its normal System.Text.Json serialization).

opts.PublishMessage<OrderPlaced>().ToPulsarTopic(topic).UseJsonSchema<OrderPlaced>();
opts.ListenToPulsarTopic(topic).UseJsonSchema<OrderPlaced>();

Design — pass-through schema, zero pipeline rewrite

  • PulsarSchema : ISchema<ReadOnlySequence<byte>> is a pass-through over Wolverine's bytes whose SchemaInfo declares the schema the broker stores. Because it stays typed as ReadOnlySequence<byte>, the entire existing sender/listener/PulsarEnvelopeMapper and CloudEvents pipeline is unchanged — the producer/consumer are simply created with the schema (NewProducer(schema) / NewConsumer(schema)) so the broker registers it.
  • AvroSchemaGenerator builds the Avro-format JSON schema Pulsar uses for SchemaType.Json from a CLR type's public properties (primitives, strings, enums/Guid/DateTime as strings, nullable value types as ["null", T] unions, arrays, nested records), falling back to string for anything unmappable so registration never fails on an exotic property.
  • DSL: UseJsonSchema<T>() and UsePulsarSchema(ISchema<…>) on both listener and subscriber configs.

Scope notes

  • AUTO_CONSUME is dropped (not available in DotPulsar 5.1.2, per the API verification).
  • No regression: no schema is registered unless you opt in, so raw-bytes and CloudEvents endpoints are unaffected.
  • The UsePulsarSchema(ISchema<ReadOnlySequence<byte>>) seam accepts a custom schema for advanced cases (custom definition / SchemaType / codec). Built-in typed Avro (Schema.AvroISpecificRecord<T>(), which is ISchema<T> and needs the typed producer/consumer path) is a natural follow-up on top of this seam.

Acceptance criteria

  • ✅ A typed message produced and consumed under a JSON schema with broker-side schema registration, proven by integration test (verified via the Pulsar admin REST …/schemas/…/schema endpoint returning type: JSON).
  • ✅ Existing raw-bytes / CloudEvents paths remain working (opt-in only).

Tests / build

  • pulsar_json_schema: JSON round-trip + broker registration (integration) and Avro-format schema generator (unit) — green against a Testcontainers broker.
  • Full wolverine.slnx builds clean in Release.

🤖 Generated with Claude Code

The Pulsar transport previously sent/received raw bytes only, with no schema
registered on the broker — the single biggest divergence from idiomatic Pulsar.
This adds JSON schema support so a topic gets broker-side schema registration and
Pulsar's schema compatibility / evolution checks, while Wolverine keeps owning the
message body bytes (its normal System.Text.Json serialization).

    opts.PublishMessage<OrderPlaced>().ToPulsarTopic(topic).UseJsonSchema<OrderPlaced>();
    opts.ListenToPulsarTopic(topic).UseJsonSchema<OrderPlaced>();

- PulsarSchema : ISchema<ReadOnlySequence<byte>> is a pass-through over Wolverine's
  bytes whose SchemaInfo declares the schema the broker stores. Because it stays
  typed as ReadOnlySequence<byte>, the entire existing sender/listener/mapper and
  CloudEvents pipeline is unchanged — the producer/consumer are simply created with
  the schema (NewProducer(schema)/NewConsumer(schema)) so the broker registers it.
- AvroSchemaGenerator builds the Avro-format JSON schema Pulsar uses for
  SchemaType.Json from a CLR type's public properties (primitives, strings,
  enums/Guid/DateTime as strings, nullable value types as ["null", T] unions, arrays,
  nested records), falling back to string for anything unmappable so registration
  never fails on an exotic property.
- DSL: UseJsonSchema<T>() and UsePulsarSchema(ISchema<...>) on both listener and
  subscriber configs. No schema is registered unless opted in, so raw-bytes and
  CloudEvents endpoints are unaffected.

AUTO_CONSUME is dropped (not available in DotPulsar 5.1.2). The UsePulsarSchema seam
accepts a custom ISchema for advanced cases (e.g. Avro/Protobuf codecs).

Tests: JSON round-trip with broker-side schema registration verified via the Pulsar
admin REST API; Avro-format schema generator unit tests. Docs: Schema Support section.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jeremydmiller jeremydmiller merged commit 72913ef into main Jun 23, 2026
25 checks passed
This was referenced Jun 23, 2026
This was referenced Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant