Skip to content

Commit

Permalink
core: channelBuilder.defaulServiceConfig() and lookUpServiceConfig()
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 authored Mar 14, 2019
1 parent 185cf3d commit 1735adc
Show file tree
Hide file tree
Showing 8 changed files with 588 additions and 26 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/io/grpc/ForwardingChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import com.google.common.base.MoreObjects;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* A {@link ManagedChannelBuilder} that delegates all its builder method to another builder by
Expand Down Expand Up @@ -242,6 +244,18 @@ public T proxyDetector(ProxyDetector proxyDetector) {
return thisT();
}

@Override
public T defaultServiceConfig(@Nullable Map<String, ?> serviceConfig) {
delegate().defaultServiceConfig(serviceConfig);
return thisT();
}

@Override
public T lookUpServiceConfig(boolean enable) {
delegate().lookUpServiceConfig(enable);
return thisT();
}

/**
* Returns the {@link ManagedChannel} built by the delegate by default. Overriding method can
* return different value.
Expand Down
56 changes: 56 additions & 0 deletions core/src/main/java/io/grpc/ManagedChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* A builder for {@link ManagedChannel} instances.
Expand Down Expand Up @@ -537,6 +539,60 @@ public T proxyDetector(ProxyDetector proxyDetector) {
throw new UnsupportedOperationException();
}

/**
* Provides a service config to the channel. The channel will use the default service config when
* the name resolver provides no service config or if the channel disables lookup service config
* from name resolver (see {@link #lookUpServiceConfig(boolean)}). The argument
* {@code serviceConfig} is a nested map representing a Json object in the most natural way:
*
* <table border="1">
* <tr>
* <td>Json entry</td><td>Java Type</td>
* </tr>
* <tr>
* <td>object</td><td>{@link Map}</td>
* </tr>
* <tr>
* <td>array</td><td>{@link List}</td>
* </tr>
* <tr>
* <td>string</td><td>{@link String}</td>
* </tr>
* <tr>
* <td>number</td><td>{@link Double}</td>
* </tr>
* <tr>
* <td>boolean</td><td>{@link Boolean}</td>
* </tr>
* <tr>
* <td>null</td><td>{@code null}</td>
* </tr>
* </table>
*
* <p>If null is passed, then there will be no default service config.
*
* @throws IllegalArgumentException When the given serviceConfig is invalid or the current version
* of grpc library can not parse it gracefully. The state of the builder is unchanged if
* an exception is thrown.
* @return this
* @since 1.20.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5189")
public T defaultServiceConfig(@Nullable Map<String, ?> serviceConfig) {
throw new UnsupportedOperationException();
}

/**
* Enables or disables service config look-up from the naming system. Enabled by default.
*
* @return this
* @since 1.20.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5189")
public T lookUpServiceConfig(boolean enable) {
throw new UnsupportedOperationException();
}

/**
* Builds a channel using the given parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -139,6 +141,10 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
InternalChannelz channelz = InternalChannelz.instance();
int maxTraceEvents;

@Nullable
Map<String, ?> defaultServiceConfig;
boolean lookUpServiceConfig = true;

protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();

private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
Expand Down Expand Up @@ -379,6 +385,78 @@ public T proxyDetector(@Nullable ProxyDetector proxyDetector) {
return thisT();
}

@Override
public T defaultServiceConfig(@Nullable Map<String, ?> serviceConfig) {
// TODO(notcarl): use real parsing
defaultServiceConfig = checkMapEntryTypes(serviceConfig);
return thisT();
}

@Nullable
private static Map<String, ?> checkMapEntryTypes(@Nullable Map<?, ?> map) {
if (map == null) {
return null;
}
// Not using ImmutableMap.Builder because of extra guava dependency for Android.
Map<String, Object> parsedMap = new LinkedHashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
checkArgument(
entry.getKey() instanceof String,
"The key of the entry '%s' is not of String type", entry);

String key = (String) entry.getKey();
Object value = entry.getValue();
if (value == null) {
parsedMap.put(key, null);
} else if (value instanceof Map) {
parsedMap.put(key, checkMapEntryTypes((Map<?, ?>) value));
} else if (value instanceof List) {
parsedMap.put(key, checkListEntryTypes((List<?>) value));
} else if (value instanceof String) {
parsedMap.put(key, value);
} else if (value instanceof Double) {
parsedMap.put(key, value);
} else if (value instanceof Boolean) {
parsedMap.put(key, value);
} else {
throw new IllegalArgumentException(
"The value of the map entry '" + entry + "' is of type '" + value.getClass()
+ "', which is not supported");
}
}
return Collections.unmodifiableMap(parsedMap);
}

private static List<?> checkListEntryTypes(List<?> list) {
List<Object> parsedList = new ArrayList<>(list.size());
for (Object value : list) {
if (value == null) {
parsedList.add(null);
} else if (value instanceof Map) {
parsedList.add(checkMapEntryTypes((Map<?, ?>) value));
} else if (value instanceof List) {
parsedList.add(checkListEntryTypes((List<?>) value));
} else if (value instanceof String) {
parsedList.add(value);
} else if (value instanceof Double) {
parsedList.add(value);
} else if (value instanceof Boolean) {
parsedList.add(value);
} else {
throw new IllegalArgumentException(
"The entry '" + value + "' is of type '" + value.getClass()
+ "', which is not supported");
}
}
return Collections.unmodifiableList(parsedList);
}

@Override
public T lookUpServiceConfig(boolean enable) {
this.lookUpServiceConfig = enable;
return thisT();
}

/**
* Disable or enable stats features. Enabled by default.
*
Expand Down Expand Up @@ -490,7 +568,7 @@ final List<ClientInterceptor> getEffectiveInterceptors() {
/**
* Subclasses can override this method to provide a default port to {@link NameResolver} for use
* in cases where the target string doesn't include a port. The default implementation returns
* {@link GrpcUtil.DEFAULT_PORT_SSL}.
* {@link GrpcUtil#DEFAULT_PORT_SSL}.
*/
protected int getDefaultPort() {
return GrpcUtil.DEFAULT_PORT_SSL;
Expand Down
89 changes: 74 additions & 15 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,17 @@ public void uncaughtException(Thread t, Throwable e) {
// Must be mutated and read from syncContext
@CheckForNull
private Boolean haveBackends; // a flag for doing channel tracing when flipped
// Must be mutated and read from syncContext
// Must be mutated and read from constructor or syncContext
// TODO(notcarl): check this value when error in service config resolution
@Nullable
private Map<String, ?> lastServiceConfig; // used for channel tracing when value changed
@Nullable
private final Map<String, ?> defaultServiceConfig;
// Must be mutated and read from constructor or syncContext
// See service config error handling spec for reference.
// TODO(notcarl): check this value when error in service config resolution
private boolean waitingForServiceConfig = true;
private final boolean lookUpServiceConfig;

// One instance per channel.
private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
Expand Down Expand Up @@ -581,6 +589,9 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new

serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig;
this.lastServiceConfig = defaultServiceConfig;
this.lookUpServiceConfig = builder.lookUpServiceConfig;
Channel channel = new RealChannel(nameResolver.getServiceAuthority());
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
if (builder.binlog != null) {
Expand Down Expand Up @@ -621,6 +632,23 @@ public CallTracer create() {
channelCallTracer = callTracerFactory.create();
this.channelz = checkNotNull(builder.channelz);
channelz.addRootChannel(this);

if (!lookUpServiceConfig) {
if (defaultServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
}
handleServiceConfigUpdate();
}
}

// May only be called in constructor or syncContext
private void handleServiceConfigUpdate() {
waitingForServiceConfig = false;
serviceConfigInterceptor.handleUpdate(lastServiceConfig);
if (retryEnabled) {
throttle = ServiceConfigUtil.getThrottlePolicy(lastServiceConfig);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -1278,32 +1306,57 @@ private class NameResolverListenerImpl implements NameResolver.Listener {
}

@Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes attrs) {
final class NamesResolved implements Runnable {

@SuppressWarnings("ReferenceEquality")
@Override
public void run() {
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, config);
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, attrs);

if (haveBackends == null || !haveBackends) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
haveBackends = true;
}
final Map<String, ?> serviceConfig =
config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null && !serviceConfig.equals(lastServiceConfig)) {
channelLogger.log(ChannelLogLevel.INFO, "Service config changed");
lastServiceConfig = serviceConfig;
}

nameResolverBackoffPolicy = null;

if (serviceConfig != null) {
try {
serviceConfigInterceptor.handleUpdate(serviceConfig);
if (retryEnabled) {
throttle = ServiceConfigUtil.getThrottlePolicy(serviceConfig);
// Assuming no error in config resolution for now.
final Map<String, ?> serviceConfig =
attrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
Map<String, ?> effectiveServiceConfig;
if (!lookUpServiceConfig) {
if (serviceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Service config from name resolver discarded by channel settings");
}
effectiveServiceConfig = defaultServiceConfig;
} else {
// Try to use config if returned from name resolver
// Otherwise, try to use the default config if available
if (serviceConfig != null) {
effectiveServiceConfig = serviceConfig;
} else {
effectiveServiceConfig = defaultServiceConfig;
if (defaultServiceConfig != null) {
channelLogger.log(
ChannelLogLevel.INFO,
"Received no service config, using default service config");
}
}

// FIXME(notcarl): reference equality is not right (although not harmful) right now.
// Name resolver should return the same config if txt record is the same
if (effectiveServiceConfig != lastServiceConfig) {
channelLogger.log(ChannelLogLevel.INFO,
"Service config changed{0}", effectiveServiceConfig == null ? " to null" : "");
lastServiceConfig = effectiveServiceConfig;
}

try {
handleServiceConfigUpdate();
} catch (RuntimeException re) {
logger.log(
Level.WARNING,
Expand All @@ -1318,7 +1371,13 @@ public void run() {
handleErrorInSyncContext(Status.UNAVAILABLE.withDescription(
"Name resolver " + resolver + " returned an empty list"));
} else {
helper.lb.handleResolvedAddressGroups(servers, config);
Attributes effectiveAttrs = attrs;
if (effectiveServiceConfig != serviceConfig) {
effectiveAttrs = attrs.toBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, effectiveServiceConfig)
.build();
}
helper.lb.handleResolvedAddressGroups(servers, effectiveAttrs);
}
}
}
Expand Down
Loading

0 comments on commit 1735adc

Please sign in to comment.