Skip to content

Commit

Permalink
Optimize some code related to consul (#7523)
Browse files Browse the repository at this point in the history
* fix #7518 Optimize some code related to consul

* Optimize some code related to consul

* Optimize some code related to consul

* Optimize some code related to consul

* Optimize some code related to consul
  • Loading branch information
xiaoheng1 authored Apr 12, 2021
1 parent d5b469d commit 82ff056
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,6 @@ public interface CommonConstants {
String ON_CONNECT_KEY = "onconnect";

String ON_DISCONNECT_KEY = "ondisconnect";

String TOKEN = "token";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.common.constants;

/**
* Common configuration for configCenter, metadata, and registry modules
*/
public interface ConsulConstants {

int DEFAULT_PORT = 8500;

int DEFAULT_WATCH_TIMEOUT = 60 * 1000;

String WATCH_TIMEOUT = "consul-watch-timeout";

int INVALID_PORT = 0;


}
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@
import java.util.concurrent.ConcurrentMap;

import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.TOKEN;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
import static org.apache.dubbo.common.constants.ConsulConstants.WATCH_TIMEOUT;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_PORT;
import static org.apache.dubbo.common.constants.ConsulConstants.INVALID_PORT;

/**
* config center implementation for consul
*/
public class ConsulDynamicConfiguration extends TreePathDynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class);

private static final int DEFAULT_PORT = 8500;
private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
private static final String WATCH_TIMEOUT = "consul-watch-timeout";

private final Consul client;

private final KeyValueClient kvClient;
Expand All @@ -68,10 +69,10 @@ public ConsulDynamicConfiguration(URL url) {
super(url);
watchTimeout = url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
int port = INVALID_PORT != url.getPort() ? url.getPort() : DEFAULT_PORT;
Consul.Builder builder = Consul.builder()
.withHostAndPort(HostAndPort.fromParts(host, port));
String token = url.getParameter("token", (String) null);
String token = url.getParameter(TOKEN, (String) null);
if (StringUtils.isNotEmpty(token)) {
builder.withAclToken(token);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@
import java.util.Collections;
import java.util.List;

import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_PORT;
import static org.apache.dubbo.common.constants.ConsulConstants.INVALID_PORT;

/**
* metadata report impl for consul
*/
public class ConsulMetadataReport extends AbstractMetadataReport {
private static final int DEFAULT_PORT = 8500;

private ConsulClient client;

public ConsulMetadataReport(URL url) {
super(url);

String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
int port = INVALID_PORT != url.getPort() ? url.getPort() : DEFAULT_PORT;
client = new ConsulClient(host, port);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,13 @@ public class AbstractConsulRegistry {

static final String SERVICE_TAG = "dubbo";
static final String URL_META_KEY = "url";
static final String WATCH_TIMEOUT = "consul-watch-timeout";
static final String CHECK_PASS_INTERVAL = "consul-check-pass-interval";
static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after";

static final int DEFAULT_PORT = 8500;
// default watch timeout in millisecond
static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000;
// default time-to-live in millisecond
static final long DEFAULT_CHECK_PASS_INTERVAL = 16000L;
// default deregister critical server after
static final String DEFAULT_DEREGISTER_TIME = "20s";


static final int PERIOD_DENOMINATOR = 8;
static final int ONE_THOUSAND = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,22 @@

import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_PORT;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
import static org.apache.dubbo.common.constants.ConsulConstants.INVALID_PORT;
import static org.apache.dubbo.common.constants.ConsulConstants.WATCH_TIMEOUT;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_PORT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_WATCH_TIMEOUT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.SERVICE_TAG;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.URL_META_KEY;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.WATCH_TIMEOUT;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;

/**
Expand All @@ -84,17 +87,22 @@ public class ConsulRegistry extends FailbackRegistry {
*/
private String token;

private static final int CONSUL_CORE_THREAD_SIZE = 1;

private static final int DEFAULT_INDEX = -1;
private static final int DEFAULT_WAIT_TIME = -1;


public ConsulRegistry(URL url) {
super(url);
token = url.getParameter(TOKEN_KEY, (String) null);
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
int port = INVALID_PORT != url.getPort() ? url.getPort() : DEFAULT_PORT;
client = new ConsulClient(host, port);
checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
ttlConsulCheckExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Ttl-Consul-Check-Executor", true));
ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, checkPassInterval / 8,
checkPassInterval / 8, TimeUnit.MILLISECONDS);
ttlConsulCheckExecutor = new ScheduledThreadPoolExecutor(CONSUL_CORE_THREAD_SIZE, new NamedThreadFactory("Ttl-Consul-Check-Executor", true));
ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, checkPassInterval / PERIOD_DENOMINATOR,
checkPassInterval / PERIOD_DENOMINATOR, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -147,13 +155,13 @@ public void doSubscribe(URL url, NotifyListener listener) {
Long index;
List<URL> urls;
if (ANY_VALUE.equals(url.getServiceInterface())) {
Response<Map<String, List<String>>> response = getAllServices(-1, buildWatchTimeout(url));
Response<Map<String, List<String>>> response = getAllServices(DEFAULT_INDEX, buildWatchTimeout(url));
index = response.getConsulIndex();
List<HealthService> services = getHealthServices(response.getValue());
urls = convert(services, url);
} else {
String service = url.getServiceInterface();
Response<List<HealthService>> response = getHealthServices(service, -1, buildWatchTimeout(url));
Response<List<HealthService>> response = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
index = response.getConsulIndex();
urls = convert(response.getValue(), url);
}
Expand Down Expand Up @@ -185,7 +193,7 @@ public List<URL> lookup(URL url) {
}
try {
String service = url.getServiceKey();
Response<List<HealthService>> result = getHealthServices(service, -1, buildWatchTimeout(url));
Response<List<HealthService>> result = getHealthServices(service, DEFAULT_INDEX, buildWatchTimeout(url));
if (result == null || result.getValue() == null || result.getValue().isEmpty()) {
return new ArrayList<>();
} else {
Expand Down Expand Up @@ -247,7 +255,7 @@ private Response<Map<String, List<String>>> getAllServices(long index, int watch
private List<HealthService> getHealthServices(Map<String, List<String>> services) {
return services.entrySet().stream()
.filter(s -> s.getValue().contains(SERVICE_TAG))
.map(s -> getHealthServices(s.getKey(), -1, -1).getValue())
.map(s -> getHealthServices(s.getKey(), DEFAULT_INDEX, DEFAULT_WAIT_TIME).getValue())
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -315,13 +323,13 @@ private String buildId(URL url) {

private NewService.Check buildCheck(URL url) {
NewService.Check check = new NewService.Check();
check.setTtl((checkPassInterval / 1000) + "s");
check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME));
return check;
}

private int buildWatchTimeout(URL url) {
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
}

private class ConsulNotifier implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_PORT;
import static org.apache.dubbo.common.constants.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
import static org.apache.dubbo.common.constants.ConsulConstants.WATCH_TIMEOUT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_PORT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_WATCH_TIMEOUT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEREGISTER_AFTER;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.WATCH_TIMEOUT;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.ONE_THOUSAND;
import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.PERIOD_DENOMINATOR;
import static org.apache.dubbo.registry.consul.ConsulParameter.ACL_TOKEN;
import static org.apache.dubbo.registry.consul.ConsulParameter.CONSISTENCY_MODE;
import static org.apache.dubbo.registry.consul.ConsulParameter.DEFAULT_ZONE_METADATA_NAME;
import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_GROUP;
import static org.apache.dubbo.registry.consul.ConsulParameter.INSTANCE_ZONE;
import static org.apache.dubbo.registry.consul.ConsulParameter.TAGS;
import static org.apache.dubbo.common.constants.ConsulConstants.INVALID_PORT;

/**
* 2019-07-31
Expand Down Expand Up @@ -120,7 +123,7 @@ public void onEvent(ServiceInstancesChangedEvent event) {
public void initialize(URL registryURL) throws Exception {
this.url = registryURL;
String host = url.getHost();
int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
int port = INVALID_PORT != url.getPort() ? url.getPort() : DEFAULT_PORT;
checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
client = new ConsulClient(host, port);
ttlScheduler = new TtlScheduler(checkPassInterval, client);
Expand Down Expand Up @@ -195,7 +198,8 @@ public void doRegister(ServiceInstance serviceInstance) {
}

@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
Set<String> serviceNames = listener.getServiceNames();
for (String serviceName : serviceNames) {
ConsulNotifier notifier = notifiers.get(serviceName);
Expand Down Expand Up @@ -366,14 +370,14 @@ private Map<String, String> decodeMetadata(Map<String, String> metadata) {

private NewService.Check buildCheck(ServiceInstance serviceInstance) {
NewService.Check check = new NewService.Check();
check.setTtl((checkPassInterval / 1000) + "s");
check.setTtl((checkPassInterval / ONE_THOUSAND) + "s");
String deregister = serviceInstance.getMetadata().get(DEREGISTER_AFTER);
check.setDeregisterCriticalServiceAfter(deregister == null ? DEFAULT_DEREGISTER_TIME : deregister);
return check;
}

private int buildWatchTimeout() {
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / ONE_THOUSAND;
}

private class ConsulNotifier implements Runnable {
Expand Down Expand Up @@ -435,8 +439,8 @@ public TtlScheduler(long checkInterval, ConsulClient client) {
public void add(String instanceId) {
ScheduledFuture task = this.scheduler.scheduleAtFixedRate(
new ConsulHeartbeatTask(instanceId),
checkInterval / 8,
checkInterval / 8,
checkInterval / PERIOD_DENOMINATOR,
checkInterval / PERIOD_DENOMINATOR,
TimeUnit.MILLISECONDS);
ScheduledFuture previousTask = this.serviceHeartbeats.put(instanceId, task);
if (previousTask != null) {
Expand Down

0 comments on commit 82ff056

Please sign in to comment.