Skip to content

Commit

Permalink
[grid] delay the newsessionqueue response (#14764)
Browse files Browse the repository at this point in the history
* [grid] delay the newsessionqueue response

* Update variable naming with prefix `DEFAULT_`

Signed-off-by: Viet Nguyen Duc <[email protected]>

---------

Signed-off-by: Viet Nguyen Duc <[email protected]>
Co-authored-by: Viet Nguyen Duc <[email protected]>
  • Loading branch information
joerg1985 and VietND96 authored Dec 25, 2024
1 parent 8a04527 commit 359ac9a
Show file tree
Hide file tree
Showing 19 changed files with 99 additions and 7 deletions.
1 change: 1 addition & 0 deletions java/src/org/openqa/selenium/grid/commands/Hub.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ protected Handlers createHandlers(Config config) {
distributorOptions.getSlotMatcher(),
newSessionRequestOptions.getSessionRequestTimeoutPeriod(),
newSessionRequestOptions.getSessionRequestTimeout(),
newSessionRequestOptions.getMaximumResponseDelay(),
secret,
newSessionRequestOptions.getBatchSize());
handler.addHandler(queue);
Expand Down
1 change: 1 addition & 0 deletions java/src/org/openqa/selenium/grid/commands/Standalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ protected Handlers createHandlers(Config config) {
distributorOptions.getSlotMatcher(),
newSessionRequestOptions.getSessionRequestTimeoutPeriod(),
newSessionRequestOptions.getSessionRequestTimeout(),
newSessionRequestOptions.getMaximumResponseDelay(),
registrationSecret,
newSessionRequestOptions.getBatchSize());
combinedHandler.addHandler(queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class NewSessionQueueOptions {

static final String SESSION_QUEUE_SECTION = "sessionqueue";
static final int DEFAULT_MAXIMUM_RESPONSE_DELAY = 8;
static final int DEFAULT_REQUEST_TIMEOUT = 300;
static final int DEFAULT_REQUEST_TIMEOUT_PERIOD = 10;
static final int DEFAULT_RETRY_INTERVAL = 15;
Expand Down Expand Up @@ -89,6 +90,15 @@ public URI getSessionQueueUri() {
}
}

public Duration getMaximumResponseDelay() {
int timeout =
config
.getInt(SESSION_QUEUE_SECTION, "maximum-response-delay")
.orElse(DEFAULT_MAXIMUM_RESPONSE_DELAY);

return Duration.ofSeconds(timeout);
}

public Duration getSessionRequestTimeout() {
// If the user sets 0 or less, we default to 1s.
int timeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class LocalNewSessionQueue extends NewSessionQueue implements Closeable {
private static final String NAME = "Local New Session Queue";
private final SlotMatcher slotMatcher;
private final Duration requestTimeout;
private final Duration maximumResponseDelay;
private final int batchSize;
private final Map<RequestId, Data> requests;
private final Map<RequestId, TraceContext> contexts;
Expand All @@ -115,6 +116,7 @@ public LocalNewSessionQueue(
SlotMatcher slotMatcher,
Duration requestTimeoutCheck,
Duration requestTimeout,
Duration maximumResponseDelay,
Secret registrationSecret,
int batchSize) {
super(tracer, registrationSecret);
Expand All @@ -123,6 +125,7 @@ public LocalNewSessionQueue(
Require.nonNegative("Retry period", requestTimeoutCheck);

this.requestTimeout = Require.positive("Request timeout", requestTimeout);
this.maximumResponseDelay = Require.positive("Maximum response delay", maximumResponseDelay);

this.requests = new ConcurrentHashMap<>();
this.queue = new ConcurrentLinkedDeque<>();
Expand Down Expand Up @@ -152,6 +155,7 @@ public static NewSessionQueue create(Config config) {
slotMatcher,
newSessionQueueOptions.getSessionRequestTimeoutPeriod(),
newSessionQueueOptions.getSessionRequestTimeout(),
newSessionQueueOptions.getMaximumResponseDelay(),
secretOptions.getRegistrationSecret(),
newSessionQueueOptions.getBatchSize());
}
Expand Down Expand Up @@ -234,7 +238,9 @@ public HttpResponse addToQueue(SessionRequest request) {
}

Lock writeLock = this.lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
requests.remove(request.getRequestId());
queue.remove(request);
Expand Down Expand Up @@ -268,7 +274,9 @@ Data injectIntoQueue(SessionRequest request) {
Data data = new Data(request.getEnqueued());

Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
requests.put(request.getRequestId(), data);
queue.addLast(request);
Expand All @@ -288,7 +296,9 @@ public boolean retryAddToQueue(SessionRequest request) {
contexts.getOrDefault(request.getRequestId(), tracer.getCurrentContext());
try (Span ignored = context.createSpan("sessionqueue.retry")) {
Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
if (!requests.containsKey(request.getRequestId())) {
return false;
Expand Down Expand Up @@ -324,7 +334,9 @@ public Optional<SessionRequest> remove(RequestId reqId) {
Require.nonNull("Request ID", reqId);

Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
Iterator<SessionRequest> iterator = queue.iterator();
while (iterator.hasNext()) {
Expand All @@ -345,6 +357,29 @@ public Optional<SessionRequest> remove(RequestId reqId) {
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
Require.nonNull("Stereotypes", stereotypes);

// use nano time to avoid issues with a jumping clock e.g. on WSL2 or due to time-sync
long started = System.nanoTime();
// delay the response to avoid heavy polling via http
while (maximumResponseDelay.toNanos() > System.nanoTime() - started) {
Lock readLock = lock.readLock();
readLock.lock();

try {
if (!queue.isEmpty()) {
break;
}
} finally {
readLock.unlock();
}

try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}

Predicate<Capabilities> matchesStereotype =
caps ->
stereotypes.entrySet().stream()
Expand All @@ -360,7 +395,9 @@ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes
});

Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
List<SessionRequest> availableRequests =
queue.stream()
Expand Down Expand Up @@ -397,7 +434,9 @@ public boolean complete(
try (Span ignored = context.createSpan("sessionqueue.completed")) {
Data data;
Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}
try {
data = requests.remove(reqId);
queue.removeIf(req -> reqId.equals(req.getRequestId()));
Expand All @@ -417,7 +456,9 @@ public boolean complete(
@Override
public int clearQueue() {
Lock writeLock = lock.writeLock();
writeLock.lock();
if (!writeLock.tryLock()) {
writeLock.lock();
}

try {
int size = queue.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void setUpDistributor() throws MalformedURLException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void drainedNodeDoesNotShutDownIfNotEmpty() throws InterruptedException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalNode node =
Expand Down Expand Up @@ -106,6 +107,7 @@ void drainedNodeShutsDownAfterSessionsFinish() throws InterruptedException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalNode node =
Expand Down Expand Up @@ -182,6 +184,7 @@ void testDrainedNodeShutsDownOnceEmpty() throws InterruptedException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalNode node =
Expand Down Expand Up @@ -234,6 +237,7 @@ void drainingNodeDoesNotAcceptNewSessions() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalNode node =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void shouldBeAbleToRemoveANode() throws MalformedURLException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -147,6 +148,7 @@ void shouldIncludeHostsThatAreUpInHostList() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
handler.addHandler(sessions);
Expand Down Expand Up @@ -219,6 +221,7 @@ void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold() throws Interrupt
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -281,6 +284,7 @@ void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthCheckPasse
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -338,6 +342,7 @@ void shouldBeAbleToAddANodeAndCreateASession() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
LocalNode node =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void creatingASessionAddsItToTheSessionMap() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -134,6 +135,7 @@ void shouldReleaseSlotOnceSessionEnds() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -203,6 +205,7 @@ void shouldNotStartASessionIfTheCapabilitiesAreNotSupported() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
handler.addHandler(sessions);
Expand Down Expand Up @@ -243,6 +246,7 @@ void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void setUp() throws URISyntaxException {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void theMostLightlyLoadedNodeIsSelectedFirst() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -121,6 +122,7 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Node leastRecent = createNode(caps, 5, 0);
Expand Down Expand Up @@ -192,6 +194,7 @@ void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -247,6 +250,7 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ void testAddNodeToDistributor() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Distributor distributor =
Expand Down Expand Up @@ -157,6 +158,7 @@ void testRemoveNodeFromDistributor() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Distributor distributor =
Expand Down Expand Up @@ -195,6 +197,7 @@ void testAddSameNodeTwice() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Distributor distributor =
Expand Down Expand Up @@ -228,6 +231,7 @@ void shouldBeAbleToAddMultipleSessionsConcurrently() throws Exception {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down Expand Up @@ -321,6 +325,7 @@ void testDrainNodeFromDistributor() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Distributor distributor =
Expand Down Expand Up @@ -366,6 +371,7 @@ void testDrainNodeFromNode() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);
Distributor distributor =
Expand Down Expand Up @@ -396,6 +402,7 @@ void slowStartingNodesShouldNotCauseReservationsToBeSerialized() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void setupGrid() {
new DefaultSlotMatcher(),
Duration.ofSeconds(2),
Duration.ofSeconds(2),
Duration.ofSeconds(1),
registrationSecret,
5);

Expand Down
Loading

0 comments on commit 359ac9a

Please sign in to comment.