diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 6f38394799bc..1c0de9d988e8 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -23,7 +23,7 @@ import { WebSocketImpl } from 'rhea-promise'; // @public export interface BatchOptions { - maxMessageSizeInBytes?: number; + maxSizeInBytes?: number; partitionKey?: string; } @@ -48,8 +48,9 @@ export class EventDataBatch { // @internal constructor(context: ConnectionContext, maxSizeInBytes: number, partitionKey?: string); readonly batchMessage: Buffer | undefined; + readonly count: number; readonly partitionKey: string | undefined; - readonly size: number; + readonly sizeInBytes: number; tryAdd(eventData: EventData): boolean; } diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index ff45014b4cb9..adb73780ffdc 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -5,6 +5,7 @@ import { EventData, toAmqpMessage } from "./eventData"; import { ConnectionContext } from "./connectionContext"; import { AmqpMessage } from "@azure/core-amqp"; import { message } from "rhea-promise"; +import { throwTypeErrorIfParameterMissing } from "./util/error"; /** * A class representing a batch of events which can be passed to the `send` method of a `EventConsumer` instance. @@ -31,11 +32,15 @@ export class EventDataBatch { /** * @property Current size of the batch in bytes. */ - private _size: number; + private _sizeInBytes: number; /** * @property Encoded amqp messages. */ private _encodedMessages: Buffer[] = []; + /** + * @property Number of events in the batch. + */ + private _count: number; /** * @property Encoded batch message. */ @@ -50,7 +55,8 @@ export class EventDataBatch { this._context = context; this._maxSizeInBytes = maxSizeInBytes; this._partitionKey = partitionKey; - this._size = 0; + this._sizeInBytes = 0; + this._count = 0; } /** @@ -65,8 +71,16 @@ export class EventDataBatch { * @property Size of a batch of events. * @readonly */ - get size(): number { - return this._size; + get sizeInBytes(): number { + return this._sizeInBytes; + } + + /** + * @property Number of events in the batch. + * @readonly + */ + get count(): number { + return this._count; } /** @@ -83,6 +97,7 @@ export class EventDataBatch { * @returns A boolean value indicating if the event data has been added to the batch or not. */ public tryAdd(eventData: EventData): boolean { + throwTypeErrorIfParameterMissing(this._context.connectionId, "eventData", eventData); // Convert EventData to AmqpMessage. const amqpMessage = toAmqpMessage(eventData, this._partitionKey); amqpMessage.body = this._context.dataTransformer.encode(eventData.body); @@ -107,7 +122,8 @@ export class EventDataBatch { return false; } this._batchMessage = encodedBatchMessage; - this._size = currentSize; + this._sizeInBytes = currentSize; + this._count++; return true; } } diff --git a/sdk/eventhub/event-hubs/src/eventHubClient.ts b/sdk/eventhub/event-hubs/src/eventHubClient.ts index 8b8e4188523f..cbb281cbe86f 100644 --- a/sdk/eventhub/event-hubs/src/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubClient.ts @@ -117,7 +117,7 @@ export interface BatchOptions { * @property * The maximum size allowed for the batch. */ - maxMessageSizeInBytes?: number; + maxSizeInBytes?: number; } /** diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 49306a29e0ef..6523e7533539 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -401,13 +401,13 @@ export class EventHubSender extends LinkEntity { throw error; } + // throw an error if partition key is different than the one provided in the options. if (events instanceof EventDataBatch && options && options.partitionKey) { - // throw an error if partition key is different than the one provided in the options. const error = new Error( "Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead." ); log.error( - "[%s] Partition key is not supported when using createBatch(). %O", + "[%s] Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead. %O", this._context.connectionId, error ); diff --git a/sdk/eventhub/event-hubs/src/sender.ts b/sdk/eventhub/event-hubs/src/sender.ts index e92baf3e8710..8ba730ed5cbb 100644 --- a/sdk/eventhub/event-hubs/src/sender.ts +++ b/sdk/eventhub/event-hubs/src/sender.ts @@ -70,18 +70,34 @@ export class EventHubProducer { if (!options) { options = {}; } + // throw an error if partition key and partition id are both defined + if ( + typeof options.partitionKey === "string" && + typeof this._senderOptions.partitionId === "string" + ) { + const error = new Error( + "Creating a batch with partition key is not supported when using producers that were created using a partition id." + ); + log.error( + "[%s] Creating a batch with partition key is not supported when using producers that were created using a partition id. %O", + this._context.connectionId, + error + ); + throw error; + } + let maxMessageSize = await this._eventHubSender!.getMaxMessageSize(); - if (options.maxMessageSizeInBytes) { - if (options.maxMessageSizeInBytes > maxMessageSize) { + if (options.maxSizeInBytes) { + if (options.maxSizeInBytes > maxMessageSize) { const error = new Error( - `Max message size (${options.maxMessageSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.` + `Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.` ); log.error( - `[${this._context.connectionId}] Max message size (${options.maxMessageSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}` + `[${this._context.connectionId}] Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}` ); throw error; } - maxMessageSize = options.maxMessageSizeInBytes; + maxMessageSize = options.maxSizeInBytes; } return new EventDataBatch(this._context, maxMessageSize, options.partitionKey); } @@ -108,9 +124,13 @@ export class EventHubProducer { ): Promise { this._throwIfSenderOrConnectionClosed(); throwTypeErrorIfParameterMissing(this._context.connectionId, "eventData", eventData); - if (eventData instanceof EventDataBatch && !eventData.batchMessage) { + if (Array.isArray(eventData) && eventData.length === 0) { + log.error(`[${this._context.connectionId}] Empty array was passed. No events to send.`); + return; + } + if (eventData instanceof EventDataBatch && eventData.count === 0) { log.error( - `[${this._context.connectionId}] No events to send, use tryAdd() function on the EventDataBatch to add events in a batch.` + `[${this._context.connectionId}] Empty batch was passsed. No events to send.` ); return; } diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 2630d0d55f4c..6dd9a0dfdd7c 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -243,7 +243,7 @@ describe("EventHub Sender #RunnableInBrowser", function(): void { "0", EventPosition.fromSequenceNumber(partitionInfo.lastEnqueuedSequenceNumber) ); - const eventDataBatch = await producer.createBatch({ maxMessageSizeInBytes: 5000 }); + const eventDataBatch = await producer.createBatch({ maxSizeInBytes: 5000 }); const message = { body: `${Buffer.from("Z".repeat(4096))}` }; for (let i = 1; i <= 3; i++) { const isAdded = eventDataBatch.tryAdd(message); @@ -265,7 +265,7 @@ describe("EventHub Sender #RunnableInBrowser", function(): void { > { try { const producer = client.createProducer({ partitionId: "0" }); - await producer.createBatch({ maxMessageSizeInBytes: 2046528 }); + await producer.createBatch({ maxSizeInBytes: 2046528 }); throw new Error("Test Failure"); } catch (err) { // \(delivery-id:(\d+), size:(\d+) bytes\) exceeds the limit \((\d+) bytes\)