Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public class BenchmarkDriverOptions
@Option(names = "--client-request-timeout", paramLabel = "<timeout>", defaultValue = "2m", description = "Client request timeout " + DEFAULT_VALUE)
public Duration clientRequestTimeout;

@Option(names = "--disable-compression", description = "Disable compression of query results")
public boolean disableCompression;

public ClientSession getClientSession()
{
return new ClientSession(
Expand All @@ -115,7 +118,8 @@ public ClientSession getClientSession()
extraCredentials.stream()
.collect(toImmutableMap(ClientExtraCredential::getName, ClientExtraCredential::getValue)),
null,
clientRequestTimeout);
clientRequestTimeout,
disableCompression);
}

private static URI parseServer(String server)
Expand Down
6 changes: 5 additions & 1 deletion presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public class ClientOptions
@Option(names = "--timezone", paramLabel = "<timezone>", description = "Session time zone " + DEFAULT_VALUE)
public ZoneId timeZone = ZoneId.systemDefault();

@Option(names = "--disable-compression", description = "Disable compression of query results")
public boolean disableCompression;

public enum OutputFormat
{
ALIGNED,
Expand Down Expand Up @@ -195,7 +198,8 @@ public ClientSession toClientSession()
emptyMap(),
toExtraCredentials(extraCredentials),
null,
clientRequestTimeout);
clientRequestTimeout,
disableCompression);
}

public static URI parseServer(String server)
Expand Down
13 changes: 13 additions & 0 deletions presto-cli/src/test/java/io/prestosql/cli/TestClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static io.prestosql.cli.Presto.createCommandLine;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestClientOptions
{
Expand Down Expand Up @@ -171,6 +172,18 @@ public void testTimeZone()
assertEquals(session.getTimeZone(), ZoneId.of("Europe/Vilnius"));
}

@Test
public void testDisableCompression()
{
Console console = createConsole("--disable-compression");

ClientOptions options = console.clientOptions;
assertTrue(options.disableCompression);

ClientSession session = options.toClientSession();
assertTrue(session.isCompressionDisabled());
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "\\QInvalid session property: foo.bar.baz=value\\E")
public void testThreePartPropertyName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ static ClientSession createClientSession(MockWebServer server)
ImmutableMap.of(),
ImmutableMap.of(),
null,
new Duration(2, MINUTES));
new Duration(2, MINUTES),
true);
}

static String createResults(MockWebServer server)
Expand Down
21 changes: 19 additions & 2 deletions presto-client/src/main/java/io/prestosql/client/ClientSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ClientSession
private final Map<String, String> extraCredentials;
private final String transactionId;
private final Duration clientRequestTimeout;
private final boolean compressionDisabled;

