Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit fc8e520

Browse files
authored
fix: fix flaky tests and non blocking semaphore (#1365)
* fix: fix race condition in non blocking semaphore * update comment
1 parent 3c45191 commit fc8e520

File tree

3 files changed

+42
-66
lines changed

3 files changed

+42
-66
lines changed

gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java

+12-13
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
/** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
3737
class NonBlockingSemaphore implements Semaphore64 {
38-
private AtomicLong availablePermits;
38+
private AtomicLong acquiredPermits;
3939
private AtomicLong limit;
4040

4141
private static void checkNotNegative(long l) {
@@ -44,17 +44,18 @@ private static void checkNotNegative(long l) {
4444

4545
NonBlockingSemaphore(long permits) {
4646
checkNotNegative(permits);
47-
this.availablePermits = new AtomicLong(permits);
47+
this.acquiredPermits = new AtomicLong(0);
4848
this.limit = new AtomicLong(permits);
4949
}
5050

5151
@Override
5252
public void release(long permits) {
5353
checkNotNegative(permits);
5454
while (true) {
55-
long old = availablePermits.get();
55+
long old = acquiredPermits.get();
5656
// TODO: throw exceptions when the permits overflow
57-
if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) {
57+
long newAcquired = Math.max(0, old - permits);
58+
if (acquiredPermits.compareAndSet(old, newAcquired)) {
5859
return;
5960
}
6061
}
@@ -64,11 +65,11 @@ public void release(long permits) {
6465
public boolean acquire(long permits) {
6566
checkNotNegative(permits);
6667
while (true) {
67-
long old = availablePermits.get();
68-
if (old < permits) {
68+
long old = acquiredPermits.get();
69+
if (old + permits > limit.get()) {
6970
return false;
7071
}
71-
if (availablePermits.compareAndSet(old, old - permits)) {
72+
if (acquiredPermits.compareAndSet(old, old + permits)) {
7273
return true;
7374
}
7475
}
@@ -79,13 +80,13 @@ public boolean acquirePartial(long permits) {
7980
checkNotNegative(permits);
8081
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
8182
// limit. This will allow individual large requests to be sent. Please note that this behavior
82-
// will result in availablePermits going negative.
83+
// will result in acquiredPermits going over limit.
8384
while (true) {
84-
long old = availablePermits.get();
85-
if (old < Math.min(limit.get(), permits)) {
85+
long old = acquiredPermits.get();
86+
if (old + permits > limit.get() && old > 0) {
8687
return false;
8788
}
88-
if (availablePermits.compareAndSet(old, old - permits)) {
89+
if (acquiredPermits.compareAndSet(old, old + permits)) {
8990
return true;
9091
}
9192
}
@@ -94,7 +95,6 @@ public boolean acquirePartial(long permits) {
9495
@Override
9596
public void increasePermitLimit(long permits) {
9697
checkNotNegative(permits);
97-
availablePermits.addAndGet(permits);
9898
limit.addAndGet(permits);
9999
}
100100

@@ -106,7 +106,6 @@ public void reducePermitLimit(long reduction) {
106106
long oldLimit = limit.get();
107107
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
108108
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
109-
availablePermits.addAndGet(-reduction);
110109
return;
111110
}
112111
}

gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java

+6-21
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636

3737
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
3838
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
39-
import java.util.ArrayList;
40-
import java.util.List;
4139
import java.util.concurrent.TimeUnit;
4240
import org.junit.Test;
4341
import org.junit.runner.RunWith;
@@ -71,27 +69,14 @@ public void testCreateEvent() {
7169
}
7270

7371
@Test
74-
public void testGetLastEvent() throws InterruptedException {
75-
final FlowControlEventStats stats = new FlowControlEventStats();
76-
final long currentTime = System.currentTimeMillis();
72+
public void testGetLastEvent() {
73+
FlowControlEventStats stats = new FlowControlEventStats();
74+
long currentTime = System.currentTimeMillis();
7775

78-
List<Thread> threads = new ArrayList<>();
7976
for (int i = 1; i <= 10; i++) {
80-
final int timeElapsed = i;
81-
Thread t =
82-
new Thread() {
83-
@Override
84-
public void run() {
85-
stats.recordFlowControlEvent(
86-
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
87-
}
88-
};
89-
threads.add(t);
90-
t.start();
91-
}
92-
93-
for (Thread t : threads) {
94-
t.join(10);
77+
int timeElapsed = i;
78+
stats.recordFlowControlEvent(
79+
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
9580
}
9681

9782
assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());

gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java

+24-32
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@
4444
import java.util.ArrayList;
4545
import java.util.List;
4646
import java.util.Random;
47+
import java.util.concurrent.ExecutionException;
4748
import java.util.concurrent.ExecutorService;
4849
import java.util.concurrent.Executors;
4950
import java.util.concurrent.Future;
5051
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.TimeoutException;
5253
import java.util.concurrent.atomic.AtomicInteger;
53-
import org.junit.Ignore;
5454
import org.junit.Test;
5555
import org.junit.runner.RunWith;
5656
import org.junit.runners.JUnit4;
@@ -500,7 +500,6 @@ private void testRejectedReserveRelease(
500500
}
501501

502502
flowController.release(1, 1);
503-
504503
flowController.reserve(maxElementCount, maxNumBytes);
505504
flowController.release(maxElementCount, maxNumBytes);
506505
}
@@ -523,11 +522,11 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
523522
final AtomicInteger totalDecreased = new AtomicInteger(0);
524523
final AtomicInteger releasedCounter = new AtomicInteger(0);
525524

526-
List<Thread> reserveThreads =
525+
List<Future> reserveThreads =
527526
testConcurrentUpdates(
528-
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
529-
for (Thread t : reserveThreads) {
530-
t.join(200);
527+
flowController, 100, 100, 10, totalIncreased, totalDecreased, releasedCounter);
528+
for (Future t : reserveThreads) {
529+
t.get(200, TimeUnit.MILLISECONDS);
531530
}
532531
assertEquals(reserveThreads.size(), releasedCounter.get());
533532
assertTrue(totalIncreased.get() > 0);
@@ -539,9 +538,6 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
539538
testBlockingReserveRelease(flowController, 0, expectedValue);
540539
}
541540

542-
// This test is very flaky. Remove @Ignore once https://github.com/googleapis/gax-java/issues/1359
543-
// is fixed.
544-
@Ignore
545541
@Test
546542
public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
547543
int initialValue = 5000;
@@ -559,11 +555,11 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
559555
AtomicInteger totalIncreased = new AtomicInteger(0);
560556
AtomicInteger totalDecreased = new AtomicInteger(0);
561557
AtomicInteger releasedCounter = new AtomicInteger(0);
562-
List<Thread> reserveThreads =
558+
List<Future> reserveThreads =
563559
testConcurrentUpdates(
564560
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
565-
for (Thread t : reserveThreads) {
566-
t.join(200);
561+
for (Future t : reserveThreads) {
562+
t.get(200, TimeUnit.MILLISECONDS);
567563
}
568564
assertEquals(reserveThreads.size(), releasedCounter.get());
569565
assertTrue(totalIncreased.get() > 0);
@@ -698,8 +694,7 @@ public void run() {
698694
};
699695
// blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
700696
flowController.reserve(5, 1);
701-
ExecutorService executor = Executors.newCachedThreadPool();
702-
Future<?> finished1 = executor.submit(runnable);
697+
Future<?> finished1 = Executors.newSingleThreadExecutor().submit(runnable);
703698
try {
704699
finished1.get(50, TimeUnit.MILLISECONDS);
705700
fail("reserve should block");
@@ -722,7 +717,7 @@ public void run() {
722717

723718
// Similar to blocked by element, test blocking by bytes.
724719
flowController.reserve(1, 5);
725-
Future<?> finished2 = executor.submit(runnable);
720+
Future<?> finished2 = Executors.newSingleThreadExecutor().submit(runnable);
726721
try {
727722
finished2.get(50, TimeUnit.MILLISECONDS);
728723
fail("reserve should block");
@@ -739,15 +734,15 @@ public void run() {
739734
.isAtLeast(currentTime);
740735
}
741736

742-
private List<Thread> testConcurrentUpdates(
737+
private List<Future> testConcurrentUpdates(
743738
final FlowController flowController,
744739
final int increaseStepRange,
745740
final int decreaseStepRange,
746741
final int reserve,
747742
final AtomicInteger totalIncreased,
748743
final AtomicInteger totalDecreased,
749744
final AtomicInteger releasedCounter)
750-
throws InterruptedException {
745+
throws InterruptedException, TimeoutException, ExecutionException {
751746
final Random random = new Random();
752747
Runnable increaseRunnable =
753748
new Runnable() {
@@ -779,22 +774,19 @@ public void run() {
779774
}
780775
}
781776
};
782-
List<Thread> updateThreads = new ArrayList<>();
783-
List<Thread> reserveReleaseThreads = new ArrayList<>();
784-
for (int i = 0; i < 20; i++) {
785-
Thread increase = new Thread(increaseRunnable);
786-
Thread decrease = new Thread(decreaseRunnable);
787-
Thread reserveRelease = new Thread(reserveReleaseRunnable);
788-
updateThreads.add(increase);
789-
updateThreads.add(decrease);
790-
reserveReleaseThreads.add(reserveRelease);
791-
increase.start();
792-
decrease.start();
793-
reserveRelease.start();
777+
List<Future> updateFuture = new ArrayList<>();
778+
List<Future> reserveReleaseFuture = new ArrayList<>();
779+
ExecutorService executors = Executors.newFixedThreadPool(10);
780+
ExecutorService reserveExecutor = Executors.newFixedThreadPool(10);
781+
for (int i = 0; i < 5; i++) {
782+
updateFuture.add(executors.submit(increaseRunnable));
783+
updateFuture.add(executors.submit(decreaseRunnable));
784+
reserveReleaseFuture.add(reserveExecutor.submit(reserveReleaseRunnable));
794785
}
795-
for (Thread t : updateThreads) {
796-
t.join(10);
786+
for (Future t : updateFuture) {
787+
t.get(50, TimeUnit.MILLISECONDS);
797788
}
798-
return reserveReleaseThreads;
789+
executors.shutdown();
790+
return reserveReleaseFuture;
799791
}
800792
}

0 commit comments

Comments
 (0)