Skip to content

Commit c39d7d2

Browse files
authored
Fixes issue where lastEnqueuedEventProperties is null for empty window. (Azure#27363)
* Adding test for PartitionPump * Adding startPartitionPump tests for PartitionPumpManager. * Adding test for stopping. * Update PartitionPump to keep track of last received event properties. * Update PartitionPumpManager to keep track of lastEnqueuedProperties and use those when a batch is empty. * Adding test case to ensure enqueued properties updated. * Updating CHANGELOG
1 parent 70e5280 commit c39d7d2

File tree

5 files changed

+542
-5
lines changed

5 files changed

+542
-5
lines changed

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
### Bugs Fixed
1010

1111
- Removed the incorrect lock from `EventDataBatch.tryAdd()` implementation and documented that this API is not thread-safe. ([#25910](https://github.com/Azure/azure-sdk-for-java/issues/25910))
12+
- Fixed a bug where users get a NullPointerException when getting `LastEnqueuedEventProperties` for an empty window. ([#27121](https://github.com/Azure/azure-sdk-for-java/issues/27121))
1213

1314
### Other Changes
1415

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPump.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.messaging.eventhubs;
55

66
import com.azure.core.util.logging.ClientLogger;
7+
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
78
import reactor.core.scheduler.Scheduler;
89

910
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
@@ -15,6 +16,7 @@ class PartitionPump implements AutoCloseable {
1516
private final String partitionId;
1617
private final EventHubConsumerAsyncClient client;
1718
private final Scheduler scheduler;
19+
private LastEnqueuedEventProperties lastEnqueuedEventProperties;
1820
private final ClientLogger logger = new ClientLogger(PartitionPump.class);
1921

2022
/**
@@ -34,6 +36,25 @@ EventHubConsumerAsyncClient getClient() {
3436
return client;
3537
}
3638

39+
/**
40+
* Gets the last enqueued event properties.
41+
*
42+
* @return the last enqueued event properties or null if there has been no events received or {@link
43+
* EventProcessorClientBuilder#trackLastEnqueuedEventProperties(boolean)} is false.
44+
*/
45+
LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
46+
return lastEnqueuedEventProperties;
47+
}
48+
49+
/**
50+
* Sets the last enqueued event properties seen.
51+
*
52+
* @param lastEnqueuedEventProperties the last enqueued event properties.
53+
*/
54+
void setLastEnqueuedEventProperties(LastEnqueuedEventProperties lastEnqueuedEventProperties) {
55+
this.lastEnqueuedEventProperties = lastEnqueuedEventProperties;
56+
}
57+
3758
/**
3859
* Disposes of the scheduler and the consumer.
3960
*/

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
254254
.concatMap(Flux::collectList)
255255
.publishOn(scheduler, false, prefetch)
256256
.subscribe(partitionEventBatch -> {
257-
processEvents(partitionContext, partitionProcessor,
258-
eventHubConsumer, partitionEventBatch);
257+
processEvents(partitionContext, partitionProcessor, partitionPump, eventHubConsumer,
258+
partitionEventBatch);
259259
},
260260
/* EventHubConsumer receive() returned an error */
261261
ex -> handleError(claimedOwnership, partitionPump, partitionProcessor, ex, partitionContext),
@@ -316,7 +316,8 @@ private void processEvent(PartitionContext partitionContext, PartitionProcessor
316316
}
317317

318318
private void processEvents(PartitionContext partitionContext, PartitionProcessor partitionProcessor,
319-
EventHubConsumerAsyncClient eventHubConsumer, List<PartitionEvent> partitionEventBatch) {
319+
PartitionPump partitionPump, EventHubConsumerAsyncClient eventHubConsumer,
320+
List<PartitionEvent> partitionEventBatch) {
320321
try {
321322
if (batchReceiveMode) {
322323
LastEnqueuedEventProperties[] lastEnqueuedEventProperties = new LastEnqueuedEventProperties[1];
@@ -326,15 +327,23 @@ private void processEvents(PartitionContext partitionContext, PartitionProcessor
326327
return partitionEvent.getData();
327328
})
328329
.collect(Collectors.toList());
330+
331+
// It's possible when using windowTimeout that in the timeframe, there weren't any events received.
332+
LastEnqueuedEventProperties enqueuedEventProperties =
333+
updateOrGetLastEnqueuedEventProperties(partitionPump, lastEnqueuedEventProperties[0]);
334+
329335
EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, eventDataList,
330-
checkpointStore, lastEnqueuedEventProperties[0]);
336+
checkpointStore, enqueuedEventProperties);
337+
331338
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
332339
logger.atVerbose()
333340
.addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId())
334341
.addKeyValue(ENTITY_PATH_KEY, partitionContext.getEventHubName())
335342
.log("Processing event batch.");
336343
}
344+
337345
partitionProcessor.processEventBatch(eventBatchContext);
346+
338347
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
339348
logger.atVerbose()
340349
.addKeyValue(PARTITION_ID_KEY, partitionContext.getPartitionId())
@@ -346,8 +355,14 @@ private void processEvents(PartitionContext partitionContext, PartitionProcessor
346355
? partitionEventBatch.get(0).getData() : null);
347356
LastEnqueuedEventProperties lastEnqueuedEventProperties = (partitionEventBatch.size() == 1
348357
? partitionEventBatch.get(0).getLastEnqueuedEventProperties() : null);
358+
359+
// Get the last value we've seen if the current value from this is null.
360+
LastEnqueuedEventProperties enqueuedEventProperties =
361+
updateOrGetLastEnqueuedEventProperties(partitionPump, lastEnqueuedEventProperties);
362+
349363
EventContext eventContext = new EventContext(partitionContext, eventData, checkpointStore,
350-
lastEnqueuedEventProperties);
364+
enqueuedEventProperties);
365+
351366
processEvent(partitionContext, partitionProcessor, eventHubConsumer, eventContext);
352367
}
353368
} catch (Throwable throwable) {
@@ -449,4 +464,22 @@ private void endProcessTracingSpan(Context processSpanContext, Signal<Void> sign
449464
}
450465
tracerProvider.endSpan(processSpanContext, signal);
451466
}
467+
468+
/**
469+
* Updates the last enqueued event if it was seen and gets the most up-to-date value.
470+
*
471+
* @param partitionPump The partition pump.
472+
* @param last The last enqueued event properties. Could be null if there were no events seen.
473+
*
474+
* @return Updates the partition pump properties if there is a latest value and returns it.
475+
*/
476+
private LastEnqueuedEventProperties updateOrGetLastEnqueuedEventProperties(PartitionPump partitionPump,
477+
LastEnqueuedEventProperties last) {
478+
479+
if (last != null) {
480+
partitionPump.setLastEnqueuedEventProperties(last);
481+
}
482+
483+
return partitionPump.getLastEnqueuedEventProperties();
484+
}
452485
}

0 commit comments

Comments
 (0)