public static Builder builder(ClientSession clientSession)
{
Expand Down Expand Up @@ -82,7 +83,8 @@ public ClientSession(
Map<String, ClientSelectedRole> roles,
Map<String, String> extraCredentials,
String transactionId,
Duration clientRequestTimeout)
Duration clientRequestTimeout,
boolean compressionDisabled)
{
this.server = requireNonNull(server, "server is null");
this.user = user;
Expand All @@ -102,6 +104,7 @@ public ClientSession(
this.roles = ImmutableMap.copyOf(requireNonNull(roles, "roles is null"));
this.extraCredentials = ImmutableMap.copyOf(requireNonNull(extraCredentials, "extraCredentials is null"));
this.clientRequestTimeout = clientRequestTimeout;
this.compressionDisabled = compressionDisabled;

for (String clientTag : clientTags) {
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
Expand Down Expand Up @@ -230,6 +233,11 @@ public Duration getClientRequestTimeout()
return clientRequestTimeout;
}

public boolean isCompressionDisabled()
{
return compressionDisabled;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -270,6 +278,7 @@ public static final class Builder
private Map<String, String> credentials;
private String transactionId;
private Duration clientRequestTimeout;
private boolean compressionDisabled;

private Builder(ClientSession clientSession)
{
Expand All @@ -292,6 +301,7 @@ private Builder(ClientSession clientSession)
credentials = clientSession.getExtraCredentials();
transactionId = clientSession.getTransactionId();
clientRequestTimeout = clientSession.getClientRequestTimeout();
compressionDisabled = clientSession.isCompressionDisabled();
}

public Builder withCatalog(String catalog)
Expand Down Expand Up @@ -348,6 +358,12 @@ public Builder withoutTransactionId()
return this;
}

public Builder withCompressionDisabled(boolean compressionDisabled)
{
this.compressionDisabled = compressionDisabled;
return this;
}

public ClientSession build()
{
return new ClientSession(
Expand All @@ -368,7 +384,8 @@ public ClientSession build()
roles,
credentials,
transactionId,
clientRequestTimeout);
clientRequestTimeout,
compressionDisabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING;
import static com.google.common.net.HttpHeaders.USER_AGENT;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.prestosql.client.PrestoHeaders.PRESTO_ADDED_PREPARE;
Expand Down Expand Up @@ -111,6 +112,7 @@ class StatementClientV1
private final Duration requestTimeoutNanos;
private final String user;
private final String clientCapabilities;
private final boolean compressionDisabled;

private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

Expand All @@ -126,6 +128,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
this.requestTimeoutNanos = session.getClientRequestTimeout();
this.user = session.getUser();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.compressionDisabled = session.isCompressionDisabled();

Request request = buildQueryRequest(session, query);

Expand Down Expand Up @@ -332,10 +335,14 @@ public boolean isClearTransactionId()

private Request.Builder prepareRequest(HttpUrl url)
{
return new Request.Builder()
Request.Builder builder = new Request.Builder()
.addHeader(PRESTO_USER, user)
.addHeader(USER_AGENT, USER_AGENT_VALUE)
.url(url);
if (compressionDisabled) {
builder.header(ACCEPT_ENCODING, "identity");
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum SslVerificationMode
public static final ConnectionProperty<HostAndPort> SOCKS_PROXY = new SocksProxy();
public static final ConnectionProperty<HostAndPort> HTTP_PROXY = new HttpProxy();
public static final ConnectionProperty<String> APPLICATION_NAME_PREFIX = new ApplicationNamePrefix();
public static final ConnectionProperty<Boolean> DISABLE_COMPRESSION = new DisableCompression();
public static final ConnectionProperty<Boolean> SSL = new Ssl();
public static final ConnectionProperty<SslVerificationMode> SSL_VERIFICATION = new SslVerification();
public static final ConnectionProperty<String> SSL_KEY_STORE_PATH = new SslKeyStorePath();
Expand Down Expand Up @@ -82,6 +83,7 @@ enum SslVerificationMode
.add(SOCKS_PROXY)
.add(HTTP_PROXY)
.add(APPLICATION_NAME_PREFIX)
.add(DISABLE_COMPRESSION)
.add(SSL)
.add(SSL_VERIFICATION)
.add(SSL_KEY_STORE_PATH)
Expand Down Expand Up @@ -246,6 +248,15 @@ public TraceToken()
}
}

private static class DisableCompression
extends AbstractConnectionProperty<Boolean>
{
public DisableCompression()
{
super("disableCompression", NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class Ssl
extends AbstractConnectionProperty<Boolean>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class PrestoConnection
private final URI jdbcUri;
private final URI httpUri;
private final String user;
private final boolean compressionDisabled;
private final Map<String, String> extraCredentials;
private final Optional<String> applicationNamePrefix;
private final Optional<String> source;
Expand All @@ -111,6 +112,7 @@ public class PrestoConnection
this.applicationNamePrefix = uri.getApplicationNamePrefix();
this.source = uri.getSource();
this.extraCredentials = uri.getExtraCredentials();
this.compressionDisabled = uri.isCompressionDisabled();
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
uri.getClientInfo().ifPresent(tags -> clientInfo.put(CLIENT_INFO, tags));
uri.getClientTags().ifPresent(tags -> clientInfo.put(CLIENT_TAGS, tags));
Expand Down Expand Up @@ -720,7 +722,8 @@ StatementClient startQuery(String sql, Map<String, String> sessionPropertiesOver
ImmutableMap.copyOf(roles),
extraCredentials,
transactionId.get(),
timeout);
timeout,
compressionDisabled);

return queryExecutor.startQuery(session, sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static io.prestosql.jdbc.ConnectionProperties.APPLICATION_NAME_PREFIX;
import static io.prestosql.jdbc.ConnectionProperties.CLIENT_INFO;
import static io.prestosql.jdbc.ConnectionProperties.CLIENT_TAGS;
import static io.prestosql.jdbc.ConnectionProperties.DISABLE_COMPRESSION;
import static io.prestosql.jdbc.ConnectionProperties.EXTRA_CREDENTIALS;
import static io.prestosql.jdbc.ConnectionProperties.HTTP_PROXY;
import static io.prestosql.jdbc.ConnectionProperties.KERBEROS_CONFIG_PATH;
Expand Down Expand Up @@ -197,6 +198,12 @@ public Optional<String> getSource()
return SOURCE.getValue(properties);
}

public boolean isCompressionDisabled()
throws SQLException
{
return DISABLE_COMPRESSION.getValue(properties).orElse(false);
}

public void setupClient(OkHttpClient.Builder builder)
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;

import static io.prestosql.jdbc.ConnectionProperties.CLIENT_TAGS;
import static io.prestosql.jdbc.ConnectionProperties.DISABLE_COMPRESSION;
import static io.prestosql.jdbc.ConnectionProperties.EXTRA_CREDENTIALS;
import static io.prestosql.jdbc.ConnectionProperties.HTTP_PROXY;
import static io.prestosql.jdbc.ConnectionProperties.SOCKS_PROXY;
Expand All @@ -33,6 +34,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestPrestoDriverUri
Expand Down Expand Up @@ -205,6 +207,17 @@ public void testUriWithHttpProxy()
assertEquals(properties.getProperty(HTTP_PROXY.getKey()), "localhost:5678");
}

@Test
public void testUriWithoutCompression()
throws SQLException
{
PrestoDriverUri parameters = createDriverUri("presto://localhost:8080?disableCompression=true");
assertTrue(parameters.isCompressionDisabled());

Properties properties = parameters.getProperties();
assertEquals(properties.getProperty(DISABLE_COMPRESSION.getKey()), "true");
}

@Test
public void testUriWithoutSsl()
throws SQLException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.prestosql.execution.ExecutionFailureInfo;
import io.prestosql.execution.QueryState;
import io.prestosql.server.HttpRequestSessionContext;
import io.prestosql.server.ServerConfig;
import io.prestosql.server.SessionContext;
import io.prestosql.server.protocol.Slug;
import io.prestosql.server.security.ResourceSecurity;
Expand Down Expand Up @@ -104,19 +105,22 @@ public class QueuedStatementResource

private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<>();
private final ScheduledExecutorService queryPurger = newSingleThreadScheduledExecutor(threadsNamed("dispatch-query-purger"));
private final boolean compressionEnabled;

@Inject
public QueuedStatementResource(
GroupProvider groupProvider,
DispatchManager dispatchManager,
DispatchExecutor executor)
DispatchExecutor executor,
ServerConfig serverConfig)
{
this.groupProvider = requireNonNull(groupProvider, "groupProvider is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");

requireNonNull(dispatchManager, "dispatchManager is null");
this.responseExecutor = requireNonNull(executor, "responseExecutor is null").getExecutor();
this.timeoutExecutor = requireNonNull(executor, "timeoutExecutor is null").getScheduledExecutor();
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();

queryPurger.scheduleWithFixedDelay(
() -> {
Expand Down Expand Up @@ -181,7 +185,7 @@ public Response postStatement(
// let authentication filter know that identity lifecycle has been handed off
servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);

return Response.ok(query.getQueryResults(query.getLastToken(), uriInfo)).build();
return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled);
}

@ResourceSecurity(PUBLIC)
Expand Down Expand Up @@ -214,7 +218,7 @@ public void getStatus(
// transform to Response
ListenableFuture<Response> response = Futures.transform(
queryResultsFuture,
queryResults -> Response.ok(queryResults).build(),
queryResults -> createQueryResultsResponse(queryResults, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, response, responseExecutor);
}
Expand Down Expand Up @@ -242,6 +246,15 @@ private Query getQuery(QueryId queryId, String slug, long token)
return query;
}

private static Response createQueryResultsResponse(QueryResults results, boolean compressionEnabled)
{
Response.ResponseBuilder builder = Response.ok(results);
if (!compressionEnabled) {
builder.encoding("identity");
}
return builder.build();
}

private static URI getQueryHtmlUri(QueryId queryId, UriInfo uriInfo)
{
return uriInfo.getRequestUriBuilder()
Expand Down
13 changes: 13 additions & 0 deletions presto-main/src/main/java/io/prestosql/server/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class ServerConfig
private boolean coordinator = true;
private boolean includeExceptionInResponse = true;
private Duration gracePeriod = new Duration(2, MINUTES);
private boolean queryResultsCompressionEnabled = true;

public boolean isCoordinator()
{
Expand Down Expand Up @@ -59,4 +60,16 @@ public ServerConfig setGracePeriod(Duration gracePeriod)
this.gracePeriod = gracePeriod;
return this;
}

public boolean isQueryResultsCompressionEnabled()
{
return queryResultsCompressionEnabled;
}

@Config("query-results.compression-enabled")
public ServerConfig setQueryResultsCompressionEnabled(boolean queryResultsCompressionEnabled)
{
this.queryResultsCompressionEnabled = queryResultsCompressionEnabled;
return this;
}
}
Loading