[EPH] adds support for processing multiple partitions#4535
[EPH] adds support for processing multiple partitions#4535chradek merged 4 commits intoAzure:masterfrom
Conversation
| import { cancellableDelay } from "./util/cancellableDelay"; | ||
|
|
||
| /** | ||
| * Reason for closing an EventProcessor. |
There was a problem hiding this comment.
| * Reason for closing an EventProcessor. | |
| * Reason for closing an PartitionProcessor. |
| }); | ||
|
|
||
| try { | ||
| await pump.start(); |
There was a problem hiding this comment.
If I have 3 partitions then it will call my initialize() method 3 times?
There was a problem hiding this comment.
Yes, there is one PartitionProcessor per partition :)
| try { | ||
| await this._partitionProcessor.initialize(); | ||
| } catch { | ||
| this._isReceiving = false; |
There was a problem hiding this comment.
What is the expectation for when initialize fails?
Here we swallow the error and move on.
If so, then why is _isReceiving set to false if we call recieveEvents in the next line
There was a problem hiding this comment.
Ah, I think I meant to close the partitionPump, but I'm not sure what the right thing to do here is...expect the user to handle all errors in their own code and keep going or treat a thrown error as meaning STOP.
Either way what I have now isn't right, I'll reach out to other teams to see how they plan to handle errors thrown by PartitionProcessor.
There was a problem hiding this comment.
For now I decided to swallow the error but we can revisit.
Fixes #4464
This change does a couple things:
Adds a
PumpManagerclass that can create and removePartitionPumps.Updates the
EventProcessorto have a loop. The loop is started when callingeventProcessor.start().Within the loop, the EventProcessor checks which partitions it is processing and starts processing any partitions it isn't already. Right now it does this check every 30 seconds but this could be configurable/based on some partition ownership rule in the future.
Updates
startto no longer return a promise, since it just kicks off the loop.I also temporarily removed the type validation that was being done on the PartitionProcessor because there isn't a way for the user to catch the error today. #4534 will address this.