Skip to content

Commit

Permalink
#12268 reset _iterate flag when another processing is scheduled
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban committed Sep 16, 2024
1 parent 1a55579 commit c241a14
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ private void processing()
}
case SCHEDULED:
{
_iterate = false;
// we won the race against the callback, so the callback has to process and we can break processing
_state = State.PENDING;
break processing;
Expand All @@ -300,6 +301,7 @@ private void processing()
{
if (action != Action.SCHEDULED)
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
_iterate = false;
// we lost the race, so we have to keep processing
_state = State.PROCESSING;
continue;
Expand Down Expand Up @@ -459,6 +461,14 @@ public void close()
onCompleteFailure(new IOException(failure));
}

boolean isPending()
{
try (AutoLock ignored = _lock.lock())
{
return _state == State.PENDING;
}
}

/**
* @return whether this callback is idle, and {@link #iterate()} needs to be called
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -44,6 +48,45 @@ public void dispose() throws Exception
scheduler.stop();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testIterateWhileProcessingLoopCount(boolean succeededWinsRace)
{
var icb = new IteratingCallback()
{
int counter = 0;

@Override
protected Action process()
{
int counter = this.counter++;
if (counter == 0)
{
iterate();
if (succeededWinsRace)
{
succeeded();
}
else
{
new Thread(() ->
{
await().atMost(5, TimeUnit.SECONDS).until(this::isPending, is(true));
succeeded();
}).start();
}
return Action.SCHEDULED;
}
return Action.IDLE;
}
};

icb.iterate();

await().atMost(10, TimeUnit.SECONDS).until(icb::isIdle, is(true));
assertEquals(2, icb.counter);
}

@Test
public void testNonWaitingProcess() throws Exception
{
Expand Down

0 comments on commit c241a14

Please sign in to comment.