Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#noissue]Add recoder HTTP messages #9940

Open
wants to merge 1 commit into
base: 2.0.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agent/src/main/resources/profiles/release/pinpoint-env.config
Original file line number Diff line number Diff line change
Expand Up @@ -1121,4 +1121,16 @@ profiler.jdk.concurrent.completable-future=true
# which package of runnable(callable) instance can be thread plugin trace
# Set the package name to track
# eg) profiler.thread.match.package=com.company.shopping.cart, com.company.payment
profiler.thread.match.package=
profiler.thread.match.package=
###########################################################
# 控制是否进行报文采集
profiler.spring.web.body=true
# 0 代表全量采集;1 代表正常报文按照采样率,异常报文全量采集;2 代表报文采集按照采样率来走
profiler.spring.web.body.strategy=0
# 是否需要通过响应体的某个字段响应码来判定异常
profiler.spring.web.body.response.judge.enable=true
# 响应字段,严格区分大小写(用来进行异常打标)
profiler.spring.web.body.response.judge.sign=status
# 响应码(用来进行异常打标)
profiler.spring.web.body.response.judge.code=0000
###########################################################
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public class DefaultThriftTransportConfig implements ThriftTransportConfig {
private static final String DEFAULT_DATA_SENDER_PINPOINT_CLIENT_WRITE_BUFFER_LOW_WATER_MAK = "16m";
private String tcpDataSenderPinpointClientWriteBufferLowWaterMark = DEFAULT_DATA_SENDER_PINPOINT_CLIENT_WRITE_BUFFER_LOW_WATER_MAK;

/**
* 报文异常判断配置
*/
private boolean responseJudge = false;
private String responseJudgeSign = "status";
private String responseJudgeCode = "0000";
public DefaultThriftTransportConfig() {
}

Expand Down Expand Up @@ -120,6 +126,10 @@ public void read(DefaultProfilerConfig profilerConfig) {
this.tcpDataSenderPinpointClientHandshakeInterval = profilerConfig.readLong("profiler.tcpdatasender.client.handshake.interval", DEFAULT_DATA_SENDER_PINPOINT_CLIENT_HANDSHAKE_INTERVAL);
this.tcpDataSenderPinpointClientWriteBufferHighWaterMark = profilerConfig.readString("profiler.tcpdatasender.client.write.buffer.highwatermark", DEFAULT_DATA_SENDER_PINPOINT_CLIENT_WRITE_BUFFER_HIGH_WATER_MAK);
this.tcpDataSenderPinpointClientWriteBufferLowWaterMark = profilerConfig.readString("profiler.tcpdatasender.client.write.buffer.lowwatermark", DEFAULT_DATA_SENDER_PINPOINT_CLIENT_WRITE_BUFFER_LOW_WATER_MAK);

this.responseJudge = profilerConfig.readBoolean("profiler.spring.web.body.response.judge.enable", false);
this.responseJudgeSign = profilerConfig.readString("profiler.spring.web.body.response.judge.sign", "status");
this.responseJudgeCode = profilerConfig.readString("profiler.spring.web.body.response.judge.code", "0000");
}

@Override
Expand Down Expand Up @@ -292,6 +302,24 @@ public int getStatDataSenderChunkSize() {
return statDataSenderChunkSize;
}



@Override
public boolean isResponseJudge() {
return responseJudge;
}

@Override
public String getResponseJudgeSign() {
return responseJudgeSign;
}

@Override
public String getResponseJudgeCode() {
return responseJudgeCode;
}


@Override
public String toString() {
return "DefaultThriftTransportConfig{" +
Expand Down Expand Up @@ -327,8 +355,11 @@ public String toString() {
", tcpDataSenderPinpointClientReconnectInterval=" + tcpDataSenderPinpointClientReconnectInterval +
", tcpDataSenderPinpointClientPingInterval=" + tcpDataSenderPinpointClientPingInterval +
", tcpDataSenderPinpointClientHandshakeInterval=" + tcpDataSenderPinpointClientHandshakeInterval +
", tcpDataSenderPinpointClientWriteBufferHighWaterMark=" + tcpDataSenderPinpointClientWriteBufferHighWaterMark +
", tcpDataSenderPinpointClientWriteBufferLowWaterMark=" + tcpDataSenderPinpointClientWriteBufferLowWaterMark +
", tcpDataSenderPinpointClientWriteBufferHighWaterMark='" + tcpDataSenderPinpointClientWriteBufferHighWaterMark + '\'' +
", tcpDataSenderPinpointClientWriteBufferLowWaterMark='" + tcpDataSenderPinpointClientWriteBufferLowWaterMark + '\'' +
", responseJudge=" + responseJudge +
", responseJudgeSign='" + responseJudgeSign + '\'' +
", responseJudgeCode='" + responseJudgeCode + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,12 @@ public interface ThriftTransportConfig {

int getStatDataSenderChunkSize();

/**
* 报文异常判断相关
*/
boolean isResponseJudge();

String getResponseJudgeSign();

String getResponseJudgeCode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,31 @@ public interface SpanRecorder extends FrameAttachment {
void recordLogging(LoggingInfo loggingInfo);

void recordStatusCode(int statusCode);

void recordWebInfoRequestUrl(String requestUrl);

void recordWebInfoRequestBody(Object requestBody);

void recordWebInfoRequestHeader(Object requestHeader);

void recordWebInfoResponseBody(Object responseBody);

void recordWebInfoResponseHeader(Object responseHeader);

void recordWebInfoRequestMethod(String requestMethod);

void recordWebInfoStatusCode(int statusCode);

/**
* 判断是否已经进行了请求体的采集
* @return
*/
boolean requestBodyTraced();

/**
* 记录报文采样策略
* @param strategy
*/
void recordWebInfoStrategy(byte strategy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,15 @@ public interface TraceContext {

JdbcContext getJdbcContext();

/**
* 是否做报文采集
* @return
*/
boolean bodyObtainEnable();

/**
* 报文采样策略
* @return
*/
byte bodyObtainStrategy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.handler.thrift;

import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.collector.sample.Sample;
import com.navercorp.pinpoint.collector.service.TraceService;
import com.navercorp.pinpoint.common.server.bo.SpanWebInfoBo;
import com.navercorp.pinpoint.common.server.bo.thrift.SpanFactory;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.kafka.KafkaProducerGroup;
import com.navercorp.pinpoint.kafka.KafkaProducerManager;
import com.navercorp.pinpoint.thrift.dto.TSpanWebInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Objects;

/**
* @author emeroad
*/
@Service
public class ThriftSpanWebInfoHandler implements SimpleHandler {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final TraceService traceService;

private final SpanFactory spanFactory;

@Autowired
private Sample sampleService;

private KafkaProducerGroup kafkaProducerGroup = KafkaProducerManager.getGroup("spanwebinfo");


public ThriftSpanWebInfoHandler(TraceService traceService, SpanFactory spanFactory) {
this.traceService = Objects.requireNonNull(traceService, "traceService");
this.spanFactory = Objects.requireNonNull(spanFactory, "spanFactory");
}

@Override
public void handleSimple(ServerRequest serverRequest) {
final Object data = serverRequest.getData();
if (logger.isDebugEnabled()) {
logger.debug("Handle simple data={}", data);
}
if (data instanceof TSpanWebInfo) {
handleSpanWebInfo((TSpanWebInfo) data);
} else {
throw new UnsupportedOperationException("data is not support type : " + data);
}
}

private void handleSpanWebInfo(TSpanWebInfo tbase) {
try {
final SpanWebInfoBo spanWebInfoBo = this.spanFactory.buildSpanWebInfoBo(tbase);
if(sampleService.getKafkaFlag()) {
if (sampleService.sample(spanWebInfoBo.getTransactionId().hashCode())) {
if (kafkaProducerGroup == null) {
kafkaProducerGroup = KafkaProducerManager.getGroup("spanwebinfo");
}
kafkaProducerGroup.put(spanWebInfoBo);
}
}
} catch (Exception e) {
logger.warn("Failed to handle SpanWebInfo={}, Caused={}", tbase, e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ public class SpanDispatchHandler implements DispatchHandler {
private final SimpleHandler spanDataHandler;

private final SimpleHandler spanChunkHandler;


private final SimpleHandler spanWebInfoHandler;


public SpanDispatchHandler(SimpleHandler spanDataHandler, SimpleHandler spanChunkHandler, SimpleHandler spanWebInfoHandler) {
this.spanDataHandler = Objects.requireNonNull(spanDataHandler, "spanDataHandler");
this.spanChunkHandler = Objects.requireNonNull(spanChunkHandler, "spanChunkHandler");
this.spanWebInfoHandler = Objects.requireNonNull(spanWebInfoHandler, "spanWebInfoHandler");
}

public SpanDispatchHandler(SimpleHandler spanDataHandler, SimpleHandler spanChunkHandler) {
this.spanDataHandler = Objects.requireNonNull(spanDataHandler, "spanDataHandler");
this.spanChunkHandler = Objects.requireNonNull(spanChunkHandler, "spanChunkHandler");
this.spanWebInfoHandler = null;
}

private SimpleHandler getSimpleHandler(Header header) {
Expand All @@ -46,6 +55,8 @@ private SimpleHandler getSimpleHandler(Header header) {
return spanDataHandler;
case DefaultTBaseLocator.SPANCHUNK:
return spanChunkHandler;
case DefaultTBaseLocator.SPAN_WEB_INFO:
return spanWebInfoHandler;
}
throw new UnsupportedOperationException("unsupported header:" + header);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
<bean id="spanDispatchHandler" class="com.navercorp.pinpoint.collector.receiver.SpanDispatchHandler">
<constructor-arg ref="thriftSpanHandler"/>
<constructor-arg ref="thriftSpanChunkHandler"/>
<constructor-arg ref="thriftSpanWebInfoHandler"/>
</bean>
<bean id="spanDispatchHandlerFactoryBean" class="com.navercorp.pinpoint.collector.receiver.thrift.DispatchHandlerFactoryBean">
<property name="dispatchHandler" ref="spanDispatchHandler"/>
Expand Down
Loading