Skip to content

Commit

Permalink
Minor improvements to CompetingConsumerSubscriptionModel and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanhaleby committed Feb 15, 2025
1 parent a04e744 commit cccec2e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ private CompetingConsumerSubscription startCompetingConsumerSubscription(String
if (!delegate.isRunning()) {
delegate.start();
}
return delegate.subscribe(subscriptionId, filter, startAt, action);
if (delegate.isPaused(subscriptionId)) {
return delegate.resumeSubscription(subscriptionId);
} else {
return delegate.subscribe(subscriptionId, filter, startAt, action);
}
})));
competingConsumerSubscription = new CompetingConsumerSubscription(subscriptionId, subscriberId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ void chaos_competing_consumers() throws InterruptedException {
int millis = ThreadLocalRandom.current().nextInt(1500, 5000);
sleep(millis);
if (millis % 2 == 0) {
if (competingConsumerSubscriptionModel1.isRunning()) {
if (competingConsumerSubscriptionModel1.isRunning(subscriptionId)) {
pause(subscriptionId, competingConsumerSubscriptionModel1);
}
if (competingConsumerSubscriptionModel2.isPaused(subscriptionId)) {
resume(subscriptionId, competingConsumerSubscriptionModel2);
}
} else {
if (competingConsumerSubscriptionModel2.isRunning()) {
if (competingConsumerSubscriptionModel2.isRunning(subscriptionId)) {
pause(subscriptionId, competingConsumerSubscriptionModel2);
}
if (competingConsumerSubscriptionModel1.isPaused(subscriptionId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,43 @@ void it_is_possible_to_stop_and_start_a_CompetingConsumerSubscriptionModel_when_
await().failFast("cloudEventsSubscription2 should never have more than 1 event", () -> cloudEventsSubscription2.size() > 1)
.untilAsserted(() -> assertThat(cloudEventsSubscription2).hasSize(1));
}

@Test
void consumption_is_resumed_after_prohibited_when_lease_time_is_low() {
// Given
CopyOnWriteArrayList<CloudEvent> cloudEventsSubscription1 = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<CloudEvent> cloudEventsSubscription2 = new CopyOnWriteArrayList<>();

competingConsumerSubscriptionModel1 = new CompetingConsumerSubscriptionModel(springSubscriptionModel1, loggingStrategy("1", mongoTemplate, Duration.ofMillis(500)));
competingConsumerSubscriptionModel2 = new CompetingConsumerSubscriptionModel(springSubscriptionModel2, loggingStrategy("2", mongoTemplate, Duration.ofMillis(500)));

String subscriberId1 = "SubscriberId1";
String subscriberId2 = "SubscriberId2";
String subscriptionId1 = "SubscriptionId1";
String subscriptionId2 = "SubscriptionId2";
competingConsumerSubscriptionModel1.subscribe(subscriberId1, subscriptionId1, null, StartAt.subscriptionModelDefault(), cloudEventsSubscription1::add).waitUntilStarted();
competingConsumerSubscriptionModel2.subscribe(subscriberId2, subscriptionId1, null, StartAt.subscriptionModelDefault(), cloudEventsSubscription1::add).waitUntilStarted();

competingConsumerSubscriptionModel1.subscribe(subscriberId1, subscriptionId2, null, StartAt.subscriptionModelDefault(), cloudEventsSubscription2::add).waitUntilStarted();
competingConsumerSubscriptionModel2.subscribe(subscriberId2, subscriptionId2, null, StartAt.subscriptionModelDefault(), cloudEventsSubscription2::add).waitUntilStarted();

// We prohibit all
competingConsumerSubscriptionModel1.onConsumeProhibited(subscriptionId1, subscriberId1);
competingConsumerSubscriptionModel1.onConsumeProhibited(subscriptionId1, subscriberId2);
competingConsumerSubscriptionModel2.onConsumeProhibited(subscriptionId2, subscriberId1);
competingConsumerSubscriptionModel2.onConsumeProhibited(subscriptionId2, subscriberId2);

NameDefined nameDefined = new NameDefined("eventId", LocalDateTime.of(2021, 2, 26, 14, 15, 16), "name", "my name");

// When
eventStore.write("streamId", serialize(nameDefined));

// Then
await().failFast("cloudEventsSubscription1 should never have more than 1 event", () -> cloudEventsSubscription1.size() > 1)
.untilAsserted(() -> assertThat(cloudEventsSubscription1).hasSize(1));
await().failFast("cloudEventsSubscription2 should never have more than 1 event", () -> cloudEventsSubscription2.size() > 1)
.untilAsserted(() -> assertThat(cloudEventsSubscription2).hasSize(1));
}

@Test
void it_is_possible_to_pause_and_resume_a_CompetingConsumerSubscriptionModel_when_some_subscriptions_are_blocked() {
Expand Down Expand Up @@ -684,7 +721,6 @@ void can_resume_after_lock_document_removed() {
await().untilAsserted(() -> assertThat(cloudEvents).hasSize(1));
}


@Test
void on_consume_granted_is_idempotent_when_already_running() {
// Given
Expand Down

0 comments on commit cccec2e

Please sign in to comment.