Skip to content

Commit

Permalink
Add upstream response metrics (#190)
Browse files Browse the repository at this point in the history
Add upstream response metrics
  • Loading branch information
aphralG authored Feb 10, 2023
1 parent c057452 commit 7543485
Show file tree
Hide file tree
Showing 534 changed files with 113,105 additions and 285 deletions.
6 changes: 6 additions & 0 deletions src/core/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func GetCalculationMap() map[string]string {
"nginx.upstream.header.time.max": "avg",
"nginx.upstream.header.time.median": "avg",
"nginx.upstream.header.time.pctl95": "avg",
"nginx.upstream.response.length": "avg",
"nginx.upstream.response.time": "avg",
"nginx.upstream.response.time.count": "sum",
"nginx.upstream.response.time.max": "avg",
"nginx.upstream.response.time.median": "avg",
"nginx.upstream.response.time.pctl95": "avg",
"nginx.http.conn.handled": "sum",
"nginx.http.conn.reading": "avg",
"nginx.http.conn.writing": "avg",
Expand Down
262 changes: 139 additions & 123 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,13 @@ func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *proto.St
c.buf = []*proto.StatsEntity{}
}

var httpRequestMetrics = []string{
"request.time",
"request.time.count",
"request.time.max",
"request.time.median",
"request.time.pctl95",
}

func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string) {
logPattern := convertLogFormat(logFormat)
log.Debugf("Collecting from: %s using format: %s", logFile, logFormat)
log.Debugf("Pattern used for tailing logs: %s", logPattern)

httpCounters, connCounters, headerCounters := getDefaultCounters()
gzipRatios, requestLengths, requestTimes, connectTimes, headerTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}
httpCounters, upstreamCounters := getDefaultCounters()
gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

mu := sync.Mutex{}

Expand All @@ -191,36 +183,46 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
}

