diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/ExecutingStatementResource.java b/core/trino-main/src/main/java/io/trino/server/protocol/ExecutingStatementResource.java index d7d5d250888b..d80c9f57edbb 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/ExecutingStatementResource.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/ExecutingStatementResource.java @@ -23,7 +23,6 @@ import io.airlift.units.Duration; import io.trino.Session; import io.trino.client.ProtocolHeaders; -import io.trino.client.QueryResults; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.QueryManager; import io.trino.operator.DirectExchangeClientSupplier; @@ -220,52 +219,52 @@ private void asyncQueryResults( else { targetResultSize = Ordering.natural().min(targetResultSize, MAX_TARGET_RESULT_SIZE); } - ListenableFuture queryResultsFuture = query.waitForResults(token, uriInfo, wait, targetResultSize); + ListenableFuture queryResultsFuture = query.waitForResults(token, uriInfo, wait, targetResultSize); - ListenableFuture response = Futures.transform(queryResultsFuture, queryResults -> toResponse(query, queryResults), directExecutor()); + ListenableFuture response = Futures.transform(queryResultsFuture, this::toResponse, directExecutor()); bindAsyncResponse(asyncResponse, response, responseExecutor); } - private Response toResponse(Query query, QueryResults queryResults) + private Response toResponse(QueryResultsResponse resultsResponse) { - ResponseBuilder response = Response.ok(queryResults); + ResponseBuilder response = Response.ok(resultsResponse.queryResults()); - ProtocolHeaders protocolHeaders = query.getProtocolHeaders(); - query.getSetCatalog().ifPresent(catalog -> response.header(protocolHeaders.responseSetCatalog(), catalog)); - query.getSetSchema().ifPresent(schema -> response.header(protocolHeaders.responseSetSchema(), schema)); - query.getSetPath().ifPresent(path -> response.header(protocolHeaders.responseSetPath(), path)); + ProtocolHeaders protocolHeaders = resultsResponse.protocolHeaders(); + resultsResponse.setCatalog().ifPresent(catalog -> response.header(protocolHeaders.responseSetCatalog(), catalog)); + resultsResponse.setSchema().ifPresent(schema -> response.header(protocolHeaders.responseSetSchema(), schema)); + resultsResponse.setPath().ifPresent(path -> response.header(protocolHeaders.responseSetPath(), path)); // add set session properties - query.getSetSessionProperties() + resultsResponse.setSessionProperties() .forEach((key, value) -> response.header(protocolHeaders.responseSetSession(), key + '=' + urlEncode(value))); // add clear session properties - query.getResetSessionProperties() + resultsResponse.resetSessionProperties() .forEach(name -> response.header(protocolHeaders.responseClearSession(), name)); // add set roles - query.getSetRoles() + resultsResponse.setRoles() .forEach((key, value) -> response.header(protocolHeaders.responseSetRole(), key + '=' + urlEncode(value.toString()))); // add added prepare statements - for (Entry entry : query.getAddedPreparedStatements().entrySet()) { + for (Entry entry : resultsResponse.addedPreparedStatements().entrySet()) { String encodedKey = urlEncode(entry.getKey()); String encodedValue = urlEncode(preparedStatementEncoder.encodePreparedStatementForHeader(entry.getValue())); response.header(protocolHeaders.responseAddedPrepare(), encodedKey + '=' + encodedValue); } // add deallocated prepare statements - for (String name : query.getDeallocatedPreparedStatements()) { + for (String name : resultsResponse.deallocatedPreparedStatements()) { response.header(protocolHeaders.responseDeallocatedPrepare(), urlEncode(name)); } // add new transaction ID - query.getStartedTransactionId() + resultsResponse.startedTransactionId() .ifPresent(transactionId -> response.header(protocolHeaders.responseStartedTransactionId(), transactionId)); // add clear transaction ID directive - if (query.isClearTransactionId()) { + if (resultsResponse.clearTransactionId()) { response.header(protocolHeaders.responseClearTransactionId(), true); } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java index ec4365913f8e..72d025f58a5d 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java @@ -28,7 +28,6 @@ import io.trino.client.ClientCapabilities; import io.trino.client.Column; import io.trino.client.FailureInfo; -import io.trino.client.ProtocolHeaders; import io.trino.client.QueryError; import io.trino.client.QueryResults; import io.trino.exchange.ExchangeDataSource; @@ -273,76 +272,23 @@ public QueryInfo getQueryInfo() return queryManager.getFullQueryInfo(queryId); } - public ProtocolHeaders getProtocolHeaders() + public ListenableFuture waitForResults(long token, UriInfo uriInfo, Duration wait, DataSize targetResultSize) { - return session.getProtocolHeaders(); - } - - public synchronized Optional getSetCatalog() - { - return setCatalog; - } - - public synchronized Optional getSetSchema() - { - return setSchema; - } - - public synchronized Optional getSetPath() - { - return setPath; - } - - public synchronized Map getSetSessionProperties() - { - return setSessionProperties; - } - - public synchronized Set getResetSessionProperties() - { - return resetSessionProperties; - } - - public synchronized Map getSetRoles() - { - return setRoles; - } - - public synchronized Map getAddedPreparedStatements() - { - return addedPreparedStatements; - } - - public synchronized Set getDeallocatedPreparedStatements() - { - return deallocatedPreparedStatements; - } - - public synchronized Optional getStartedTransactionId() - { - return startedTransactionId; - } - - public synchronized boolean isClearTransactionId() - { - return clearTransactionId; - } - - public synchronized ListenableFuture waitForResults(long token, UriInfo uriInfo, Duration wait, DataSize targetResultSize) - { - // before waiting, check if this request has already been processed and cached - Optional cachedResult = getCachedResult(token); - if (cachedResult.isPresent()) { - return immediateFuture(cachedResult.get()); + ListenableFuture futureStateChange; + synchronized (this) { + // before waiting, check if this request has already been processed and cached + Optional cachedResult = getCachedResult(token); + if (cachedResult.isPresent()) { + return immediateFuture(toResultsResponse(cachedResult.get())); + } + // release the lock eagerly after acquiring the future to avoid contending with callback threads + futureStateChange = getFutureStateChange(); } // wait for a results data or query to finish, up to the wait timeout - ListenableFuture futureStateChange = addTimeout( - getFutureStateChange(), - () -> null, - wait, - timeoutExecutor); - + if (!futureStateChange.isDone()) { + futureStateChange = addTimeout(futureStateChange, () -> null, wait, timeoutExecutor); + } // when state changes, fetch the next result return Futures.transform(futureStateChange, ignored -> getNextResult(token, uriInfo, targetResultSize), resultsProcessorExecutor); } @@ -447,12 +393,12 @@ private synchronized Optional getCachedResult(long token) return Optional.empty(); } - private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, DataSize targetResultSize) + private synchronized QueryResultsResponse getNextResult(long token, UriInfo uriInfo, DataSize targetResultSize) { // check if the result for the token have already been created Optional cachedResult = getCachedResult(token); if (cachedResult.isPresent()) { - return cachedResult.get(); + return toResultsResponse(cachedResult.get()); } verify(nextToken.isPresent(), "Cannot generate next result when next token is not present"); @@ -551,7 +497,24 @@ private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, Dat lastToken = token; lastResult = queryResults; - return queryResults; + return toResultsResponse(queryResults); + } + + private synchronized QueryResultsResponse toResultsResponse(QueryResults queryResults) + { + return new QueryResultsResponse( + setCatalog, + setSchema, + setPath, + setSessionProperties, + resetSessionProperties, + setRoles, + addedPreparedStatements, + deallocatedPreparedStatements, + startedTransactionId, + clearTransactionId, + session.getProtocolHeaders(), + queryResults); } private synchronized QueryResultRows removePagesFromExchange(QueryInfo queryInfo, long targetResultBytes) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/QueryResultsResponse.java b/core/trino-main/src/main/java/io/trino/server/protocol/QueryResultsResponse.java new file mode 100644 index 000000000000..0bce541cefe5 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/protocol/QueryResultsResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server.protocol; + +import io.trino.client.ProtocolHeaders; +import io.trino.client.QueryResults; +import io.trino.spi.security.SelectedRole; +import io.trino.transaction.TransactionId; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +record QueryResultsResponse( + Optional setCatalog, + Optional setSchema, + Optional setPath, + Map setSessionProperties, + Set resetSessionProperties, + Map setRoles, + Map addedPreparedStatements, + Set deallocatedPreparedStatements, + Optional startedTransactionId, + boolean clearTransactionId, + ProtocolHeaders protocolHeaders, + QueryResults queryResults) +{ + QueryResultsResponse { + requireNonNull(setCatalog, "setCatalog is null"); + requireNonNull(setSchema, "setSchema is null"); + requireNonNull(setPath, "setPath is null"); + requireNonNull(setSessionProperties, "setSessionProperties is null"); + requireNonNull(resetSessionProperties, "resetSessionProperties is null"); + requireNonNull(setRoles, "setRoles is null"); + requireNonNull(addedPreparedStatements, "addedPreparedStatements is null"); + requireNonNull(deallocatedPreparedStatements, "deallocatedPreparedStatements is null"); + requireNonNull(startedTransactionId, "startedTransactionId is null"); + requireNonNull(protocolHeaders, "protocolHeaders is null"); + requireNonNull(queryResults, "queryResults is null"); + } +}