Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@

import okhttp3.OkHttpClient;

import java.util.Optional;
import java.util.Set;

public final class StatementClientFactory
{
private StatementClientFactory() {}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
return new StatementClientV1(httpClient, session, query);
return new StatementClientV1(httpClient, session, query, Optional.empty());
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
return new StatementClientV1(httpClient, session, query, clientCapabilities);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING;
import static com.google.common.net.HttpHeaders.USER_AGENT;
import static io.trino.client.JsonCodec.jsonCodec;
Expand All @@ -56,6 +57,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static java.util.Arrays.stream;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -93,7 +95,7 @@ class StatementClientV1

private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
Expand All @@ -107,7 +109,9 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.clientCapabilities = Joiner.on(",").join(clientCapabilities.orElseGet(() -> stream(ClientCapabilities.values())
.map(Enum::name)
.collect(toImmutableSet())));
this.compressionDisabled = session.isCompressionDisabled();

Request request = buildQueryRequest(session, query);
Expand Down
1 change: 1 addition & 0 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ private SessionBuilder(Session session)
this.remoteUserAddress = session.remoteUserAddress.orElse(null);
this.userAgent = session.userAgent.orElse(null);
this.clientInfo = session.clientInfo.orElse(null);
this.clientCapabilities = ImmutableSet.copyOf(session.clientCapabilities);
this.clientTags = ImmutableSet.copyOf(session.clientTags);
this.start = session.start;
this.systemProperties.putAll(session.systemProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.Page;
import it.unimi.dsi.fastutil.ints.IntArrayList;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
Expand Down Expand Up @@ -58,18 +57,7 @@ public PartitioningExchanger(
@Override
public void accept(Page page)
{
Consumer<Page> wholePagePartition = partitionPageOrFindWholePagePartition(page, partitionedPagePreparer.apply(page));
if (wholePagePartition != null) {
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
sendPageToPartition(wholePagePartition, page);
}
}

@Nullable
private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
{
Page partitionPage = partitionedPagePreparer.apply(page);
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
for (int position = 0; position < partitionPage.getPositionCount(); position++) {
int partition = partitionFunction.getPartition(partitionPage, position);
Expand All @@ -89,22 +77,19 @@ private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page par
int[] positions = positionsList.elements();
positionsList.clear();

Page pageSplit;
if (partitionSize == page.getPositionCount()) {
// entire page will be sent to this partition, compact and send the page after releasing the lock
return buffers.get(partition);
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
pageSplit = page;
}
Page pageSplit = page.copyPositions(positions, 0, partitionSize);
sendPageToPartition(buffers.get(partition), pageSplit);
else {
pageSplit = page.copyPositions(positions, 0, partitionSize);
}
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(pageSplit);
}
// No single partition receives the entire input page
return null;
}

// This is safe to call without synchronizing because the partition buffers are themselves threadsafe
private void sendPageToPartition(Consumer<Page> buffer, Page pageSplit)
{
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffer.accept(pageSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,15 @@ private ProcessBatchResult processBatch(int batchSize)
}
else {
if (pageProjectWork == null) {
Page inputPage = projection.getInputChannels().getInputChannels(page);
expressionProfiler.start();
pageProjectWork = projection.project(session, yieldSignal, inputPage, positionsBatch);
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);
pageProjectWork = projection.project(session, yieldSignal, projection.getInputChannels().getInputChannels(page), positionsBatch);
}
if (!pageProjectWork.process()) {

expressionProfiler.start();
boolean finished = pageProjectWork.process();
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);

if (!finished) {
return ProcessBatchResult.processBatchYield();
}
previouslyComputedResults[i] = pageProjectWork.getResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void redirectForNewToken(ContainerRequestContext request, String refresh
{
OAuth2Client.Response response = client.refreshTokens(refreshToken);
String serializedToken = tokenPairSerializer.serialize(TokenPair.fromOAuth2Response(response));
request.abortWith(Response.seeOther(request.getUriInfo().getRequestUri())
request.abortWith(Response.temporaryRedirect(request.getUriInfo().getRequestUri())
.cookie(OAuthWebUiCookie.create(serializedToken, tokenExpiration.map(expiration -> Instant.now().plus(expiration)).orElse(response.getExpiration())))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

import io.trino.Session;
import io.trino.Session.SessionBuilder;
import io.trino.client.ClientCapabilities;
import io.trino.execution.QueryIdGenerator;
import io.trino.metadata.SessionPropertyManager;
import io.trino.spi.security.Identity;
import io.trino.spi.type.TimeZoneKey;

import java.util.Arrays;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Locale.ENGLISH;

public final class TestingSession
Expand Down Expand Up @@ -54,6 +58,8 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setSchema("schema")
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setClientCapabilities(Arrays.stream(ClientCapabilities.values()).map(Enum::name)
.collect(toImmutableSet()))
.setRemoteUserAddress("address")
.setUserAgent("agent");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestDropTableTask
@Test
public void testDropExistingTable()
{
QualifiedObjectName tableName = qualifiedObjectName("not_existing_table");
QualifiedObjectName tableName = qualifiedObjectName("existing_table");
metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false);
assertThat(metadata.getTableHandle(testSession, tableName)).isPresent();

Expand Down
Loading