Skip to content

Commit

Permalink
Replace most Packetbeat warnings with expvar metrics
Browse files Browse the repository at this point in the history
Demoted warnings like "Two requests without a response" or
"Response without a request" to the Debug level. To compensate,
expvar counters are added for these.

These types of warnings are normal in some situations (e.g. at startup)
but can indicate serious issues in the other cases.

This is part of elastic#1931, which has the overall goal for switching the
default logging level to INFO.
  • Loading branch information
Tudor Golubenco committed Jul 1, 2016
1 parent 4792368 commit d060c28
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 12 deletions.
10 changes: 9 additions & 1 deletion packetbeat/protos/amqp/amqp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package amqp

import (
"expvar"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -33,6 +34,11 @@ type Amqp struct {
MethodMap map[codeClass]map[codeMethod]AmqpMethod
}

var (
unmatchedRequests = expvar.NewInt("amqpUnmatchedRequests")
unmatchedResponses = expvar.NewInt("amqpUnmatchedResponses")
)

func init() {
protos.Register("amqp", New)
}
Expand Down Expand Up @@ -237,6 +243,7 @@ func (amqp *Amqp) handleAmqpRequest(msg *AmqpMessage) {
if trans != nil {
if trans.Amqp != nil {
debugf("Two requests without a Response. Dropping old request: %s", trans.Amqp)
unmatchedRequests.Add(1)
}
} else {
trans = &AmqpTransaction{Type: "amqp", tuple: tuple}
Expand Down Expand Up @@ -295,7 +302,8 @@ func (amqp *Amqp) handleAmqpResponse(msg *AmqpMessage) {
tuple := msg.TcpTuple
trans := amqp.getTransaction(tuple.Hashable())
if trans == nil || trans.Amqp == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}

Expand Down
8 changes: 8 additions & 0 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package dns

import (
"bytes"
"expvar"
"fmt"
"net"
"sort"
Expand Down Expand Up @@ -41,6 +42,11 @@ const (
// Transport protocol.
type Transport uint8

var (
unmatchedRequests = expvar.NewInt("dnsUnmatchedRequests")
unmatchedResponses = expvar.NewInt("dnsUnmatchedResponses")
)

const (
TransportTcp = iota
TransportUdp
Expand Down Expand Up @@ -313,6 +319,7 @@ func (dns *Dns) receivedDnsResponse(tuple *DnsTuple, msg *DnsMessage) {
Src: msg.CmdlineTuple.Dst, Dst: msg.CmdlineTuple.Src})
trans.Notes = append(trans.Notes, OrphanedResponse.Error())
debugf("%s %s", OrphanedResponse.Error(), tuple.String())
unmatchedResponses.Add(1)
}

trans.Response = msg
Expand Down Expand Up @@ -422,6 +429,7 @@ func (dns *Dns) expireTransaction(t *DnsTransaction) {
t.Notes = append(t.Notes, NoResponse.Error())
debugf("%s %s", NoResponse.Error(), t.tuple.String())
dns.publishTransaction(t)
unmatchedRequests.Add(1)
}

// Adds the DNS message data to the supplied MapStr.
Expand Down
10 changes: 8 additions & 2 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"expvar"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -31,6 +32,10 @@ const (
stateBodyChunkedWaitFinalCRLF
)

var (
unmatchedResponses = expvar.NewInt("httpUnmatchedResponses")
)

type stream struct {
tcptuple *common.TcpTuple

Expand Down Expand Up @@ -407,7 +412,8 @@ func (http *HTTP) correlate(conn *httpConnectionData) {
// drop responses with missing requests
if conn.requests.empty() {
for !conn.responses.empty() {
logp.Warn("Response from unknown transaction. Ingoring.")
debugf("Response from unknown transaction. Ingoring.")
unmatchedResponses.Add(1)
conn.responses.pop()
}
return
Expand Down Expand Up @@ -437,7 +443,7 @@ func (http *HTTP) newTransaction(requ, resp *message) common.MapStr {

path, params, err := http.extractParameters(requ, requ.Raw)
if err != nil {
logp.Warn("http", "Fail to parse HTTP parameters: %v", err)
logp.Warn("Fail to parse HTTP parameters: %v", err)
}

src := common.Endpoint{
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/protos/icmp/icmp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package icmp

import (
"expvar"
"net"
"time"

Expand Down Expand Up @@ -49,6 +50,12 @@ const (
orphanedResponseMsg = "Response was received without an associated request."
)

var (
unmatchedRequests = expvar.NewInt("icmpUnmatchedRequests")
unmatchedResponses = expvar.NewInt("icmpUnmatchedResponses")
duplicateRequests = expvar.NewInt("icmpDuplicateRequests")
)

func New(testMode bool, results publish.Transactions, cfg *common.Config) (*Icmp, error) {
p := &Icmp{}
config := defaultConfig
Expand Down Expand Up @@ -175,6 +182,7 @@ func (icmp *Icmp) processRequest(tuple *icmpTuple, msg *icmpMessage) {
if trans != nil {
trans.Notes = append(trans.Notes, duplicateRequestMsg)
logp.Debug("icmp", duplicateRequestMsg+" %s", tuple)
duplicateRequests.Add(1)
icmp.publishTransaction(trans)
}

Expand All @@ -197,6 +205,7 @@ func (icmp *Icmp) processResponse(tuple *icmpTuple, msg *icmpMessage) {
trans = &icmpTransaction{Ts: msg.Ts, Tuple: revTuple}
trans.Notes = append(trans.Notes, orphanedResponseMsg)
logp.Debug("icmp", orphanedResponseMsg+" %s", tuple)
unmatchedResponses.Add(1)
}

trans.Response = msg
Expand Down Expand Up @@ -246,6 +255,7 @@ func (icmp *Icmp) deleteTransaction(k hashableIcmpTuple) *icmpTransaction {
func (icmp *Icmp) expireTransaction(tuple hashableIcmpTuple, trans *icmpTransaction) {
trans.Notes = append(trans.Notes, orphanedRequestMsg)
logp.Debug("icmp", orphanedRequestMsg+" %s", &trans.Tuple)
unmatchedRequests.Add(1)
icmp.publishTransaction(trans)
}

Expand Down
7 changes: 7 additions & 0 deletions packetbeat/protos/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package memcache

import (
"encoding/json"
"expvar"
"math"
"time"

Expand Down Expand Up @@ -99,6 +100,12 @@ type memcacheStat struct {

var debug = logp.MakeDebug("memcache")

var (
unmatchedRequests = expvar.NewInt("memcacheUnmatchedRequests")
unmatchedResponses = expvar.NewInt("memcacheUnmatchedResponses")
unfinishedTransactions = expvar.NewInt("memcacheUnfinishedTransaction")
)

func init() {
protos.Register("memcache", New)
}
Expand Down
3 changes: 3 additions & 0 deletions packetbeat/protos/memcache/plugin_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (mc *Memcache) correlateTCP(conn *connection) error {
note := NoteNonQuietResponseOnly
logp.Warn("%s", note)
requ.AddNotes(note)
unmatchedRequests.Add(1)
}

// send request
Expand All @@ -297,6 +298,7 @@ func (mc *Memcache) correlateTCP(conn *connection) error {
if requ == nil {
debug("found orphan memcached response=%p", resp)
resp.AddNotes(NoteTransactionNoRequ)
unmatchedResponses.Add(1)
}

debug("merge request=%p and response=%p", requ, resp)
Expand Down Expand Up @@ -406,6 +408,7 @@ func (mc *Memcache) pushAllTCPTrans(conn *connection) {
msg := conn.requests.pop()
if !msg.isQuiet && !msg.noreply {
msg.AddNotes(NoteTransUnfinished)
unfinishedTransactions.Add(1)
}
debug("push incomplete request=%p", msg)
err := mc.onTCPTrans(msg, nil)
Expand Down
10 changes: 8 additions & 2 deletions packetbeat/protos/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mongodb

import (
"expvar"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -35,6 +36,10 @@ type transactionKey struct {
id int
}

var (
unmatchedRequests = expvar.NewInt("mongodbUnmatchedRequests")
)

func init() {
protos.Register("mongodb", New)
}
Expand Down Expand Up @@ -120,7 +125,7 @@ func ensureMongodbConnection(private protos.ProtocolData) *mongodbConnectionData
return &mongodbConnectionData{}
}
if priv == nil {
logp.Warn("Unexpected: mongodb connection data not set, create new one")
debugf("Unexpected: mongodb connection data not set, create new one")
return &mongodbConnectionData{}
}

Expand Down Expand Up @@ -226,7 +231,8 @@ func (mongodb *Mongodb) onRequest(conn *mongodbConnectionData, msg *mongodbMessa
// insert into cache for correlation
old := mongodb.requests.Put(key, msg)
if old != nil {
logp.Warn("Two requests without a Response. Dropping old request")
debugf("Two requests without a Response. Dropping old request")
unmatchedRequests.Add(1)
}
}

Expand Down
15 changes: 12 additions & 3 deletions packetbeat/protos/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mysql

import (
"errors"
"expvar"
"fmt"
"strings"
"time"
Expand All @@ -22,6 +23,11 @@ const (

const MAX_PAYLOAD_SIZE = 100 * 1024

var (
unmatchedRequests = expvar.NewInt("mysqlUnmatchedRequests")
unmatchedResponses = expvar.NewInt("mysqlUnmatchedResponses")
)

type MysqlMessage struct {
start int
end int
Expand Down Expand Up @@ -261,7 +267,7 @@ func mysqlMessageParser(s *MysqlStream) (bool, bool) {

} else {
// something else, not expected
logp.Warn("Unexpected MySQL message of type %d received.", m.Typ)
logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.Typ)
return false, false
}
break
Expand Down Expand Up @@ -586,6 +592,7 @@ func (mysql *Mysql) receivedMysqlRequest(msg *MysqlMessage) {
if trans != nil {
if trans.Mysql != nil {
logp.Debug("mysql", "Two requests without a Response. Dropping old request: %s", trans.Mysql)
unmatchedRequests.Add(1)
}
} else {
trans = &MysqlTransaction{Type: "mysql", tuple: tuple}
Expand Down Expand Up @@ -635,12 +642,14 @@ func (mysql *Mysql) receivedMysqlRequest(msg *MysqlMessage) {
func (mysql *Mysql) receivedMysqlResponse(msg *MysqlMessage) {
trans := mysql.getTransaction(msg.TcpTuple.Hashable())
if trans == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
logp.Debug("mysql", "Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}
// check if the request was received
if trans.Mysql == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
logp.Debug("mysql", "Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return

}
Expand Down
6 changes: 6 additions & 0 deletions packetbeat/protos/nfs/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package nfs

import (
"expvar"
"fmt"
"time"

Expand All @@ -20,10 +21,15 @@ var ACCEPT_STATUS = [...]string{
"system_err",
}

var (
unmatchedRequests = expvar.NewInt("nfsUnmatchedRequests")
)

// called by Cache, when re reply seen within expected time window
func (rpc *Rpc) handleExpiredPacket(nfs *Nfs) {
nfs.event["status"] = "NO_REPLY"
rpc.results.PublishTransaction(nfs.event)
unmatchedRequests.Add(1)
}

// called when we process a RPC call
Expand Down
11 changes: 9 additions & 2 deletions packetbeat/protos/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pgsql

import (
"errors"
"expvar"
"strings"
"time"

Expand Down Expand Up @@ -97,6 +98,10 @@ var (
detailedf = logp.MakeDebug("pgsqldetailed")
)

var (
unmatchedResponses = expvar.NewInt("pgsqlUnmatchedResponses")
)

type Pgsql struct {

// config
Expand Down Expand Up @@ -411,7 +416,8 @@ func (pgsql *Pgsql) receivedPgsqlResponse(msg *PgsqlMessage) {
tuple := msg.TcpTuple
transList := pgsql.getTransaction(tuple.Hashable())
if transList == nil || len(transList) == 0 {
logp.Warn("Response from unknown transaction. Ignoring.")
debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}

Expand All @@ -420,7 +426,8 @@ func (pgsql *Pgsql) receivedPgsqlResponse(msg *PgsqlMessage) {

// check if the request was received
if trans.Pgsql == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}

Expand Down
8 changes: 7 additions & 1 deletion packetbeat/protos/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"bytes"
"expvar"
"time"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -47,6 +48,10 @@ var (
isDebug = false
)

var (
unmatchedResponses = expvar.NewInt("redisUnmatchedResponses")
)

func init() {
protos.Register("redis", New)
}
Expand Down Expand Up @@ -233,7 +238,8 @@ func (redis *Redis) correlate(conn *redisConnectionData) {
// drop responses with missing requests
if conn.requests.empty() {
for !conn.responses.empty() {
logp.Warn("Response from unknown transaction. Ignoring")
debugf("Response from unknown transaction. Ignoring")
unmatchedResponses.Add(1)
conn.responses.pop()
}
return
Expand Down
Loading

0 comments on commit d060c28

Please sign in to comment.