Skip to content
Merged
5 changes: 3 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { WebSocketImpl } from 'rhea-promise';

// @public
export interface BatchOptions {
maxMessageSizeInBytes?: number;
maxSizeInBytes?: number;
partitionKey?: string;
}

Expand All @@ -48,8 +48,9 @@ export class EventDataBatch {
// @internal
constructor(context: ConnectionContext, maxSizeInBytes: number, partitionKey?: string);
readonly batchMessage: Buffer | undefined;
readonly count: number;
readonly partitionKey: string | undefined;
readonly size: number;
readonly sizeInBytes: number;
tryAdd(eventData: EventData): boolean;
}

Expand Down
26 changes: 21 additions & 5 deletions sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { EventData, toAmqpMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { AmqpMessage } from "@azure/core-amqp";
import { message } from "rhea-promise";
import { throwTypeErrorIfParameterMissing } from "./util/error";

/**
* A class representing a batch of events which can be passed to the `send` method of a `EventConsumer` instance.
Expand All @@ -31,11 +32,15 @@ export class EventDataBatch {
/**
* @property Current size of the batch in bytes.
*/
private _size: number;
private _sizeInBytes: number;
/**
* @property Encoded amqp messages.
*/
private _encodedMessages: Buffer[] = [];
/**
* @property Number of events in the batch.
*/
private _count: number;
/**
* @property Encoded batch message.
*/
Expand All @@ -50,7 +55,8 @@ export class EventDataBatch {
this._context = context;
this._maxSizeInBytes = maxSizeInBytes;
this._partitionKey = partitionKey;
this._size = 0;
this._sizeInBytes = 0;
this._count = 0;
}

/**
Expand All @@ -65,8 +71,16 @@ export class EventDataBatch {
* @property Size of a batch of events.
* @readonly
*/
get size(): number {
return this._size;
get sizeInBytes(): number {
return this._sizeInBytes;
}

/**
* @property Number of events in the batch.
* @readonly
*/
get count(): number {
return this._count;
Comment thread
ramya-rao-a marked this conversation as resolved.
}

/**
Expand All @@ -83,6 +97,7 @@ export class EventDataBatch {
* @returns A boolean value indicating if the event data has been added to the batch or not.
*/
public tryAdd(eventData: EventData): boolean {
throwTypeErrorIfParameterMissing(this._context.connectionId, "eventData", eventData);
Comment thread
ramya-rao-a marked this conversation as resolved.
// Convert EventData to AmqpMessage.
const amqpMessage = toAmqpMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
Expand All @@ -107,7 +122,8 @@ export class EventDataBatch {
return false;
}
this._batchMessage = encodedBatchMessage;
this._size = currentSize;
this._sizeInBytes = currentSize;
this._count++;
return true;
}
}
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export interface BatchOptions {
* @property
* The maximum size allowed for the batch.
*/
maxMessageSizeInBytes?: number;
maxSizeInBytes?: number;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,13 @@ export class EventHubSender extends LinkEntity {
throw error;
}

// throw an error if partition key is different than the one provided in the options.
if (events instanceof EventDataBatch && options && options.partitionKey) {
// throw an error if partition key is different than the one provided in the options.
const error = new Error(
"Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."
);
log.error(
"[%s] Partition key is not supported when using createBatch(). %O",
"[%s] Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead. %O",
this._context.connectionId,
error
);
Expand Down
34 changes: 27 additions & 7 deletions sdk/eventhub/event-hubs/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,34 @@ export class EventHubProducer {
if (!options) {
options = {};
}
// throw an error if partition key and partition id are both defined
if (
typeof options.partitionKey === "string" &&
typeof this._senderOptions.partitionId === "string"
) {
const error = new Error(
"Creating a batch with partition key is not supported when using producers that were created using a partition id."
);
log.error(
"[%s] Creating a batch with partition key is not supported when using producers that were created using a partition id. %O",
this._context.connectionId,
error
);
throw error;
}

let maxMessageSize = await this._eventHubSender!.getMaxMessageSize();
if (options.maxMessageSizeInBytes) {
if (options.maxMessageSizeInBytes > maxMessageSize) {
if (options.maxSizeInBytes) {
if (options.maxSizeInBytes > maxMessageSize) {
const error = new Error(
`Max message size (${options.maxMessageSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.`
`Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link.`
);
log.error(
`[${this._context.connectionId}] Max message size (${options.maxMessageSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}`
`[${this._context.connectionId}] Max message size (${options.maxSizeInBytes} bytes) is greater than maximum message size (${maxMessageSize} bytes) on the AMQP sender link. ${error}`
);
throw error;
}
maxMessageSize = options.maxMessageSizeInBytes;
maxMessageSize = options.maxSizeInBytes;
}
return new EventDataBatch(this._context, maxMessageSize, options.partitionKey);
}
Expand All @@ -108,9 +124,13 @@ export class EventHubProducer {
): Promise<void> {
this._throwIfSenderOrConnectionClosed();
throwTypeErrorIfParameterMissing(this._context.connectionId, "eventData", eventData);
if (eventData instanceof EventDataBatch && !eventData.batchMessage) {
if (Array.isArray(eventData) && eventData.length === 0) {
log.error(`[${this._context.connectionId}] Empty array was passed. No events to send.`);
return;
}
if (eventData instanceof EventDataBatch && eventData.count === 0) {
log.error(
`[${this._context.connectionId}] No events to send, use tryAdd() function on the EventDataBatch to add events in a batch.`
`[${this._context.connectionId}] Empty batch was passsed. No events to send.`
);
return;
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/test/sender.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ describe("EventHub Sender #RunnableInBrowser", function(): void {
"0",
EventPosition.fromSequenceNumber(partitionInfo.lastEnqueuedSequenceNumber)
);
const eventDataBatch = await producer.createBatch({ maxMessageSizeInBytes: 5000 });
const eventDataBatch = await producer.createBatch({ maxSizeInBytes: 5000 });
const message = { body: `${Buffer.from("Z".repeat(4096))}` };
for (let i = 1; i <= 3; i++) {
const isAdded = eventDataBatch.tryAdd(message);
Expand All @@ -265,7 +265,7 @@ describe("EventHub Sender #RunnableInBrowser", function(): void {
> {
try {
const producer = client.createProducer({ partitionId: "0" });
await producer.createBatch({ maxMessageSizeInBytes: 2046528 });
await producer.createBatch({ maxSizeInBytes: 2046528 });
throw new Error("Test Failure");
} catch (err) {
// \(delivery-id:(\d+), size:(\d+) bytes\) exceeds the limit \((\d+) bytes\)
Expand Down