Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion presto-cli/src/main/java/com/facebook/presto/cli/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class Query
private final AtomicBoolean ignoreUserInterrupt = new AtomicBoolean();
private final StatementClient client;
private final boolean debug;
private Optional<Long> clientStopTimestamp = Optional.empty();

public Query(StatementClient client, boolean debug)
{
Expand Down Expand Up @@ -174,7 +175,7 @@ else if (results.getColumns() == null) {
if (statusPrinter != null) {
// Print all warnings at the end of the query
new PrintStreamWarningsPrinter(System.err).print(client.finalStatusInfo().getWarnings(), true, true);
statusPrinter.printFinalInfo();
statusPrinter.printFinalInfo(clientStopTimestamp);
}
else {
// Print remaining warnings separated
Expand Down Expand Up @@ -234,6 +235,9 @@ private void discardResults()
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
recordClientStop();
}
}

private void renderResults(PrintStream out, OutputFormat outputFormat, boolean interactive, List<Column> columns)
Expand Down Expand Up @@ -279,20 +283,34 @@ private void pageOutput(OutputFormat format, List<String> fieldNames)
});
}
handler.processRows(client);
// Record the CLI query end time *before* closing the handler and pager
recordClientStop();
}
catch (RuntimeException | IOException e) {
if (client.isClientAborted() && !(e instanceof QueryAbortedException)) {
recordClientStop();
throw new QueryAbortedException(e);
}
throw e;
}
}

/**
* Records the earliest timestamp that we were finished with the {@link StatementClient}
* This can be either when we're done fetching all results from the server or query
* Or we ran into a failure and decided to stop using the client
*/
private void recordClientStop()
{
clientStopTimestamp = Optional.of(Math.min(System.nanoTime(), clientStopTimestamp.orElse(Long.MAX_VALUE)));
}

private void sendOutput(PrintStream out, OutputFormat format, List<String> fieldNames)
throws IOException
{
try (OutputHandler handler = createOutputHandler(format, createWriter(out), fieldNames)) {
handler.processRows(client);
recordClientStop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.PrintStream;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -42,6 +43,8 @@
import static com.google.common.base.Verify.verify;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.Duration.nanosSince;
import static io.airlift.units.Duration.succinctDuration;
import static io.airlift.units.Duration.succinctNanos;
import static java.lang.Character.toUpperCase;
import static java.lang.Math.max;
import static java.lang.Math.min;
Expand Down Expand Up @@ -154,20 +157,24 @@ private void updateScreen(WarningsPrinter warningsPrinter)
private String autoFormatMetricValue(RuntimeUnit unit, long value)
{
if (unit == RuntimeUnit.NANO) {
return formatTime(Duration.succinctNanos(value));
return formatTime(succinctNanos(value));
}
if (unit == RuntimeUnit.BYTE) {
return formatDataSize(bytes(value), true);
}
return formatCount(value); // NONE
}

public void printFinalInfo()
public void printFinalInfo(Optional<Long> clientStopTimestamp)
{
Duration wallTime = nanosSince(start);
//We don't want to use nanosSince(start) for client side latency since that will include time the user spent
// viewing the results in the pager. Instead, we use the caller provided clientStopTimestamp (if present) for
// accurate client side latency
Duration clientSideWallTime = succinctNanos(clientStopTimestamp.orElse(System.nanoTime()) - start);

QueryStatusInfo results = client.finalStatusInfo();
StatementStats stats = results.getStats();
Duration serverSideWallTime = succinctDuration(stats.getElapsedTimeMillis(), MILLISECONDS);

int nodes = stats.getNodes();
if ((nodes == 0) || (stats.getTotalSplits() == 0)) {
Expand Down Expand Up @@ -206,13 +213,13 @@ public void printFinalInfo()
(int) percentage(stats.getCpuTimeMillis(), stats.getWallTimeMillis()));
out.println(cpuTimeSummary);

double parallelism = cpuTime.getValue(MILLISECONDS) / wallTime.getValue(MILLISECONDS);
double parallelism = cpuTime.getValue(MILLISECONDS) / serverSideWallTime.getValue(MILLISECONDS);

// Per Node: 3.5 parallelism, 83.3K rows/s, 0.7 MB/s
String perNodeSummary = format("Per Node: %.1f parallelism, %5s rows/s, %8s",
parallelism / nodes,
formatCountRate((double) stats.getProcessedRows() / nodes, wallTime, false),
formatDataRate(bytes(stats.getProcessedBytes() / nodes), wallTime, true));
formatCountRate((double) stats.getProcessedRows() / nodes, serverSideWallTime, false),
formatDataRate(bytes(stats.getProcessedBytes() / nodes), serverSideWallTime, true));
reprintLine(perNodeSummary);

// Parallelism: 5.3
Expand Down Expand Up @@ -242,13 +249,14 @@ public void printFinalInfo()
}
}

// 0:32 [2.12GB, 15M rows] [67MB/s, 463K rows/s]
String statsLine = format("%s [%s rows, %s] [%s rows/s, %s]",
formatTime(wallTime),
// [Measured Latency: client-side: 0:33, server-side: 0:32] [2.12GB, 15M rows] [67MB/s, 463K rows/s]
String statsLine = format("[Latency: client-side: %s, server-side: %s] [%s rows, %s] [%s rows/s, %s]",
formatTime(clientSideWallTime),
formatTime(serverSideWallTime),
formatCount(stats.getProcessedRows()),
formatDataSize(bytes(stats.getProcessedBytes()), true),
formatCountRate(stats.getProcessedRows(), wallTime, false),
formatDataRate(bytes(stats.getProcessedBytes()), wallTime, true));
formatCountRate(stats.getProcessedRows(), serverSideWallTime, false),
formatDataRate(bytes(stats.getProcessedBytes()), serverSideWallTime, true));

out.println(statsLine);

Expand Down