Skip to content

Commit

Permalink
feat: Support configuration of service class name aliases (#12)
Browse files Browse the repository at this point in the history
feat: Support configuration of service class name aliases

Motivation

Litelinks performs validation that the thrift service class name matches when a client discovers a given service and when servers join an existing cluster. This is to ensure that the service interface matches. However, it also means there is no way to change the name of the service interface definition and upgrade existing clusters in-place because the new servers will refuse to join the cluster.

Modifications

- Introduce a new environment variable that can be used to define aliases for the service class names, so that the old name will be interpreted as the new one. This can then be added to existing clients and new server builds.
- Rename the ThriftConnProp class to more appropriate "ServiceProperties"
- New unit test which makes use of a new dummy thrift service definition

Result

It is possible to change litelinks service class names and upgrade existing clusters in a non-disruptive manner.
  • Loading branch information
njhill authored Feb 8, 2022
1 parent 1966f15 commit 8e3c6c7
Show file tree
Hide file tree
Showing 12 changed files with 2,865 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@
public static final String ZOOKEEPER_CONNECT_STRING = ZookeeperClient.ZK_CONN_STRING_ENV_VAR;

public static final String PRIVATE_DOMAIN_ID = "LL_PRIVATE_DOMAIN_ID"; //beta

/**
* @see LitelinksSystemPropNames#SERVICE_CLASS_ALIASES
*/
public static final String SERVICE_CLASS_ALIASES = "LL_SERVICE_CLASS_ALIASES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,11 @@
* returns.
*/
public static final String POOLED_BYTEBUFFERS = "litelinks.produce_pooled_bytebufs";

/**
* Can be used to declare aliases of service class names, to use when validating
* that the server interface is compatible by clients and other servers joining a cluster.
* Should be of the form "f.q.ClassNameA=f.q.ClassNameB,f.q.ClassNameC=f.q.ClassNameD,...".
*/
public static final String SERVICE_CLASS_ALIASES = "litelinks.svc_class_aliases";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
*/
package com.ibm.watson.litelinks;

import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;

public abstract class ThriftConnProp {
public final class ServiceProperties {
private static final Logger logger = LoggerFactory.getLogger(ServiceProperties.class);

private ThriftConnProp() {}
private ServiceProperties() {}

// Thrift-specific
public static final String TR_PROTO_FACTORY = "transport.tprotocol.factory";
public static final String TR_FRAMED = "transport.framed";
// Other connection properties
public static final String TR_SSL = "transport.ssl.enabled";
public static final String TR_SSL_PROTOCOL = "transport.ssl.protocol";
public static final String TR_EXTRA_INFO = "transport.extrainfo_supported";
Expand All @@ -39,6 +45,35 @@ private ThriftConnProp() {}
public static final String METH_INFO_PREFIX = "methodinfo.";
public static final String APP_METADATA_PREFIX = "app.";

public static final Map<String, String> SERVICE_CLASS_ALIASES;

static {
String serviceClassAliasConfig = System.getProperty(LitelinksSystemPropNames.SERVICE_CLASS_ALIASES);
if (serviceClassAliasConfig == null) {
serviceClassAliasConfig = System.getenv(LitelinksEnvVariableNames.SERVICE_CLASS_ALIASES);
}
ImmutableMap.Builder<String, String> builder = null;
if (serviceClassAliasConfig != null) {
for (String entry : serviceClassAliasConfig.split(",")) {
String[] values = entry.split("=");
if (values.length != 2) {
throw new RuntimeException("Invalid value for " + LitelinksEnvVariableNames.SERVICE_CLASS_ALIASES
+ " env var: " + serviceClassAliasConfig);
}
if (builder == null) {
builder = ImmutableMap.builder();
}
builder.put(values[0], values[1]);
}
}
SERVICE_CLASS_ALIASES = builder != null ? builder.build() : Collections.emptyMap();
if (SERVICE_CLASS_ALIASES != null) {
for (Entry<String, String> ent : SERVICE_CLASS_ALIASES.entrySet()) {
logger.info("Configured litelinks service class name alias: " + ent.getKey() + " -> " + ent.getValue());
}
}
}

public static void log(Logger logger, Map<Object, Object> props) {
if (props != null && !props.isEmpty()) {
for (Entry<Object, Object> ent : props.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.ibm.watson.litelinks.ThriftConnProp;
import com.ibm.watson.litelinks.ServiceProperties;
import org.apache.thrift.protocol.TProtocolFactory;

import java.util.HashMap;
Expand Down Expand Up @@ -115,17 +115,17 @@ protected ServiceRegistryClient getRegistryClient() {
public void start(long timeoutMillis) throws Exception {
Map<Object, Object> props = new HashMap<>();
if (protoFactory != null) {
props.put(ThriftConnProp.TR_PROTO_FACTORY,
props.put(ServiceProperties.TR_PROTO_FACTORY,
protoFactory.getName());
}
if (ssl != null) {
props.put(ThriftConnProp.TR_SSL, ssl.toString());
props.put(ServiceProperties.TR_SSL, ssl.toString());
}
if (framed != null) {
props.put(ThriftConnProp.TR_FRAMED, framed.toString());
props.put(ServiceProperties.TR_FRAMED, framed.toString());
}
if (extraInfo != null) {
props.put(ThriftConnProp.TR_EXTRA_INFO, extraInfo.toString());
props.put(ServiceProperties.TR_EXTRA_INFO, extraInfo.toString());
}
String id = hostname + ":" + port + "/" + serviceName;
getListener().serverAdded(hostname, port, System.currentTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.ibm.watson.litelinks.NettyTTransport;
import com.ibm.watson.litelinks.SSLHelper;
import com.ibm.watson.litelinks.SSLHelper.SSLParams;
import com.ibm.watson.litelinks.ThriftConnProp;
import com.ibm.watson.litelinks.ServiceProperties;
import com.ibm.watson.litelinks.client.LitelinksServiceClient.ServiceInstanceInfo;
import com.ibm.watson.litelinks.client.ServiceInstance.ServiceInstanceConfig;
import com.ibm.watson.litelinks.client.ServiceInstanceCache.Balancers;
Expand Down Expand Up @@ -147,7 +147,7 @@ public TServiceClientManager<?> load(ServiceKey key) throws Exception {
String serviceClassName, serviceName;
if (key.mplexerName != null) {
serviceName = key.mplexerName;
serviceClassName = ThriftConnProp.MULTIPLEX_CLASS;
serviceClassName = ServiceProperties.MULTIPLEX_CLASS;
factory = new TMultiplexClientFactory<>(factory, key.name);
} else {
serviceName = key.name;
Expand Down Expand Up @@ -222,7 +222,7 @@ public static <C extends TServiceClient> TServiceClientManager<C> get(final Serv
this.serviceKey = svcKey;
this.serviceName = serviceName;
this.serviceClassName = serviceClassName;
this.serviceIface = serviceClassName != null && !ThriftConnProp.MULTIPLEX_CLASS.equals(serviceClassName)
this.serviceIface = serviceClassName != null && !ServiceProperties.MULTIPLEX_CLASS.equals(serviceClassName)
? getIfaceFromSvcClass(Class.forName(serviceClassName)) : null;
this.ServiceUnavailableException = LitelinksExceptions.eraseStackTrace(
new ServiceUnavailableException(serviceName));
Expand Down Expand Up @@ -440,23 +440,34 @@ private static long cutoff(long size) {
return System.currentTimeMillis() - 691200000l / size;
}

private static final int MD_PREFIX_LEN = ThriftConnProp.APP_METADATA_PREFIX.length();
private static final int MI_PREFIX_LEN = ThriftConnProp.METH_INFO_PREFIX.length();
private static final int MD_PREFIX_LEN = ServiceProperties.APP_METADATA_PREFIX.length();
private static final int MI_PREFIX_LEN = ServiceProperties.METH_INFO_PREFIX.length();

private boolean verifyServiceClass(String otherScName) {
if (serviceClassName == null || serviceClassName.equals(otherScName)) {
return true;
}
Class<?> otherIface = null;
if (serviceIface != null) {
try {
otherIface = getIfaceFromSvcClass(Class.forName(otherScName));
} catch (ClassNotFoundException cnfe) {}
}
return otherIface != null && serviceIface.isAssignableFrom(otherIface);
}

@Override
public ServiceInstanceConfig<C> getInstanceConfig(String hostname, int port,
long registrationTime, String version, Map<Object, Object> connConfig) throws Exception {
final String sc = (String) connConfig.get(ThriftConnProp.SERVICE_CLASS);
if (sc != null && serviceClassName != null && !sc.equals(serviceClassName)) {
Class<?> otherIface = null;
if (serviceIface != null) {
try {
otherIface = getIfaceFromSvcClass(Class.forName(sc));
} catch (ClassNotFoundException cnfe) {}
}
if (otherIface == null || !serviceIface.isAssignableFrom(otherIface)) {
throw new Exception("service class mismatch: expecting " + serviceClassName
+ " but server is " + sc); //TODO maybe custom exception type
String sc = (String) connConfig.get(ServiceProperties.SERVICE_CLASS);
if (sc != null) {
if (!verifyServiceClass(sc)) {
// Also check for service class name alias if configured
String aliasScName = ServiceProperties.SERVICE_CLASS_ALIASES.get(sc);
if (aliasScName == null || !verifyServiceClass(aliasScName)) {
throw new Exception("service class mismatch: expecting " + serviceClassName
+ " but server is " + sc); //TODO maybe custom exception type
}
}
}
if (hostname == null) {
Expand All @@ -469,7 +480,7 @@ public ServiceInstanceConfig<C> getInstanceConfig(String hostname, int port,
ImmutableMap.Builder<String, String> metaImb = null;
ImmutableMap.Builder<String, MethodInfo> miImb = null;
if (connConfig != null) {
String val = (String) connConfig.remove(ThriftConnProp.METH_INFO_PREFIX + MethodInfo.DEFAULT);
String val = (String) connConfig.remove(ServiceProperties.METH_INFO_PREFIX + MethodInfo.DEFAULT);
if (serviceIface != null && val != null) {
try {
(miImb = ImmutableMap.builder()).put(MethodInfo.DEFAULT, MethodInfo.deserialize(val));
Expand All @@ -484,13 +495,13 @@ public ServiceInstanceConfig<C> getInstanceConfig(String hostname, int port,
}
String key = ent.getKey().toString();
val = ent.getValue().toString();
if (key.startsWith(ThriftConnProp.APP_METADATA_PREFIX)) {
if (key.startsWith(ServiceProperties.APP_METADATA_PREFIX)) {
it.remove(); // remove from connConfig
if (metaImb == null) {
metaImb = ImmutableMap.builder();
}
metaImb.put(key.substring(MD_PREFIX_LEN), val);
} else if (key.startsWith(ThriftConnProp.METH_INFO_PREFIX)) {
} else if (key.startsWith(ServiceProperties.METH_INFO_PREFIX)) {
it.remove();
if (serviceIface != null) {
String methName = key.substring(MI_PREFIX_LEN);
Expand Down Expand Up @@ -657,14 +668,14 @@ public ThriftInstanceConfig(String host, int port, String version,
Map<String, String> metadata, Map<String, MethodInfo> methodInfos) throws Exception {
super(version, registrationTime, metadata, methodInfos);
// default is framed=false
this.framed = "true".equals(connConfig.get(ThriftConnProp.TR_FRAMED));
this.framed = "true".equals(connConfig.get(ServiceProperties.TR_FRAMED));
this.protoFactory = getServiceProtocolFactory(connConfig);

this.extraInfoSupported = "true".equals(connConfig.get(ThriftConnProp.TR_EXTRA_INFO));
this.extraInfoSupported = "true".equals(connConfig.get(ServiceProperties.TR_EXTRA_INFO));

final boolean ssl = "true".equals(connConfig.get(ThriftConnProp.TR_SSL));
final boolean ssl = "true".equals(connConfig.get(ServiceProperties.TR_SSL));
if (ssl) {
String protocol = (String) connConfig.get(ThriftConnProp.TR_SSL_PROTOCOL);
String protocol = (String) connConfig.get(ServiceProperties.TR_SSL_PROTOCOL);
this.sslProtocol = protocol != null ? protocol : SSLParams.getDefault().protocol;
this.sslContext = SSLHelper.getSslContext(sslProtocol, false, false);
}
Expand All @@ -674,7 +685,7 @@ public ThriftInstanceConfig(String host, int port, String version,
}

if (usePrivateEndpoints()) {
String privateEndpoint = (String) connConfig.get(ThriftConnProp.PRIVATE_ENDPOINT);
String privateEndpoint = (String) connConfig.get(ServiceProperties.PRIVATE_ENDPOINT);
if (privateEndpoint != null) {
Matcher m = PRIV_ENDPOINT_PATT.matcher(privateEndpoint);
if (!m.matches()) {
Expand Down Expand Up @@ -750,7 +761,7 @@ public boolean equals(Object obj) {

static TProtocolFactory getServiceProtocolFactory(Map<Object, Object> props) {
TProtocolFactory tpf = DEFAULT_TPROTOFAC;
String tp = (String) props.get(ThriftConnProp.TR_PROTO_FACTORY);
String tp = (String) props.get(ServiceProperties.TR_PROTO_FACTORY);
if (tp != null) {
try {
Class<?> tpc = Class.forName(tp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.ibm.watson.litelinks.ThreadPoolHelper;
import com.ibm.watson.litelinks.ThriftConnProp;
import com.ibm.watson.litelinks.ServiceProperties;
import com.ibm.watson.litelinks.server.ZookeeperWatchedService;
import com.ibm.watson.zk.ZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -189,7 +189,7 @@ private void processDataChange(ChildData cdata) throws Exception {
currentConfigMxid = newStat.getMzxid();
if (logger.isDebugEnabled()) {
logger.debug("New client config from ZK for service " + serviceName + ":");
ThriftConnProp.log(logger, props);
ServiceProperties.log(logger, props);
}
}
if (!childCacheStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.ibm.etcd.client.EtcdClient;
import com.ibm.etcd.client.config.EtcdClusterConfig;
import com.ibm.etcd.client.utils.PersistentLeaseKey;
import com.ibm.watson.litelinks.ThriftConnProp;
import com.ibm.watson.litelinks.ServiceProperties;
import com.ibm.watson.litelinks.server.ConfiguredService;
import com.ibm.watson.litelinks.server.WatchedService;
import org.slf4j.Logger;
Expand Down Expand Up @@ -147,7 +147,7 @@ protected void registerAsync() throws Exception {
}
if (privateEndpoint != null) {
config = new HashMap<>(config);
config.put(ThriftConnProp.PRIVATE_ENDPOINT, privateEndpoint);
config.put(ServiceProperties.PRIVATE_ENDPOINT, privateEndpoint);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static com.ibm.watson.litelinks.ThriftConnProp.*;
import static com.ibm.watson.litelinks.ServiceProperties.*;
import static com.ibm.watson.litelinks.server.AdapterThriftService.getIfaceFromSvcClass;

public class DefaultThriftServer extends AbstractService
Expand Down Expand Up @@ -306,21 +306,30 @@ else if (implClass != null) {
}
}

private boolean verifyServiceClass(String otherScName) {
final Class<?> sc = tp.getClass().getDeclaringClass();
if (sc == null || otherScName.equals(sc.getName())) {
return true;
}
Class<?> serviceIface = getIfaceFromSvcClass(sc), otherIface = null;
if (serviceIface != null) {
try {
otherIface = getIfaceFromSvcClass(Class.forName(otherScName));
} catch (ClassNotFoundException cnfe) {}
}
return otherIface != null && otherIface.isAssignableFrom(serviceIface);
}

@Override
public void verifyConfig(Map<String, String> other) throws ConfigMismatchException {
TProcessor tp = this.tp;
if (tp != null) {
Object otherScName = other.get(SERVICE_CLASS);
String otherScName = other.get(SERVICE_CLASS);
if (otherScName != null) {
Class<?> sc = tp.getClass().getDeclaringClass();
if (otherScName != null && sc != null && !otherScName.equals(sc.getName())) {
Class<?> serviceIface = getIfaceFromSvcClass(sc), otherIface = null;
if (serviceIface != null) {
try {
otherIface = getIfaceFromSvcClass(Class.forName((String) otherScName));
} catch (ClassNotFoundException cnfe) {}
}
if (otherIface == null || !otherIface.isAssignableFrom(serviceIface)) {
if (!verifyServiceClass(otherScName)) {
// Also check for service class name alias if configured
String aliasScName = SERVICE_CLASS_ALIASES.get(otherScName);
if (aliasScName == null || !verifyServiceClass(aliasScName)) {
throw new ConfigMismatchException("service interface mismatch");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.common.primitives.Bytes;
import com.google.common.util.concurrent.Service;
import com.ibm.watson.litelinks.ThriftConnProp;
import com.ibm.watson.litelinks.ServiceProperties;
import com.ibm.watson.litelinks.server.ConfiguredService.ConfigMismatchException;
import com.ibm.watson.zk.ZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -221,7 +221,7 @@ protected void registerAsync() throws Exception {
byte[] versData = version == null ? EMPTY
: (SERVICE_VERSION + "=" + version + "\n").getBytes(StandardCharsets.ISO_8859_1);
byte[] privEndpointData = privateEndpoint == null ? EMPTY
: (ThriftConnProp.PRIVATE_ENDPOINT + "=" + privateEndpoint + "\n").getBytes(StandardCharsets.ISO_8859_1);
: (ServiceProperties.PRIVATE_ENDPOINT + "=" + privateEndpoint + "\n").getBytes(StandardCharsets.ISO_8859_1);
byte[] instanceData = Bytes.concat(headerData, DELIM_BYTES, iidData, versData, privEndpointData, config);

logger.info("creating service ephemeral znode...");
Expand Down
Loading

0 comments on commit 8e3c6c7

Please sign in to comment.