Skip to content

Commit e988593

Browse files
committed
[pinpoint-apm#9666] Applied maxAge for collector stream
1 parent b5ade4c commit e988593

File tree

3 files changed

+59
-22
lines changed

3 files changed

+59
-22
lines changed

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/AbstractRouteHandler.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@
2222
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransfer;
2323
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransferResponse;
2424
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;
25-
26-
import org.apache.logging.log4j.Logger;
2725
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
2827

2928
import java.util.ArrayList;
3029
import java.util.List;
@@ -47,7 +46,10 @@ protected ClusterPoint<?> findClusterPoint(TCommandTransfer deliveryCommand) {
4746
String agentId = deliveryCommand.getAgentId();
4847
long startTimeStamp = deliveryCommand.getStartTime();
4948
final ClusterKey sourceKey = new ClusterKey(applicationName, agentId, startTimeStamp);
49+
return findClusterPoint(sourceKey);
50+
}
5051

52+
public ClusterPoint<?> findClusterPoint(ClusterKey sourceKey) {
5153
List<ClusterPoint<?>> result = new ArrayList<>();
5254

5355
for (ClusterPoint<?> targetClusterPoint : targetClusterPointLocator.getClusterPointList()) {
@@ -62,7 +64,7 @@ protected ClusterPoint<?> findClusterPoint(TCommandTransfer deliveryCommand) {
6264
}
6365

6466
if (result.size() > 1) {
65-
logger.warn("Ambiguous ClusterPoint {}, {}, {} (Valid Agent list={}).", applicationName, agentId, startTimeStamp, result);
67+
logger.warn("Ambiguous ClusterPoint {} (Valid Agent list={}).", sourceKey, result);
6668
return null;
6769
}
6870

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamEvent.java

+27-14
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannel;
2020
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransfer;
21-
2221
import org.apache.thrift.TBase;
2322

2423
import java.util.Objects;
@@ -30,16 +29,27 @@ public class StreamEvent extends DefaultRouteEvent {
3029

3130
private final ServerStreamChannel serverStreamChannel;
3231
private final TBase<?, ?> requestObject;
32+
private final long maxAgeMs;
3333

3434
public StreamEvent(RouteEvent routeEvent, ServerStreamChannel serverStreamChannel, TBase<?, ?> requestObject) {
3535
this(routeEvent.getDeliveryCommand(), serverStreamChannel, requestObject);
3636
}
3737

3838
public StreamEvent(TCommandTransfer deliveryCommand, ServerStreamChannel serverStreamChannel, TBase<?, ?> requestObject) {
39+
this(deliveryCommand, serverStreamChannel, requestObject, Long.MAX_VALUE);
40+
}
41+
42+
public StreamEvent(
43+
TCommandTransfer deliveryCommand,
44+
ServerStreamChannel serverStreamChannel,
45+
TBase<?, ?> requestObject,
46+
long maxAgeMs
47+
) {
3948
super(deliveryCommand, serverStreamChannel.getRemoteAddress());
4049

4150
this.serverStreamChannel = Objects.requireNonNull(serverStreamChannel, "serverStreamChannel");
4251
this.requestObject = Objects.requireNonNull(requestObject, "requestObject");
52+
this.maxAgeMs = maxAgeMs;
4353
}
4454

4555
public ServerStreamChannel getStreamChannel() {
@@ -53,21 +63,24 @@ public int getStreamChannelId() {
5363
public TBase<?, ?> getRequestObject() {
5464
return requestObject;
5565
}
56-
66+
67+
public long getMaxAgeMs() {
68+
return maxAgeMs;
69+
}
70+
5771
@Override
5872
public String toString() {
59-
final StringBuilder sb = new StringBuilder();
60-
sb.append(this.getClass().getSimpleName());
61-
sb.append("{");
62-
sb.append("{remoteAddress=").append(getRemoteAddress()).append(", ");
63-
sb.append("applicationName=").append(getDeliveryCommand().getApplicationName()).append(", ");
64-
sb.append("agentId=").append(getDeliveryCommand().getAgentId()).append(", ");
65-
sb.append("startTimeStamp=").append(getDeliveryCommand().getStartTime()).append(", ");
66-
sb.append("serverStreamChannel=").append(getStreamChannel()).append(", ");
67-
sb.append("streamChannelId=").append(getStreamChannelId()).append(", ");
68-
sb.append("requestObject=").append(requestObject);
69-
sb.append('}');
70-
return sb.toString();
73+
return this.getClass().getSimpleName() +
74+
"{" +
75+
"{remoteAddress=" + getRemoteAddress() + ", " +
76+
"applicationName=" + getDeliveryCommand().getApplicationName() + ", " +
77+
"agentId=" + getDeliveryCommand().getAgentId() + ", " +
78+
"startTimeStamp=" + getDeliveryCommand().getStartTime() + ", " +
79+
"serverStreamChannel=" + getStreamChannel() + ", " +
80+
"streamChannelId=" + getStreamChannelId() + ", " +
81+
"requestObject=" + requestObject +
82+
"maxAgeMs=" + maxAgeMs +
83+
'}';
7184
}
7285

7386
}

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/route/StreamRouteHandler.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@
3535
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
3636
import com.navercorp.pinpoint.thrift.io.SerializerFactory;
3737
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
38-
39-
import org.apache.thrift.TBase;
40-
import org.apache.logging.log4j.Logger;
4138
import org.apache.logging.log4j.LogManager;
39+
import org.apache.logging.log4j.Logger;
40+
import org.apache.thrift.TBase;
4241
import org.springframework.beans.factory.annotation.Qualifier;
42+
import org.springframework.lang.NonNull;
4343

4444
import java.util.Objects;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.ScheduledExecutorService;
47+
import java.util.concurrent.ThreadFactory;
48+
import java.util.concurrent.TimeUnit;
4549

4650
/**
4751
* @author koo.taejin
@@ -51,6 +55,7 @@ public class StreamRouteHandler extends AbstractRouteHandler<StreamEvent> {
5155
public static final String ATTACHMENT_KEY = StreamRouteManager.class.getSimpleName();
5256

5357
private final Logger logger = LogManager.getLogger(this.getClass());
58+
private final ScheduledExecutorService streamExpireExecutor = createExecutor("Stream-Expire");
5459

5560
private final RouteFilterChain<StreamEvent> streamCreateFilterChain;
5661
private final RouteFilterChain<ResponseEvent> responseFilterChain;
@@ -71,6 +76,15 @@ public StreamRouteHandler(ClusterPointLocator<ClusterPoint<?>> targetClusterPoin
7176
this.commandSerializerFactory = Objects.requireNonNull(commandSerializerFactory, "commandSerializerFactory");
7277
}
7378

79+
private static ScheduledExecutorService createExecutor(String name) {
80+
return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
81+
@Override
82+
public Thread newThread(@NonNull Runnable r) {
83+
return new Thread(r, name);
84+
}
85+
});
86+
}
87+
7488
@Override
7589
public void addRequestFilter(RouteFilter<StreamEvent> filter) {
7690
this.streamCreateFilterChain.addLast(filter);
@@ -89,8 +103,7 @@ public void addCloseFilter(RouteFilter<StreamRouteCloseEvent> filter) {
89103
public TCommandTransferResponse onRoute(StreamEvent event) {
90104
streamCreateFilterChain.doEvent(event);
91105

92-
TCommandTransferResponse routeResult = onRoute0(event);
93-
return routeResult;
106+
return onRoute0(event);
94107
}
95108

96109
private TCommandTransferResponse onRoute0(StreamEvent event) {
@@ -118,6 +131,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {
118131

119132
ClientStreamChannel producerStreamChannel = createStreamChannel((ThriftAgentConnection) clusterPoint, event.getDeliveryCommand().getPayload(), routeManager);
120133
routeManager.setProducer(producerStreamChannel);
134+
scheduleExpire(producerStreamChannel, event.getMaxAgeMs());
121135
return createResponse(TRouteResult.OK);
122136
} else if (clusterPoint instanceof GrpcAgentConnection) {
123137
StreamRouteManager routeManager = new StreamRouteManager(event);
@@ -127,6 +141,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {
127141

128142
ClientStreamChannel producerStreamChannel = ((GrpcAgentConnection) clusterPoint).openStream(event.getRequestObject(), routeManager);
129143
routeManager.setProducer(producerStreamChannel);
144+
scheduleExpire(producerStreamChannel, event.getMaxAgeMs());
130145
return createResponse(TRouteResult.OK);
131146
} else {
132147
return createResponse(TRouteResult.NOT_SUPPORTED_SERVICE);
@@ -143,6 +158,13 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {
143158
return createResponse(TRouteResult.UNKNOWN);
144159
}
145160

161+
private void scheduleExpire(ClientStreamChannel channel, long maxAgeMs) {
162+
if (maxAgeMs < Long.MAX_VALUE) {
163+
final Runnable expireTask = () -> channel.close();
164+
streamExpireExecutor.schedule(expireTask, maxAgeMs, TimeUnit.MILLISECONDS);
165+
}
166+
}
167+
146168
private ClientStreamChannel createStreamChannel(ThriftAgentConnection clusterPoint, byte[] payload, ClientStreamChannelEventHandler streamChannelEventHandler) throws StreamException {
147169
PinpointServer pinpointServer = clusterPoint.getPinpointServer();
148170
return pinpointServer.openStreamAndAwait(payload, streamChannelEventHandler, 3000);

0 commit comments

Comments
 (0)