diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index fa6181d6ace0..0e5256134867 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -34,15 +34,15 @@ export interface Checkpoint { consumerGroupName: string; eTag: string; eventHubName: string; - instanceId: string; offset: number; + ownerId: string; partitionId: string; sequenceNumber: number; } // @public export class CheckpointManager { - constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string); + constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, eventProcessorId: string); updateCheckpoint(eventData: ReceivedEventData): Promise; updateCheckpoint(sequenceNumber: number, offset: number): Promise; } @@ -175,6 +175,7 @@ export class EventPosition { // @public export class EventProcessor { constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions); + readonly id: string; start(): void; stop(): Promise; } @@ -223,9 +224,9 @@ export interface PartitionOwnership { consumerGroupName: string; eTag?: string; eventHubName: string; - instanceId: string; lastModifiedTimeInMS?: number; offset?: number; + ownerId: string; ownerLevel: number; partitionId: string; sequenceNumber?: number; diff --git a/sdk/eventhub/event-hubs/src/checkpointManager.ts b/sdk/eventhub/event-hubs/src/checkpointManager.ts index 24c0e1b821fa..4e04c6d1833f 100644 --- a/sdk/eventhub/event-hubs/src/checkpointManager.ts +++ b/sdk/eventhub/event-hubs/src/checkpointManager.ts @@ -18,9 +18,9 @@ export interface Checkpoint { */ consumerGroupName: string; /** - * @property The unique instance identifier + * @property The unique identifier of the event processor. */ - instanceId: string; + ownerId: string; /** * @property The identifier of the Event Hub partition */ @@ -43,15 +43,23 @@ export interface Checkpoint { * CheckPointManager is created by the library & passed to user's code to let them create a checkpoint */ export class CheckpointManager { - private _partitionContext: PartitionContext; + private _partitionContext: PartitionContext; private _partitionManager: PartitionManager; - private _instanceId: string; + private _eventProcessorId: string; private _eTag: string; - constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string) { + /** + * @ignore + * @interal + */ + constructor( + partitionContext: PartitionContext, + partitionManager: PartitionManager, + eventProcessorId: string + ) { this._partitionContext = partitionContext; this._partitionManager = partitionManager; - this._instanceId = instanceId; + this._eventProcessorId = eventProcessorId; this._eTag = ""; } /** @@ -77,7 +85,7 @@ export class CheckpointManager { const checkpoint: Checkpoint = { eventHubName: this._partitionContext.eventHubName, consumerGroupName: this._partitionContext.consumerGroupName, - instanceId: this._instanceId, + ownerId: this._eventProcessorId, partitionId: this._partitionContext.partitionId, sequenceNumber: typeof eventDataOrSequenceNumber === "number" diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 92973113749f..b04cdc402630 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -10,7 +10,7 @@ import { ReceivedEventData } from "./eventData"; import { PumpManager } from "./pumpManager"; import { AbortSignalLike, AbortController } from "@azure/abort-controller"; import * as log from "./log"; -import { cancellableDelay } from "./util/cancellableDelay"; +import { delay } from "@azure/core-amqp"; /** * Reason for closing a PartitionProcessor. @@ -65,9 +65,9 @@ export interface PartitionOwnership { */ consumerGroupName: string; /** - * @property The unique instance identifier + * @property The unique identifier of the event processor. */ - instanceId: string; + ownerId: string; /** * @property The identifier of the Event Hub partition */ @@ -221,7 +221,7 @@ export class EventProcessor { const partitionOwnership: PartitionOwnership = { eventHubName: this._eventHubClient.eventHubName, consumerGroupName: this._consumerGroupName, - instanceId: this._id, + ownerId: this._id, partitionId: partitionId, ownerLevel: 0 }; @@ -274,7 +274,7 @@ export class EventProcessor { log.eventProcessor( `[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.` ); - await cancellableDelay(waitIntervalInMs, abortSignal); + await delay(waitIntervalInMs, abortSignal); } catch (err) { log.error(`[${this._id}] An error occured within the EventProcessor loop: ${err}`); } @@ -284,6 +284,15 @@ export class EventProcessor { return this._pumpManager.removeAllPumps(CloseReason.Shutdown); } + /** + * The unique identifier for the EventProcessor. + * + * @return {string} + */ + get id(): string { + return this._id; + } + /** * Starts the event processor, fetching the list of partitions, and attempting to grab leases * For each successful lease, it will get the details from the blob and start a receiver at the diff --git a/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts b/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts deleted file mode 100644 index 45a037e9f36a..000000000000 --- a/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { AbortSignalLike, AbortError } from "@azure/abort-controller"; - -export function cancellableDelay(delayInMs: number, abortSignal?: AbortSignalLike): Promise { - return new Promise((resolve, reject) => { - if (abortSignal && abortSignal.aborted) { - return reject(new AbortError(`The delay was cancelled by the user.`)); - } - - const timer = setTimeout(resolve, delayInMs); - if (abortSignal) { - abortSignal.addEventListener("abort", () => { - clearTimeout(timer); - reject(new AbortError(`The delay was cancelled by the user.`)); - }); - } - }); -} diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index c3f703bbb511..d53b52e9fb44 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -46,6 +46,28 @@ describe("Event Processor", function(): void { await client.close(); }); + it("should expose an id", async function(): Promise { + const factory: PartitionProcessorFactory = (context) => { + return { + async processEvents() {}, + async processError() {} + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + new InMemoryPartitionManager(), + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + const id = processor.id; + id.length.should.be.gt(1); + }); + it("should treat consecutive start invocations as idempotent", async function(): Promise { const partitionIds = await client.getPartitionIds(); @@ -413,7 +435,7 @@ describe("Event Processor", function(): void { eventProcessorFactory, new InMemoryPartitionManager(), { - initialEventPosition:EventPosition.fromEnqueuedTime(new Date()), + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()), maxBatchSize: 1, maxWaitTimeInSeconds: 5 } @@ -441,14 +463,14 @@ describe("Event Processor", function(): void { const partitionOwnership1: PartitionOwnership = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, - instanceId: generate_uuid(), + ownerId: generate_uuid(), partitionId: "0", ownerLevel: 10 }; const partitionOwnership2: PartitionOwnership = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, - instanceId: generate_uuid(), + ownerId: generate_uuid(), partitionId: "1", ownerLevel: 10 }; @@ -466,7 +488,7 @@ describe("Event Processor", function(): void { const checkpoint: Checkpoint = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, - instanceId: generate_uuid(), + ownerId: generate_uuid(), partitionId: "0", sequenceNumber: 10, offset: 50,