Skip to content

Commit 9f2744f

Browse files
committed
Add XDS retry and circuit-breaking functionality to the flow control plugin
Signed-off-by: hanbingleixue <[email protected]>
1 parent 723b3e4 commit 9f2744f

31 files changed

+2135
-109
lines changed
+17-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
# FlowControl configuration
22
flow.control.plugin:
3-
useCseRule: true # whether to configure cse rules
4-
enable-start-monitor: false # whether to enable indicator monitoring
5-
enable-system-adaptive: false # whether to enable system adaptive flow control
6-
enable-system-rule: false # whether to enable system rule flow control
3+
# whether to configure cse rules
4+
useCseRule: true
5+
# whether to enable indicator monitoring
6+
enable-start-monitor: false
7+
# whether to enable system adaptive flow control
8+
enable-system-adaptive: false
9+
# whether to enable system rule flow control
10+
enable-system-rule: false
11+
xds.flow.control.config:
12+
# Whether to enable Xds flow control
13+
enable: false
14+
# whether to use secure protocol to invoke spring cloud downstream service with xds route, example: http or https
15+
enabled-springcloud-xds-route-secure: false
16+
retry:
17+
retryHostPredicate: PreviousHostsPredicate
18+
x-sermant-retriable-status-codes:
19+
x-sermant-retriable-header-names:

Diff for: sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/pom.xml

