Skip to content

Commit

Permalink
[fix] Make operations on individualDeletedMessages in lock scope (#…
Browse files Browse the repository at this point in the history
…22966)

(cherry picked from commit dbbb6b6)
  • Loading branch information
dao-jun authored and lhotari committed Jul 5, 2024
1 parent 589f283 commit 2e99b70
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;

/**
* Configuration class for a ManagedLedger.
Expand Down Expand Up @@ -281,7 +281,7 @@ public ManagedLedgerConfig setPassword(String password) {
}

/**
* should use {@link ConcurrentOpenLongPairRangeSet} to store unacked ranges.
* should use {@link OpenLongPairRangeSet} to store unacked ranges.
* @return
*/
public boolean isUnackedRangesOpenCacheSetEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ public Map<String, Long> getProperties() {

@Override
public boolean isCursorDataFullyPersistable() {
return individualDeletedMessages.size() <= getConfig().getMaxUnackedRangesToPersist();
lock.readLock().lock();
try {
return individualDeletedMessages.size() <= getConfig().getMaxUnackedRangesToPersist();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -1102,7 +1107,12 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() {

@Override
public int getTotalNonContiguousDeletedMessagesRange() {
return individualDeletedMessages.size();
lock.readLock().lock();
try {
return individualDeletedMessages.size();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -2380,8 +2390,9 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
boolean empty = individualDeletedMessages.isEmpty();
lock.writeLock().unlock();
if (individualDeletedMessages.isEmpty()) {
if (empty) {
callback.deleteComplete(ctx);
}
}
Expand Down Expand Up @@ -2659,10 +2670,15 @@ public void operationFailed(MetaStoreException e) {
}

private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& getConfig().getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore();
lock.readLock().lock();
try {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& getConfig().getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore();
} finally {
lock.readLock().unlock();
}
}

private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
Expand Down Expand Up @@ -3022,7 +3038,7 @@ private static List<StringProperty> buildStringPropertiesMap(Map<String, String>
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
lock.writeLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
this.individualDeletedMessagesSerializedSize = 0;
Expand Down Expand Up @@ -3064,7 +3080,7 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
individualDeletedMessages.resetDirtyKeys();
return rangeList;
} finally {
lock.readLock().unlock();
lock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -3450,8 +3466,13 @@ public LongPairRangeSet<PositionImpl> getIndividuallyDeletedMessagesSet() {

public boolean isMessageDeleted(Position position) {
checkArgument(position instanceof PositionImpl);
return ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
lock.readLock().lock();
try {
return ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
} finally {
lock.readLock().unlock();
}
}

//this method will return a copy of the position's ack set
Expand Down Expand Up @@ -3480,13 +3501,19 @@ public long[] getBatchPositionAckSet(Position position) {
* @return next available position
*/
public PositionImpl getNextAvailablePosition(PositionImpl position) {
Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
PositionImpl nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext();
lock.readLock().lock();
try {
Range<PositionImpl> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
PositionImpl nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0)
? nextPosition : position.getNext();
}
return position.getNext();
} finally {
lock.readLock().unlock();
}
return position.getNext();
}

public Position getNextLedgerPosition(long currentLedgerId) {
Expand Down Expand Up @@ -3537,7 +3564,12 @@ public ManagedLedger getManagedLedger() {

@Override
public Range<PositionImpl> getLastIndividualDeletedRange() {
return individualDeletedMessages.lastRange();
lock.readLock().lock();
try {
return individualDeletedMessages.lastRange();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -3667,15 +3699,20 @@ public ManagedLedgerConfig getConfig() {
public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
lock.readLock().lock();
try {
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
}
} finally {
lock.readLock().unlock();
}
if (batchDeletedIndexes != null) {
for (Map.Entry<PositionImpl, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Collection;
import java.util.List;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;

/**
* Wraps other Range classes, and adds LRU, marking dirty data and other features on this basis.
Expand Down Expand Up @@ -55,7 +55,7 @@ public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
this.config = managedCursor.getManagedLedger().getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
? new OpenLongPairRangeSet<>(4096, rangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
}
Expand Down Expand Up @@ -148,16 +148,16 @@ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upper

@VisibleForTesting
void add(Range<LongPair> range) {
if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) {
if (!(rangeSet instanceof OpenLongPairRangeSet)) {
throw new UnsupportedOperationException("Only ConcurrentOpenLongPairRangeSet support this method");
}
((ConcurrentOpenLongPairRangeSet<T>) rangeSet).add(range);
((OpenLongPairRangeSet<T>) rangeSet).add(range);
}

@VisibleForTesting
void remove(Range<T> range) {
if (rangeSet instanceof ConcurrentOpenLongPairRangeSet) {
((ConcurrentOpenLongPairRangeSet<T>) rangeSet).remove((Range<LongPair>) range);
if (rangeSet instanceof OpenLongPairRangeSet) {
((OpenLongPairRangeSet<T>) rangeSet).remove((Range<LongPair>) range);
} else {
((DefaultRangeSet<T>) rangeSet).remove(range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang.mutable.MutableInt;

/**
Expand All @@ -41,7 +42,8 @@
* So, this rangeSet is not suitable for large number of unique keys.
* </pre>
*/
public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
@NotThreadSafe
public class OpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {

protected final NavigableMap<Long, BitSet> rangeBitSetMap = new ConcurrentSkipListMap<>();
private boolean threadSafe = true;
Expand All @@ -54,15 +56,15 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
private volatile boolean updatedAfterCachedForSize = true;
private volatile boolean updatedAfterCachedForToString = true;

public ConcurrentOpenLongPairRangeSet(LongPairConsumer<T> consumer) {
public OpenLongPairRangeSet(LongPairConsumer<T> consumer) {
this(1024, true, consumer);
}

public ConcurrentOpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
public OpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
this(size, true, consumer);
}

public ConcurrentOpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer<T> consumer) {
public OpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer<T> consumer) {
this.threadSafe = threadSafe;
this.bitSetSize = size;
this.consumer = consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class DefaultRangeSetTest {
public void testBehavior() {
LongPairRangeSet.DefaultRangeSet<LongPairRangeSet.LongPair> set =
new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);
ConcurrentOpenLongPairRangeSet<LongPairRangeSet.LongPair> rangeSet =
new ConcurrentOpenLongPairRangeSet<>(consumer);
OpenLongPairRangeSet<LongPairRangeSet.LongPair> rangeSet =
new OpenLongPairRangeSet<>(consumer);

assertNull(set.firstRange());
assertNull(set.lastRange());
Expand Down
Loading

0 comments on commit 2e99b70

Please sign in to comment.