Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,25 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
}
}

/**
* Checks if the given sequence number was marked as completed in this tracker.
*/
public boolean contains(final long seqNo) {
assert seqNo >= 0 : "invalid seq_no=" + seqNo;
if (seqNo >= nextSeqNo) {
return false;
}
if (seqNo <= checkpoint) {
return true;
}
final long bitSetKey = getBitSetKey(seqNo);
final CountedBitSet bitSet;
synchronized (this) {
bitSet = processedSeqNo.get(bitSetKey);
}
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
}

/**
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* current checkpoint is processed.
Expand Down Expand Up @@ -206,7 +225,6 @@ private long lastSeqNoInBitSet(final long bitSetKey) {
* @return the bit set corresponding to the provided sequence number
*/
private long getBitSetKey(final long seqNo) {
assert Thread.holdsLock(this);
return seqNo / BIT_SET_SIZE;
}

Expand All @@ -232,7 +250,6 @@ private CountedBitSet getBitSetForSeqNo(final long seqNo) {
* @return the position in the bit set corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this);
return Math.toIntExact(seqNo % BIT_SET_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,52 +65,71 @@ public void testSimplePrimary() {
assertThat(seqNo1, equalTo(0L));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0L), equalTo(true));
assertThat(tracker.contains(atLeast(1)), equalTo(false));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
tracker.markSeqNoAsCompleted(seqNo2);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(seqNo1), equalTo(false));
assertThat(tracker.contains(seqNo2), equalTo(true));
tracker.markSeqNoAsCompleted(seqNo1);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.contains(between(0, 2)), equalTo(true));
assertThat(tracker.contains(atLeast(3)), equalTo(false));
}

public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false));
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(0), equalTo(true));
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.contains(1L), equalTo(false));
assertThat(tracker.contains(2L), equalTo(true));
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.contains(between(0, 2)), equalTo(true));
assertThat(tracker.contains(atLeast(3)), equalTo(false));
}

public void testLazyInitialization() {
/*
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
long seqNo = randomNonNegativeLong();
tracker.markSeqNoAsCompleted(seqNo);
assertThat(tracker.processedSeqNo.size(), equalTo(1));
assertThat(tracker.contains(seqNo), equalTo(true));
assertThat(tracker.contains(randomValueOtherThan(seqNo, ESTestCase::randomNonNegativeLong)), equalTo(false));
assertThat(tracker.processedSeqNo.size(), equalTo(1));
}

public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>();
List<Long> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean();
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));

for (int i = 0; i < maxOps; i++) {
for (long i = 0; i < maxOps; i++) {
seqNoList.add(i);
}
Collections.shuffle(seqNoList, random());
for (Integer seqNo : seqNoList) {
for (Long seqNo : seqNoList) {
tracker.markSeqNoAsCompleted(seqNo);
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
assertThat(tracker.contains(randomFrom(seqNoList)), equalTo(true));
final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong);
assertThat(tracker.contains(notCompletedSeqNo), equalTo(false));
}

public void testConcurrentPrimary() throws InterruptedException {
Expand Down Expand Up @@ -199,8 +218,12 @@ protected void doRun() throws Exception {
}
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
assertThat(tracker.contains(randomValueOtherThan(unFinishedSeq, () -> (long) randomFrom(seqNos))), equalTo(true));
assertThat(tracker.contains(unFinishedSeq), equalTo(false));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.contains(unFinishedSeq), equalTo(true));
assertThat(tracker.contains(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
Expand Down Expand Up @@ -272,4 +295,23 @@ public void describeTo(Description description) {
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}

public void testContains() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
if (localCheckpoint >= 0) {
assertThat(tracker.contains(randomLongBetween(0, localCheckpoint)), equalTo(true));
}
assertThat(tracker.contains(randomLongBetween(localCheckpoint + 1, Long.MAX_VALUE)), equalTo(false));
final int numOps = between(1, 100);
final List<Long> seqNos = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
long seqNo = randomLongBetween(0, 1000);
seqNos.add(seqNo);
tracker.markSeqNoAsCompleted(seqNo);
}
final long seqNo = randomNonNegativeLong();
assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
}
}