diff --git a/client/trino-cli/src/main/java/io/trino/cli/ClientOptions.java b/client/trino-cli/src/main/java/io/trino/cli/ClientOptions.java index 7b0563231b48..f7c10aaba44b 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/ClientOptions.java +++ b/client/trino-cli/src/main/java/io/trino/cli/ClientOptions.java @@ -300,6 +300,12 @@ public class ClientOptions @Option(names = "--decimal-data-size", description = "Show data size and rate in base 10 rather than base 2") public boolean decimalDataSize; + @Option(names = "--max-buffered-rows", paramLabel = "", description = "Maximum number of rows to buffer in memory before writing to output (default: ${DEFAULT-VALUE})") + public int maxBufferedRows = 10_000; + + @Option(names = "--max-queued-rows", paramLabel = "", description = "Maximum number of rows to queue before blocking the query (default: ${DEFAULT-VALUE})") + public int maxQueuedRows = 50_000; + public enum OutputFormat { AUTO, diff --git a/client/trino-cli/src/main/java/io/trino/cli/Console.java b/client/trino-cli/src/main/java/io/trino/cli/Console.java index e5e82c932ce7..9c26f5793afd 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/Console.java +++ b/client/trino-cli/src/main/java/io/trino/cli/Console.java @@ -177,7 +177,9 @@ public boolean run() try (QueryRunner queryRunner = new QueryRunner( uri, session, - clientOptions.debug)) { + clientOptions.debug, + clientOptions.maxQueuedRows, + clientOptions.maxBufferedRows)) { if (hasQuery) { return executeCommand( queryRunner, diff --git a/client/trino-cli/src/main/java/io/trino/cli/OutputHandler.java b/client/trino-cli/src/main/java/io/trino/cli/OutputHandler.java index f1c8b76a5eff..56c676171599 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/OutputHandler.java +++ b/client/trino-cli/src/main/java/io/trino/cli/OutputHandler.java @@ -37,17 +37,19 @@ public final class OutputHandler implements Closeable { - private static final int MAX_QUEUED_ROWS = 50_000; - private static final int MAX_BUFFERED_ROWS = 10_000; private static final Duration MAX_BUFFER_TIME = new Duration(3, SECONDS); private static final List END_TOKEN = new ArrayList<>(0); private final AtomicBoolean closed = new AtomicBoolean(); private final OutputPrinter printer; + private final int maxQueuedRows; + private final int maxBufferedRows; - public OutputHandler(OutputPrinter printer) + public OutputHandler(OutputPrinter printer, int maxQueuedRows, int maxBufferedRows) { this.printer = requireNonNull(printer, "printer is null"); + this.maxQueuedRows = maxQueuedRows; + this.maxBufferedRows = maxBufferedRows; } @Override @@ -62,7 +64,7 @@ public void close() public void processRows(StatementClient client) throws IOException { - BlockingQueue> rowQueue = new ArrayBlockingQueue<>(MAX_QUEUED_ROWS); + BlockingQueue> rowQueue = new ArrayBlockingQueue<>(maxQueuedRows); CompletableFuture readerFuture = CompletableFuture.runAsync(() -> { while (client.isRunning()) { for (List row : client.currentRows()) { @@ -72,17 +74,17 @@ public void processRows(StatementClient client) } }).whenComplete((result, ex) -> putOrThrow(rowQueue, END_TOKEN)); - List> rowBuffer = new ArrayList<>(MAX_BUFFERED_ROWS); + List> rowBuffer = new ArrayList<>(maxBufferedRows); long bufferStart = System.nanoTime(); try { while (!readerFuture.isDone()) { - boolean atEnd = drainDetectingEnd(rowQueue, rowBuffer, MAX_BUFFERED_ROWS, END_TOKEN); + boolean atEnd = drainDetectingEnd(rowQueue, rowBuffer, maxBufferedRows, END_TOKEN); if (atEnd) { break; } // Flush if needed - if (rowBuffer.size() >= MAX_BUFFERED_ROWS || nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) { + if (rowBuffer.size() >= maxBufferedRows || nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) { printer.printRows(unmodifiableList(rowBuffer), false); rowBuffer.clear(); bufferStart = System.nanoTime(); diff --git a/client/trino-cli/src/main/java/io/trino/cli/Query.java b/client/trino-cli/src/main/java/io/trino/cli/Query.java index b46dab2daaa5..16cfdec6c22b 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/Query.java +++ b/client/trino-cli/src/main/java/io/trino/cli/Query.java @@ -68,11 +68,15 @@ public class Query private final AtomicBoolean ignoreUserInterrupt = new AtomicBoolean(); private final StatementClient client; private final boolean debug; + private final int maxQueuedRows; + private final int maxBufferedRows; - public Query(StatementClient client, boolean debug) + public Query(StatementClient client, boolean debug, int maxQueuedRows, int maxBufferedRows) { this.client = requireNonNull(client, "client is null"); this.debug = debug; + this.maxQueuedRows = maxQueuedRows; + this.maxBufferedRows = maxBufferedRows; } public Optional getSetCatalog() @@ -263,7 +267,7 @@ else if (results.getColumns() != null && !results.getColumns().isEmpty()) { private void discardResults() { - try (OutputHandler handler = new OutputHandler(new NullPrinter())) { + try (OutputHandler handler = new OutputHandler(new NullPrinter(), 100, 100)) { handler.processRows(client); } catch (IOException e) { @@ -302,7 +306,7 @@ private void pageOutput(Optional pagerName, OutputFormat format, int max try (Pager pager = Pager.create(pagerName); ThreadInterruptor clientThread = new ThreadInterruptor(); Writer writer = createWriter(pager); - OutputHandler handler = createOutputHandler(format, maxWidth, writer, columns)) { + OutputHandler handler = createOutputHandler(format, maxWidth, writer, columns, maxQueuedRows, maxBufferedRows)) { if (!pager.isNullPager()) { // ignore the user pressing ctrl-C while in the pager ignoreUserInterrupt.set(true); @@ -325,14 +329,14 @@ private void pageOutput(Optional pagerName, OutputFormat format, int max private void sendOutput(PrintStream out, OutputFormat format, int maxWidth, List fieldNames) throws IOException { - try (OutputHandler handler = createOutputHandler(format, maxWidth, createWriter(out), fieldNames)) { + try (OutputHandler handler = createOutputHandler(format, maxWidth, createWriter(out), fieldNames, maxQueuedRows, maxBufferedRows)) { handler.processRows(client); } } - private static OutputHandler createOutputHandler(OutputFormat format, int maxWidth, Writer writer, List columns) + private static OutputHandler createOutputHandler(OutputFormat format, int maxWidth, Writer writer, List columns, int maxQueuedRows, int maxBufferedRows) { - return new OutputHandler(createOutputPrinter(format, maxWidth, writer, columns)); + return new OutputHandler(createOutputPrinter(format, maxWidth, writer, columns), maxQueuedRows, maxBufferedRows); } private static OutputPrinter createOutputPrinter(OutputFormat format, int maxWidth, Writer writer, List columns) diff --git a/client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java b/client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java index 2c3ad201b7ff..f2726570172e 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java +++ b/client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java @@ -36,8 +36,10 @@ public class QueryRunner private final boolean debug; private final OkHttpClient httpClient; private final OkHttpClient segmentHttpClient; + private final int maxQueuedRows; + private final int maxBufferedRows; - public QueryRunner(TrinoUri uri, ClientSession session, boolean debug) + public QueryRunner(TrinoUri uri, ClientSession session, boolean debug, int maxQueuedRows, int maxBufferedRows) { this.session = new AtomicReference<>(requireNonNull(session, "session is null")); this.httpClient = HttpClientFactory.toHttpClientBuilder(uri, USER_AGENT).build(); @@ -45,6 +47,8 @@ public QueryRunner(TrinoUri uri, ClientSession session, boolean debug) .unauthenticatedClientBuilder(uri, USER_AGENT) .build(); this.debug = debug; + this.maxQueuedRows = maxQueuedRows; + this.maxBufferedRows = maxBufferedRows; } public ClientSession getSession() @@ -64,7 +68,7 @@ public boolean isDebug() public Query startQuery(String query) { - return new Query(startInternalQuery(session.get(), query), debug); + return new Query(startInternalQuery(session.get(), query), debug, maxQueuedRows, maxBufferedRows); } public StatementClient startInternalQuery(String query) diff --git a/client/trino-cli/src/test/java/io/trino/cli/TestClientOptions.java b/client/trino-cli/src/test/java/io/trino/cli/TestClientOptions.java index e9be6d2a897e..6ebeaf2c91a7 100644 --- a/client/trino-cli/src/test/java/io/trino/cli/TestClientOptions.java +++ b/client/trino-cli/src/test/java/io/trino/cli/TestClientOptions.java @@ -383,6 +383,8 @@ private boolean isCliSpecificOptions(String name) case "editingMode": case "disableAutoSuggestion": case "decimalDataSize": + case "maxBufferedRows": + case "maxQueuedRows": return true; } diff --git a/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java b/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java index 5f5b7eedd144..d2071cb5e977 100644 --- a/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java +++ b/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java @@ -156,7 +156,9 @@ static QueryRunner createQueryRunner(TrinoUri uri, ClientSession clientSession) return new QueryRunner( uri, clientSession, - false); + false, + 1000, + 500); } static PrintStream nullPrintStream()