+35
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
<netflix-core.version>1.4.7.RELEASE</netflix-core.version>
2828
<spring.cloud.context.version>2.2.0.RELEASE</spring.cloud.context.version>
2929
<google.guava>31.1-jre</google.guava>
30+
<ribbon.version>2.2.5</ribbon.version>
31+
<apache-httpclient.version>4.5.13</apache-httpclient.version>
32+
<http.client.async.verion>4.1.5</http.client.async.verion>
33+
<okhttp.version>4.11.0</okhttp.version>
34+
<okhttp.sq.version>2.7.5</okhttp.sq.version>
3035
</properties>
3136
<dependencies>
3237
<!--compile-->
@@ -99,6 +104,12 @@
99104
<version>${netflix-core.version}</version>
100105
<scope>provided</scope>
101106
</dependency>
107+
<dependency>
108+
<groupId>com.netflix.ribbon</groupId>
109+
<artifactId>ribbon-loadbalancer</artifactId>
110+
<version>${ribbon.version}</version>
111+
<scope>provided</scope>
112+
</dependency>
102113
<dependency>
103114
<groupId>jakarta.annotation</groupId>
104115
<artifactId>jakarta.annotation-api</artifactId>
@@ -111,6 +122,24 @@
111122
<version>${google.guava}</version>
112123
<scope>provided</scope>
113124
</dependency>
125+
<dependency>
126+
<groupId>org.apache.httpcomponents</groupId>
127+
<artifactId>httpclient</artifactId>
128+
<version>${apache-httpclient.version}</version>
129+
<scope>provided</scope>
130+
</dependency>
131+
<dependency>
132+
<groupId>com.squareup.okhttp3</groupId>
133+
<artifactId>okhttp</artifactId>
134+
<version>${okhttp.version}</version>
135+
<scope>provided</scope>
136+
</dependency>
137+
<dependency>
138+
<groupId>com.squareup.okhttp</groupId>
139+
<artifactId>okhttp</artifactId>
140+
<version>${okhttp.sq.version}</version>
141+
<scope>provided</scope>
142+
</dependency>
114143
<!--test-->
115144
<dependency>
116145
<groupId>junit</groupId>
@@ -136,6 +165,12 @@
136165
<version>${project.version}</version>
137166
<scope>test</scope>
138167
</dependency>
168+
<dependency>
169+
<groupId>org.apache.httpcomponents</groupId>
170+
<artifactId>httpasyncclient</artifactId>
171+
<version>${http.client.async.verion}</version>
172+
<scope>provided</scope>
173+
</dependency>
139174
</dependencies>
140175
<build>
141176
<plugins>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.sermant.flowcontrol;
18+
19+
import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
20+
import io.sermant.core.plugin.config.PluginConfigManager;
21+
import io.sermant.flowcontrol.common.config.XdsFlowControlConfig;
22+
23+
/**
24+
* okhttp request declarer
25+
*
26+
* @author zhp
27+
* @since 2024-11-30
28+
*/
29+
public abstract class AbstractXdsDeclarer extends AbstractPluginDeclarer {
30+
private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class);
31+
32+
@Override
33+
public boolean isEnabled() {
34+
return config.isEnable();
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.sermant.flowcontrol;
18+
19+
import io.sermant.core.plugin.agent.entity.ExecuteContext;
20+
import io.sermant.core.service.xds.entity.ServiceInstance;
21+
import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers;
22+
import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers;
23+
import io.sermant.core.utils.CollectionUtils;
24+
import io.sermant.flowcontrol.common.config.CommonConst;
25+
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
26+
import io.sermant.flowcontrol.common.handler.retry.RetryContext;
27+
import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy;
28+
import io.sermant.flowcontrol.common.util.StringUtils;
29+
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
30+
import io.sermant.flowcontrol.common.xds.circuit.CircuitBreakerManager;
31+
import io.sermant.flowcontrol.common.xds.handler.XdsFlowControlHandler;
32+
import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancer;
33+
import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancerFactory;
34+
import io.sermant.flowcontrol.service.InterceptorSupporter;
35+
36+
import java.util.ArrayList;
37+
import java.util.Collections;
38+
import java.util.HashSet;
39+
import java.util.List;
40+
import java.util.Optional;
41+
import java.util.Set;
42+
43+
/**
44+
* Enhance the client request sending functionality by performing Xds service instance discovery and circuit breaking
45+
* during the request sending process
46+
*
47+
* @author zhp
48+
* @since 2024-11-30
49+
*/
50+
public abstract class AbstractXdsHttpClientInterceptor extends InterceptorSupporter {
51+
protected static final String MESSAGE = "CircuitBreaker has forced open and deny any requests";
52+
53+
private static final int MIN_SUCCESS_CODE = 200;
54+
55+
private static final int MAX_SUCCESS_CODE = 399;
56+
57+
private static final int HUNDRED = 100;
58+
59+
/**
60+
* Perform circuit breaker judgment and handling
61+
*
62+
* @return The result of whether circuit breaking is needed
63+
*/
64+
public boolean needsCircuitBreak() {
65+
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
66+
if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName())
67+
|| StringUtils.isEmpty(scenarioInfo.getServiceName())
68+
|| StringUtils.isEmpty(scenarioInfo.getAddress())) {
69+
return false;
70+
}
71+
Optional<XdsRequestCircuitBreakers> circuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
72+
getRequestCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
73+
if (!circuitBreakersOptional.isPresent()) {
74+
return false;
75+
}
76+
int activeRequestNum = CircuitBreakerManager.incrementActiveRequests(scenarioInfo.getServiceName(),
77+
scenarioInfo.getClusterName(), scenarioInfo.getAddress());
78+
int maxRequest = circuitBreakersOptional.get().getMaxRequests();
79+
return maxRequest != 0 && activeRequestNum > maxRequest;
80+
}
81+
82+
@Override
83+
public ExecuteContext doAfter(ExecuteContext context) {
84+
XdsThreadLocalUtil.removeSendByteFlag();
85+
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
86+
if (context.getThrowable() != null || scenarioInfo == null) {
87+
return context;
88+
}
89+
decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo);
90+
return context;
91+
}
92+
93+
private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext context,
94+
FlowControlScenario scenarioInfo) {
95+
CircuitBreakerManager.decrementActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getServiceName(),
96+
scenarioInfo.getAddress());
97+
int statusCode = getStatusCode(context);
98+
if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) {
99+
return;
100+
}
101+
countFailedRequests(scenarioInfo, statusCode);
102+
}
103+
104+
@Override
105+
public ExecuteContext doThrow(ExecuteContext context) {
106+
XdsThreadLocalUtil.removeSendByteFlag();
107+
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
108+
if (scenarioInfo == null) {
109+
return context;
110+
}
111+
decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo);
112+
return context;
113+
}
114+
115+
/**
116+
* handler failure request
117+
*
118+
* @param statusCode response code
119+
* @param scenario scenario information
120+
*/
121+
private void countFailedRequests(FlowControlScenario scenario, int statusCode) {
122+
CircuitBreakerManager.decrementActiveRequests(scenario.getServiceName(), scenario.getClusterName(),
123+
scenario.getAddress());
124+
Optional<XdsInstanceCircuitBreakers> instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
125+
getInstanceCircuitBreakers(scenario.getServiceName(), scenario.getClusterName());
126+
if (!instanceCircuitBreakersOptional.isPresent()) {
127+
return;
128+
}
129+
XdsInstanceCircuitBreakers circuitBreakers = instanceCircuitBreakersOptional.get();
130+
CircuitBreakerManager.countFailureRequest(scenario, scenario.getAddress(), statusCode, circuitBreakers);
131+
}
132+
133+
/**
134+
* Get status code
135+
*
136+
* @param context The execution context of the Interceptor
137+
* @return response code
138+
*/
139+
protected abstract int getStatusCode(ExecuteContext context);
140+
141+
/**
142+
* choose serviceInstance by xds rule
143+
*
144+
* @return result
145+
*/
146+
protected Optional<ServiceInstance> chooseServiceInstanceForXds() {
147+
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
148+
if (scenarioInfo == null || io.sermant.core.utils.StringUtils.isBlank(scenarioInfo.getServiceName())
149+
|| io.sermant.core.utils.StringUtils.isEmpty(scenarioInfo.getClusterName())) {
150+
return Optional.empty();
151+
}
152+
Set<ServiceInstance> serviceInstanceSet = XdsFlowControlHandler.INSTANCE.
153+
getAllServerInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
154+
if (serviceInstanceSet.isEmpty()) {
155+
return Optional.empty();
156+
}
157+
boolean needRetry = RetryContext.INSTANCE.isPolicyNeedRetry();
158+
if (needRetry) {
159+
removeRetriedServiceInstance(serviceInstanceSet);
160+
}
161+
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
162+
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo));
163+
}
164+
165+
private void removeRetriedServiceInstance(Set<ServiceInstance> serviceInstanceSet) {
166+
RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy();
167+
retryPolicy.retryMark();
168+
List<Object> retriedInstance = retryPolicy.getAllRetriedInstance();
169+
Set<ServiceInstance> allInstance = new HashSet<>(serviceInstanceSet);
170+
for (Object retryInstance : retriedInstance) {
171+
if (retryInstance instanceof ServiceInstance) {
172+
serviceInstanceSet.remove(retryInstance);
173+
}
174+
}
175+
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
176+
serviceInstanceSet.addAll(allInstance);
177+
}
178+
}
179+
180+
private ServiceInstance chooseServiceInstanceByLoadBalancer(Set<ServiceInstance> instanceSet,
181+
FlowControlScenario scenarioInfo) {
182+
XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(),
183+
scenarioInfo.getClusterName());
184+
return loadBalancer.selectInstance(new ArrayList<>(instanceSet));
185+
}
186+
187+
private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set<ServiceInstance> instanceSet) {
188+
Optional<XdsInstanceCircuitBreakers> instanceCircuitBreakersOptional = XdsFlowControlHandler.INSTANCE.
189+
getInstanceCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName());
190+
if (!instanceCircuitBreakersOptional.isPresent()) {
191+
return;
192+
}
193+
XdsInstanceCircuitBreakers outlierDetection = instanceCircuitBreakersOptional.get();
194+
int count = instanceSet.size();
195+
if (checkMinInstanceNum(outlierDetection, count)) {
196+
return;
197+
}
198+
List<ServiceInstance> circuitBreakerInstances = new ArrayList<>();
199+
float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED;
200+
int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent);
201+
for (ServiceInstance serviceInstance : instanceSet) {
202+
if (hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) {
203+
break;
204+
}
205+
String address = serviceInstance.getHost() + CommonConst.CONNECT + serviceInstance.getPort();
206+
if (CircuitBreakerManager.needCircuitBreaker(scenarioInfo, address, outlierDetection)) {
207+
circuitBreakerInstances.add(serviceInstance);
208+
}
209+
}
210+
if (checkHealthInstanceNum(count, outlierDetection, circuitBreakerInstances.size())) {
211+
return;
212+
}
213+
circuitBreakerInstances.forEach(instanceSet::remove);
214+
}
215+
216+
private boolean hasReachedCircuitBreakerThreshold(List<ServiceInstance> circuitBreakerInstances,
217+
int maxCircuitBreakerInstances) {
218+
return circuitBreakerInstances.size() >= maxCircuitBreakerInstances;
219+
}
220+
221+
private boolean checkHealthInstanceNum(int count, XdsInstanceCircuitBreakers outlierDetection, int size) {
222+
return count * outlierDetection.getMinHealthPercent() >= (count - size);
223+
}
224+
225+
private boolean checkMinInstanceNum(XdsInstanceCircuitBreakers outlierDetection, int count) {
226+
return outlierDetection.getFailurePercentageMinimumHosts() > count;
227+
}
228+
229+
@Override
230+
protected boolean canInvoke(ExecuteContext context) {
231+
return true;
232+
}
233+
234+
/**
235+
* Get Retry Handler
236+
*
237+
* @return Retry Handlers
238+
*/
239+
protected List<io.github.resilience4j.retry.Retry> getRetryHandlers() {
240+
if (XdsThreadLocalUtil.getScenarioInfo() != null) {
241+
FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo();
242+
RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo);
243+
return getRetryHandler().getXdsHandlers(scenarioInfo);
244+
}
245+
return Collections.emptyList();
246+
}
247+
}

0 commit comments

Comments
 (0)