Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,18 @@ public static void run(Class<?> testClass, PerfStressOptions options) {

if (options.getTestProxy() != null) {
Disposable recordStatus = printStatus("=== Record and Start Playback ===", () -> ".", false, false);
Flux.just(tests).flatMap(PerfStressTest::recordAndStartPlaybackAsync).blockLast();

try {
ForkJoinPool forkJoinPool = new ForkJoinPool(tests.length);
forkJoinPool.submit(() -> {
IntStream.range(0, tests.length).parallel().forEach(i -> tests[i].recordAndStartPlayback());
}).get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e);
e.printStackTrace(System.err);
throw new RuntimeException(e);
}

startedPlayback = true;
recordStatus.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,32 +128,30 @@ public Mono<Void> setupAsync() {
}

/**
* Records responses and starts async tests in playback mode.
* @return An empty {@link Mono}.
* Records responses and starts tests in playback mode.
*/
public Mono<Void> recordAndStartPlaybackAsync() {
public void recordAndStartPlayback() {
// Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests.
return runSyncOrAsync()
.then(startRecordingAsync())
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("record");
})
// Must use Mono.defer() to ensure fields are set from prior requests
.then(Mono.defer(() -> runSyncOrAsync()))
.then(Mono.defer(() -> stopRecordingAsync()))
.then(Mono.defer(() -> startPlaybackAsync()))
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("playback");
});
runSyncOrAsync();

startRecordingAsync().block();

testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("record");

runSyncOrAsync();
stopRecordingAsync().block();
startPlaybackAsync().block();

testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("playback");
}

private Mono<Void> runSyncOrAsync() {
private void runSyncOrAsync() {
if (options.isSync()) {
return Mono.empty().then().doOnSuccess(x -> run());
run();
} else {
return runAsync();
runAsync().block();
}
}

Expand Down