Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ export interface Checkpoint {
consumerGroupName: string;
eTag: string;
eventHubName: string;
instanceId: string;
offset: number;
ownerId: string;
partitionId: string;
sequenceNumber: number;
}

// @public
export class CheckpointManager {
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string);
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, eventProcessorId: string);
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}
Expand Down Expand Up @@ -175,6 +175,7 @@ export class EventPosition {
// @public
export class EventProcessor {
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
readonly id: string;
start(): void;
stop(): Promise<void>;
}
Expand Down Expand Up @@ -223,9 +224,9 @@ export interface PartitionOwnership {
consumerGroupName: string;
eTag?: string;
eventHubName: string;
instanceId: string;
lastModifiedTimeInMS?: number;
offset?: number;
ownerId: string;
ownerLevel: number;
partitionId: string;
sequenceNumber?: number;
Expand Down
22 changes: 15 additions & 7 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ export interface Checkpoint {
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
* @property The unique identifier of the event processor.
*/
instanceId: string;
ownerId: string;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update event-hubs.api.md file with this change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

/**
* @property The identifier of the Event Hub partition
*/
Expand All @@ -43,15 +43,23 @@ export interface Checkpoint {
* CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
*/
export class CheckpointManager {
private _partitionContext: PartitionContext;
private _partitionContext: PartitionContext;
private _partitionManager: PartitionManager;
private _instanceId: string;
private _eventProcessorId: string;
private _eTag: string;

constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string) {
/**
* @ignore
* @interal
*/
constructor(
partitionContext: PartitionContext,
partitionManager: PartitionManager,
eventProcessorId: string
) {
this._partitionContext = partitionContext;
this._partitionManager = partitionManager;
this._instanceId = instanceId;
this._eventProcessorId = eventProcessorId;
this._eTag = "";
}
/**
Expand All @@ -77,7 +85,7 @@ export class CheckpointManager {
const checkpoint: Checkpoint = {
eventHubName: this._partitionContext.eventHubName,
consumerGroupName: this._partitionContext.consumerGroupName,
instanceId: this._instanceId,
ownerId: this._eventProcessorId,
partitionId: this._partitionContext.partitionId,
sequenceNumber:
typeof eventDataOrSequenceNumber === "number"
Expand Down
19 changes: 14 additions & 5 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ReceivedEventData } from "./eventData";
import { PumpManager } from "./pumpManager";
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
import * as log from "./log";
import { cancellableDelay } from "./util/cancellableDelay";
import { delay } from "@azure/core-amqp";

/**
* Reason for closing a PartitionProcessor.
Expand Down Expand Up @@ -65,9 +65,9 @@ export interface PartitionOwnership {
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
* @property The unique identifier of the event processor.
*/
instanceId: string;
ownerId: string;
/**
* @property The identifier of the Event Hub partition
*/
Expand Down Expand Up @@ -221,7 +221,7 @@ export class EventProcessor {
const partitionOwnership: PartitionOwnership = {
eventHubName: this._eventHubClient.eventHubName,
consumerGroupName: this._consumerGroupName,
instanceId: this._id,
ownerId: this._id,
partitionId: partitionId,
ownerLevel: 0
};
Expand Down Expand Up @@ -274,7 +274,7 @@ export class EventProcessor {
log.eventProcessor(
`[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.`
);
await cancellableDelay(waitIntervalInMs, abortSignal);
await delay(waitIntervalInMs, abortSignal);
} catch (err) {
log.error(`[${this._id}] An error occured within the EventProcessor loop: ${err}`);
}
Expand All @@ -284,6 +284,15 @@ export class EventProcessor {
return this._pumpManager.removeAllPumps(CloseReason.Shutdown);
}

/**
* The unique identifier for the EventProcessor.
*
* @return {string}
*/
get id(): string {
return this._id;
}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
* For each successful lease, it will get the details from the blob and start a receiver at the
Expand Down
17 changes: 0 additions & 17 deletions sdk/eventhub/event-hubs/src/util/cancellableDelay.ts

This file was deleted.

30 changes: 26 additions & 4 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ describe("Event Processor", function(): void {
await client.close();
});

it("should expose an id", async function(): Promise<void> {
const factory: PartitionProcessorFactory = (context) => {
return {
async processEvents() {},
async processError() {}
};
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
factory,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
);

const id = processor.id;
id.length.should.be.gt(1);
});

it("should treat consecutive start invocations as idempotent", async function(): Promise<void> {
const partitionIds = await client.getPartitionIds();

Expand Down Expand Up @@ -413,7 +435,7 @@ describe("Event Processor", function(): void {
eventProcessorFactory,
new InMemoryPartitionManager(),
{
initialEventPosition:EventPosition.fromEnqueuedTime(new Date()),
initialEventPosition: EventPosition.fromEnqueuedTime(new Date()),
maxBatchSize: 1,
maxWaitTimeInSeconds: 5
}
Expand Down Expand Up @@ -441,14 +463,14 @@ describe("Event Processor", function(): void {
const partitionOwnership1: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
ownerId: generate_uuid(),
partitionId: "0",
ownerLevel: 10
};
const partitionOwnership2: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
ownerId: generate_uuid(),
partitionId: "1",
ownerLevel: 10
};
Expand All @@ -466,7 +488,7 @@ describe("Event Processor", function(): void {
const checkpoint: Checkpoint = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
ownerId: generate_uuid(),
partitionId: "0",
sequenceNumber: 10,
offset: 50,
Expand Down