Skip to content

Commit

Permalink
feat: add implementation of ChannelPrimer to establish connection to …
Browse files Browse the repository at this point in the history
…GFE and integrate into bigtable client
  • Loading branch information
tonytanger committed Dec 17, 2019
1 parent 4e77ace commit f7aede6
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,25 @@ public CredentialsProvider getCredentialsProvider() {
return stubSettings.getCredentialsProvider();
}

/**
* Configure periodic gRPC channel refreshes.
*
* <p>This feature will gracefully refresh connections to the Cloud Bigtable service. This is an
* experimental feature to address tail latency caused by the service dropping long lived gRPC
* connections, which causes the client to renegotiate the gRPC connection in the request path,
* which causes periodic spikes in latency
*/
@BetaApi("This API depends on experimental gRPC APIs")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
stubSettings.setRefreshingChannel(isRefreshingChannel);
return this;
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}

/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelPrimer;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.concurrent.TimeUnit;

/**
* Establish a connection to the Cloud Bigtable service on managedChannel
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@BetaApi("This API depends on gRPC experimental API")
@InternalApi
public final class RefreshChannel implements ChannelPrimer {
@Override
public void primeChannel(ManagedChannel managedChannel) {
for (int i = 0; i < 10; i++) {
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand All @@ -29,6 +30,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.RefreshChannel;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand Down Expand Up @@ -149,6 +151,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String projectId;
private final String instanceId;
private final String appProfileId;
private final boolean isRefreshingChannel;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -179,6 +182,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
projectId = builder.projectId;
instanceId = builder.instanceId;
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -210,6 +214,11 @@ public String getAppProfileId() {
return appProfileId;
}

/** Returns if channels will gracefully refresh connections to Cloud Bigtable service */
public boolean isRefreshingChannel() {
return isRefreshingChannel;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -413,6 +422,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String projectId;
private String instanceId;
private String appProfileId;
private boolean isRefreshingChannel;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -433,6 +443,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
*/
private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -515,6 +526,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
projectId = settings.projectId;
instanceId = settings.instanceId;
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -602,6 +614,22 @@ public String getAppProfileId() {
return appProfileId;
}

/**
* Sets if channels will gracefully refresh connections to Cloud Bigtable service
*
* @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel
*/
@BetaApi("This API depends on experimental gRPC APIs")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
this.isRefreshingChannel = isRefreshingChannel;
return this;
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
public boolean isRefreshingChannel() {
return isRefreshingChannel;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -642,6 +670,16 @@ public EnhancedBigtableStubSettings build() {
Preconditions.checkState(projectId != null, "Project id must be set");
Preconditions.checkState(instanceId != null, "Instance id must be set");

// Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh
// connections to Cloud Bigtable service
if (isRefreshingChannel
&& getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
InstantiatingGrpcChannelProvider.Builder channelBuilder =
((InstantiatingGrpcChannelProvider) getTransportChannelProvider())
.toBuilder()
.setChannelPrimer(new RefreshChannel());
setTransportChannelProvider(channelBuilder.build());
}
return new EnhancedBigtableStubSettings(this);
}
// </editor-fold>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal;

import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class RefreshChannelTest {
// RefreshChannel should establish connection to the server through managedChannel.getState(true)
@Test
public void testGetStateIsCalled() {
RefreshChannel refreshChannel = new RefreshChannel();
ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);

Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true);

refreshChannel.primeChannel(managedChannel);
Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void settingsAreNotLostTest() {
String projectId = "my-project";
String instanceId = "my-instance";
String appProfileId = "my-app-profile-id";
boolean isRefreshingChannel = true;
String endpoint = "some.other.host:123";
CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class);
WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class);
Expand All @@ -71,6 +72,7 @@ public void settingsAreNotLostTest() {
.setProjectId(projectId)
.setInstanceId(instanceId)
.setAppProfileId(appProfileId)
.setRefreshingChannel(isRefreshingChannel)
.setEndpoint(endpoint)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
Expand All @@ -81,6 +83,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -90,6 +93,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -99,6 +103,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -110,13 +115,15 @@ private void verifyBuilder(
String projectId,
String instanceId,
String appProfileId,
boolean isRefreshingChannel,
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(builder.getProjectId()).isEqualTo(projectId);
assertThat(builder.getInstanceId()).isEqualTo(instanceId);
assertThat(builder.getAppProfileId()).isEqualTo(appProfileId);
assertThat(builder.isRefreshingChannel()).isEqualTo(isRefreshingChannel);
assertThat(builder.getEndpoint()).isEqualTo(endpoint);
assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
Expand All @@ -128,13 +135,15 @@ private void verifySettings(
String projectId,
String instanceId,
String appProfileId,
boolean isRefreshingChannel,
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(settings.getProjectId()).isEqualTo(projectId);
assertThat(settings.getInstanceId()).isEqualTo(instanceId);
assertThat(settings.getAppProfileId()).isEqualTo(appProfileId);
assertThat(settings.isRefreshingChannel()).isEqualTo(isRefreshingChannel);
assertThat(settings.getEndpoint()).isEqualTo(endpoint);
assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
Expand Down

0 comments on commit f7aede6

Please sign in to comment.