Skip to content

Commit

Permalink
Update nginx access & error log metric sources to only report metrics…
Browse files Browse the repository at this point in the history
… that are available instead of returning zero for metric that are missing.
  • Loading branch information
dhurley committed Aug 2, 2023
1 parent 996d150 commit 5214377
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 950 deletions.
167 changes: 74 additions & 93 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
log.Debugf("Collecting from: %s using format: %s", logFile, logFormat)
log.Debugf("Pattern used for tailing logs: %s", logPattern)

httpCounters, upstreamCounters, upstreamCacheCounters := getDefaultCounters()
httpCounters, upstreamCounters, upstreamCacheCounters := map[string]float64{}, map[string]float64{}, map[string]float64{}
gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

mu := sync.Mutex{}
Expand Down Expand Up @@ -267,7 +267,13 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
if isOtherMethod(n) {
n = "method.others"
}
httpCounters[n] = httpCounters[n] + 1

existingValue, ok := httpCounters[n]
if ok {
httpCounters[n] = existingValue + 1
} else {
httpCounters[n] = 1
}

if access.ServerProtocol == "" {
calculateServerProtocol(protocol, httpCounters)
Expand All @@ -284,7 +290,12 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
for _, value := range statusValues {
if v, err := strconv.Atoi(value); err == nil {
n := fmt.Sprintf("upstream.status.%dxx", v/100)
upstreamCounters[n] = upstreamCounters[n] + 1
existingValue, ok := upstreamCounters[n]
if ok {
upstreamCounters[n] = existingValue + 1
} else {
upstreamCounters[n] = 1
}
} else {
log.Debugf("Error getting upstream status value from access logs, %v", err)
}
Expand All @@ -307,8 +318,13 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
upstreamRequest, upstreamCounters = calculateUpstreamNextCount(upstreamTimes, upstreamCounters)
}

if upstreamRequest == true {
upstreamCounters["upstream.request.count"] = upstreamCounters["upstream.request.count"] + 1
if upstreamRequest {
existingValue, ok := upstreamCounters["upstream.request.count"]
if ok {
upstreamCounters["upstream.request.count"] = existingValue + 1
} else {
upstreamCounters["upstream.request.count"] = 1
}
}

mu.Unlock()
Expand Down Expand Up @@ -346,6 +362,7 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
if len(upstreamResponseLength) > 0 {
upstreamCounters["upstream.response.length"] = getAverageMetricValue(upstreamResponseLength)
}

c.group = "http"
simpleMetrics := c.convertSamplesToSimpleMetrics(httpCounters)

Expand All @@ -358,7 +375,7 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
log.Tracef("Access log metrics collected: %v", simpleMetrics)

// reset the counters
httpCounters, upstreamCounters, upstreamCacheCounters = getDefaultCounters()
httpCounters, upstreamCounters, upstreamCacheCounters = map[string]float64{}, map[string]float64{}, map[string]float64{}
gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes = []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}

c.buf = append(c.buf, metrics.NewStatsEntityWrapper(c.baseDimensions.ToDimensions(), simpleMetrics, proto.MetricsReport_INSTANCE))
Expand All @@ -383,7 +400,12 @@ func calculateUpstreamNextCount(metricValues []string, upstreamCounters map[stri
upstreamRequest = true
times := strings.Split(upstreamTimes, ", ")
if len(times) > 1 {
upstreamCounters["upstream.next.count"] = upstreamCounters["upstream.next.count"] + (float64(len(times)) - 1)
existingValue, ok := upstreamCounters["upstream.next.count"]
if ok {
upstreamCounters["upstream.next.count"] = existingValue + (float64(len(times)) - 1)
} else {
upstreamCounters["upstream.next.count"] = (float64(len(times)) - 1)
}
return upstreamRequest, upstreamCounters
}
}
Expand Down Expand Up @@ -424,7 +446,12 @@ func (c *NginxAccessLog) parseAccessLogUpstream(metricName string, metric string
func (c *NginxAccessLog) parseAccessLogFloatCounters(metricName string, metric string, counters map[string]float64) map[string]float64 {
if metric != "" {
if v, err := strconv.ParseFloat(metric, 64); err == nil {
counters[metricName] = v + counters[metricName]
existingValue, ok := counters[metricName]
if ok {
counters[metricName] = v + existingValue
} else {
counters[metricName] = v
}
return counters
} else {
c.logger.Log(fmt.Sprintf("Error getting %s value from access logs, %v", metricName, err))
Expand All @@ -438,7 +465,13 @@ func calculateServerProtocol(protocol string, counters map[string]float64) {
httpProtocolVersion := strings.Split(protocol, "/")[1]
httpProtocolVersion = strings.ReplaceAll(httpProtocolVersion, ".", "_")
n := fmt.Sprintf("v%s", httpProtocolVersion)
counters[n] = counters[n] + 1

existingValue, ok := counters[n]
if ok {
counters[n] = existingValue + 1
} else {
counters[n] = 1
}
}
}

Expand Down Expand Up @@ -500,17 +533,39 @@ func getAverageMetricValue(metricValues []float64) float64 {
func (c *NginxAccessLog) calculateHttpStatus(status string, counter map[string]float64) {
if v, err := strconv.Atoi(status); err == nil {
n := fmt.Sprintf("status.%dxx", v/100)
counter[n] = counter[n] + 1

existingValue, ok := counter[n]
if ok {
counter[n] = existingValue + 1
} else {
counter[n] = 1
}

switch v {
case 403, 404, 500, 502, 503, 504:
n := fmt.Sprintf("status.%d", v)
counter[n] = counter[n] + 1
existingValue, ok := counter[n]
if ok {
counter[n] = existingValue + 1
} else {
counter[n] = 1
}
case 499:
n := "status.discarded"
counter[n] = counter[n] + 1
existingValue, ok := counter[n]
if ok {
counter[n] = existingValue + 1
} else {
counter[n] = 1
}
case 400:
n := "request.malformed"
counter[n] = counter[n] + 1
existingValue, ok := counter[n]
if ok {
counter[n] = existingValue + 1
} else {
counter[n] = 1
}
}
} else {
c.logger.Log(fmt.Sprintf("Error getting status value from access logs, %v", err))
Expand All @@ -522,7 +577,12 @@ func calculateUpstreamCacheStatus(status string, counter map[string]float64) {

switch status {
case "BYPASS", "EXPIRED", "HIT", "MISS", "REVALIDATED", "STALE", "UPDATING":
counter[n] = counter[n] + 1
existingValue, ok := counter[n]
if ok {
counter[n] = existingValue + 1
} else {
counter[n] = 1
}
return
}
}
Expand All @@ -537,11 +597,8 @@ func calculateTimeMetricsMap(metricName string, times []float64, counter map[str
}

for metric := range timeMetrics {

metricType := metric[strings.LastIndex(metric, ".")+1:]

counter[metric] = metrics.GetTimeMetrics(times, metricType)

}
}

Expand All @@ -554,82 +611,6 @@ func isOtherMethod(method string) bool {
method != "method.options"
}

func getDefaultCounters() (map[string]float64, map[string]float64, map[string]float64) {
httpCounters := map[string]float64{
"gzip.ratio": 0,
"method.delete": 0,
"method.get": 0,
"method.head": 0,
"method.options": 0,
"method.post": 0,
"method.put": 0,
"method.others": 0,
"request.body_bytes_sent": 0,
"request.bytes_sent": 0,
"request.length": 0,
"request.malformed": 0,
"request.time": 0,
"request.time.count": 0,
"request.time.max": 0,
"request.time.median": 0,
"request.time.pctl95": 0,
"status.403": 0,
"status.404": 0,
"status.500": 0,
"status.502": 0,
"status.503": 0,
"status.504": 0,
"status.discarded": 0,
"status.1xx": 0,
"status.2xx": 0,
"status.3xx": 0,
"status.4xx": 0,
"status.5xx": 0,
"v0_9": 0,
"v1_0": 0,
"v1_1": 0,
"v2": 0,
}

upstreamCounters := map[string]float64{
"upstream.connect.time": 0,
"upstream.connect.time.count": 0,
"upstream.connect.time.max": 0,
"upstream.connect.time.median": 0,
"upstream.connect.time.pctl95": 0,
"upstream.header.time": 0,
"upstream.header.time.count": 0,
"upstream.header.time.max": 0,
"upstream.header.time.median": 0,
"upstream.header.time.pctl95": 0,
"upstream.request.count": 0,
"upstream.next.count": 0,
"upstream.response.time": 0,
"upstream.response.time.count": 0,
"upstream.response.time.max": 0,
"upstream.response.time.median": 0,
"upstream.response.time.pctl95": 0,
"upstream.response.length": 0,
"upstream.status.1xx": 0,
"upstream.status.2xx": 0,
"upstream.status.3xx": 0,
"upstream.status.4xx": 0,
"upstream.status.5xx": 0,
}

upstreamCacheCounters := map[string]float64{
"cache.bypass": 0,
"cache.expired": 0,
"cache.hit": 0,
"cache.miss": 0,
"cache.revalidated": 0,
"cache.stale": 0,
"cache.updating": 0,
}

return httpCounters, upstreamCounters, upstreamCacheCounters
}

// For all the variables in the log format that are not present in the logVarMap
// replace them with the %{DATA:.*} format
func replaceCustomLogVars(logPattern string) string {
Expand Down
Loading

0 comments on commit 5214377

Please sign in to comment.