mu.Lock()
if v, err := strconv.Atoi(access.BodyBytesSent); err == nil {
n := "request.body_bytes_sent"
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting body_bytes_sent value from access logs: %v", err)
if access.BodyBytesSent != "" {
if v, err := strconv.Atoi(access.BodyBytesSent); err == nil {
n := "request.body_bytes_sent"
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting body_bytes_sent value from access logs: %v", err)
}
}

if v, err := strconv.Atoi(access.BytesSent); err == nil {
n := "request.bytes_sent"
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting bytes_sent value from access logs: %v", err)
if access.BytesSent != "" {
if v, err := strconv.Atoi(access.BytesSent); err == nil {
n := "request.bytes_sent"
httpCounters[n] = float64(v) + httpCounters[n]
} else {
log.Debugf("Error getting bytes_sent value from access logs: %v", err)
}
}

if v, err := strconv.Atoi(access.GzipRatio); err == nil {
gzipRatios = append(gzipRatios, float64(v))
} else {
log.Debugf("Error getting gzip_ratio value from access logs: %v", err)
if access.GzipRatio != "-" && access.GzipRatio != "" {
if v, err := strconv.Atoi(access.GzipRatio); err == nil {
gzipRatios = append(gzipRatios, float64(v))
} else {
log.Debugf("Error getting gzip_ratio value from access logs: %v", err)
}
}

if v, err := strconv.Atoi(access.RequestLength); err == nil {
requestLengths = append(requestLengths, float64(v))
} else {
log.Debugf("Error getting request_length value from access logs: %v", err)
if access.RequestLength != "" {
if v, err := strconv.Atoi(access.RequestLength); err == nil {
requestLengths = append(requestLengths, float64(v))
} else {
log.Debugf("Error getting request_length value from access logs: %v", err)
}
}

if v, err := strconv.ParseFloat(access.RequestTime, 64); err == nil {
requestTimes = append(requestTimes, v)
} else {
log.Debugf("Error getting request_time value from access logs: %v", err)
if access.RequestTime != "" {
if v, err := strconv.ParseFloat(access.RequestTime, 64); err == nil {
requestTimes = append(requestTimes, v)
} else {
log.Debugf("Error getting request_time value from access logs: %v", err)
}
}

if access.Request != "" {
Expand All @@ -241,28 +243,39 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
}
}

for _, cTime := range strings.Split(access.UpstreamConnectTime, ", ") {
// nginx uses '-' to represent TCP connection failures
cTime = strings.ReplaceAll(cTime, "-", "0")

if v, err := strconv.ParseFloat(cTime, 64); err == nil {
connectTimes = append(connectTimes, v)
if access.UpstreamConnectTime != "-" && access.UpstreamConnectTime != "" {
if v, err := strconv.ParseFloat(access.UpstreamConnectTime, 64); err == nil {
upstreamConnectTimes = append(upstreamConnectTimes, v)
} else {
log.Debugf("Error getting upstream_connect_time value from access logs, %v", err)
}
}

for _, hTime := range strings.Split(access.UpstreamHeaderTime, ", ") {
// nginx uses '-' to represent TCP connection failures
hTime = strings.ReplaceAll(hTime, "-", "0")

if v, err := strconv.ParseFloat(hTime, 64); err == nil {
headerTimes = append(headerTimes, v)
if access.UpstreamHeaderTime != "-" && access.UpstreamHeaderTime != "" {
if v, err := strconv.ParseFloat(access.UpstreamHeaderTime, 64); err == nil {
upstreamHeaderTimes = append(upstreamHeaderTimes, v)
} else {
log.Debugf("Error getting upstream_header_time value from access logs: %v", err)
}
}

if access.UpstreamResponseLength != "-" && access.UpstreamResponseLength != "" {
if v, err := strconv.ParseFloat(access.UpstreamResponseLength, 64); err == nil {
upstreamResponseLength = append(upstreamResponseLength, v)
} else {
log.Debugf("Error getting upstream_response_length value from access logs: %v", err)
}

}

if access.UpstreamResponseTime != "-" && access.UpstreamResponseTime != "" {
if v, err := strconv.ParseFloat(access.UpstreamResponseTime, 64); err == nil {
upstreamResponseTimes = append(upstreamResponseTimes, v)
} else {
log.Debugf("Error getting upstream_response_time value from access logs: %v", err)
}
}

if access.ServerProtocol != "" {
if strings.Count(access.ServerProtocol, "/") == 1 {
httpProtocolVersion := strings.Split(access.ServerProtocol, "/")[1]
Expand Down Expand Up @@ -302,37 +315,43 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
mu.Lock()

if len(requestLengths) > 0 {
httpCounters["request.length"] = getRequestLengthMetricValue(requestLengths)
httpCounters["request.length"] = getAverageMetricValue(requestLengths)
}

if len(gzipRatios) > 0 {
httpCounters["gzip.ratio"] = getGzipRatioMetricValue(gzipRatios)
httpCounters["gzip.ratio"] = getAverageMetricValue(gzipRatios)
}

if len(requestTimes) > 0 {
getTimeMetricsMap("request.time", requestTimes, httpCounters)
}

for _, metricName := range httpRequestMetrics {
httpCounters[metricName] = getTimeMetrics(metricName, requestTimes)
if len(upstreamConnectTimes) > 0 {
getTimeMetricsMap("upstream.connect.time", upstreamConnectTimes, upstreamCounters)
}

for metricName := range connCounters {
connCounters[metricName] = getTimeMetrics(metricName, connectTimes)
if len(upstreamHeaderTimes) > 0 {
getTimeMetricsMap("upstream.header.time", upstreamHeaderTimes, upstreamCounters)
}

for metricName := range headerCounters {
headerCounters[metricName] = getTimeMetrics(metricName, headerTimes)
if len(upstreamResponseTimes) > 0 {
getTimeMetricsMap("upstream.response.time", upstreamResponseTimes, upstreamCounters)
}

if len(upstreamResponseLength) > 0 {
upstreamCounters["upstream.response.length"] = getAverageMetricValue(upstreamResponseLength)
}
c.group = "http"
simpleMetrics := c.convertSamplesToSimpleMetrics(httpCounters)

c.group = ""
simpleMetrics = append(simpleMetrics, c.convertSamplesToSimpleMetrics(connCounters)...)
simpleMetrics = append(simpleMetrics, c.convertSamplesToSimpleMetrics(headerCounters)...)
simpleMetrics = append(simpleMetrics, c.convertSamplesToSimpleMetrics(upstreamCounters)...)

log.Tracef("Access log metrics collected: %v", simpleMetrics)

// reset the counters
httpCounters, connCounters, headerCounters = getDefaultCounters()
gzipRatios, requestLengths, requestTimes, connectTimes, headerTimes = []float64{}, []float64{}, []float64{}, []float64{}, []float64{}
httpCounters, upstreamCounters = getDefaultCounters()
gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes = []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

c.buf = append(c.buf, metrics.NewStatsEntity(c.baseDimensions.ToDimensions(), simpleMetrics))

Expand Down Expand Up @@ -378,79 +397,71 @@ func getParsedRequest(request string) (method string, uri string, protocol strin
return
}

func getRequestLengthMetricValue(requestLengths []float64) float64 {
func getAverageMetricValue(metricValues []float64) float64 {
value := 0.0

if len(requestLengths) > 0 {
sort.Float64s(requestLengths)
requestLengthSum := 0.0
for _, requestLength := range requestLengths {
requestLengthSum += requestLength
if len(metricValues) > 0 {
sort.Float64s(metricValues)
metricValueSum := 0.0
for _, metricValue := range metricValues {
metricValueSum += metricValue
}
value = requestLengthSum / float64(len(requestLengths))
value = metricValueSum / float64(len(metricValues))
}

return value
}

func getGzipRatioMetricValue(gzipRatios []float64) float64 {
value := 0.0
func getTimeMetricsMap(metricName string, times []float64, counter map[string]float64) {

if len(gzipRatios) > 0 {
sort.Float64s(gzipRatios)
gzipRatioSum := 0.0
for _, gzipRatio := range gzipRatios {
gzipRatioSum += gzipRatio
}
value = gzipRatioSum / float64(len(gzipRatios))
metrics := map[string]float64{
metricName: 0,
metricName + ".count": 0,
metricName + ".median": 0,
metricName + ".max": 0,
metricName + ".pctl95": 0,
}

return value
}
for metric := range metrics {

func getTimeMetrics(metricName string, times []float64) float64 {
if len(times) == 0 {
return 0
}
metricType := metric[strings.LastIndex(metric, ".")+1:]

metricType := metricName[strings.LastIndex(metricName, ".")+1:]
switch metricType {
case "time":
// Calculate average
sum := 0.0
for _, t := range times {
sum += t
}

switch metricType {
case "time":
// Calculate average
sum := 0.0
for _, t := range times {
sum += t
}
return sum / float64(len(times))
counter[metric] = (math.Round(sum*1000) / 1000) / float64(len(times))

case "count":
return float64(len(times))
case "count":
counter[metric] = float64(len(times))

case "max":
sort.Float64s(times)
return times[len(times)-1]
case "max":
sort.Float64s(times)
counter[metric] = times[len(times)-1]

case "median":
sort.Float64s(times)
case "median":
sort.Float64s(times)

mNumber := len(times) / 2
if len(times)%2 != 0 {
return times[mNumber]
} else {
return (times[mNumber-1] + times[mNumber]) / 2
}
mNumber := len(times) / 2
if len(times)%2 != 0 {
counter[metric] = times[mNumber]
} else {
counter[metric] = (times[mNumber-1] + times[mNumber]) / 2
}

case "pctl95":
sort.Float64s(times)
case "pctl95":
sort.Float64s(times)

index := int(math.RoundToEven(float64(0.95)*float64(len(times)))) - 1
return times[index]
}
index := int(math.RoundToEven(float64(0.95)*float64(len(times)))) - 1
counter[metric] = times[index]
}

log.Debugf("Could not get time metrics for %s: invalid metric type", metricName)
}

return 0
}

// convertLogFormat converts log format into a pattern that can be parsed by the tailer
Expand All @@ -472,6 +483,8 @@ func convertLogFormat(logFormat string) string {
newLogFormat = strings.ReplaceAll(newLogFormat, "$request ", "%{DATA:request} ")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_connect_time", "%{DATA:upstream_connect_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_header_time", "%{DATA:upstream_header_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_response_time", "%{DATA:upstream_response_time}")
newLogFormat = strings.ReplaceAll(newLogFormat, "$upstream_response_length", "%{DATA:upstream_response_length}")
newLogFormat = strings.ReplaceAll(newLogFormat, "[", "\\[")
newLogFormat = strings.ReplaceAll(newLogFormat, "]", "\\]")
return newLogFormat
Expand All @@ -486,7 +499,7 @@ func isOtherMethod(method string) bool {
method != "method.options"
}

func getDefaultCounters() (map[string]float64, map[string]float64, map[string]float64) {
func getDefaultCounters() (map[string]float64, map[string]float64) {
httpCounters := map[string]float64{
"gzip.ratio": 0,
"method.delete": 0,
Expand Down Expand Up @@ -523,21 +536,24 @@ func getDefaultCounters() (map[string]float64, map[string]float64, map[string]fl
"v2": 0,
}

upstreamConnectCounters := map[string]float64{
"upstream.connect.time": 0,
"upstream.connect.time.count": 0,
"upstream.connect.time.max": 0,
"upstream.connect.time.median": 0,
"upstream.connect.time.pctl95": 0,
}

upstreamHeaderCounters := map[string]float64{
"upstream.header.time": 0,
"upstream.header.time.count": 0,
"upstream.header.time.max": 0,
"upstream.header.time.median": 0,
"upstream.header.time.pctl95": 0,
upstreamCounters := map[string]float64{
"upstream.connect.time": 0,
"upstream.connect.time.count": 0,
"upstream.connect.time.max": 0,
"upstream.connect.time.median": 0,
"upstream.connect.time.pctl95": 0,
"upstream.header.time": 0,
"upstream.header.time.count": 0,
"upstream.header.time.max": 0,
"upstream.header.time.median": 0,
"upstream.header.time.pctl95": 0,
"upstream.response.time": 0,
"upstream.response.time.count": 0,
"upstream.response.time.max": 0,
"upstream.response.time.median": 0,
"upstream.response.time.pctl95": 0,
"upstream.response.length": 0,
}

return httpCounters, upstreamConnectCounters, upstreamHeaderCounters
return httpCounters, upstreamCounters
}
Loading

0 comments on commit 7543485

Please sign in to comment.