Skip to content

[event-hubs] fixes issue where processEvents ignores maxWaitTime after retryable disconnect event#12280

Merged
5 commits merged into
Azure:masterfrom
chradek:eh-fix-max-wait-time
Nov 5, 2020
Merged

[event-hubs] fixes issue where processEvents ignores maxWaitTime after retryable disconnect event#12280
5 commits merged into
Azure:masterfrom
chradek:eh-fix-max-wait-time

Conversation

@chradek
Copy link
Copy Markdown
Contributor

@chradek chradek commented Nov 4, 2020

Fixes #12278

Description

This PR adds a check within the PartitionPump's receive events loop to determine whether the receiver has been closed. If the receiver has been closed, it will create a new receiver using an event start position that matches the last event seen by the pump.

The receiver is explicitly closed when a disconnected event is received on the underlying AMQP connection, which causes calls to receiveBatch to immediately return any events it had collected up to this point. Note that once the receiver is closed, the receiver's onAmqpMessage handler is removed so it won't receive any additional events.

Updates to testing

I manually tested the changes in this PR against the sample code in the linked issue.

I also updated the existing disconnect test to confirm that

  • the maxWaitTimeInSeconds is honoured after a disconnected event is encountered.
  • new events can be received on subsequent processEvents invocations.

@chradek chradek added Client This issue points to a problem in the data-plane of the library. Event Hubs labels Nov 4, 2020
@chradek
Copy link
Copy Markdown
Contributor Author

chradek commented Nov 4, 2020

/azp run js - eventhubs-client - tests

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

@chradek
Copy link
Copy Markdown
Contributor Author

chradek commented Nov 4, 2020

/azp run js - eventhubs-client - tests

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

@chradek
Copy link
Copy Markdown
Contributor Author

chradek commented Nov 4, 2020

/azp run js - eventhubs-client - tests

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

0,
"Expected to receive 0 events in second processEvents invocation."
);
// The elapsed time since the last processEvents invocation should be >= maxWaitTimeInSeconds
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Shouldn't the subscribe end with the maxWaitTime?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxWaitTimeInSeconds is the maximum time you should wait between processEvents invocations. The subscription runs forever unless explicitly closed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
Reading this makes me think something else.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you need to take it within the context of a subscribe call.

/**
* Subscribe to events from all partitions.
*
* If checkpoint store is provided to the `EventHubConsumerClient` and there are multiple
* instances of your application, then each instance will subscribe to a subset of the
* partitions such that the load is balanced amongst them.
*
* Call close() on the returned object to stop receiving events.
*
* Example usage:
* ```ts
* const client = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
* const subscription = client.subscribe(
* {
* processEvents: (events, context) => { console.log("Received event count: ", events.length) },
* processError: (err, context) => { console.log("Error: ", err) }
* },
* { startPosition: earliestEventPosition }
* );
* ```
*
* @param handlers Handlers for the lifecycle of the subscription - subscription initialization
* per partition, receiving events, handling errors and the closing
* of a subscription per partition.
* @param options Configures the way events are received.
* Most common are `maxBatchSize` and `maxWaitTimeInSeconds` that control the flow of
* events to the handler provided to receive events as well as the start position. For example,
* `{ maxBatchSize: 20, maxWaitTimeInSeconds: 120, startPosition: { sequenceNumber: 123 } }
*/

If you understand that subscribe() receives events until it is closed, then 'passing the data to user code for processing' is a little less cryptic (though it could be even more clear if we called out processEvents explicitly here)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand now.
The elapsed time since the last processEvents invocation should be >= maxWaitTimeInSeconds since there are no events to receive.
Previously, with the bug, the same test would go back to the start and would have received events.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, previously with the bug it was worse in that both maxWaitTimeInSeconds would be ignored, and no events would be received (so it just invoked processEvents repeatedly as fast as it could).

Copy link
Copy Markdown
Member

@richardpark-msft richardpark-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits. I agree with @HarshaNalluru's comment and I think the test is a bit unclear.

The actual code change is pretty great though.

Comment thread sdk/eventhub/event-hubs/test/node/disconnects.spec.ts Outdated
Comment thread sdk/eventhub/event-hubs/test/node/disconnects.spec.ts
Comment thread sdk/eventhub/event-hubs/test/node/disconnects.spec.ts Outdated
Comment thread sdk/eventhub/event-hubs/test/node/disconnects.spec.ts Outdated
Comment thread sdk/eventhub/event-hubs/test/node/disconnects.spec.ts
@chradek
Copy link
Copy Markdown
Contributor Author

chradek commented Nov 5, 2020

/azp run js - eventhubs-client - tests

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

@ghost
Copy link
Copy Markdown

ghost commented Nov 5, 2020

Hello @chradek!

Because this pull request has the auto-merge label, I will be glad to assist with helping to merge this pull request once all check-in policies pass.

p.s. you can customize the way I help with merging this pull request, such as holding this pull request until a specific person approves. Simply @mention me (@msftbot) and give me an instruction to get started! Learn more here.

This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Client This issue points to a problem in the data-plane of the library. Event Hubs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[event-hubs] processEvents ignoring maxWaitTime after retryable disconnect event

3 participants