diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java index 40088937ca68..155c96c0a963 100644 --- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java +++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java @@ -143,7 +143,18 @@ public static void run(Class testClass, PerfStressOptions options) { if (options.getTestProxies() != null && !options.getTestProxies().isEmpty()) { Disposable recordStatus = printStatus("=== Record and Start Playback ===", () -> ".", false, false); - Flux.just(tests).flatMap(PerfStressTest::recordAndStartPlaybackAsync).blockLast(); + + try { + ForkJoinPool forkJoinPool = new ForkJoinPool(tests.length); + forkJoinPool.submit(() -> { + IntStream.range(0, tests.length).parallel().forEach(i -> tests[i].recordAndStartPlayback()); + }).get(); + } catch (InterruptedException | ExecutionException e) { + System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e); + e.printStackTrace(System.err); + throw new RuntimeException(e); + } + startedPlayback = true; recordStatus.dispose(); } diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressTest.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressTest.java index 28902219e68e..6c5e4db83652 100644 --- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressTest.java +++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressTest.java @@ -137,32 +137,30 @@ public Mono setupAsync() { } /** - * Records responses and starts async tests in playback mode. - * @return An empty {@link Mono}. + * Records responses and starts tests in playback mode. */ - public Mono recordAndStartPlaybackAsync() { + public void recordAndStartPlayback() { // Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests. - return runSyncOrAsync() - .then(startRecordingAsync()) - .doOnSuccess(x -> { - testProxyPolicy.setRecordingId(recordingId); - testProxyPolicy.setMode("record"); - }) - // Must use Mono.defer() to ensure fields are set from prior requests - .then(Mono.defer(() -> runSyncOrAsync())) - .then(Mono.defer(() -> stopRecordingAsync())) - .then(Mono.defer(() -> startPlaybackAsync())) - .doOnSuccess(x -> { - testProxyPolicy.setRecordingId(recordingId); - testProxyPolicy.setMode("playback"); - }); + runSyncOrAsync(); + + startRecordingAsync().block(); + + testProxyPolicy.setRecordingId(recordingId); + testProxyPolicy.setMode("record"); + + runSyncOrAsync(); + stopRecordingAsync().block(); + startPlaybackAsync().block(); + + testProxyPolicy.setRecordingId(recordingId); + testProxyPolicy.setMode("playback"); } - private Mono runSyncOrAsync() { + private void runSyncOrAsync() { if (options.isSync()) { - return Mono.empty().then().doOnSuccess(x -> run()); + run(); } else { - return runAsync(); + runAsync().block(); } }