Skip to content

Commit

Permalink
[fix][broker] fix pulsar-admin topics stats-internal caused a BK clie…
Browse files Browse the repository at this point in the history
…nt thread a deadlock (apache#23258)
  • Loading branch information
poorbarcode authored and michalcukierman committed Sep 11, 2024
1 parent 2d6c5c6 commit 6acd417
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -2819,13 +2820,13 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = -1;
info.size = -1;

Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
futures.add(getCompactedTopicContextAsync().thenAccept(v -> {
if (v != null) {
info.ledgerId = v.getLedger().getId();
info.entries = v.getLedger().getLastAddConfirmed() + 1;
info.size = v.getLedger().getLength();
}
}));

stats.compactedLedger = info;

Expand Down Expand Up @@ -2951,12 +2952,24 @@ public Optional<CompactedTopicContext> getCompactedTopicContext() {
if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) {
return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
}
} catch (ExecutionException | InterruptedException e) {
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
return Optional.empty();
}

public CompletableFuture<CompactedTopicContext> getCompactedTopicContextAsync() {
if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) {
CompletableFuture<CompactedTopicContext> res =
pulsarCompactedService.getCompactedTopic().getCompactedTopicContextFuture();
if (res == null) {
return CompletableFuture.completedFuture(null);
}
return res;
}
return CompletableFuture.completedFuture(null);
}

public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -304,8 +306,10 @@ static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, lo
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException,
TimeoutException {
return compactedTopicContext == null ? Optional.empty() :
Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS));
}

@Override
Expand Down

0 comments on commit 6acd417

Please sign in to comment.