From 90837a081754f72089fb1a6410c05ba98c67a6f7 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Thu, 1 Aug 2019 16:21:36 -0700 Subject: [PATCH 1/4] [Event Hubs] Add test to receive events from the checkpoint and update EPH sample --- .../event-hubs/samples/eventProcessor.ts | 35 +++-- sdk/eventhub/event-hubs/src/eventProcessor.ts | 9 +- .../event-hubs/test/eventProcessor.spec.ts | 126 ++++++++++++++++-- 3 files changed, 152 insertions(+), 18 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 9015fe7cf592..07792084d2e1 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -1,18 +1,22 @@ import { EventHubClient, - EventData, + ReceivedEventData, EventPosition, delay, EventProcessor, - PartitionContext + PartitionContext, + InMemoryPartitionManager, + CheckpointManager } from "@azure/event-hubs"; class SimplePartitionProcessor { private _context: PartitionContext; - constructor(context: PartitionContext) { + private _checkpointManager: CheckpointManager; + constructor(context: PartitionContext, checkpointManager: CheckpointManager) { this._context = context; + this._checkpointManager = checkpointManager; } - async processEvents(events: EventData[]) { + async processEvents(events: ReceivedEventData[]) { for (const event of events) { console.log( "Received event: '%s' from partition: '%s' and consumer group: '%s'", @@ -20,6 +24,19 @@ class SimplePartitionProcessor { this._context.partitionId, this._context.consumerGroupName ); + try { + // checkpoint using the last event in the batch + await this._checkpointManager.updateCheckpoint(events[events.length - 1]); + console.log( + "Successfully checkpointed event: '%s' from partition: '%s'", + events[events.length - 1].body, + this._context.partitionId + ); + } catch (err) { + console.log( + `Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}` + ); + } } } @@ -43,15 +60,15 @@ const eventHubName = ""; async function main() { const client = new EventHubClient(connectionString, eventHubName); - const eventProcessorFactory = (context: PartitionContext) => { - return new SimplePartitionProcessor(context); + const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => { + return new SimplePartitionProcessor(context, checkpoint); }; const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, - "partitionManager" as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.earliest(), maxBatchSize: 10, @@ -59,8 +76,8 @@ async function main() { } ); await processor.start(); - // after 2 seconds, stop processing - await delay(2000); + // after 5 seconds, stop processing + await delay(5000); await processor.stop(); await client.close(); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 558f78bbfe0a..4cb43a6c35cc 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -242,9 +242,16 @@ export class EventProcessor { ); // eventually this will 1st check if the existing PartitionOwnership has a position - const eventPosition = + let eventPosition = this._processorOptions.initialEventPosition || EventPosition.earliest(); + const partitionOwnerships = await this._partitionManager.listOwnership(this._eventHubClient.eventHubName, this._consumerGroupName); + for (const ownership of partitionOwnerships) { + if(ownership.partitionId === partitionId && ownership.sequenceNumber){ + eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); + } + } + tasks.push( this._pumpManager.createPump( this._eventHubClient, diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 390b5a698e9d..238f03446a7a 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -18,7 +18,8 @@ import { PartitionOwnership, Checkpoint, PartitionProcessorFactory, - CloseReason + CloseReason, + ReceivedEventData } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { generate_uuid } from "rhea-promise"; @@ -81,7 +82,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -139,7 +140,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -187,7 +188,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -283,7 +284,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -346,7 +347,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -410,7 +411,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, - "partitionManager" as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromSequenceNumber( partitionInfo.lastEnqueuedSequenceNumber @@ -458,7 +459,7 @@ describe("Event Processor", function(): void { partitionOwnership2 ]); partitionOwnership.length.should.equals(2); - + const ownershiplist = await inMemoryPartitionManager.listOwnership( "myEventHub", EventHubClient.defaultConsumerGroupName @@ -484,5 +485,114 @@ describe("Event Processor", function(): void { partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber); partitionOwnershipList[0].offset!.should.equals(checkpoint.offset); }); + + it("should receive events from the checkpoint", async function(): Promise { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + let partitionResultsMap = new Map(); + partitionIds.forEach((id) => partitionResultsMap.set(id, [])); + let didError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context, checkpointManager) => { + return { + async processEvents(events: ReceivedEventData[]) { + const existingEvents = partitionResultsMap.get(context.partitionId)!; + events.forEach((event: ReceivedEventData) => { + existingEvents.push(event); + debug( + "Received event: '%s' from partition: '%s' and consumer group: '%s'", + event.body, + context.partitionId + ); + checkpointManager.updateCheckpoint(event); + }); + }, + async processError() { + didError = true; + } + }; + }; + + const inMemoryPartitionManager = new InMemoryPartitionManager(); + const processor1 = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + inMemoryPartitionManager, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + // start first processor + processor1.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - checkpoint - "; + const events: EventData[] = []; + + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + for (let index = 1; index <= 100; index++) { + events.push({ body: `${expectedMessagePrefix} ${index} ${partitionId}` }); + } + await producer.send(events); + await producer.close(); + } + + // set a delay to give a consumers a chance to receive a message + await delay(500); + + // shutdown the first processor + await processor1.stop(); + + let lastEventsReceivedFromProcessor1: ReceivedEventData[] = []; + let index = 0; + + for (const partitionId of partitionIds) { + const receivedEvents = partitionResultsMap.get(partitionId)!; + lastEventsReceivedFromProcessor1[index++] = receivedEvents[receivedEvents.length - 1]; + } + + partitionResultsMap = new Map(); + partitionIds.forEach((id) => partitionResultsMap.set(id, [])); + + const processor2 = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + inMemoryPartitionManager + ); + // start second processor + processor2.start(); + + // set a delay to give a consumers a chance to receive a message + await delay(2000); + + // shutdown the second processor + await processor2.stop(); + + index = 0; + let firstEventsReceivedFromProcessor2: ReceivedEventData[] = []; + for (const partitionId of partitionIds) { + const receivedEvents = partitionResultsMap.get(partitionId)!; + firstEventsReceivedFromProcessor2[index++] = receivedEvents[0]; + } + + didError.should.be.false; + index = 0; + // validate correct events captured for each partition using checkpoint + for (const partitionId of partitionIds) { + debug(`working on partition ${partitionId}`); + lastEventsReceivedFromProcessor1[index].sequenceNumber.should.equal( + firstEventsReceivedFromProcessor2[index].sequenceNumber - 1 + ); + index++; + } + }); }); }).timeout(90000); From 2562f72af77a316e9f75b91866adf5a65c3069fb Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Thu, 1 Aug 2019 16:33:12 -0700 Subject: [PATCH 2/4] Format files --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 13 ++++++++----- sdk/eventhub/event-hubs/test/eventProcessor.spec.ts | 3 +-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 4cb43a6c35cc..824d76ffd0f3 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -245,12 +245,15 @@ export class EventProcessor { let eventPosition = this._processorOptions.initialEventPosition || EventPosition.earliest(); - const partitionOwnerships = await this._partitionManager.listOwnership(this._eventHubClient.eventHubName, this._consumerGroupName); - for (const ownership of partitionOwnerships) { - if(ownership.partitionId === partitionId && ownership.sequenceNumber){ - eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); - } + const partitionOwnerships = await this._partitionManager.listOwnership( + this._eventHubClient.eventHubName, + this._consumerGroupName + ); + for (const ownership of partitionOwnerships) { + if (ownership.partitionId === partitionId && ownership.sequenceNumber) { + eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); } + } tasks.push( this._pumpManager.createPump( diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 238f03446a7a..ea0284af4caf 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -459,7 +459,6 @@ describe("Event Processor", function(): void { partitionOwnership2 ]); partitionOwnership.length.should.equals(2); - const ownershiplist = await inMemoryPartitionManager.listOwnership( "myEventHub", EventHubClient.defaultConsumerGroupName @@ -587,7 +586,7 @@ describe("Event Processor", function(): void { index = 0; // validate correct events captured for each partition using checkpoint for (const partitionId of partitionIds) { - debug(`working on partition ${partitionId}`); + debug(`Validate events for partition: ${partitionId}`); lastEventsReceivedFromProcessor1[index].sequenceNumber.should.equal( firstEventsReceivedFromProcessor2[index].sequenceNumber - 1 ); From 352a72bebf1c1ff1a230b0d9a1d05b80c008de33 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Fri, 2 Aug 2019 11:23:40 -0700 Subject: [PATCH 3/4] Address comments --- .../event-hubs/samples/eventProcessor.ts | 3 ++ sdk/eventhub/event-hubs/src/eventProcessor.ts | 1 + .../event-hubs/test/eventProcessor.spec.ts | 44 ++++++++++--------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 07792084d2e1..5b1ae4c4c328 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -17,6 +17,9 @@ class SimplePartitionProcessor { this._checkpointManager = checkpointManager; } async processEvents(events: ReceivedEventData[]) { + if(events.length === 0){ + return; + } for (const event of events) { console.log( "Received event: '%s' from partition: '%s' and consumer group: '%s'", diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 824d76ffd0f3..92973113749f 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -252,6 +252,7 @@ export class EventProcessor { for (const ownership of partitionOwnerships) { if (ownership.partitionId === partitionId && ownership.sequenceNumber) { eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); + break; } } diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index ea0284af4caf..cb94dfab0f49 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -491,24 +491,25 @@ describe("Event Processor", function(): void { // ensure we have at least 2 partitions partitionIds.length.should.gte(2); - let partitionResultsMap = new Map(); - partitionIds.forEach((id) => partitionResultsMap.set(id, [])); + let checkpointMap = new Map(); + partitionIds.forEach((id) => checkpointMap.set(id, [])); let didError = false; - // The partitionProcess will need to add events to the partitionResultsMap as they are received + let partionCount: { [x: string]: number } = {}; const factory: PartitionProcessorFactory = (context, checkpointManager) => { return { async processEvents(events: ReceivedEventData[]) { - const existingEvents = partitionResultsMap.get(context.partitionId)!; - events.forEach((event: ReceivedEventData) => { - existingEvents.push(event); - debug( - "Received event: '%s' from partition: '%s' and consumer group: '%s'", - event.body, - context.partitionId - ); - checkpointManager.updateCheckpoint(event); - }); + !partionCount[context.partitionId] + ? (partionCount[context.partitionId] = 1) + : partionCount[context.partitionId]++; + const existingEvents = checkpointMap.get(context.partitionId)!; + for (const event of events) { + debug("Received event: '%s' from partition: '%s'", event.body, context.partitionId); + if (partionCount[context.partitionId] <= 50) { + await checkpointManager.updateCheckpoint(event); + existingEvents.push(event); + } + } }, async processError() { didError = true; @@ -544,21 +545,22 @@ describe("Event Processor", function(): void { } // set a delay to give a consumers a chance to receive a message - await delay(500); + await delay(5000); // shutdown the first processor await processor1.stop(); - let lastEventsReceivedFromProcessor1: ReceivedEventData[] = []; + const lastEventsReceivedFromProcessor1: ReceivedEventData[] = []; let index = 0; for (const partitionId of partitionIds) { - const receivedEvents = partitionResultsMap.get(partitionId)!; + const receivedEvents = checkpointMap.get(partitionId)!; lastEventsReceivedFromProcessor1[index++] = receivedEvents[receivedEvents.length - 1]; } - partitionResultsMap = new Map(); - partitionIds.forEach((id) => partitionResultsMap.set(id, [])); + checkpointMap = new Map(); + partitionIds.forEach((id) => checkpointMap.set(id, [])); + partionCount = {}; const processor2 = new EventProcessor( EventHubClient.defaultConsumerGroupName, @@ -570,15 +572,15 @@ describe("Event Processor", function(): void { processor2.start(); // set a delay to give a consumers a chance to receive a message - await delay(2000); + await delay(5000); // shutdown the second processor await processor2.stop(); index = 0; - let firstEventsReceivedFromProcessor2: ReceivedEventData[] = []; + const firstEventsReceivedFromProcessor2: ReceivedEventData[] = []; for (const partitionId of partitionIds) { - const receivedEvents = partitionResultsMap.get(partitionId)!; + const receivedEvents = checkpointMap.get(partitionId)!; firstEventsReceivedFromProcessor2[index++] = receivedEvents[0]; } From e57b64d9912b78a7cee0a35844bc476ea96d22eb Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Fri, 2 Aug 2019 11:38:23 -0700 Subject: [PATCH 4/4] Fix test --- sdk/eventhub/event-hubs/test/eventProcessor.spec.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index cb94dfab0f49..c3f703bbb511 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -406,28 +406,26 @@ describe("Event Processor", function(): void { const eventProcessorFactory = (context: PartitionContext) => { return new SimpleEventProcessor(); }; - const partitionInfo = await client.getPartitionProperties("0"); + const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, new InMemoryPartitionManager(), { - initialEventPosition: EventPosition.fromSequenceNumber( - partitionInfo.lastEnqueuedSequenceNumber - ), + initialEventPosition:EventPosition.fromEnqueuedTime(new Date()), maxBatchSize: 1, maxWaitTimeInSeconds: 5 } ); const producer = client.createProducer({ partitionId: "0" }); await producer.send({ body: "Hello world!!!" }); + await producer.close(); await processor.start(); // after 2 seconds, stop processing await delay(2000); await processor.stop(); - await producer.close(); isinitializeCalled.should.equal(true); receivedEvents.length.should.equal(1); receivedEvents[0].body.should.equal("Hello world!!!");