Skip to content

feat(new source/sink) Added Azure Event Hub source/sink#24659

Draft
zapdos26 wants to merge 24 commits intovectordotdev:masterfrom
zapdos26:feature/azure-event-hubs
Draft

feat(new source/sink) Added Azure Event Hub source/sink#24659
zapdos26 wants to merge 24 commits intovectordotdev:masterfrom
zapdos26:feature/azure-event-hubs

Conversation

@zapdos26
Copy link
Contributor

@zapdos26 zapdos26 commented Feb 16, 2026

Summary

Adds Azure Event Hubs as both a source and sink to Vector using the native azure_messaging_eventhubs SDK over AMQP. Allows for usage of all the Azure identity mechanisms and works with the Basic Event Hubs.
Updates azure_storage_blob from 0.7 to 0.9, to resolve dependency mismatch.

Realize this is the example in the contributing docs, but now since there is an actual supported package by Azure it seems that this is the way to go.

Vector configuration

Source

[sources.my_event_hubs]
   type = "azure_event_hubs"
   connection_string = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=mykey;SharedAccessKey=abc123==;EntityPath=my-hub"
   consumer_group = "$Default"
   start_position = "latest"
   decoding.codec = "json"

Sink — connection string auth with batching and partition routing

[sinks.my_event_hubs_sink]
type = "azure_event_hubs"
inputs = ["my_source"]
connection_string = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=mykey;SharedAccessKey=abc123==;EntityPath=my-hub"
encoding.codec = "json"
batch_enabled = true
batch_max_events = 100
batch_timeout_secs = 1
partition_id_field = ".partition_id"

Source — Azure Identity auth

[sources.my_event_hubs]
type = "azure_event_hubs"
namespace = "mynamespace.servicebus.windows.net"
event_hub_name = "my-hub"
decoding.codec = "json"

How did you test this PR?

  • 39 unit tests covering config parsing/defaults, connection string parsing, SAS credential generation, output schema definitions (Vector + Legacy namespace), and
    deny_unknown_fields enforcement
  • 3 integration tests against the Azure Event Hubs emulator via Docker Compose: healthcheck, batch-mode round-trip, non-batch-mode round-trip

Unit tests

cargo test --no-default-features
--features sources-azure_event_hubs,sinks-azure_event_hubs
-- azure_event_hubs

Integration tests (requires emulator via Docker Compose)

EVENTHUBS_ADDRESS=localhost cargo test --no-default-features
--features azure-event-hubs-integration-tests
-- azure_event_hubs::integration_tests

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

Add Azure Event Hubs as a source and sink to Vector using the
azure_messaging_eventhubs crate (v0.11).

Source (sources-azure_event_hubs):
- ConsumerClient-based event consuming with configurable partition and
  consumer group
- Supports connection string (SAS) and Azure Identity (Managed Identity)
  authentication
- Enriches events with partition_id, sequence_number, and offset metadata
- Configurable start position (latest/earliest)
- Standard framing and decoding support

Sink (sinks-azure_event_hubs):
- ProducerClient-based event sending
- Supports connection string (SAS) and Azure Identity authentication
- Healthcheck via get_eventhub_properties
- Standard encoding support via StreamSink pattern

Dependency updates:
- azure_core: 0.30 -> 0.32
- azure_storage_blob: 0.7 -> 0.9
- Added: azure_messaging_eventhubs 0.11, azure_core_amqp 0.11,
  azure_identity 0.32
- Updated azure_common reqwest usage from v0.12 to v0.13 for
  azure_core 0.32 compatibility
- Updated rustls 0.23.23 -> 0.23.36
Refactor the Event Hubs sink to use Vector's standard driver pattern:
- Add AzureEventHubsRequestBuilder (RequestBuilder<Event>) for encoding
- Add AzureEventHubsService (Tower Service) with atomic in-flight
  counter for backpressure via poll_ready
