Skip to content

Commit

Permalink
feat(telemetry)_: track message reliability
Browse files Browse the repository at this point in the history
Add metrics for dial errors, missed messages,
missed relevant messages, and confirmed delivery.
  • Loading branch information
adklempner committed Oct 4, 2024
1 parent 031b534 commit 903b6e2
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 14 deletions.
133 changes: 120 additions & 13 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ import (
"github.com/status-im/status-go/wakuv2"

wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"

v1protocol "github.com/status-im/status-go/protocol/v1"
v2common "github.com/status-im/status-go/wakuv2/common"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MaxRetryCache = 5000
ProtocolStatsMetric TelemetryType = "ProtocolStats"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
DialFailureMetric TelemetryType = "DialFailure"
MissedMessageMetric TelemetryType = "MissedMessages"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages"
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
MaxRetryCache = 5000
)

type TelemetryRequest struct {
Expand Down Expand Up @@ -103,6 +108,26 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
}
}

func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) {
var errorMessage string = ""
if dialFailure.ErrType == v2common.ErrorUnknown {
errorMessage = dialFailure.ErrMsg
}
c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols})
}

func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
}

func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) {
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
}

func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) {
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -136,6 +161,24 @@ type PeerCountByOrigin struct {
Count uint
}

type DialFailure struct {
ErrorType v2common.DialErrorType
ErrorMsg string
Protocols string
}

type MissedMessage struct {
Envelope *v2protocol.Envelope
}

type MissedRelevantMessage struct {
ReceivedMessage *v2common.ReceivedMessage
}

type MessageDeliveryConfirmed struct {
MessageHash string
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -308,6 +351,30 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
case DialFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: DialFailureMetric,
TelemetryData: c.ProcessDialFailure(v),
}
case MissedMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedMessageMetric,
TelemetryData: c.ProcessMissedMessage(v),
}
case MissedRelevantMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedRelevantMessageMetric,
TelemetryData: c.ProcessMissedRelevantMessage(v),
}
case MessageDeliveryConfirmed:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MessageDeliveryConfirmedMetric,
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -467,6 +534,46 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *
return &jsonRawMessage
}

func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage {
postBody := c.commonPostBody()
postBody["errorType"] = dialFailure.ErrorType
postBody["errorMsg"] = dialFailure.ErrorMsg
postBody["protocols"] = dialFailure.Protocols
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic()
postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String()
postBody["sentAt"] = missedMessage.ReceivedMessage.Sent
postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic
postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = messageDeliveryConfirmed.MessageHash
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
defer common.LogOnPanic()
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
Expand Down
104 changes: 104 additions & 0 deletions wakuv2/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"errors"
"fmt"
mrand "math/rand"
"regexp"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/multiformats/go-multiaddr"
)

// IsPubKeyEqual checks that two public keys are equal
Expand Down Expand Up @@ -110,3 +113,104 @@ func ValidateDataIntegrity(k []byte, expectedSize int) bool {
}
return true
}

func ParseDialErrors(errMsg string) []DialError {
// Regular expression to match the array of failed dial attempts
re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`)

match := re.FindStringSubmatch(errMsg)
if len(match) < 2 {
return nil
}

// Split the matched string into individual dial attempts
dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n")

// Regular expression to extract multiaddr and error message
reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`)

var dialErrors []DialError
for _, attempt := range dialAttempts {
attempt = strings.TrimSpace(strings.Trim(attempt, "* "))
matches := reAttempt.FindStringSubmatch(attempt)
if len(matches) == 3 {
errMsg := strings.TrimSpace(matches[2])
ma, err := multiaddr.NewMultiaddr(matches[1])
if err != nil {
continue
}
protocols := ma.Protocols()
protocolsStr := "/"
for i, protocol := range protocols {
protocolsStr += protocol.Name
if i < len(protocols)-1 {
protocolsStr += "/"
}
}
dialErrors = append(dialErrors, DialError{
Protocols: protocolsStr,
MultiAddr: matches[1],
ErrMsg: errMsg,
ErrType: CategorizeDialError(errMsg),
})
}
}

return dialErrors
}

// DialErrorType represents the type of dial error
type DialErrorType int

const (
ErrorUnknown DialErrorType = iota
ErrorIOTimeout
ErrorConnectionRefused
ErrorRelayCircuitFailed
ErrorRelayNoReservation
ErrorSecurityNegotiationFailed
ErrorConcurrentDialSucceeded
ErrorConcurrentDialFailed
)

func (det DialErrorType) String() string {
return [...]string{
"Unknown",
"I/O Timeout",
"Connection Refused",
"Relay Circuit Failed",
"Relay No Reservation",
"Security Negotiation Failed",
"Concurrent Dial Succeeded",
"Concurrent Dial Failed",
}[det]
}

func CategorizeDialError(errMsg string) DialErrorType {
switch {
case strings.Contains(errMsg, "i/o timeout"):
return ErrorIOTimeout
case strings.Contains(errMsg, "connect: connection refused"):
return ErrorConnectionRefused
case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"):
return ErrorRelayCircuitFailed
case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"):
return ErrorRelayNoReservation
case strings.Contains(errMsg, "failed to negotiate security protocol"):
return ErrorSecurityNegotiationFailed
case strings.Contains(errMsg, "concurrent active dial succeeded"):
return ErrorConcurrentDialSucceeded
case strings.Contains(errMsg, "concurrent active dial through the same relay failed"):
return ErrorConcurrentDialFailed
default:
return ErrorUnknown
}
}

// DialError represents a single dial error with its multiaddr and error message
type DialError struct {
MultiAddr string
ErrMsg string
ErrType DialErrorType
Protocols string
}
31 changes: 30 additions & 1 deletion wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type ITelemetryClient interface {
PushMessageCheckFailure(ctx context.Context, messageHash string)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
PushDialFailure(ctx context.Context, dialFailure common.DialError)
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
PushMessageDeliveryConfirmed(ctx context.Context, messageHash string)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -990,6 +994,11 @@ func (w *Waku) SkipPublishToTopic(value bool) {

func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
w.messageSender.MessagesDelivered(hashes)
if w.statusTelemetryClient != nil {
for _, hash := range hashes {
w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, hash.String())
}
}
}

func (w *Waku) SetStorePeerID(peerID peer.ID) {
Expand Down Expand Up @@ -1113,12 +1122,24 @@ func (w *Waku) Start() error {
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
defer peerTelemetryTicker.Stop()

sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
if err != nil {
w.logger.Error("failed to subscribe to dial errors", zap.Error(err))
return
}
defer sub.Close()

for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
case dialErr := <-sub.Out():
errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error())
for _, dialError := range errors {
w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols})
}
}
}
}()
Expand All @@ -1133,7 +1154,6 @@ func (w *Waku) Start() error {
go w.runPeerExchangeLoop()

if w.cfg.EnableMissingMessageVerification {

w.missingMsgVerifier = missing.NewMissingMessageVerifier(
w.node.Store(),
w,
Expand Down Expand Up @@ -1411,6 +1431,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
return nil
}

if w.statusTelemetryClient != nil {
if msgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope)
}
}

logger := w.logger.With(
zap.String("messageType", msgType),
zap.Stringer("envelopeHash", envelope.Hash()),
Expand Down Expand Up @@ -1529,6 +1555,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
w.storeMsgIDsMu.Unlock()
} else {
logger.Debug("filters did match")
if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e)
}
e.Processed.Store(true)
}

Expand Down

0 comments on commit 903b6e2

Please sign in to comment.