Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class BenchmarkDriverOptions
@Option(name = "--client-request-timeout", title = "client request timeout", description = "Client request timeout (default: 2m)")
public Duration clientRequestTimeout = new Duration(2, MINUTES);

@Option(name = "--disable-compression", title = "disable response compression", description = "Disable compression of query results")
public boolean disableCompression;

public ClientSession getClientSession()
{
return new ClientSession(
Expand All @@ -104,7 +107,8 @@ public ClientSession getClientSession()
ImmutableMap.of(),
ImmutableMap.of(),
null,
clientRequestTimeout);
clientRequestTimeout,
disableCompression);
}

private static URI parseServer(String server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public class ClientOptions
@Option(name = "--ignore-errors", title = "ignore errors", description = "Continue processing in batch mode when an error occurs (default is to exit immediately)")
public boolean ignoreErrors;

@Option(name = "--disable-compression", title = "disable response compression", description = "Disable compression of query results")
public boolean disableCompression;

public enum OutputFormat
{
ALIGNED,
Expand Down Expand Up @@ -170,7 +173,8 @@ public ClientSession toClientSession()
emptyMap(),
toExtraCredentials(extraCredentials),
null,
clientRequestTimeout);
clientRequestTimeout,
disableCompression);
}

public static URI parseServer(String server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import static io.airlift.airline.SingleCommand.singleCommand;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestClientOptions
{
Expand Down Expand Up @@ -125,6 +126,14 @@ public void testSessionProperties()
assertEquals(new ClientSessionProperty("foo="), new ClientSessionProperty(Optional.empty(), "foo", ""));
}

@Test
public void testDisableCompression()
{
Console console = singleCommand(Console.class).parse("--disable-compression");
assertTrue(console.clientOptions.disableCompression);
assertTrue(console.clientOptions.toClientSession().isCompressionDisabled());
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testThreePartPropertyName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void testCookie()
ImmutableMap.of(),
ImmutableMap.of(),
null,
new Duration(2, MINUTES)));
new Duration(2, MINUTES),
true));
try (Query query = queryRunner.startQuery("first query will introduce a cookie")) {
query.renderOutput(new PrintStream(nullOutputStream()), CSV, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ClientSession
private final Map<String, String> extraCredentials;
private final String transactionId;
private final Duration clientRequestTimeout;
private final boolean compressionDisabled;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead use compressionEnabled? Disabled would incur one more flip in mind :)

Copy link
Copy Markdown
Contributor Author

@pettyjamesm pettyjamesm Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrasing is a little weird, but the mechanism here doesn't actually let you force compression- only disable compression. Same thing with the server side logic. Default negotiation happens inside of jetty around the client Accept-Encoding header and server's chosen mime-type and Content-Encoding headers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But we could probably just flip the boolean when it is used? I'm just worried this might be error-prone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I’m worried about the impression that “enabled” will give. To me, that sounds like the client has ultimate control of the decision when in fact, they do not. The client only as the option to opt-out of compression.


public static Builder builder(ClientSession clientSession)
{
Expand Down Expand Up @@ -81,7 +82,8 @@ public ClientSession(
Map<String, SelectedRole> roles,
Map<String, String> extraCredentials,
String transactionId,
Duration clientRequestTimeout)
Duration clientRequestTimeout,
boolean compressionDisabled)
{
this.server = requireNonNull(server, "server is null");
this.user = user;
Expand All @@ -100,6 +102,7 @@ public ClientSession(
this.roles = ImmutableMap.copyOf(requireNonNull(roles, "roles is null"));
this.extraCredentials = ImmutableMap.copyOf(requireNonNull(extraCredentials, "extraCredentials is null"));
this.clientRequestTimeout = clientRequestTimeout;
this.compressionDisabled = compressionDisabled;

for (String clientTag : clientTags) {
checkArgument(!clientTag.contains(","), "client tag cannot contain ','");
Expand Down Expand Up @@ -223,6 +226,11 @@ public Duration getClientRequestTimeout()
return clientRequestTimeout;
}

public boolean isCompressionDisabled()
{
return compressionDisabled;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -261,6 +269,7 @@ public static final class Builder
private Map<String, String> credentials;
private String transactionId;
private Duration clientRequestTimeout;
private boolean compressionDisabled;

private Builder(ClientSession clientSession)
{
Expand All @@ -282,6 +291,7 @@ private Builder(ClientSession clientSession)
credentials = clientSession.getExtraCredentials();
transactionId = clientSession.getTransactionId();
clientRequestTimeout = clientSession.getClientRequestTimeout();
compressionDisabled = clientSession.isCompressionDisabled();
}

public Builder withCatalog(String catalog)
Expand Down Expand Up @@ -332,6 +342,12 @@ public Builder withoutTransactionId()
return this;
}

public Builder withCompressionDisabled(boolean compressionDisabled)
{
this.compressionDisabled = compressionDisabled;
return this;
}

public ClientSession build()
{
return new ClientSession(
Expand All @@ -351,7 +367,8 @@ public ClientSession build()
roles,
credentials,
transactionId,
clientRequestTimeout);
clientRequestTimeout,
compressionDisabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING;
import static com.google.common.net.HttpHeaders.USER_AGENT;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
Expand Down Expand Up @@ -107,6 +108,7 @@ class StatementClientV1
private final TimeZoneKey timeZone;
private final Duration requestTimeoutNanos;
private final String user;
private final boolean compressionDisabled;

private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

Expand All @@ -121,6 +123,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
this.query = query;
this.requestTimeoutNanos = session.getClientRequestTimeout();
this.user = session.getUser();
this.compressionDisabled = session.isCompressionDisabled();

Request request = buildQueryRequest(session, query);

Expand Down Expand Up @@ -313,10 +316,14 @@ public boolean isClearTransactionId()

private Request.Builder prepareRequest(HttpUrl url)
{
return new Request.Builder()
Request.Builder builder = new Request.Builder()
.addHeader(PRESTO_USER, user)
.addHeader(USER_AGENT, USER_AGENT_VALUE)
.url(url);
if (compressionDisabled) {
builder.header(ACCEPT_ENCODING, "identity");
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ final class ConnectionProperties
public static final ConnectionProperty<HostAndPort> SOCKS_PROXY = new SocksProxy();
public static final ConnectionProperty<HostAndPort> HTTP_PROXY = new HttpProxy();
public static final ConnectionProperty<String> APPLICATION_NAME_PREFIX = new ApplicationNamePrefix();
public static final ConnectionProperty<Boolean> DISABLE_COMPRESSION = new DisableCompression();
public static final ConnectionProperty<Boolean> SSL = new Ssl();
public static final ConnectionProperty<String> SSL_KEY_STORE_PATH = new SslKeyStorePath();
public static final ConnectionProperty<String> SSL_KEY_STORE_PASSWORD = new SslKeyStorePassword();
Expand All @@ -58,6 +59,7 @@ final class ConnectionProperties
.add(SOCKS_PROXY)
.add(HTTP_PROXY)
.add(APPLICATION_NAME_PREFIX)
.add(DISABLE_COMPRESSION)
.add(SSL)
.add(SSL_KEY_STORE_PATH)
.add(SSL_KEY_STORE_PASSWORD)
Expand Down Expand Up @@ -155,6 +157,15 @@ public ApplicationNamePrefix()
}
}

private static class DisableCompression
extends AbstractConnectionProperty<Boolean>
{
public DisableCompression()
{
super("disableCompression", NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class Ssl
extends AbstractConnectionProperty<Boolean>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class PrestoConnection
private final URI jdbcUri;
private final URI httpUri;
private final String user;
private final boolean compressionDisabled;
private final Map<String, String> extraCredentials;
private final Map<String, String> sessionProperties;
private final Optional<String> applicationNamePrefix;
Expand All @@ -104,6 +105,7 @@ public class PrestoConnection
this.catalog.set(uri.getCatalog());
this.user = uri.getUser();
this.applicationNamePrefix = uri.getApplicationNamePrefix();
this.compressionDisabled = uri.isCompressionDisabled();

this.extraCredentials = uri.getExtraCredentials();
this.sessionProperties = new ConcurrentHashMap<>(uri.getSessionProperties());
Expand Down Expand Up @@ -709,7 +711,8 @@ else if (applicationName != null) {
ImmutableMap.copyOf(roles),
extraCredentials,
transactionId.get(),
timeout);
timeout,
compressionDisabled);

return queryExecutor.startQuery(session, sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.facebook.presto.client.OkHttpUtil.tokenAuth;
import static com.facebook.presto.jdbc.ConnectionProperties.ACCESS_TOKEN;
import static com.facebook.presto.jdbc.ConnectionProperties.APPLICATION_NAME_PREFIX;
import static com.facebook.presto.jdbc.ConnectionProperties.DISABLE_COMPRESSION;
import static com.facebook.presto.jdbc.ConnectionProperties.EXTRA_CREDENTIALS;
import static com.facebook.presto.jdbc.ConnectionProperties.HTTP_PROXY;
import static com.facebook.presto.jdbc.ConnectionProperties.KERBEROS_CONFIG_PATH;
Expand Down Expand Up @@ -155,6 +156,12 @@ public Map<String, String> getSessionProperties()
return SESSION_PROPERTIES.getValue(properties).orElse(ImmutableMap.of());
}

public boolean isCompressionDisabled()
throws SQLException
{
return DISABLE_COMPRESSION.getValue(properties).orElse(false);
}

public void setupClient(OkHttpClient.Builder builder)
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.SQLException;
import java.util.Properties;

import static com.facebook.presto.jdbc.ConnectionProperties.DISABLE_COMPRESSION;
import static com.facebook.presto.jdbc.ConnectionProperties.EXTRA_CREDENTIALS;
import static com.facebook.presto.jdbc.ConnectionProperties.HTTP_PROXY;
import static com.facebook.presto.jdbc.ConnectionProperties.SESSION_PROPERTIES;
Expand All @@ -32,6 +33,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestPrestoDriverUri
Expand Down Expand Up @@ -164,6 +166,15 @@ void testUriWithHttpProxy()
assertEquals(properties.getProperty(HTTP_PROXY.getKey()), "localhost:5678");
}

@Test
public void testUriWithoutCompression()
throws SQLException
{
PrestoDriverUri parameters = createDriverUri("presto://localhost:8080/blackhole?disableCompression=true");
assertTrue(parameters.isCompressionDisabled());
assertEquals(parameters.getProperties().getProperty(DISABLE_COMPRESSION.getKey()), "true");
}

@Test
public void testUriWithoutSsl()
throws SQLException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ServerConfig
private boolean includeExceptionInResponse = true;
private Duration gracePeriod = new Duration(2, MINUTES);
private boolean enhancedErrorReporting = true;
private boolean queryResultsCompressionEnabled = true;

public boolean isCoordinator()
{
Expand Down Expand Up @@ -106,4 +107,16 @@ public ServerConfig setEnhancedErrorReporting(boolean value)
this.enhancedErrorReporting = value;
return this;
}

public boolean isQueryResultsCompressionEnabled()
{
return queryResultsCompressionEnabled;
}

@Config("query-results.compression-enabled")
public ServerConfig setQueryResultsCompressionEnabled(boolean queryResultsCompressionEnabled)
{
this.queryResultsCompressionEnabled = queryResultsCompressionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.presto.server.ForStatementResource;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.QueryId;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -59,14 +60,17 @@ public class ExecutingStatementResource

private final BoundedExecutor responseExecutor;
private final LocalQueryProvider queryProvider;
private final boolean compressionEnabled;

@Inject
public ExecutingStatementResource(
@ForStatementResource BoundedExecutor responseExecutor,
LocalQueryProvider queryProvider)
LocalQueryProvider queryProvider,
ServerConfig serverConfig)
{
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
}

@GET
Expand Down Expand Up @@ -96,7 +100,7 @@ public void getQueryResults(
Query query = queryProvider.getQuery(queryId, slug);
ListenableFuture<Response> queryResultsFuture = transform(
query.waitForResults(token, uriInfo, proto, wait, targetResultSize),
results -> toResponse(query, results),
results -> toResponse(query, results, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class QueryResourceUtil
{
private QueryResourceUtil() {}

public static Response toResponse(Query query, QueryResults queryResults)
public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled)
{
Response.ResponseBuilder response = Response.ok(queryResults);

Expand Down Expand Up @@ -90,6 +90,10 @@ public static Response toResponse(Query query, QueryResults queryResults)
response.header(PRESTO_CLEAR_TRANSACTION_ID, true);
}

if (!compressionEnabled) {
response.encoding("identity");
}

return response.build();
}

Expand Down
Loading