- Rewrite sink to use request_builder() -> into_driver(service) pipeline
- Add TowerRequestConfig for configurable concurrency/rate limiting
- DriverResponse impl provides EventsSent/BytesSent telemetry
Source tests (16):
- Connection string parsing: full, without entity path, base64 padding,
  missing fields, empty segments
- SAS credential: creation, URI stripping, invalid base64, token
  generation, get_token async
- build_credential: connection string with/without entity path,
  override, missing required fields
- generate_config

Sink tests (16):
- Config: generate_config, TOML parsing (connection string, identity
  auth, request settings), default acknowledgements
- Request builder: split_input finalizers, build_request body,
  compression
- Service: InFlightGuard drop semantics, multiple guards,
  DriverResponse delivered/errored, Finalizable, MetaDescriptive
Add integration tests that run against the Azure Event Hubs emulator
(mcr.microsoft.com/azure-messaging/eventhubs-emulator):

- azure_event_hubs_sink_healthcheck: verifies producer can connect
  and get_eventhub_properties succeeds
- azure_event_hubs_sink_happy_path: sends 10 events through the sink,
  verifies batch acknowledgements, reads events back via ConsumerClient,
  asserts all messages received correctly

Emulator support:
- Parse UseDevelopmentEmulator=true from connection strings
- EmulatorCredential returns dummy token (emulator skips SAS validation)
- Custom AMQP endpoint (amqp://host:5672) for plain AMQP (no TLS)
- build_credential now returns optional custom_endpoint for emulator

Infrastructure:
- Add eventhubs-emulator service to compose.yaml (depends on Azurite)
- Update Azurite to listen on all interfaces (--tableHost/--queueHost)
- Add eventhubs-config.json with eh1 entity (2 partitions, cg1 group)
- Add EVENTHUBS_ADDRESS env var to test.yaml
- Feature-gated with azure-event-hubs-integration-tests
…nection, and changelog

- Source: auto-discover partitions via get_eventhub_properties(), spawn per-partition tasks, reconnection with exponential backoff
- Sink: partition_id_field config routes events to specific partitions via SendEventOptions
- Internal events: typed AzureEventHubsReceiveError, AzureEventHubsConnectError, AzureEventHubsSendError
- Fix internal_events module visibility (pub(crate) mod) to avoid glob import ambiguity with AMQP
- Add changelog entry
Replace generic BytesReceived/EventsReceived handles with labeled
AzureEventHubsBytesReceived and AzureEventHubsEventsReceived internal
events that emit component_received_bytes_total,
component_received_events_total, and component_received_event_bytes_total
counters with event_hub_name and partition_id labels (protocol=amqp).
- Service definition: website/cue/reference/services/azure_event_hubs.cue
- Source docs: component definition, generated config, output schema, telemetry
- Sink docs: component definition, generated config, input types
- Both marked as development: beta
- Includes how_it_works sections for authentication and partition handling
Sink architecture:
- batch_enabled (default: true): EventDataBatch per-partition batching
  with configurable batch_max_events (100) and batch_timeout_secs (1)
- batch_enabled=false: individual send_event() per event
- Rate limiting via flat rate_limit_duration_secs/rate_limit_num (Kafka pattern)
- SDK RetryOptions exposed: retry_max_retries (8), retry_initial_delay_ms (200),
  retry_max_elapsed_secs (60) — passed to ProducerClient builder
- Removed Tower service, request_builder (no longer needed)
- SDK handles retries with exponential backoff + connection/session recovery
- Regenerated sink/source configs via 'vdev build component-docs'
- Generated files now include batch, retry, and rate limit fields
- Add batch feature to sink component CUE definition
- Add AzureEventHubsEventsSent internal event emitting
  component_sent_events_total and component_sent_bytes_total
  with event_hub_name and partition_id labels
- Emit metrics after successful batch sends (flush_batches)
  and individual sends (send_single)
- Store event_hub_name in AzureEventHubsSink for metric labels
- Delete orphaned request_builder.rs and service.rs (removed
  from mod.rs in prior refactor to SDK-native batching)
Sink config tests:
- config_defaults_batch: verify batch_enabled=true, max_events=100, timeout=1s
- config_defaults_retry: verify max_retries=8, initial_delay=200ms, elapsed=60s
- config_custom_batch_settings: batch_enabled=false and custom values
- config_custom_retry_settings: non-default retry values
- config_partition_id_field: partition routing field parsing
- config_all_fields: comprehensive test with every config field
- config_rejects_unknown_fields: deny_unknown_fields enforcement

Source tests:
- config_defaults: verify consumer_group, start_position, empty partition_ids
- config_from_toml_connection_string: connection string auth
- config_from_toml_identity_auth: identity auth fields
- config_custom_consumer_group_and_partitions: custom CG, partition list, earliest
- config_rejects_unknown_fields: deny_unknown_fields enforcement
- output_schema_definition_vector_namespace: Vector namespace schema
- output_schema_definition_legacy_namespace: Legacy namespace schema

Integration tests:
- azure_event_hubs_sink_non_batch_mode: 5-event round-trip without batching
Sink compliance:
- Register standard EventsSent/BytesSent handles (Protocol: amqp)
  to satisfy assert_sink_compliance requirements
- Emit CountByteSize and ByteSize after successful sends in both
  flush_batches (batch mode) and send_single (non-batch mode)
- Track JsonSize through encode_event for accurate event byte size
- Replace AzureEventHubsEventsSent internal event with plain
  emit_eventhubs_sent_metrics() to avoid NamedInternalEvent collision

EH-specific metrics:
- azure_event_hubs_events_sent_total (per event_hub_name + partition_id)
- azure_event_hubs_bytes_sent_total (per event_hub_name + partition_id)

Documentation:
- Define azure_event_hubs_events_sent_total and
  azure_event_hubs_bytes_sent_total in internal_metrics.cue registry
- Add telemetry section to sink CUE doc with standard + EH-specific metrics
- Add component_received_event_bytes_total to source CUE telemetry section
- Add metrics how_it_works section to sink CUE doc

Integration tests:
- Fix assertions to use superset checks (emulator accumulates events
  across runs since receivers read from Earliest)
- Introduce SourceParams struct to bundle azure_event_hubs_source args
- Add #[allow(clippy::too_many_arguments)] on partition_receiver
- Fix orphaned doc comment for EventHubsSasCredential
- Remove stray doc comment line before EmulatorCredential
- Fix azure_blob integration test: create_container(None) -> create(None)
  for azure_storage_blob 0.9 API, add explicit type annotation
- Replace loop+match with while-let in Event Hubs integration tests
@zapdos26 zapdos26 requested review from a team as code owners February 16, 2026 03:13
@github-actions github-actions bot added domain: sources Anything related to the Vector's sources domain: sinks Anything related to the Vector's sinks domain: external docs Anything related to Vector's external, public documentation labels Feb 16, 2026
@github-actions github-actions bot added the domain: ci Anything related to Vector's CI environment label Feb 16, 2026
@zapdos26
Copy link
Contributor Author

Please let me know if I need to split this PR in half (or thirds due to the Azure blob update). There is definitely some Copilot use here, which I reviewed. Please let me know if I need to make any additional updates. 😄

zapdos26 and others added 4 commits February 15, 2026 22:35
Remove artificial json/text restriction from CUE docs since
azure_messaging_eventhubs sends opaque byte payloads and supports
any serialization format.
- Add 10 new tests (1 unit, 9 integration) covering partition routing,
  rate limiting, JSON encoding, batch overflow, acknowledgements,
  oversized events, empty streams, and batch partition grouping
- Fix EventHubsService::call() to update finalizers with Delivered/Errored
  status instead of silently dropping them
- Isolate each integration test with a dedicated Event Hub entity for
  parallel execution
@urseberry urseberry self-assigned this Feb 17, 2026
@zapdos26 zapdos26 marked this pull request as draft February 21, 2026 01:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

New azure_event_hubs sink

2 participants