diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index 70c429f71bbc..14adbe5a2869 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,5 +1,5 @@ # Release History -## 0.1.0 (TBD) +## 0.1.0 (2022-08-11) - Initial preview for the new version of the Azure Event Hubs Go SDK. diff --git a/sdk/messaging/azeventhubs/README.md b/sdk/messaging/azeventhubs/README.md new file mode 100644 index 000000000000..519220017ffd --- /dev/null +++ b/sdk/messaging/azeventhubs/README.md @@ -0,0 +1,133 @@ +# Azure Event Hubs Client Module for Go + +[Azure Event Hubs](https://azure.microsoft.com/services/event-hubs/) is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: [link](https://docs.microsoft.com/azure/event-hubs/event-hubs-about). + +Use the client library `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs` in your application to: + +- Send messages to an event hub. +- Consume messages from an event hub. + +**NOTE**: This library is currently a beta. There may be breaking changes until it reaches semantic version `v1.0.0`. + +Key links: +- [Source code][source] +- [API Reference Documentation][godoc] +- [Product documentation](https://azure.microsoft.com/services/event-hubs/) +- [Samples][godoc_examples] + +## Getting started + +### Install the package + +Install the Azure Event Hubs client module for Go with `go get`: + +```bash +go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs +``` + +### Prerequisites + +- Go, version 1.18 or higher +- An [Azure subscription](https://azure.microsoft.com/free/) +- An [Event Hub namespace](https://docs.microsoft.com/azure/event-hubs/). +- An Event Hub. You can create an event hub in your Event Hubs Namespace using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create), or the [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli). + +### Authenticate the client + +Event Hub clients are created using an Event Hub a credential from the [Azure Identity package][azure_identity_pkg], like [DefaultAzureCredential][default_azure_credential]. +You can also create a client using a connection string. + +#### Using a service principal + - ConsumerClient: [link](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-NewConsumerClient) + - ProducerClient: [link](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-NewProducerClient) + +#### Using a connection string + - ConsumerClient: [link](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-NewConsumerClientFromConnectionString) + - ProducerClient: [link](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-NewProducerClientFromConnectionString) + +# Key concepts + +An Event Hub [**namespace**](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#namespace) can have multiple event hubs. Each event hub, in turn, contains [**partitions**](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#partitions) which store events. + +Events are published to an event hub using an [event publisher](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-publishers). In this package, the event publisher is the [ProducerClient](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#ProducerClient) + +Events can be consumed from an event hub using an [event consumer](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-consumers). In this package, the event consumer is the [ConsumerClient](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#ConsumerClient). + +For more information about Event Hubs features and terminology can be found here: [link](https://docs.microsoft.com/azure/event-hubs/event-hubs-features) + +# Examples + +Examples for various scenarios can be found on [pkg.go.dev](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#pkg-examples) or in the example*_test.go files in our GitHub repo for [azeventhubs](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs). + +# Troubleshooting + +### Logging + +This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`. + +Use the `azcore/log` package to control log event output or to enable logs for `azservicebus` only. For example: + +```go +import ( + "fmt" + azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" +) + +// print log output to stdout +azlog.SetListener(func(event azlog.Event, s string) { + fmt.Printf("[%s] %s\n", event, s) +}) + +// pick the set of events to log +azlog.SetEvents( + azeventhubs.EventConn, + azeventhubs.EventAuth, + azeventhubs.EventProducer, + azeventhubs.EventConsumer, +) +``` + +## Contributing +For details on contributing to this repository, see the [contributing guide][azure_sdk_for_go_contributing]. + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + +### Additional Helpful Links for Contributors +Many people all over the world have helped make this project better. You'll want to check out: + +* [What are some good first issues for new contributors to the repo?](https://github.com/azure/azure-sdk-for-go/issues?q=is%3Aopen+is%3Aissue+label%3A%22up+for+grabs%22) +* [How to build and test your change][azure_sdk_for_go_contributing_developer_guide] +* [How you can make a change happen!][azure_sdk_for_go_contributing_pull_requests] +* Frequently Asked Questions (FAQ) and Conceptual Topics in the detailed [Azure SDK for Go wiki](https://github.com/azure/azure-sdk-for-go/wiki). + + +### Reporting security issues and security bugs + +Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) . You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the [Security TechCenter](https://www.microsoft.com/msrc/faqs-report-an-issue). + +### License + +Azure SDK for Go is licensed under the [MIT](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/LICENSE.txt) license. + + +[azure_sdk_for_go_contributing]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md +[azure_sdk_for_go_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md#developer-guide +[azure_sdk_for_go_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md#pull-requests + +[azure_identity_pkg]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity +[default_azure_credential]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential +[source]: https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs +[godoc]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs +[godoc_examples]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#pkg-examples + +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-go%2Fsdk%2Fmessaging%2Fazeventhubs%2FREADME.png) \ No newline at end of file diff --git a/sdk/messaging/azeventhubs/event_data_batch.go b/sdk/messaging/azeventhubs/event_data_batch.go index efc462cf2a8b..c6bfcfeeb791 100644 --- a/sdk/messaging/azeventhubs/event_data_batch.go +++ b/sdk/messaging/azeventhubs/event_data_batch.go @@ -5,14 +5,17 @@ package azeventhubs import ( "errors" + "fmt" "sync" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" ) -// ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add() -var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch") +// ErrEventDataTooLarge is returned when a message cannot fit into a batch when using EventDataBatch.AddEventData() +var ErrEventDataTooLarge = errors.New("the EventData could not be added because it is too large for the batch") type ( // EventDataBatch represents a batch of messages to send to Event Hubs in a single message @@ -126,7 +129,7 @@ func (mb *EventDataBatch) addAMQPMessage(msg *amqp.Message) error { mb.batchEnvelope = nil } - return ErrMessageTooLarge + return ErrEventDataTooLarge } mb.currentSize += actualPayloadSize @@ -166,3 +169,35 @@ func calcActualSizeForPayload(payload []byte) uint64 { return uint64(vbin32Overhead + len(payload)) } + +func newEventDataBatch(sender amqpwrap.AMQPSenderCloser, options *NewEventDataBatchOptions) (*EventDataBatch, error) { + if options == nil { + options = &NewEventDataBatchOptions{} + } + + if options.PartitionID != nil && options.PartitionKey != nil { + return nil, errors.New("either PartitionID or PartitionKey can be set, but not both") + } + + var batch EventDataBatch + + if options.PartitionID != nil { + // they want to send to a particular partition. The batch size should be the same for any + // link but we might as well use the one they're going to send to. + batch.partitionID = options.PartitionID + } else if options.PartitionKey != nil { + batch.partitionKey = options.PartitionKey + } + + if options.MaxBytes == 0 { + batch.maxBytes = sender.MaxMessageSize() + return &batch, nil + } + + if options.MaxBytes > sender.MaxMessageSize() { + return nil, internal.NewErrNonRetriable(fmt.Sprintf("maximum message size for batch was set to %d bytes, which is larger than the maximum size allowed by link (%d)", options.MaxBytes, sender.MaxMessageSize())) + } + + batch.maxBytes = options.MaxBytes + return &batch, nil +} diff --git a/sdk/messaging/azeventhubs/event_data_batch_unit_test.go b/sdk/messaging/azeventhubs/event_data_batch_unit_test.go new file mode 100644 index 000000000000..be077bd55507 --- /dev/null +++ b/sdk/messaging/azeventhubs/event_data_batch_unit_test.go @@ -0,0 +1,211 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azeventhubs + +import ( + "fmt" + "sync" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" + "github.com/stretchr/testify/require" +) + +func TestUnitEventDataBatchConstants(t *testing.T) { + smallBytes := [255]byte{0} // 'vbin8' + largeBytes := [256]byte{0} // 'vbin32' + + require.Greater(t, calcActualSizeForPayload(largeBytes[:]), calcActualSizeForPayload(smallBytes[:])) + + require.EqualValues(t, calcActualSizeForPayload(smallBytes[:]), mustEncode(t, &amqp.Message{Data: [][]byte{smallBytes[:]}})) + require.EqualValues(t, calcActualSizeForPayload(smallBytes[:])*2, mustEncode(t, &amqp.Message{Data: [][]byte{smallBytes[:], smallBytes[:]}})) + + require.EqualValues(t, calcActualSizeForPayload(largeBytes[:]), mustEncode(t, &amqp.Message{Data: [][]byte{largeBytes[:]}})) + require.EqualValues(t, calcActualSizeForPayload(largeBytes[:])*2, mustEncode(t, &amqp.Message{Data: [][]byte{largeBytes[:], largeBytes[:]}})) + + require.EqualValues(t, calcActualSizeForPayload(largeBytes[:])+calcActualSizeForPayload(smallBytes[:]), mustEncode(t, &amqp.Message{Data: [][]byte{smallBytes[:], largeBytes[:]}})) +} + +type eventBatchLinkForTest struct { + amqpwrap.AMQPSenderCloser + maxMessageSize uint64 +} + +func (l eventBatchLinkForTest) MaxMessageSize() uint64 { + return l.maxMessageSize +} + +func TestUnitEventDataBatchUnitTests(t *testing.T) { + link := eventBatchLinkForTest{maxMessageSize: 10000} + + t.Run("default: uses link size", func(t *testing.T) { + batch, err := newEventDataBatch(link, &NewEventDataBatchOptions{}) + require.NoError(t, err) + require.NotNil(t, batch) + require.Equal(t, link.MaxMessageSize(), batch.maxBytes) + require.Nil(t, batch.partitionID) + require.Nil(t, batch.partitionKey) + + batch, err = newEventDataBatch(link, nil) + require.NoError(t, err) + require.NotNil(t, batch) + require.Equal(t, link.MaxMessageSize(), batch.maxBytes) + require.Nil(t, batch.partitionID) + require.Nil(t, batch.partitionKey) + }) + + t.Run("custom size", func(t *testing.T) { + batch, err := newEventDataBatch(link, &NewEventDataBatchOptions{ + MaxBytes: 9, + }) + require.NoError(t, err) + require.NotNil(t, batch) + require.Equal(t, uint64(9), batch.maxBytes) + }) + + t.Run("requested size is bigger than allowed size", func(t *testing.T) { + batch, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: link.maxMessageSize + 1}) + require.EqualError(t, err, fmt.Sprintf("maximum message size for batch was set to %d bytes, which is larger than the maximum size allowed by link (%d)", link.maxMessageSize+1, link.MaxMessageSize())) + require.Nil(t, batch) + }) + + t.Run("partition key", func(t *testing.T) { + batch, err := newEventDataBatch(link, &NewEventDataBatchOptions{ + PartitionKey: to.Ptr("hello-partition-key"), + }) + require.NoError(t, err) + require.NotNil(t, batch) + require.Equal(t, link.MaxMessageSize(), batch.maxBytes) + require.Equal(t, "hello-partition-key", *batch.partitionKey) + require.Nil(t, batch.partitionID) + }) + + t.Run("partition ID", func(t *testing.T) { + batch, err := newEventDataBatch(link, &NewEventDataBatchOptions{ + PartitionID: to.Ptr("101"), + }) + require.NoError(t, err) + require.NotNil(t, batch) + require.Equal(t, link.MaxMessageSize(), batch.maxBytes) + require.Equal(t, "101", *batch.partitionID) + require.Nil(t, batch.partitionKey) + }) + + as2k := [2048]byte{'A'} + + t.Run("sizeCalculationsAreCorrectVBin8", func(t *testing.T) { + mb, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: 8000}) + require.NoError(t, err) + + err = mb.AddEventData(&EventData{ + Body: []byte("small body"), + ApplicationProperties: map[string]interface{}{ + "small": "value", + }, + }, nil) + + require.NoError(t, err) + require.EqualValues(t, 1, mb.NumMessages()) + require.EqualValues(t, 196, mb.NumBytes()) + + actualBytes, err := mb.toAMQPMessage().MarshalBinary() + require.NoError(t, err) + + require.Equal(t, 196, len(actualBytes)) + }) + + t.Run("sizeCalculationsAreCorrectVBin32", func(t *testing.T) { + mb, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: 8000}) + require.NoError(t, err) + + err = mb.AddEventData(&EventData{ + Body: []byte("small body"), + ApplicationProperties: map[string]interface{}{ + "hello": "world", + "anInt": 100, + "aFLoat": 100.1, + "lotsOfData": string(as2k[:]), + }, + }, nil) + + require.NoError(t, err) + require.EqualValues(t, 1, mb.NumMessages()) + require.EqualValues(t, 4381, mb.NumBytes()) + + actualBytes, err := mb.toAMQPMessage().MarshalBinary() + require.NoError(t, err) + + require.Equal(t, 4381, len(actualBytes)) + }) + + // the first message gets special treatment since it gets used as the actual + // batch message's envelope. + t.Run("firstMessageTooLarge", func(t *testing.T) { + mb, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: 1}) + require.NoError(t, err) + + err = mb.AddEventData(&EventData{ + Body: []byte("hello world"), + }, nil) + + require.EqualError(t, err, ErrEventDataTooLarge.Error()) + + require.EqualValues(t, 0, mb.NumBytes()) + require.EqualValues(t, 0, len(mb.marshaledMessages)) + }) + + t.Run("addTooManyMessages", func(t *testing.T) { + mb, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: 200}) + require.NoError(t, err) + + require.EqualValues(t, 0, mb.currentSize) + err = mb.AddEventData(&EventData{ + Body: []byte("hello world"), + }, nil) + require.NoError(t, err) + require.EqualValues(t, 145, mb.currentSize) + + sizeBefore := mb.NumBytes() + countBefore := mb.NumMessages() + + err = mb.AddEventData(&EventData{ + Body: as2k[:], + }, nil) + require.EqualError(t, err, ErrEventDataTooLarge.Error()) + + require.Equal(t, sizeBefore, mb.NumBytes(), "size is unchanged when a message fails to get added") + require.Equal(t, countBefore, mb.NumMessages(), "count is unchanged when a message fails to get added") + }) + + t.Run("addConcurrently", func(t *testing.T) { + mb, err := newEventDataBatch(link, &NewEventDataBatchOptions{MaxBytes: 10000}) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + for i := byte(0); i < 100; i++ { + wg.Add(1) + go func(i byte) { + defer wg.Done() + + err := mb.AddEventData(&EventData{ + Body: []byte{i}, + }, nil) + + require.NoError(t, err) + }(i) + } + + wg.Wait() + require.EqualValues(t, 100, mb.NumMessages()) + }) +} + +func mustEncode(t *testing.T, msg *amqp.Message) int { + bytes, err := msg.MarshalBinary() + require.NoError(t, err) + return len(bytes) +} diff --git a/sdk/messaging/azeventhubs/example_consuming_events_test.go b/sdk/messaging/azeventhubs/example_consuming_events_test.go new file mode 100644 index 000000000000..11d5883cfb85 --- /dev/null +++ b/sdk/messaging/azeventhubs/example_consuming_events_test.go @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package azeventhubs_test + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +func Example_consuming() { + eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // + eventHubName := os.Getenv("EVENTHUB_NAME") + eventHubPartitionID := os.Getenv("EVENTHUB_PARTITION") + + defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) + + if err != nil { + panic(err) + } + + // Can also use a connection string: + // + // consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", "partition id", consumerGroup, nil) + // + consumerClient, err = azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, eventHubPartitionID, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) + + if err != nil { + panic(err) + } + + defer consumerClient.Close(context.TODO()) + + for { + // ReceiveEvents will wait until it either receives the # of events requested (100, in this call) + // or if the context is cancelled, in which case it'll return any messages it has received. + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) + + events, err := consumerClient.ReceiveEvents(ctx, 100, nil) + cancel() + + if err != nil { + panic(err) + } + + for _, event := range events { + // process the event in some way + fmt.Printf("Event received with body %v\n", event.Body) + } + } +} diff --git a/sdk/messaging/azeventhubs/example_enabling_logging_test.go b/sdk/messaging/azeventhubs/example_enabling_logging_test.go new file mode 100644 index 000000000000..34dd17cf3368 --- /dev/null +++ b/sdk/messaging/azeventhubs/example_enabling_logging_test.go @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package azeventhubs_test + +import ( + "fmt" + + azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +func Example_enableLogging() { + // print log output to stdout + azlog.SetListener(func(event azlog.Event, s string) { + fmt.Printf("[%s] %s\n", event, s) + }) + + // pick the set of events to log + azlog.SetEvents( + azeventhubs.EventConn, + azeventhubs.EventAuth, + azeventhubs.EventProducer, + azeventhubs.EventConsumer, + ) + + fmt.Printf("Logging enabled\n") + + // Output: + // Logging enabled + // +} diff --git a/sdk/messaging/azeventhubs/example_producerclient_test.go b/sdk/messaging/azeventhubs/example_producerclient_test.go index 9e37b1827888..e7301649ddf1 100644 --- a/sdk/messaging/azeventhubs/example_producerclient_test.go +++ b/sdk/messaging/azeventhubs/example_producerclient_test.go @@ -78,7 +78,7 @@ func ExampleEventDataBatch_AddEventData() { Body: []byte("hello"), }, nil) - if errors.Is(err, azeventhubs.ErrMessageTooLarge) { + if errors.Is(err, azeventhubs.ErrEventDataTooLarge) { // Message was too large to fit into this batch. // // At this point you'd usually just send the batch (using ProducerClient.SendEventBatch), diff --git a/sdk/messaging/azeventhubs/example_producing_events_test.go b/sdk/messaging/azeventhubs/example_producing_events_test.go new file mode 100644 index 000000000000..6ca265bb9c1c --- /dev/null +++ b/sdk/messaging/azeventhubs/example_producing_events_test.go @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package azeventhubs_test + +import ( + "context" + "errors" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +func Example_producing() { + eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // + eventHubName := os.Getenv("EVENTHUB_NAME") + + defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil) + + if err != nil { + panic(err) + } + + // Can also use a connection string: + // + // producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, eventHubName, nil) + // + producerClient, err = azeventhubs.NewProducerClient(eventHubNamespace, eventHubName, defaultAzureCred, nil) + + if err != nil { + panic(err) + } + + defer producerClient.Close(context.TODO()) + + // Other examples: + // + // sending a batch to a specific partition: + // batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.NewEventDataBatchOptions{ + // PartitionID: to.Ptr("0"), + // }) + // + // targeting a batch using a partition key + // batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.NewEventDataBatchOptions{ + // PartitionKey: to.Ptr("partition key"), + // }) + batch, err := producerClient.NewEventDataBatch(context.TODO(), nil) + + if err != nil { + panic(err) + } + + eventData := &azeventhubs.EventData{ + Body: []byte("hello"), + } + + err = batch.AddEventData(eventData, nil) + + if errors.Is(err, azeventhubs.ErrEventDataTooLarge) { + // EventData is too large for this batch. + // + // If the batch is empty and this happens then the event will never be sendable at it's current + // size as it exceeds what the link allows. + // + // Otherwise, it's simplest to send this batch and create a new one, starting with this event. + panic(err) + } else if err != nil { + panic(err) + } + + err = producerClient.SendEventBatch(context.TODO(), batch, nil) + + if err != nil { + panic(err) + } +} diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go index c2bb98bea5fa..7674a88ac99b 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go @@ -124,7 +124,7 @@ func sendEventsToPartition(cs string, hubName string, partitionID string, messag err := batch.AddEventData(ed, nil) - if errors.Is(err, azeventhubs.ErrMessageTooLarge) { + if errors.Is(err, azeventhubs.ErrEventDataTooLarge) { if batch.NumMessages() == 0 { return errors.New("single event was too large to fit into batch") } diff --git a/sdk/messaging/azeventhubs/migrationguide.md b/sdk/messaging/azeventhubs/migrationguide.md new file mode 100644 index 000000000000..d84737bd04d0 --- /dev/null +++ b/sdk/messaging/azeventhubs/migrationguide.md @@ -0,0 +1,4 @@ +# Guide to migrate from `azure-event-hubs-go` to `azeventhubs` + +This guide is intended to assist in the migration from the `azure-event-hubs-go` package to the latest beta releases (and eventual GA) of the `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs`. + diff --git a/sdk/messaging/azeventhubs/producer_client.go b/sdk/messaging/azeventhubs/producer_client.go index d62c674f1f92..e29e367ffae6 100644 --- a/sdk/messaging/azeventhubs/producer_client.go +++ b/sdk/messaging/azeventhubs/producer_client.go @@ -109,35 +109,22 @@ type NewEventDataBatchOptions struct { // NewEventDataBatch can be used to create a batch that contain multiple events. // If the operation fails it can return an *azeventhubs.Error type if the failure is actionable. func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *NewEventDataBatchOptions) (*EventDataBatch, error) { - if options == nil { - options = &NewEventDataBatchOptions{} - } - - if options.PartitionID != nil && options.PartitionKey != nil { - return nil, errors.New("either PartitionID or PartitionKey can be set, but not both") - } + var batch *EventDataBatch - var batch EventDataBatch + partitionID := anyPartitionID - if options.PartitionID != nil { - // they want to send to a particular partition. The batch size should be the same for any - // link but we might as well use the one they're going to send to. - batch.partitionID = options.PartitionID - } else if options.PartitionKey != nil { - batch.partitionKey = options.PartitionKey + if options != nil && options.PartitionID != nil { + partitionID = *options.PartitionID } - err := pc.links.Retry(ctx, exported.EventProducer, "NewEventDataBatch", getPartitionID(batch.partitionID), pc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPSenderCloser]) error { - if options.MaxBytes == 0 { - batch.maxBytes = lwid.Link.MaxMessageSize() - return nil - } + err := pc.links.Retry(ctx, exported.EventProducer, "NewEventDataBatch", partitionID, pc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPSenderCloser]) error { + tmpBatch, err := newEventDataBatch(lwid.Link, options) - if options.MaxBytes > lwid.Link.MaxMessageSize() { - return internal.NewErrNonRetriable(fmt.Sprintf("maximum message size for batch was set to %d bytes, which is larger than the maximum size allowed by link (%d)", options.MaxBytes, lwid.Link.MaxMessageSize())) + if err != nil { + return err } - batch.maxBytes = options.MaxBytes + batch = tmpBatch return nil }) @@ -145,7 +132,7 @@ func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *NewEve return nil, internal.TransformError(err) } - return &batch, nil + return batch, nil } // SendEventBatchOptions contains optional parameters for the SendEventBatch function