From fd387a561662f35d0202027060a3d4b826e983ea Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 16 May 2016 20:26:09 +0200 Subject: [PATCH] Add base service option classes for gRPC and HTTP services (#1011) * Add base service option classes for gRPC and HTTP services * Rename ExecutorProvider to ExecutorFactory, refactor shutdown and serialization --- .../cloud/bigquery/BigQueryOptions.java | 8 +- .../com/google/cloud/GrpcServiceOptions.java | 256 ++++++++++++++++++ .../com/google/cloud/HttpServiceOptions.java | 227 ++++++++++++++++ .../java/com/google/cloud/ServiceOptions.java | 146 +--------- .../google/cloud/GrpcServiceOptionsTest.java | 218 +++++++++++++++ .../google/cloud/HttpServiceOptionsTest.java | 163 +++++++++++ .../com/google/cloud/ServiceOptionsTest.java | 18 -- .../cloud/datastore/DatastoreOptions.java | 9 +- .../java/com/google/cloud/dns/DnsOptions.java | 8 +- .../java/com/google/cloud/pubsub/PubSub.java | 2 +- .../com/google/cloud/pubsub/PubSubImpl.java | 5 + .../google/cloud/pubsub/PubSubOptions.java | 10 +- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 79 ++++-- .../google/cloud/pubsub/spi/PubSubRpc.java | 2 +- .../ResourceManagerOptions.java | 10 +- .../google/cloud/storage/StorageOptions.java | 8 +- 16 files changed, 964 insertions(+), 205 deletions(-) create mode 100644 gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java create mode 100644 gcloud-java-core/src/main/java/com/google/cloud/HttpServiceOptions.java create mode 100644 gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java create mode 100644 gcloud-java-core/src/test/java/com/google/cloud/HttpServiceOptionsTest.java diff --git a/gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryOptions.java b/gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryOptions.java index b0cc56fa0298..bf7fa65bcf0f 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryOptions.java +++ b/gcloud-java-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryOptions.java @@ -16,19 +16,19 @@ package com.google.cloud.bigquery; +import com.google.cloud.HttpServiceOptions; import com.google.common.collect.ImmutableSet; -import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.spi.BigQueryRpc; import com.google.cloud.bigquery.spi.BigQueryRpcFactory; import com.google.cloud.bigquery.spi.DefaultBigQueryRpc; import java.util.Set; -public class BigQueryOptions extends ServiceOptions { +public class BigQueryOptions extends HttpServiceOptions { private static final String BIGQUERY_SCOPE = "https://www.googleapis.com/auth/bigquery"; private static final Set SCOPES = ImmutableSet.of(BIGQUERY_SCOPE); - private static final long serialVersionUID = -215981591481708043L; + private static final long serialVersionUID = -8592198255032667206L; public static class DefaultBigqueryFactory implements BigQueryFactory { @@ -51,7 +51,7 @@ public BigQueryRpc create(BigQueryOptions options) { } public static class Builder extends - ServiceOptions.Builder { + HttpServiceOptions.Builder { private Builder() { } diff --git a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java new file mode 100644 index 000000000000..2a1110c9362e --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java @@ -0,0 +1,256 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import static com.google.common.base.MoreObjects.firstNonNull; + +import com.google.cloud.spi.ServiceRpcFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import io.grpc.internal.SharedResourceHolder; +import io.grpc.internal.SharedResourceHolder.Resource; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Abstract class representing service options for those services that use gRPC as the transport + * layer. + * + * @param the service subclass + * @param the spi-layer class corresponding to the service + * @param the {@code ServiceOptions} subclass corresponding to the service + */ +public abstract class GrpcServiceOptions, ServiceRpcT, + OptionsT extends GrpcServiceOptions> + extends ServiceOptions { + + private static final long serialVersionUID = 6415982522610509549L; + private final String executorFactoryClassName; + private final int initialTimeout; + private final double timeoutMultiplier; + private final int maxTimeout; + + private transient ExecutorFactory executorFactory; + + /** + * Shared thread pool executor. + */ + private static final Resource EXECUTOR = + new Resource() { + @Override + public ScheduledExecutorService create() { + ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(8); + service.setKeepAliveTime(5, TimeUnit.SECONDS); + service.allowCoreThreadTimeOut(true); + service.setRemoveOnCancelPolicy(true); + return service; + } + + @Override + public void close(ScheduledExecutorService instance) { + instance.shutdown(); + } + }; + + /** + * An interface for {@link ScheduledExecutorService} factories. Implementations of this interface + * can be used to provide an user-defined scheduled executor to execute requests. Any + * implementation of this interface must override the {@code get()} method to return the desired + * executor. The {@code release(executor)} method should be overriden to free resources used by + * the executor (if needed) according to application's logic. + * + *

Implementation must provide a public no-arg constructor. Loading of a factory implementation + * is done via {@link java.util.ServiceLoader}. + */ + public interface ExecutorFactory { + + /** + * Gets a scheduled executor service instance. + */ + ScheduledExecutorService get(); + + /** + * Releases resources used by the executor and possibly shuts it down. + */ + void release(ScheduledExecutorService executor); + } + + @VisibleForTesting + static class DefaultExecutorFactory implements ExecutorFactory { + + private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory(); + + @Override + public ScheduledExecutorService get() { + return SharedResourceHolder.get(EXECUTOR); + } + + @Override + public synchronized void release(ScheduledExecutorService executor) { + SharedResourceHolder.release(EXECUTOR, executor); + } + } + + /** + * Builder for {@code GrpcServiceOptions}. + * + * @param the service subclass + * @param the spi-layer class corresponding to the service + * @param the {@code GrpcServiceOptions} subclass corresponding to the service + * @param the {@code ServiceOptions} builder + */ + protected abstract static class Builder, ServiceRpcT, + OptionsT extends GrpcServiceOptions, + B extends Builder> + extends ServiceOptions.Builder { + + private ExecutorFactory executorFactory; + private int initialTimeout = 20_000; + private double timeoutMultiplier = 1.5; + private int maxTimeout = 100_000; + + protected Builder() {} + + protected Builder(GrpcServiceOptions options) { + super(options); + executorFactory = options.executorFactory; + initialTimeout = options.initialTimeout; + timeoutMultiplier = options.timeoutMultiplier; + maxTimeout = options.maxTimeout; + } + + @Override + protected abstract GrpcServiceOptions build(); + + /** + * Sets the scheduled executor factory. This method can be used to provide an user-defined + * scheduled executor to execute requests. + * + * @return the builder + */ + public B executorFactory(ExecutorFactory executorFactory) { + this.executorFactory = executorFactory; + return self(); + } + + /** + * Sets the timeout for the initial RPC, in milliseconds. Subsequent calls will use this value + * adjusted according to {@link #timeoutMultiplier(double)}. Default value is 20000. + * + * @throws IllegalArgumentException if the provided timeout is < 0 + * @return the builder + */ + public B initialTimeout(int initialTimeout) { + Preconditions.checkArgument(initialTimeout > 0, "Initial timeout must be > 0"); + this.initialTimeout = initialTimeout; + return self(); + } + + /** + * Sets the timeout multiplier. This value is used to compute the timeout for a retried RPC. + * Timeout is computed as {@code timeoutMultiplier * previousTimeout}. Default value is 1.5. + * + * @throws IllegalArgumentException if the provided timeout multiplier is < 0 + * @return the builder + */ + public B timeoutMultiplier(double timeoutMultiplier) { + Preconditions.checkArgument(timeoutMultiplier >= 1.0, "Timeout multiplier must be >= 1"); + this.timeoutMultiplier = timeoutMultiplier; + return self(); + } + + /** + * Sets the maximum timeout for a RPC call, in milliseconds. Default value is 100000. If + * {@code maxTimeout} is lower than the initial timeout the {@link #initialTimeout(int)} value + * is used instead. + * + * @return the builder + */ + public B maxTimeout(int maxTimeout) { + this.maxTimeout = maxTimeout; + return self(); + } + } + + protected GrpcServiceOptions( + Class> serviceFactoryClass, + Class> rpcFactoryClass, Builder builder) { + super(serviceFactoryClass, rpcFactoryClass, builder); + executorFactory = firstNonNull(builder.executorFactory, + getFromServiceLoader(ExecutorFactory.class, DefaultExecutorFactory.INSTANCE)); + executorFactoryClassName = executorFactory.getClass().getName(); + initialTimeout = builder.initialTimeout; + timeoutMultiplier = builder.timeoutMultiplier; + maxTimeout = builder.maxTimeout <= initialTimeout ? initialTimeout : builder.maxTimeout; + } + + /** + * Returns a scheduled executor service provider. + */ + protected ExecutorFactory executorFactory() { + return executorFactory; + } + + /** + * Returns the timeout for the initial RPC, in milliseconds. Subsequent calls will use this value + * adjusted according to {@link #timeoutMultiplier()}. Default value is 20000. + */ + public int initialTimeout() { + return initialTimeout; + } + + /** + * Returns the timeout multiplier. This values is used to compute the timeout for a RPC. Timeout + * is computed as {@code timeoutMultiplier * previousTimeout}. Default value is 1.5. + */ + public double timeoutMultiplier() { + return timeoutMultiplier; + } + + /** + * Returns the maximum timeout for a RPC call, in milliseconds. Default value is 100000. + */ + public int maxTimeout() { + return maxTimeout; + } + + @Override + protected int baseHashCode() { + return Objects.hash(super.baseHashCode(), executorFactoryClassName, initialTimeout, + timeoutMultiplier, maxTimeout); + } + + protected boolean baseEquals(GrpcServiceOptions other) { + return super.baseEquals(other) + && Objects.equals(executorFactoryClassName, other.executorFactoryClassName) + && Objects.equals(initialTimeout, other.initialTimeout) + && Objects.equals(timeoutMultiplier, other.timeoutMultiplier) + && Objects.equals(maxTimeout, other.maxTimeout); + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { + input.defaultReadObject(); + executorFactory = newInstance(executorFactoryClassName); + } +} diff --git a/gcloud-java-core/src/main/java/com/google/cloud/HttpServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/HttpServiceOptions.java new file mode 100644 index 000000000000..eca411849eaf --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/cloud/HttpServiceOptions.java @@ -0,0 +1,227 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import static com.google.common.base.MoreObjects.firstNonNull; + +import com.google.api.client.extensions.appengine.http.UrlFetchTransport; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.spi.ServiceRpcFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Objects; + +/** + * Abstract class representing service options for those services that use HTTP as the transport + * layer. + * + * @param the service subclass + * @param the spi-layer class corresponding to the service + * @param the {@code ServiceOptions} subclass corresponding to the service + */ +public abstract class HttpServiceOptions, ServiceRpcT, + OptionsT extends HttpServiceOptions> + extends ServiceOptions { + + private static final long serialVersionUID = 3652819407083815771L; + private final int connectTimeout; + private final int readTimeout; + private final String httpTransportFactoryClassName; + + private transient HttpTransportFactory httpTransportFactory; + + /** + * A base interface for all {@link HttpTransport} factories. + * + *

Implementation must provide a public no-arg constructor. Loading of a factory implementation + * is done via {@link java.util.ServiceLoader}. + */ + public interface HttpTransportFactory { + HttpTransport create(); + } + + public static class DefaultHttpTransportFactory implements HttpTransportFactory { + + private static final HttpTransportFactory INSTANCE = new DefaultHttpTransportFactory(); + + @Override + public HttpTransport create() { + // Consider App Engine + if (appEngineAppId() != null) { + try { + return new UrlFetchTransport(); + } catch (Exception ignore) { + // Maybe not on App Engine + } + } + return new NetHttpTransport(); + } + } + + /** + * Builder for {@code HttpServiceOptions}. + * + * @param the service subclass + * @param the spi-layer class corresponding to the service + * @param the {@code HttpServiceOptions} subclass corresponding to the service + * @param the {@code ServiceOptions} builder + */ + protected abstract static class Builder, ServiceRpcT, + OptionsT extends HttpServiceOptions, + B extends Builder> + extends ServiceOptions.Builder { + + private HttpTransportFactory httpTransportFactory; + private int connectTimeout = -1; + private int readTimeout = -1; + + protected Builder() {} + + protected Builder(HttpServiceOptions options) { + super(options); + httpTransportFactory = options.httpTransportFactory; + connectTimeout = options.connectTimeout; + readTimeout = options.readTimeout; + } + + @Override + protected abstract HttpServiceOptions build(); + + @SuppressWarnings("unchecked") + protected B self() { + return (B) this; + } + + /** + * Sets the HTTP transport factory. + * + * @return the builder + */ + public B httpTransportFactory(HttpTransportFactory httpTransportFactory) { + this.httpTransportFactory = httpTransportFactory; + return self(); + } + + /** + * Sets the timeout in milliseconds to establish a connection. + * + * @param connectTimeout connection timeout in milliseconds. 0 for an infinite timeout, a + * negative number for the default value (20000). + * @return the builder + */ + public B connectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + return self(); + } + + /** + * Sets the timeout in milliseconds to read data from an established connection. + * + * @param readTimeout read timeout in milliseconds. 0 for an infinite timeout, a negative number + * for the default value (20000). + * @return the builder + */ + public B readTimeout(int readTimeout) { + this.readTimeout = readTimeout; + return self(); + } + } + + protected HttpServiceOptions( + Class> serviceFactoryClass, + Class> rpcFactoryClass, Builder builder) { + super(serviceFactoryClass, rpcFactoryClass, builder); + httpTransportFactory = firstNonNull(builder.httpTransportFactory, + getFromServiceLoader(HttpTransportFactory.class, DefaultHttpTransportFactory.INSTANCE)); + httpTransportFactoryClassName = httpTransportFactory.getClass().getName(); + connectTimeout = builder.connectTimeout; + readTimeout = builder.readTimeout; + } + + /** + * Returns the HTTP transport factory. + */ + public HttpTransportFactory httpTransportFactory() { + return httpTransportFactory; + } + + + /** + * Returns a request initializer responsible for initializing requests according to service + * options. + */ + public HttpRequestInitializer httpRequestInitializer() { + final HttpRequestInitializer delegate = + authCredentials() != null && authCredentials().credentials() != null + ? new HttpCredentialsAdapter(authCredentials().credentials().createScoped(scopes())) + : null; + return new HttpRequestInitializer() { + @Override + public void initialize(HttpRequest httpRequest) throws IOException { + if (delegate != null) { + delegate.initialize(httpRequest); + } + if (connectTimeout >= 0) { + httpRequest.setConnectTimeout(connectTimeout); + } + if (readTimeout >= 0) { + httpRequest.setReadTimeout(readTimeout); + } + } + }; + } + + /** + * Returns the timeout in milliseconds to establish a connection. 0 is an infinite timeout, a + * negative number is the default value (20000). + */ + public int connectTimeout() { + return connectTimeout; + } + + /** + * Returns the timeout in milliseconds to read from an established connection. 0 is an infinite + * timeout, a negative number is the default value (20000). + */ + public int readTimeout() { + return readTimeout; + } + + @Override + protected int baseHashCode() { + return Objects.hash(super.baseHashCode(), httpTransportFactoryClassName, connectTimeout, + readTimeout); + } + + protected boolean baseEquals(HttpServiceOptions other) { + return super.baseEquals(other) + && Objects.equals(httpTransportFactoryClassName, other.httpTransportFactoryClassName) + && Objects.equals(connectTimeout, other.connectTimeout) + && Objects.equals(readTimeout, other.readTimeout); + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { + input.defaultReadObject(); + httpTransportFactory = newInstance(httpTransportFactoryClassName); + } +} diff --git a/gcloud-java-core/src/main/java/com/google/cloud/ServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/ServiceOptions.java index 194f61b45c42..a4f27994a39f 100644 --- a/gcloud-java-core/src/main/java/com/google/cloud/ServiceOptions.java +++ b/gcloud-java-core/src/main/java/com/google/cloud/ServiceOptions.java @@ -20,12 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.api.client.extensions.appengine.http.UrlFetchTransport; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.spi.ServiceRpcFactory; import com.google.common.collect.Iterables; import com.google.common.io.Files; @@ -71,60 +65,28 @@ public abstract class ServiceOptions, Service OptionsT extends ServiceOptions> implements Serializable { private static final String DEFAULT_HOST = "https://www.googleapis.com"; - private static final long serialVersionUID = 1203687993961393350L; private static final String PROJECT_ENV_NAME = "GCLOUD_PROJECT"; private static final String MANIFEST_ARTIFACT_ID_KEY = "artifactId"; private static final String MANIFEST_VERSION_KEY = "Implementation-Version"; private static final String ARTIFACT_ID = "gcloud-java-core"; private static final String APPLICATION_BASE_NAME = "gcloud-java"; private static final String APPLICATION_NAME = getApplicationName(); + private static final long serialVersionUID = -6410263550484023006L; private final String projectId; private final String host; - private final String httpTransportFactoryClassName; private final RestorableState authCredentialsState; private final RetryParams retryParams; private final String serviceRpcFactoryClassName; private final String serviceFactoryClassName; - private final int connectTimeout; - private final int readTimeout; private final Clock clock; - private transient HttpTransportFactory httpTransportFactory; private transient AuthCredentials authCredentials; private transient ServiceRpcFactory serviceRpcFactory; private transient ServiceFactory serviceFactory; private transient ServiceT service; private transient ServiceRpcT rpc; - /** - * A base interface for all {@link HttpTransport} factories. - * - *

Implementation must provide a public no-arg constructor. Loading of a factory implementation - * is done via {@link java.util.ServiceLoader}. - */ - public interface HttpTransportFactory { - HttpTransport create(); - } - - public static class DefaultHttpTransportFactory implements HttpTransportFactory { - - private static final HttpTransportFactory INSTANCE = new DefaultHttpTransportFactory(); - - @Override - public HttpTransport create() { - // Consider App Engine - if (appEngineAppId() != null) { - try { - return new UrlFetchTransport(); - } catch (Exception ignore) { - // Maybe not on App Engine - } - } - return new NetHttpTransport(); - } - } - /** * A class providing access to the current time in milliseconds. This class is mainly used for * testing and will be replaced by Java8's {@code java.time.Clock}. @@ -178,13 +140,10 @@ protected abstract static class Builder, Serv private String projectId; private String host; - private HttpTransportFactory httpTransportFactory; private AuthCredentials authCredentials; private RetryParams retryParams; private ServiceFactory serviceFactory; private ServiceRpcFactory serviceRpcFactory; - private int connectTimeout = -1; - private int readTimeout = -1; private Clock clock; protected Builder() {} @@ -192,13 +151,10 @@ protected Builder() {} protected Builder(ServiceOptions options) { projectId = options.projectId; host = options.host; - httpTransportFactory = options.httpTransportFactory; authCredentials = options.authCredentials; retryParams = options.retryParams; serviceFactory = options.serviceFactory; serviceRpcFactory = options.serviceRpcFactory; - connectTimeout = options.connectTimeout; - readTimeout = options.readTimeout; clock = options.clock; } @@ -249,16 +205,6 @@ public B host(String host) { return self(); } - /** - * Sets the transport factory. - * - * @return the builder - */ - public B httpTransportFactory(HttpTransportFactory httpTransportFactory) { - this.httpTransportFactory = httpTransportFactory; - return self(); - } - /** * Sets the service authentication credentials. * @@ -290,30 +236,6 @@ public B serviceRpcFactory(ServiceRpcFactory serviceRpcFa this.serviceRpcFactory = serviceRpcFactory; return self(); } - - /** - * Sets the timeout in milliseconds to establish a connection. - * - * @param connectTimeout connection timeout in milliseconds. 0 for an infinite timeout, a - * negative number for the default value (20000). - * @return the builder - */ - public B connectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - return self(); - } - - /** - * Sets the timeout in milliseconds to read data from an established connection. - * - * @param readTimeout read timeout in milliseconds. 0 for an infinite timeout, a negative number - * for the default value (20000). - * @return the builder - */ - public B readTimeout(int readTimeout) { - this.readTimeout = readTimeout; - return self(); - } } protected ServiceOptions(Class> serviceFactoryClass, @@ -327,9 +249,6 @@ protected ServiceOptions(Class> ser + "or the environment. Please set a project ID using the builder."); } host = firstNonNull(builder.host, defaultHost()); - httpTransportFactory = firstNonNull(builder.httpTransportFactory, - getFromServiceLoader(HttpTransportFactory.class, DefaultHttpTransportFactory.INSTANCE)); - httpTransportFactoryClassName = httpTransportFactory.getClass().getName(); authCredentials = builder.authCredentials != null ? builder.authCredentials : defaultAuthCredentials(); authCredentialsState = authCredentials != null ? authCredentials.capture() : null; @@ -340,8 +259,6 @@ protected ServiceOptions(Class> ser serviceRpcFactory = firstNonNull(builder.serviceRpcFactory, getFromServiceLoader(rpcFactoryClass, defaultRpcFactory())); serviceRpcFactoryClassName = serviceRpcFactory.getClass().getName(); - connectTimeout = builder.connectTimeout; - readTimeout = builder.readTimeout; clock = firstNonNull(builder.clock, Clock.defaultClock()); } @@ -532,13 +449,6 @@ public String host() { return host; } - /** - * Returns the transport factory. - */ - public HttpTransportFactory httpTransportFactory() { - return httpTransportFactory; - } - /** * Returns the authentication credentials. */ @@ -554,47 +464,6 @@ public RetryParams retryParams() { return retryParams; } - /** - * Returns a request initializer responsible for initializing requests according to service - * options. - */ - public HttpRequestInitializer httpRequestInitializer() { - final HttpRequestInitializer delegate = - authCredentials() != null && authCredentials.credentials() != null - ? new HttpCredentialsAdapter(authCredentials().credentials().createScoped(scopes())) - : null; - return new HttpRequestInitializer() { - @Override - public void initialize(HttpRequest httpRequest) throws IOException { - if (delegate != null) { - delegate.initialize(httpRequest); - } - if (connectTimeout >= 0) { - httpRequest.setConnectTimeout(connectTimeout); - } - if (readTimeout >= 0) { - httpRequest.setReadTimeout(readTimeout); - } - } - }; - } - - /** - * Returns the timeout in milliseconds to establish a connection. 0 is an infinite timeout, a - * negative number is the default value (20000). - */ - public int connectTimeout() { - return connectTimeout; - } - - /** - * Returns the timeout in milliseconds to read from an established connection. 0 is an infinite - * timeout, a negative number is the default value (20000). - */ - public int readTimeout() { - return readTimeout; - } - /** * Returns the service's clock. Default time source uses {@link System#currentTimeMillis()} to get * current time. @@ -611,34 +480,29 @@ public String applicationName() { } protected int baseHashCode() { - return Objects.hash(projectId, host, httpTransportFactoryClassName, authCredentialsState, - retryParams, serviceFactoryClassName, serviceRpcFactoryClassName, connectTimeout, - readTimeout, clock); + return Objects.hash(projectId, host, authCredentialsState, retryParams, serviceFactoryClassName, + serviceRpcFactoryClassName, clock); } protected boolean baseEquals(ServiceOptions other) { return Objects.equals(projectId, other.projectId) && Objects.equals(host, other.host) - && Objects.equals(httpTransportFactoryClassName, other.httpTransportFactoryClassName) && Objects.equals(authCredentialsState, other.authCredentialsState) && Objects.equals(retryParams, other.retryParams) && Objects.equals(serviceFactoryClassName, other.serviceFactoryClassName) && Objects.equals(serviceRpcFactoryClassName, other.serviceRpcFactoryClassName) - && Objects.equals(connectTimeout, other.connectTimeout) - && Objects.equals(readTimeout, other.readTimeout) && Objects.equals(clock, clock); } private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { input.defaultReadObject(); - httpTransportFactory = newInstance(httpTransportFactoryClassName); serviceFactory = newInstance(serviceFactoryClassName); serviceRpcFactory = newInstance(serviceRpcFactoryClassName); authCredentials = authCredentialsState != null ? authCredentialsState.restore() : null; } @SuppressWarnings("unchecked") - private static T newInstance(String className) throws IOException, ClassNotFoundException { + static T newInstance(String className) throws IOException, ClassNotFoundException { try { return (T) Class.forName(className).newInstance(); } catch (InstantiationException | IllegalAccessException e) { @@ -663,7 +527,7 @@ protected RetryParams defaultRetryParams() { return RetryParams.defaultInstance(); } - private static T getFromServiceLoader(Class clazz, T defaultInstance) { + static T getFromServiceLoader(Class clazz, T defaultInstance) { return Iterables.getFirst(ServiceLoader.load(clazz), defaultInstance); } diff --git a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java new file mode 100644 index 000000000000..0a3c34f87916 --- /dev/null +++ b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java @@ -0,0 +1,218 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.cloud.GrpcServiceOptions.DefaultExecutorFactory; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.spi.ServiceRpcFactory; + +import org.easymock.EasyMock; +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; + +public class GrpcServiceOptionsTest { + + private static final ExecutorFactory MOCK_EXECUTOR_FACTORY = + EasyMock.createMock(ExecutorFactory.class); + private static final TestGrpcServiceOptions OPTIONS = TestGrpcServiceOptions.builder() + .projectId("project-id") + .initialTimeout(1234) + .timeoutMultiplier(1.6) + .maxTimeout(5678) + .executorFactory(MOCK_EXECUTOR_FACTORY) + .build(); + private static final TestGrpcServiceOptions DEFAULT_OPTIONS = + TestGrpcServiceOptions.builder().projectId("project-id").build(); + private static final TestGrpcServiceOptions OPTIONS_COPY = OPTIONS.toBuilder().build(); + + private interface TestService extends Service {} + + private static class TestServiceImpl + extends BaseService implements TestService { + private TestServiceImpl(TestGrpcServiceOptions options) { + super(options); + } + } + + private interface TestServiceFactory extends ServiceFactory { + } + + private static class DefaultTestServiceFactory implements TestServiceFactory { + private static final TestServiceFactory INSTANCE = new DefaultTestServiceFactory(); + + @Override + public TestService create(TestGrpcServiceOptions options) { + return new TestServiceImpl(options); + } + } + + private interface TestServiceRpcFactory + extends ServiceRpcFactory {} + + private static class DefaultTestServiceRpcFactory implements TestServiceRpcFactory { + private static final TestServiceRpcFactory INSTANCE = new DefaultTestServiceRpcFactory(); + + @Override + public TestServiceRpc create(TestGrpcServiceOptions options) { + return new DefaultTestServiceRpc(options); + } + } + + private interface TestServiceRpc {} + + private static class DefaultTestServiceRpc implements TestServiceRpc { + DefaultTestServiceRpc(TestGrpcServiceOptions options) {} + } + + private static class TestGrpcServiceOptions + extends GrpcServiceOptions { + private static class Builder + extends GrpcServiceOptions.Builder { + private Builder() {} + + private Builder(TestGrpcServiceOptions options) { + super(options); + } + + @Override + protected TestGrpcServiceOptions build() { + return new TestGrpcServiceOptions(this); + } + } + + private TestGrpcServiceOptions(Builder builder) { + super(TestServiceFactory.class, TestServiceRpcFactory.class, builder); + } + + @Override + protected TestServiceFactory defaultServiceFactory() { + return DefaultTestServiceFactory.INSTANCE; + } + + @Override + protected TestServiceRpcFactory defaultRpcFactory() { + return DefaultTestServiceRpcFactory.INSTANCE; + } + + @Override + protected Set scopes() { + return null; + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + private static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestGrpcServiceOptions && baseEquals((TestGrpcServiceOptions) obj); + } + + @Override + public int hashCode() { + return baseHashCode(); + } + } + + @Test + public void testBuilder() { + assertEquals(1234, OPTIONS.initialTimeout()); + assertEquals(1.6, OPTIONS.timeoutMultiplier(), 0.0); + assertEquals(5678, OPTIONS.maxTimeout()); + assertSame(MOCK_EXECUTOR_FACTORY, OPTIONS.executorFactory()); + assertEquals(20000, DEFAULT_OPTIONS.initialTimeout()); + assertEquals(1.5, DEFAULT_OPTIONS.timeoutMultiplier(), 0.0); + assertEquals(100000, DEFAULT_OPTIONS.maxTimeout()); + assertTrue(DEFAULT_OPTIONS.executorFactory() instanceof DefaultExecutorFactory); + } + + @Test + public void testBuilderError() { + try { + TestGrpcServiceOptions.builder().initialTimeout(0); + fail("IllegalArgumentException expected"); + } catch (IllegalArgumentException ex) { + assertEquals("Initial timeout must be > 0", ex.getMessage()); + } + try { + TestGrpcServiceOptions.builder().initialTimeout(-1); + fail("IllegalArgumentException expected"); + } catch (IllegalArgumentException ex) { + assertEquals("Initial timeout must be > 0", ex.getMessage()); + } + try { + TestGrpcServiceOptions.builder().timeoutMultiplier(0.9); + fail("IllegalArgumentException expected"); + } catch (IllegalArgumentException ex) { + assertEquals("Timeout multiplier must be >= 1", ex.getMessage()); + } + } + + @Test + public void testBuilderInvalidMaxTimeout() { + TestGrpcServiceOptions options = TestGrpcServiceOptions.builder() + .projectId("project-id") + .initialTimeout(1234) + .timeoutMultiplier(1.6) + .maxTimeout(123) + .build(); + assertEquals(1234, options.initialTimeout()); + assertEquals(1.6, options.timeoutMultiplier(), 0.0); + assertEquals(1234, options.maxTimeout()); + } + + @Test + public void testBaseEquals() { + assertEquals(OPTIONS, OPTIONS_COPY); + assertNotEquals(DEFAULT_OPTIONS, OPTIONS); + TestGrpcServiceOptions options = OPTIONS.toBuilder() + .executorFactory(new DefaultExecutorFactory()) + .build(); + assertNotEquals(OPTIONS, options); + } + + @Test + public void testBaseHashCode() { + assertEquals(OPTIONS.hashCode(), OPTIONS_COPY.hashCode()); + assertNotEquals(DEFAULT_OPTIONS.hashCode(), OPTIONS.hashCode()); + TestGrpcServiceOptions options = OPTIONS.toBuilder() + .executorFactory(new DefaultExecutorFactory()) + .build(); + assertNotEquals(OPTIONS.hashCode(), options.hashCode()); + } + + @Test + public void testDefaultExecutorFactory() { + ExecutorFactory executorFactory = new DefaultExecutorFactory(); + ScheduledExecutorService executorService = executorFactory.get(); + assertSame(executorService, executorFactory.get()); + } +} diff --git a/gcloud-java-core/src/test/java/com/google/cloud/HttpServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/HttpServiceOptionsTest.java new file mode 100644 index 000000000000..de8dc8592eeb --- /dev/null +++ b/gcloud-java-core/src/test/java/com/google/cloud/HttpServiceOptionsTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.HttpServiceOptions.DefaultHttpTransportFactory; +import com.google.cloud.HttpServiceOptions.HttpTransportFactory; +import com.google.cloud.spi.ServiceRpcFactory; + +import org.easymock.EasyMock; +import org.junit.Test; + +import java.util.Set; + +public class HttpServiceOptionsTest { + + private static final HttpTransportFactory MOCK_HTTP_TRANSPORT_FACTORY = + EasyMock.createMock(HttpTransportFactory.class); + private static final TestHttpServiceOptions OPTIONS = TestHttpServiceOptions.builder() + .projectId("project-id") + .connectTimeout(1234) + .httpTransportFactory(MOCK_HTTP_TRANSPORT_FACTORY) + .readTimeout(5678) + .build(); + private static final TestHttpServiceOptions DEFAULT_OPTIONS = + TestHttpServiceOptions.builder().projectId("project-id").build(); + private static final TestHttpServiceOptions OPTIONS_COPY = OPTIONS.toBuilder().build(); + + private interface TestService extends Service {} + + private static class TestServiceImpl + extends BaseService implements TestService { + private TestServiceImpl(TestHttpServiceOptions options) { + super(options); + } + } + + private interface TestServiceFactory extends ServiceFactory { + } + + private static class DefaultTestServiceFactory implements TestServiceFactory { + private static final TestServiceFactory INSTANCE = new DefaultTestServiceFactory(); + + @Override + public TestService create(TestHttpServiceOptions options) { + return new TestServiceImpl(options); + } + } + + private interface TestServiceRpcFactory + extends ServiceRpcFactory {} + + private static class DefaultTestServiceRpcFactory implements TestServiceRpcFactory { + private static final TestServiceRpcFactory INSTANCE = new DefaultTestServiceRpcFactory(); + + @Override + public TestServiceRpc create(TestHttpServiceOptions options) { + return new DefaultTestServiceRpc(options); + } + } + + private interface TestServiceRpc {} + + private static class DefaultTestServiceRpc implements TestServiceRpc { + DefaultTestServiceRpc(TestHttpServiceOptions options) {} + } + + private static class TestHttpServiceOptions + extends HttpServiceOptions { + private static class Builder + extends HttpServiceOptions.Builder { + private Builder() {} + + private Builder(TestHttpServiceOptions options) { + super(options); + } + + @Override + protected TestHttpServiceOptions build() { + return new TestHttpServiceOptions(this); + } + } + + private TestHttpServiceOptions(Builder builder) { + super(TestServiceFactory.class, TestServiceRpcFactory.class, builder); + } + + @Override + protected TestServiceFactory defaultServiceFactory() { + return DefaultTestServiceFactory.INSTANCE; + } + + @Override + protected TestServiceRpcFactory defaultRpcFactory() { + return DefaultTestServiceRpcFactory.INSTANCE; + } + + @Override + protected Set scopes() { + return null; + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + private static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHttpServiceOptions && baseEquals((TestHttpServiceOptions) obj); + } + + @Override + public int hashCode() { + return baseHashCode(); + } + } + + @Test + public void testBuilder() { + assertEquals(1234, OPTIONS.connectTimeout()); + assertSame(MOCK_HTTP_TRANSPORT_FACTORY, OPTIONS.httpTransportFactory()); + assertEquals(5678, OPTIONS.readTimeout()); + assertEquals(-1, DEFAULT_OPTIONS.connectTimeout()); + assertTrue(DEFAULT_OPTIONS.httpTransportFactory() instanceof DefaultHttpTransportFactory); + assertEquals(-1, DEFAULT_OPTIONS.readTimeout()); + } + + @Test + public void testBaseEquals() { + assertEquals(OPTIONS, OPTIONS_COPY); + assertNotEquals(DEFAULT_OPTIONS, OPTIONS); + } + + @Test + public void testBaseHashCode() { + assertEquals(OPTIONS.hashCode(), OPTIONS_COPY.hashCode()); + assertNotEquals(DEFAULT_OPTIONS.hashCode(), OPTIONS.hashCode()); + } +} diff --git a/gcloud-java-core/src/test/java/com/google/cloud/ServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/ServiceOptionsTest.java index b6f7a5453ddc..1f0abc0ece11 100644 --- a/gcloud-java-core/src/test/java/com/google/cloud/ServiceOptionsTest.java +++ b/gcloud-java-core/src/test/java/com/google/cloud/ServiceOptionsTest.java @@ -23,21 +23,15 @@ import static org.junit.Assert.fail; import com.google.cloud.ServiceOptions.Clock; -import com.google.cloud.ServiceOptions.DefaultHttpTransportFactory; -import com.google.cloud.ServiceOptions.HttpTransportFactory; import com.google.cloud.spi.ServiceRpcFactory; -import org.easymock.EasyMock; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Set; -@RunWith(JUnit4.class) public class ServiceOptionsTest { private static final String JSON_KEY = "{\n" @@ -75,18 +69,13 @@ public class ServiceOptionsTest { fail("Couldn't create fake JSON credentials."); } } - private static final HttpTransportFactory MOCK_HTTP_TRANSPORT_FACTORY = - EasyMock.createMock(HttpTransportFactory.class); private static final Clock TEST_CLOCK = new TestClock(); private static final TestServiceOptions OPTIONS = TestServiceOptions.builder() .authCredentials(authCredentials) .clock(TEST_CLOCK) - .connectTimeout(1234) .host("host") - .httpTransportFactory(MOCK_HTTP_TRANSPORT_FACTORY) .projectId("project-id") - .readTimeout(5678) .retryParams(RetryParams.noRetries()) .build(); private static final TestServiceOptions DEFAULT_OPTIONS = @@ -197,18 +186,11 @@ public int hashCode() { public void testBuilder() { assertSame(authCredentials, OPTIONS.authCredentials()); assertSame(TEST_CLOCK, OPTIONS.clock()); - assertEquals(1234, OPTIONS.connectTimeout()); assertEquals("host", OPTIONS.host()); - assertSame(MOCK_HTTP_TRANSPORT_FACTORY, OPTIONS.httpTransportFactory()); assertEquals("project-id", OPTIONS.projectId()); - assertEquals(5678, OPTIONS.readTimeout()); assertSame(RetryParams.noRetries(), OPTIONS.retryParams()); - assertSame(Clock.defaultClock(), DEFAULT_OPTIONS.clock()); - assertEquals(-1, DEFAULT_OPTIONS.connectTimeout()); assertEquals("https://www.googleapis.com", DEFAULT_OPTIONS.host()); - assertTrue(DEFAULT_OPTIONS.httpTransportFactory() instanceof DefaultHttpTransportFactory); - assertEquals(-1, DEFAULT_OPTIONS.readTimeout()); assertSame(RetryParams.defaultInstance(), DEFAULT_OPTIONS.retryParams()); } diff --git a/gcloud-java-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java b/gcloud-java-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java index a9466939060a..0f29fefd9815 100644 --- a/gcloud-java-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java +++ b/gcloud-java-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java @@ -18,9 +18,9 @@ import static com.google.cloud.datastore.Validator.validateNamespace; +import com.google.cloud.HttpServiceOptions; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; -import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.spi.DatastoreRpc; import com.google.cloud.datastore.spi.DatastoreRpcFactory; import com.google.cloud.datastore.spi.DefaultDatastoreRpc; @@ -29,9 +29,10 @@ import java.util.Objects; import java.util.Set; -public class DatastoreOptions extends ServiceOptions { +public class DatastoreOptions + extends HttpServiceOptions { - private static final long serialVersionUID = 5056049000758143852L; + private static final long serialVersionUID = -7859275434360052450L; private static final String DATASTORE_SCOPE = "https://www.googleapis.com/auth/datastore"; private static final Set SCOPES = ImmutableSet.of(DATASTORE_SCOPE); @@ -58,7 +59,7 @@ public DatastoreRpc create(DatastoreOptions options) { } public static class Builder extends - ServiceOptions.Builder { + HttpServiceOptions.Builder { private String namespace; diff --git a/gcloud-java-dns/src/main/java/com/google/cloud/dns/DnsOptions.java b/gcloud-java-dns/src/main/java/com/google/cloud/dns/DnsOptions.java index 059f7b212044..a213a855fff2 100644 --- a/gcloud-java-dns/src/main/java/com/google/cloud/dns/DnsOptions.java +++ b/gcloud-java-dns/src/main/java/com/google/cloud/dns/DnsOptions.java @@ -16,17 +16,17 @@ package com.google.cloud.dns; +import com.google.cloud.HttpServiceOptions; import com.google.common.collect.ImmutableSet; -import com.google.cloud.ServiceOptions; import com.google.cloud.dns.spi.DefaultDnsRpc; import com.google.cloud.dns.spi.DnsRpc; import com.google.cloud.dns.spi.DnsRpcFactory; import java.util.Set; -public class DnsOptions extends ServiceOptions { +public class DnsOptions extends HttpServiceOptions { - private static final long serialVersionUID = -519128051411747771L; + private static final long serialVersionUID = -8639966476950724880L; private static final String GC_DNS_RW = "https://www.googleapis.com/auth/ndev.clouddns.readwrite"; private static final Set SCOPES = ImmutableSet.of(GC_DNS_RW); @@ -49,7 +49,7 @@ public DnsRpc create(DnsOptions options) { } } - public static class Builder extends ServiceOptions.Builder { private Builder() { diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 6fde6f4425df..48de7002d54f 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -31,7 +31,7 @@ * * @see Google Cloud Pub/Sub */ -public interface PubSub extends Service { +public interface PubSub extends AutoCloseable, Service { /** * Class for specifying options for listing topics and subscriptions. diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 19b2e5a35fec..bd69103b9819 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -278,4 +278,9 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti Iterable ackIds) { return null; } + + @Override + public void close() throws Exception { + rpc.close(); + } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 73482ccef25f..420b0d50148b 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -16,7 +16,7 @@ package com.google.cloud.pubsub; -import com.google.cloud.ServiceOptions; +import com.google.cloud.GrpcServiceOptions; import com.google.cloud.pubsub.spi.DefaultPubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpcFactory; @@ -25,9 +25,9 @@ import java.io.IOException; import java.util.Set; -public class PubSubOptions extends ServiceOptions { +public class PubSubOptions extends GrpcServiceOptions { - private static final long serialVersionUID = 6740347843343421456L; + private static final long serialVersionUID = 5640180400046623305L; private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub"; private static final Set SCOPES = ImmutableSet.of(PUBSUB_SCOPE); private static final String DEFAULT_HOST = "https://pubsub.googleapis.com"; @@ -67,7 +67,7 @@ protected String defaultHost() { } public static class Builder extends - ServiceOptions.Builder { + GrpcServiceOptions.Builder { private Builder() {} @@ -81,7 +81,7 @@ public PubSubOptions build() { } } - private PubSubOptions(Builder builder) { + protected PubSubOptions(Builder builder) { super(PubSubFactory.class, PubSubRpcFactory.class, builder); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index d4f00fd7cf37..7878a6d83835 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -19,6 +19,9 @@ import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.ApiCallSettings; import com.google.api.gax.grpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.AuthCredentials; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.RetryParams; import com.google.cloud.pubsub.PubSubException; import com.google.cloud.pubsub.PubSubOptions; @@ -27,6 +30,7 @@ import com.google.cloud.pubsub.spi.v1.SubscriberApi; import com.google.cloud.pubsub.spi.v1.SubscriberSettings; import com.google.common.base.Function; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; @@ -52,32 +56,64 @@ import org.joda.time.Duration; +import io.grpc.ManagedChannel; +import io.grpc.Status.Code; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + import java.io.IOException; import java.util.Set; import java.util.concurrent.Future; - -import autovalue.shaded.com.google.common.common.collect.Sets; -import io.grpc.Status.Code; +import java.util.concurrent.ScheduledExecutorService; public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; + private final ScheduledExecutorService executor; + private final ExecutorFactory executorFactory; + + private static final class InternalPubSubOptions extends PubSubOptions { + + private static final long serialVersionUID = -7997372049256706185L; + + private InternalPubSubOptions(PubSubOptions options) { + super(options.toBuilder()); + } + + @Override + protected ExecutorFactory executorFactory() { + return super.executorFactory(); + } + } public DefaultPubSubRpc(PubSubOptions options) throws IOException { + executorFactory = new InternalPubSubOptions(options).executorFactory(); + executor = executorFactory.get(); try { - // Provide (and use a common thread-pool). - // This depends on https://github.com/googleapis/gax-java/issues/73 - PublisherSettings.Builder pbuilder = - PublisherSettings.defaultBuilder() - .provideChannelWith(options.authCredentials().credentials()) - .applyToAllApiMethods(apiCallSettings(options)); - publisherApi = PublisherApi.create(pbuilder.build()); - SubscriberSettings.Builder sBuilder = - SubscriberSettings.defaultBuilder() - .provideChannelWith(options.authCredentials().credentials()) - .applyToAllApiMethods(apiCallSettings(options)); - subscriberApi = SubscriberApi.create(sBuilder.build()); + PublisherSettings.Builder pubBuilder = + PublisherSettings.defaultBuilder().provideExecutorWith(executor, false); + SubscriberSettings.Builder subBuilder = + SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false); + // todo(mziccard): PublisherSettings should support null/absent credentials for testing + if (options.host().contains("localhost") + || options.authCredentials().equals(AuthCredentials.noAuth())) { + ManagedChannel channel = NettyChannelBuilder.forTarget(options.host()) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + pubBuilder.provideChannelWith(channel, true); + subBuilder.provideChannelWith(channel, true); + } else { + GoogleCredentials credentials = options.authCredentials().credentials(); + pubBuilder.provideChannelWith( + credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES)); + subBuilder.provideChannelWith( + credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES)); + } + pubBuilder.applyToAllApiMethods(apiCallSettings(options)); + subBuilder.applyToAllApiMethods(apiCallSettings(options)); + publisherApi = PublisherApi.create(pubBuilder.build()); + subscriberApi = SubscriberApi.create(subBuilder.build()); } catch (Exception ex) { throw new IOException(ex); } @@ -89,9 +125,9 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) { RetryParams retryParams = options.retryParams(); final RetrySettings.Builder builder = RetrySettings.newBuilder() .setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis())) - .setInitialRpcTimeout(Duration.millis(options.connectTimeout())) - .setRpcTimeoutMultiplier(1.5) - .setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout())) + .setInitialRpcTimeout(Duration.millis(options.initialTimeout())) + .setRpcTimeoutMultiplier(options.timeoutMultiplier()) + .setMaxRpcTimeout(Duration.millis(options.maxTimeout())) .setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis())) .setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor()) .setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis())); @@ -195,4 +231,11 @@ public Future pull(PullRequest request) { public Future modify(ModifyPushConfigRequest request) { return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false); } + + @Override + public void close() throws Exception { + subscriberApi.close(); + publisherApi.close(); + executorFactory.release(executor); + } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java index 8474ba042234..6d738cd554c4 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -39,7 +39,7 @@ import java.util.concurrent.Future; -public interface PubSubRpc { +public interface PubSubRpc extends AutoCloseable { // in all cases root cause of ExecutionException is PubSubException Future create(Topic topic); diff --git a/gcloud-java-resourcemanager/src/main/java/com/google/cloud/resourcemanager/ResourceManagerOptions.java b/gcloud-java-resourcemanager/src/main/java/com/google/cloud/resourcemanager/ResourceManagerOptions.java index 8f5c79e8bc3f..afc0514ea643 100644 --- a/gcloud-java-resourcemanager/src/main/java/com/google/cloud/resourcemanager/ResourceManagerOptions.java +++ b/gcloud-java-resourcemanager/src/main/java/com/google/cloud/resourcemanager/ResourceManagerOptions.java @@ -16,8 +16,8 @@ package com.google.cloud.resourcemanager; +import com.google.cloud.HttpServiceOptions; import com.google.common.collect.ImmutableSet; -import com.google.cloud.ServiceOptions; import com.google.cloud.resourcemanager.spi.DefaultResourceManagerRpc; import com.google.cloud.resourcemanager.spi.ResourceManagerRpc; import com.google.cloud.resourcemanager.spi.ResourceManagerRpcFactory; @@ -25,9 +25,9 @@ import java.util.Set; public class ResourceManagerOptions - extends ServiceOptions { + extends HttpServiceOptions { - private static final long serialVersionUID = 538303101192527452L; + private static final long serialVersionUID = -109855112863688882L; private static final String GCRM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; private static final Set SCOPES = ImmutableSet.of(GCRM_SCOPE); private static final String DEFAULT_HOST = "https://cloudresourcemanager.googleapis.com"; @@ -63,8 +63,8 @@ protected String defaultHost() { return DEFAULT_HOST; } - public static class Builder extends ServiceOptions.Builder { + public static class Builder extends HttpServiceOptions.Builder { private Builder() {} diff --git a/gcloud-java-storage/src/main/java/com/google/cloud/storage/StorageOptions.java b/gcloud-java-storage/src/main/java/com/google/cloud/storage/StorageOptions.java index 15e5791a6b91..45b393c5e171 100644 --- a/gcloud-java-storage/src/main/java/com/google/cloud/storage/StorageOptions.java +++ b/gcloud-java-storage/src/main/java/com/google/cloud/storage/StorageOptions.java @@ -16,17 +16,17 @@ package com.google.cloud.storage; +import com.google.cloud.HttpServiceOptions; import com.google.common.collect.ImmutableSet; -import com.google.cloud.ServiceOptions; import com.google.cloud.storage.spi.DefaultStorageRpc; import com.google.cloud.storage.spi.StorageRpc; import com.google.cloud.storage.spi.StorageRpcFactory; import java.util.Set; -public class StorageOptions extends ServiceOptions { +public class StorageOptions extends HttpServiceOptions { - private static final long serialVersionUID = -7804860602287801084L; + private static final long serialVersionUID = -7456495262640805964L; private static final String GCS_SCOPE = "https://www.googleapis.com/auth/devstorage.full_control"; private static final Set SCOPES = ImmutableSet.of(GCS_SCOPE); @@ -51,7 +51,7 @@ public StorageRpc create(StorageOptions options) { } public static class Builder extends - ServiceOptions.Builder { + HttpServiceOptions.Builder { private Builder() {}