Skip to content

Commit

Permalink
[fix][broker] Fix the order of resource close in the InMemoryDelayedD…
Browse files Browse the repository at this point in the history
…eliveryTracker (apache#18000)

(cherry picked from commit 44ae348)
(cherry picked from commit a9c67fa)
  • Loading branch information
coderzc authored and nicoloboschi committed Dec 6, 2022
1 parent 81bee7f commit 2a82a1f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
@Slf4j
public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {

private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();

private final PersistentDispatcherMultipleConsumers dispatcher;

// Reference to the shared (per-broker) timer for delayed delivery
private final Timer timer;

// Current timeout or null if not set
private Timeout timeout;
protected Timeout timeout;

// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;
Expand Down Expand Up @@ -260,7 +260,7 @@ public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout.isCancelled()) {
if (timeout == null || timeout.isCancelled()) {
return;
}

Expand All @@ -274,10 +274,11 @@ public void run(Timeout timeout) throws Exception {

@Override
public void close() {
priorityQueue.close();
if (timeout != null) {
timeout.cancel();
timeout = null;
}
priorityQueue.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,21 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Cleanup;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -433,4 +430,46 @@ public void testWithNoDelays() throws Exception {
assertFalse(tracker.shouldPauseAllDeliveries());
}

@Test
public void testClose() throws Exception {
Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
1, TimeUnit.MILLISECONDS);

PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class);

AtomicLong clockTime = new AtomicLong();
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

final Exception[] exceptions = new Exception[1];

InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, 0) {
@Override
public void run(Timeout timeout) throws Exception {
super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS);
if (timeout == null || timeout.isCancelled()) {
return;
}
try {
this.priorityQueue.peekN1();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
}
}
};

tracker.addMessage(1, 1, 10);
clockTime.set(10);

Thread.sleep(300);

tracker.close();

assertNull(exceptions[0]);

timer.stop();
}

}

0 comments on commit 2a82a1f

Please sign in to comment.