From 903b6e2cbdd9b7d85c04201b17dfd11adaee986c Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 19 Sep 2024 16:58:18 -0700 Subject: [PATCH] feat(telemetry)_: track message reliability Add metrics for dial errors, missed messages, missed relevant messages, and confirmed delivery. --- telemetry/client.go | 133 +++++++++++++++++++++++++++++++++++---- wakuv2/common/helpers.go | 104 ++++++++++++++++++++++++++++++ wakuv2/waku.go | 31 ++++++++- 3 files changed, 254 insertions(+), 14 deletions(-) diff --git a/telemetry/client.go b/telemetry/client.go index 5c21177089..3b118cc40b 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -18,26 +18,31 @@ import ( "github.com/status-im/status-go/wakuv2" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" v1protocol "github.com/status-im/status-go/protocol/v1" + v2common "github.com/status-im/status-go/wakuv2/common" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) type TelemetryType string const ( - ProtocolStatsMetric TelemetryType = "ProtocolStats" - SentEnvelopeMetric TelemetryType = "SentEnvelope" - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - PeerCountMetric TelemetryType = "PeerCount" - PeerConnFailuresMetric TelemetryType = "PeerConnFailure" - MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" - MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" - PeerCountByShardMetric TelemetryType = "PeerCountByShard" - PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" - MaxRetryCache = 5000 + ProtocolStatsMetric TelemetryType = "ProtocolStats" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailuresMetric TelemetryType = "PeerConnFailure" + MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" + MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" + PeerCountByShardMetric TelemetryType = "PeerCountByShard" + PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + DialFailureMetric TelemetryType = "DialFailure" + MissedMessageMetric TelemetryType = "MissedMessages" + MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages" + MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" + MaxRetryCache = 5000 ) type TelemetryRequest struct { @@ -103,6 +108,26 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma } } +func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) { + var errorMessage string = "" + if dialFailure.ErrType == v2common.ErrorUnknown { + errorMessage = dialFailure.ErrMsg + } + c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) +} + +func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { + c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope}) +} + +func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) { + c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) +} + +func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) { + c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -136,6 +161,24 @@ type PeerCountByOrigin struct { Count uint } +type DialFailure struct { + ErrorType v2common.DialErrorType + ErrorMsg string + Protocols string +} + +type MissedMessage struct { + Envelope *v2protocol.Envelope +} + +type MissedRelevantMessage struct { + ReceivedMessage *v2common.ReceivedMessage +} + +type MessageDeliveryConfirmed struct { + MessageHash string +} + type Client struct { serverURL string httpClient *http.Client @@ -308,6 +351,30 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: PeerCountByOriginMetric, TelemetryData: c.ProcessPeerCountByOrigin(v), } + case DialFailure: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: DialFailureMetric, + TelemetryData: c.ProcessDialFailure(v), + } + case MissedMessage: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MissedMessageMetric, + TelemetryData: c.ProcessMissedMessage(v), + } + case MissedRelevantMessage: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MissedRelevantMessageMetric, + TelemetryData: c.ProcessMissedRelevantMessage(v), + } + case MessageDeliveryConfirmed: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MessageDeliveryConfirmedMetric, + TelemetryData: c.ProcessMessageDeliveryConfirmed(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -467,6 +534,46 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) * return &jsonRawMessage } +func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage { + postBody := c.commonPostBody() + postBody["errorType"] = dialFailure.ErrorType + postBody["errorMsg"] = dialFailure.ErrorMsg + postBody["protocols"] = dialFailure.Protocols + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageHash"] = missedMessage.Envelope.Hash().String() + postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic() + postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String() + postBody["sentAt"] = missedMessage.ReceivedMessage.Sent + postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic + postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageHash"] = messageDeliveryConfirmed.MessageHash + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { defer common.LogOnPanic() c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) diff --git a/wakuv2/common/helpers.go b/wakuv2/common/helpers.go index 887eeab9ed..9a37d2f05b 100644 --- a/wakuv2/common/helpers.go +++ b/wakuv2/common/helpers.go @@ -6,8 +6,11 @@ import ( "errors" "fmt" mrand "math/rand" + "regexp" + "strings" "github.com/ethereum/go-ethereum/common" + "github.com/multiformats/go-multiaddr" ) // IsPubKeyEqual checks that two public keys are equal @@ -110,3 +113,104 @@ func ValidateDataIntegrity(k []byte, expectedSize int) bool { } return true } + +func ParseDialErrors(errMsg string) []DialError { + // Regular expression to match the array of failed dial attempts + re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`) + + match := re.FindStringSubmatch(errMsg) + if len(match) < 2 { + return nil + } + + // Split the matched string into individual dial attempts + dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n") + + // Regular expression to extract multiaddr and error message + reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`) + + var dialErrors []DialError + for _, attempt := range dialAttempts { + attempt = strings.TrimSpace(strings.Trim(attempt, "* ")) + matches := reAttempt.FindStringSubmatch(attempt) + if len(matches) == 3 { + errMsg := strings.TrimSpace(matches[2]) + ma, err := multiaddr.NewMultiaddr(matches[1]) + if err != nil { + continue + } + protocols := ma.Protocols() + protocolsStr := "/" + for i, protocol := range protocols { + protocolsStr += protocol.Name + if i < len(protocols)-1 { + protocolsStr += "/" + } + } + dialErrors = append(dialErrors, DialError{ + Protocols: protocolsStr, + MultiAddr: matches[1], + ErrMsg: errMsg, + ErrType: CategorizeDialError(errMsg), + }) + } + } + + return dialErrors +} + +// DialErrorType represents the type of dial error +type DialErrorType int + +const ( + ErrorUnknown DialErrorType = iota + ErrorIOTimeout + ErrorConnectionRefused + ErrorRelayCircuitFailed + ErrorRelayNoReservation + ErrorSecurityNegotiationFailed + ErrorConcurrentDialSucceeded + ErrorConcurrentDialFailed +) + +func (det DialErrorType) String() string { + return [...]string{ + "Unknown", + "I/O Timeout", + "Connection Refused", + "Relay Circuit Failed", + "Relay No Reservation", + "Security Negotiation Failed", + "Concurrent Dial Succeeded", + "Concurrent Dial Failed", + }[det] +} + +func CategorizeDialError(errMsg string) DialErrorType { + switch { + case strings.Contains(errMsg, "i/o timeout"): + return ErrorIOTimeout + case strings.Contains(errMsg, "connect: connection refused"): + return ErrorConnectionRefused + case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"): + return ErrorRelayCircuitFailed + case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"): + return ErrorRelayNoReservation + case strings.Contains(errMsg, "failed to negotiate security protocol"): + return ErrorSecurityNegotiationFailed + case strings.Contains(errMsg, "concurrent active dial succeeded"): + return ErrorConcurrentDialSucceeded + case strings.Contains(errMsg, "concurrent active dial through the same relay failed"): + return ErrorConcurrentDialFailed + default: + return ErrorUnknown + } +} + +// DialError represents a single dial error with its multiaddr and error message +type DialError struct { + MultiAddr string + ErrMsg string + ErrType DialErrorType + Protocols string +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index c61acd14fb..ff9e6ef65b 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -113,6 +113,10 @@ type ITelemetryClient interface { PushMessageCheckFailure(ctx context.Context, messageHash string) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) + PushDialFailure(ctx context.Context, dialFailure common.DialError) + PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) + PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) + PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) } // Waku represents a dark communication interface through the Ethereum @@ -990,6 +994,11 @@ func (w *Waku) SkipPublishToTopic(value bool) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { w.messageSender.MessagesDelivered(hashes) + if w.statusTelemetryClient != nil { + for _, hash := range hashes { + w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, hash.String()) + } + } } func (w *Waku) SetStorePeerID(peerID peer.ID) { @@ -1113,12 +1122,24 @@ func (w *Waku) Start() error { peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval) defer peerTelemetryTicker.Stop() + sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError)) + if err != nil { + w.logger.Error("failed to subscribe to dial errors", zap.Error(err)) + return + } + defer sub.Close() + for { select { case <-w.ctx.Done(): return case <-peerTelemetryTicker.C: w.reportPeerMetrics() + case dialErr := <-sub.Out(): + errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error()) + for _, dialError := range errors { + w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols}) + } } } }() @@ -1133,7 +1154,6 @@ func (w *Waku) Start() error { go w.runPeerExchangeLoop() if w.cfg.EnableMissingMessageVerification { - w.missingMsgVerifier = missing.NewMissingMessageVerifier( w.node.Store(), w, @@ -1411,6 +1431,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } + if w.statusTelemetryClient != nil { + if msgType == common.MissingMessageType { + w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope) + } + } + logger := w.logger.With( zap.String("messageType", msgType), zap.Stringer("envelopeHash", envelope.Hash()), @@ -1529,6 +1555,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } else { logger.Debug("filters did match") + if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType { + w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e) + } e.Processed.Store(true) }