Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ public class ClientOptions
@Option(names = "--decimal-data-size", description = "Show data size and rate in base 10 rather than base 2")
public boolean decimalDataSize;

@Option(names = "--max-buffered-rows", paramLabel = "<maxBufferedRows>", description = "Maximum number of rows to buffer in memory before writing to output (default: ${DEFAULT-VALUE})")
public int maxBufferedRows = 10_000;

@Option(names = "--max-queued-rows", paramLabel = "<maxQueuedRows>", description = "Maximum number of rows to queue before blocking the query (default: ${DEFAULT-VALUE})")
public int maxQueuedRows = 50_000;

public enum OutputFormat
{
AUTO,
Expand Down
4 changes: 3 additions & 1 deletion client/trino-cli/src/main/java/io/trino/cli/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ public boolean run()
try (QueryRunner queryRunner = new QueryRunner(
uri,
session,
clientOptions.debug)) {
clientOptions.debug,
clientOptions.maxQueuedRows,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a check for those two configs? i,e at least should greater than 0.

clientOptions.maxBufferedRows)) {
if (hasQuery) {
return executeCommand(
queryRunner,
Expand Down
16 changes: 9 additions & 7 deletions client/trino-cli/src/main/java/io/trino/cli/OutputHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@
public final class OutputHandler
implements Closeable
{
private static final int MAX_QUEUED_ROWS = 50_000;
private static final int MAX_BUFFERED_ROWS = 10_000;
private static final Duration MAX_BUFFER_TIME = new Duration(3, SECONDS);
private static final List<?> END_TOKEN = new ArrayList<>(0);

private final AtomicBoolean closed = new AtomicBoolean();
private final OutputPrinter printer;
private final int maxQueuedRows;
private final int maxBufferedRows;

public OutputHandler(OutputPrinter printer)
public OutputHandler(OutputPrinter printer, int maxQueuedRows, int maxBufferedRows)
{
this.printer = requireNonNull(printer, "printer is null");
this.maxQueuedRows = maxQueuedRows;
this.maxBufferedRows = maxBufferedRows;
}

@Override
Expand All @@ -62,7 +64,7 @@ public void close()
public void processRows(StatementClient client)
throws IOException
{
BlockingQueue<List<?>> rowQueue = new ArrayBlockingQueue<>(MAX_QUEUED_ROWS);
BlockingQueue<List<?>> rowQueue = new ArrayBlockingQueue<>(maxQueuedRows);
CompletableFuture<Void> readerFuture = CompletableFuture.runAsync(() -> {
while (client.isRunning()) {
for (List<Object> row : client.currentRows()) {
Expand All @@ -72,17 +74,17 @@ public void processRows(StatementClient client)
}
}).whenComplete((result, ex) -> putOrThrow(rowQueue, END_TOKEN));

List<List<?>> rowBuffer = new ArrayList<>(MAX_BUFFERED_ROWS);
List<List<?>> rowBuffer = new ArrayList<>(maxBufferedRows);
long bufferStart = System.nanoTime();
try {
while (!readerFuture.isDone()) {
boolean atEnd = drainDetectingEnd(rowQueue, rowBuffer, MAX_BUFFERED_ROWS, END_TOKEN);
boolean atEnd = drainDetectingEnd(rowQueue, rowBuffer, maxBufferedRows, END_TOKEN);
if (atEnd) {
break;
}

// Flush if needed
if (rowBuffer.size() >= MAX_BUFFERED_ROWS || nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) {
if (rowBuffer.size() >= maxBufferedRows || nanosSince(bufferStart).compareTo(MAX_BUFFER_TIME) >= 0) {
printer.printRows(unmodifiableList(rowBuffer), false);
rowBuffer.clear();
bufferStart = System.nanoTime();
Expand Down
16 changes: 10 additions & 6 deletions client/trino-cli/src/main/java/io/trino/cli/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ public class Query
private final AtomicBoolean ignoreUserInterrupt = new AtomicBoolean();
private final StatementClient client;
private final boolean debug;
private final int maxQueuedRows;
private final int maxBufferedRows;

public Query(StatementClient client, boolean debug)
public Query(StatementClient client, boolean debug, int maxQueuedRows, int maxBufferedRows)
{
this.client = requireNonNull(client, "client is null");
this.debug = debug;
this.maxQueuedRows = maxQueuedRows;
this.maxBufferedRows = maxBufferedRows;
}

public Optional<String> getSetCatalog()
Expand Down Expand Up @@ -263,7 +267,7 @@ else if (results.getColumns() != null && !results.getColumns().isEmpty()) {

private void discardResults()
{
try (OutputHandler handler = new OutputHandler(new NullPrinter())) {
try (OutputHandler handler = new OutputHandler(new NullPrinter(), 100, 100)) {
Copy link

Copilot AI Jun 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In discardResults(), the hard-coded limits (100, 100) do not use the configured maxQueuedRows and maxBufferedRows. This will lead to inconsistent behavior compared to other paths—pass the instance fields instead of constants.

Suggested change
try (OutputHandler handler = new OutputHandler(new NullPrinter(), 100, 100)) {
try (OutputHandler handler = new OutputHandler(new NullPrinter(), maxQueuedRows, maxBufferedRows)) {

Copilot uses AI. Check for mistakes.
handler.processRows(client);
}
catch (IOException e) {
Expand Down Expand Up @@ -302,7 +306,7 @@ private void pageOutput(Optional<String> pagerName, OutputFormat format, int max
try (Pager pager = Pager.create(pagerName);
ThreadInterruptor clientThread = new ThreadInterruptor();
Writer writer = createWriter(pager);
OutputHandler handler = createOutputHandler(format, maxWidth, writer, columns)) {
OutputHandler handler = createOutputHandler(format, maxWidth, writer, columns, maxQueuedRows, maxBufferedRows)) {
if (!pager.isNullPager()) {
// ignore the user pressing ctrl-C while in the pager
ignoreUserInterrupt.set(true);
Expand All @@ -325,14 +329,14 @@ private void pageOutput(Optional<String> pagerName, OutputFormat format, int max
private void sendOutput(PrintStream out, OutputFormat format, int maxWidth, List<Column> fieldNames)
throws IOException
{
try (OutputHandler handler = createOutputHandler(format, maxWidth, createWriter(out), fieldNames)) {
try (OutputHandler handler = createOutputHandler(format, maxWidth, createWriter(out), fieldNames, maxQueuedRows, maxBufferedRows)) {
handler.processRows(client);
}
}

private static OutputHandler createOutputHandler(OutputFormat format, int maxWidth, Writer writer, List<Column> columns)
private static OutputHandler createOutputHandler(OutputFormat format, int maxWidth, Writer writer, List<Column> columns, int maxQueuedRows, int maxBufferedRows)
{
return new OutputHandler(createOutputPrinter(format, maxWidth, writer, columns));
return new OutputHandler(createOutputPrinter(format, maxWidth, writer, columns), maxQueuedRows, maxBufferedRows);
}

private static OutputPrinter createOutputPrinter(OutputFormat format, int maxWidth, Writer writer, List<Column> columns)
Expand Down
8 changes: 6 additions & 2 deletions client/trino-cli/src/main/java/io/trino/cli/QueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@ public class QueryRunner
private final boolean debug;
private final OkHttpClient httpClient;
private final OkHttpClient segmentHttpClient;
private final int maxQueuedRows;
private final int maxBufferedRows;

public QueryRunner(TrinoUri uri, ClientSession session, boolean debug)
public QueryRunner(TrinoUri uri, ClientSession session, boolean debug, int maxQueuedRows, int maxBufferedRows)
{
this.session = new AtomicReference<>(requireNonNull(session, "session is null"));
this.httpClient = HttpClientFactory.toHttpClientBuilder(uri, USER_AGENT).build();
this.segmentHttpClient = HttpClientFactory
.unauthenticatedClientBuilder(uri, USER_AGENT)
.build();
this.debug = debug;
this.maxQueuedRows = maxQueuedRows;
this.maxBufferedRows = maxBufferedRows;
}

public ClientSession getSession()
Expand All @@ -64,7 +68,7 @@ public boolean isDebug()

public Query startQuery(String query)
{
return new Query(startInternalQuery(session.get(), query), debug);
return new Query(startInternalQuery(session.get(), query), debug, maxQueuedRows, maxBufferedRows);
}

public StatementClient startInternalQuery(String query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ private boolean isCliSpecificOptions(String name)
case "editingMode":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can we have a unsupported test

case "disableAutoSuggestion":
case "decimalDataSize":
case "maxBufferedRows":
case "maxQueuedRows":
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ static QueryRunner createQueryRunner(TrinoUri uri, ClientSession clientSession)
return new QueryRunner(
uri,
clientSession,
false);
false,
1000,
Copy link

Copilot AI Jun 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The magic numbers (1000, 500) in this test could drift from production defaults. Consider using named constants or referencing ClientOptions defaults to keep the test in sync when defaults change.

Copilot uses AI. Check for mistakes.
500);
}

static PrintStream nullPrintStream()
Expand Down