From 29de5984f6fa388fc126dd6e72b9c103036b4e8d Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 11 Nov 2019 10:46:47 +0800 Subject: [PATCH] [ARROW-7106][Java] Fix the problem that flight perf test hangs endlessly --- .../apache/arrow/flight/perf/TestPerf.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java index 49567dda08b..5652e987d51 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java @@ -17,6 +17,7 @@ package org.apache.arrow.flight.perf; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -78,7 +79,11 @@ public static void main(String[] args) throws Exception { @Test public void throughput() throws Exception { - for (int i = 0; i < 10; i++) { + final int numRuns = 10; + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)); + double [] throughPuts = new double[numRuns]; + + for (int i = 0; i < numRuns; i++) { try ( final BufferAllocator a = new RootAllocator(Long.MAX_VALUE); final PerformanceTestServer server = @@ -86,7 +91,6 @@ public void throughput() throws Exception { final FlightClient client = FlightClient.builder(a, server.getLocation()).build(); ) { final FlightInfo info = client.getInfo(getPerfFlightDescriptor(50_000_000L, 4095, 2)); - ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)); List> results = info.getEndpoints() .stream() .map(t -> new Consumer(client, t.getTicket())) @@ -102,16 +106,25 @@ public void throughput() throws Exception { }).get(); double seconds = r.nanos * 1.0d / 1000 / 1000 / 1000; + throughPuts[i] = (r.bytes * 1.0d / 1024 / 1024) / seconds; System.out.println(String.format( "Transferred %d records totaling %s bytes at %f MiB/s. %f record/s. %f batch/s.", r.rows, r.bytes, - (r.bytes * 1.0d / 1024 / 1024) / seconds, + throughPuts[i], (r.rows * 1.0d) / seconds, (r.batches * 1.0d) / seconds )); } } + pool.shutdown(); + + System.out.println("Summary: "); + double average = Arrays.stream(throughPuts).sum() / numRuns; + double sqrSum = Arrays.stream(throughPuts).map(val -> val - average).map(val -> val * val).sum(); + double stddev = Math.sqrt(sqrSum / numRuns); + System.out.println(String.format("Average throughput: %f MiB/s, standard deviation: %f MiB/s", + average, stddev)); } private final class Consumer implements Callable {