Skip to content

Commit

Permalink
[pinpoint-apm#9666] Realtime activeThreadCount with Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Feb 13, 2023
1 parent f52be14 commit 38dcfde
Show file tree
Hide file tree
Showing 55 changed files with 2,877 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransfer;
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransferResponse;
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -47,7 +46,10 @@ protected ClusterPoint<?> findClusterPoint(TCommandTransfer deliveryCommand) {
String agentId = deliveryCommand.getAgentId();
long startTimeStamp = deliveryCommand.getStartTime();
final ClusterKey sourceKey = new ClusterKey(applicationName, agentId, startTimeStamp);
return findClusterPoint(sourceKey);
}

public ClusterPoint<?> findClusterPoint(ClusterKey sourceKey) {
List<ClusterPoint<?>> result = new ArrayList<>();

for (ClusterPoint<?> targetClusterPoint : targetClusterPointLocator.getClusterPointList()) {
Expand All @@ -62,7 +64,7 @@ protected ClusterPoint<?> findClusterPoint(TCommandTransfer deliveryCommand) {
}

if (result.size() > 1) {
logger.warn("Ambiguous ClusterPoint {}, {}, {} (Valid Agent list={}).", applicationName, agentId, startTimeStamp, result);
logger.warn("Ambiguous ClusterPoint {} (Valid Agent list={}).", sourceKey, result);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.navercorp.pinpoint.rpc.stream.ServerStreamChannel;
import com.navercorp.pinpoint.thrift.dto.command.TCommandTransfer;

import org.apache.thrift.TBase;

import java.util.Objects;
Expand All @@ -30,16 +29,27 @@ public class StreamEvent extends DefaultRouteEvent {

private final ServerStreamChannel serverStreamChannel;
private final TBase<?, ?> requestObject;
private final long maxAgeMs;

public StreamEvent(RouteEvent routeEvent, ServerStreamChannel serverStreamChannel, TBase<?, ?> requestObject) {
this(routeEvent.getDeliveryCommand(), serverStreamChannel, requestObject);
}

public StreamEvent(TCommandTransfer deliveryCommand, ServerStreamChannel serverStreamChannel, TBase<?, ?> requestObject) {
this(deliveryCommand, serverStreamChannel, requestObject, Long.MAX_VALUE);
}

public StreamEvent(
TCommandTransfer deliveryCommand,
ServerStreamChannel serverStreamChannel,
TBase<?, ?> requestObject,
long maxAgeMs
) {
super(deliveryCommand, serverStreamChannel.getRemoteAddress());

this.serverStreamChannel = Objects.requireNonNull(serverStreamChannel, "serverStreamChannel");
this.requestObject = Objects.requireNonNull(requestObject, "requestObject");
this.maxAgeMs = maxAgeMs;
}

public ServerStreamChannel getStreamChannel() {
Expand All @@ -53,21 +63,24 @@ public int getStreamChannelId() {
public TBase<?, ?> getRequestObject() {
return requestObject;
}


public long getMaxAgeMs() {
return maxAgeMs;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName());
sb.append("{");
sb.append("{remoteAddress=").append(getRemoteAddress()).append(", ");
sb.append("applicationName=").append(getDeliveryCommand().getApplicationName()).append(", ");
sb.append("agentId=").append(getDeliveryCommand().getAgentId()).append(", ");
sb.append("startTimeStamp=").append(getDeliveryCommand().getStartTime()).append(", ");
sb.append("serverStreamChannel=").append(getStreamChannel()).append(", ");
sb.append("streamChannelId=").append(getStreamChannelId()).append(", ");
sb.append("requestObject=").append(requestObject);
sb.append('}');
return sb.toString();
return this.getClass().getSimpleName() +
"{" +
"{remoteAddress=" + getRemoteAddress() + ", " +
"applicationName=" + getDeliveryCommand().getApplicationName() + ", " +
"agentId=" + getDeliveryCommand().getAgentId() + ", " +
"startTimeStamp=" + getDeliveryCommand().getStartTime() + ", " +
"serverStreamChannel=" + getStreamChannel() + ", " +
"streamChannelId=" + getStreamChannelId() + ", " +
"requestObject=" + requestObject +
"maxAgeMs=" + maxAgeMs +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.SerializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;

import org.apache.thrift.TBase;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TBase;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.lang.NonNull;

import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* @author koo.taejin
Expand All @@ -51,6 +55,7 @@ public class StreamRouteHandler extends AbstractRouteHandler<StreamEvent> {
public static final String ATTACHMENT_KEY = StreamRouteManager.class.getSimpleName();

private final Logger logger = LogManager.getLogger(this.getClass());
private final ScheduledExecutorService streamExpireExecutor = createExecutor("Stream-Expire");

private final RouteFilterChain<StreamEvent> streamCreateFilterChain;
private final RouteFilterChain<ResponseEvent> responseFilterChain;
Expand All @@ -71,6 +76,15 @@ public StreamRouteHandler(ClusterPointLocator<ClusterPoint<?>> targetClusterPoin
this.commandSerializerFactory = Objects.requireNonNull(commandSerializerFactory, "commandSerializerFactory");
}

private static ScheduledExecutorService createExecutor(String name) {
return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r, name);
}
});
}

@Override
public void addRequestFilter(RouteFilter<StreamEvent> filter) {
this.streamCreateFilterChain.addLast(filter);
Expand All @@ -89,8 +103,7 @@ public void addCloseFilter(RouteFilter<StreamRouteCloseEvent> filter) {
public TCommandTransferResponse onRoute(StreamEvent event) {
streamCreateFilterChain.doEvent(event);

TCommandTransferResponse routeResult = onRoute0(event);
return routeResult;
return onRoute0(event);
}

private TCommandTransferResponse onRoute0(StreamEvent event) {
Expand Down Expand Up @@ -118,6 +131,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {

ClientStreamChannel producerStreamChannel = createStreamChannel((ThriftAgentConnection) clusterPoint, event.getDeliveryCommand().getPayload(), routeManager);
routeManager.setProducer(producerStreamChannel);
scheduleExpire(producerStreamChannel, event.getMaxAgeMs());
return createResponse(TRouteResult.OK);
} else if (clusterPoint instanceof GrpcAgentConnection) {
StreamRouteManager routeManager = new StreamRouteManager(event);
Expand All @@ -127,6 +141,7 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {

ClientStreamChannel producerStreamChannel = ((GrpcAgentConnection) clusterPoint).openStream(event.getRequestObject(), routeManager);
routeManager.setProducer(producerStreamChannel);
scheduleExpire(producerStreamChannel, event.getMaxAgeMs());
return createResponse(TRouteResult.OK);
} else {
return createResponse(TRouteResult.NOT_SUPPORTED_SERVICE);
Expand All @@ -143,6 +158,13 @@ private TCommandTransferResponse onRoute0(StreamEvent event) {
return createResponse(TRouteResult.UNKNOWN);
}

private void scheduleExpire(ClientStreamChannel channel, long maxAgeMs) {
if (maxAgeMs < Long.MAX_VALUE) {
final Runnable expireTask = () -> channel.close();
streamExpireExecutor.schedule(expireTask, maxAgeMs, TimeUnit.MILLISECONDS);
}
}

private ClientStreamChannel createStreamChannel(ThriftAgentConnection clusterPoint, byte[] payload, ClientStreamChannelEventHandler streamChannelEventHandler) throws StreamException {
PinpointServer pinpointServer = clusterPoint.getPinpointServer();
return pinpointServer.openStreamAndAwait(payload, streamChannelEventHandler, 3000);
Expand Down
4 changes: 4 additions & 0 deletions metric-module/collector-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-uristat-collector</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-realtime-collector</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.navercorp.pinpoint.collector.config.FlinkContextConfiguration;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.data.web.SpringDataWebAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.sql.init.SqlInitializationAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
Expand All @@ -12,7 +16,15 @@
import org.springframework.context.annotation.ImportResource;

@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class, SqlInitializationAutoConfiguration.class})
@EnableAutoConfiguration(exclude = {
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
SqlInitializationAutoConfiguration.class,
SpringDataWebAutoConfiguration.class,
RedisAutoConfiguration.class,
RedisRepositoriesAutoConfiguration.class,
RedisReactiveAutoConfiguration.class
})
@ImportResource({"classpath:applicationContext-collector.xml", "classpath:servlet-context-collector.xml"})
@Import({CollectorAppPropertySources.class, FlinkContextConfiguration.class})
@ComponentScan({})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,31 @@
import com.navercorp.pinpoint.metric.collector.CollectorTypeParser;
import com.navercorp.pinpoint.metric.collector.MetricCollectorApp;
import com.navercorp.pinpoint.metric.collector.TypeSet;
import com.navercorp.pinpoint.realtime.collector.atc.config.RealtimeCollectorConfig;
import com.navercorp.pinpoint.uristat.collector.UriStatCollectorConfig;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.data.web.SpringDataWebAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;

import java.util.Arrays;

@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
@EnableAutoConfiguration(exclude = {
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
SpringDataWebAutoConfiguration.class,
RedisAutoConfiguration.class,
RedisRepositoriesAutoConfiguration.class,
RedisReactiveAutoConfiguration.class
})
public class MultiApplication {
private static final ServerBootLogger logger = ServerBootLogger.getLogger(MultiApplication.class);

Expand All @@ -44,7 +56,11 @@ public static void main(String[] args) {

if (types.hasType(CollectorType.BASIC)) {
logger.info(String.format("Start %s collector", CollectorType.BASIC));
SpringApplicationBuilder collectorAppBuilder = createAppBuilder(builder, 15400, BasicCollectorApp.class, UriStatCollectorConfig.class);
SpringApplicationBuilder collectorAppBuilder = createAppBuilder(builder, 15400,
BasicCollectorApp.class,
UriStatCollectorConfig.class,
RealtimeCollectorConfig.class
);
collectorAppBuilder.build().run(args);
}

Expand Down
4 changes: 4 additions & 0 deletions metric-module/web-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-uristat-web</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-realtime-web</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@

import com.navercorp.pinpoint.common.server.util.ServerBootLogger;
import com.navercorp.pinpoint.metric.web.MetricWebApp;
import com.navercorp.pinpoint.realtime.web.atc.config.RealtimeWebConfig;
import com.navercorp.pinpoint.uristat.web.UriStatWebConfig;
import com.navercorp.pinpoint.web.AuthorizationConfig;
import com.navercorp.pinpoint.web.PinpointBasicLoginConfig;
import com.navercorp.pinpoint.web.WebApp;
import com.navercorp.pinpoint.web.WebAppPropertySources;
import com.navercorp.pinpoint.web.WebMvcConfig;
import com.navercorp.pinpoint.web.WebServerConfig;
import com.navercorp.pinpoint.web.WebStarter;
import com.navercorp.pinpoint.web.AuthorizationConfig;
import com.navercorp.pinpoint.web.cache.CacheConfiguration;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.data.web.SpringDataWebAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
Expand All @@ -38,16 +43,30 @@
* @author minwoo.jung
*/
@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class,
SecurityAutoConfiguration.class})
@EnableAutoConfiguration(exclude = {
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
SecurityAutoConfiguration.class,
SpringDataWebAutoConfiguration.class,
RedisAutoConfiguration.class,
RedisRepositoriesAutoConfiguration.class,
RedisReactiveAutoConfiguration.class
})
@ImportResource({"classpath:applicationContext-web.xml", "classpath:servlet-context-web.xml"})
@Import({WebAppPropertySources.class, WebServerConfig.class, WebMvcConfig.class, CacheConfiguration.class})
public class MetricAndWebApp {
private static final ServerBootLogger logger = ServerBootLogger.getLogger(WebApp.class);

public static void main(String[] args) {
try {
WebStarter starter = new WebStarter(MetricAndWebApp.class, PinpointBasicLoginConfig.class, AuthorizationConfig.class, MetricWebApp.class, UriStatWebConfig.class);
WebStarter starter = new WebStarter(
MetricAndWebApp.class,
PinpointBasicLoginConfig.class,
AuthorizationConfig.class,
MetricWebApp.class,
UriStatWebConfig.class,
RealtimeWebConfig.class
);
starter.start(args);
} catch (Exception exception) {
logger.error("[WebApp] could not launch app.", exception);
Expand Down
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
<module>hbase2-module</module>
<module>metric-module</module>
<module>uristat</module>
<module>realtime</module>

<!-- <module>agent-testweb</module> -->
<!-- <module>plugins-it</module> -->
Expand Down Expand Up @@ -439,6 +440,21 @@
<artifactId>pinpoint-uristat-collector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-realtime-web</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-realtime-collector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-realtime-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-metric</artifactId>
Expand Down
21 changes: 21 additions & 0 deletions realtime/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pinpoint</artifactId>
<groupId>com.navercorp.pinpoint</groupId>
<version>2.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>pinpoint-realtime</artifactId>
<packaging>pom</packaging>

<modules>
<module>realtime-web</module>
<module>realtime-common</module>
<module>realtime-collector</module>
</modules>

</project>
Loading

0 comments on commit 38dcfde

Please sign in to comment.