Skip to content

Commit

Permalink
Merge pull request #236 from xmidt-org/denopink/feature/qos-ack-imple…
Browse files Browse the repository at this point in the history
…mentation

FR: QOS Ack Implementation
  • Loading branch information
denopink authored Aug 25, 2022
2 parents 38e5c97 + d29e0df commit 7156a66
Show file tree
Hide file tree
Showing 14 changed files with 1,268 additions and 303 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ coverage.txt
#Goland
.idea/

# VSCode
*.code-workspace
.vscode/*

# helm
deploy/k8s/talaria/rendered.*

Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- https://nvd.nist.gov/vuln/detail/CVE-2021-31525
- https://nvd.nist.gov/vuln/detail/CVE-2021-44716
- Introduces new vuln https://www.mend.io/vulnerability-database/CVE-2022-29526

- QOS Ack implementation [#228](https://github.com/xmidt-org/talaria/pull/228) [#236](https://github.com/xmidt-org/talaria/pull/236)

## [v0.6.5]
- Bumped webpa-common to v2.0.6, fixing panic on send endpoint. [#229](https://github.com/xmidt-org/talaria/pull/229)
Expand Down
175 changes: 175 additions & 0 deletions ackDispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright 2017 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main

import (
"context"
"os"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/webpa-common/v2/device"
"github.com/xmidt-org/webpa-common/v2/logging"
"github.com/xmidt-org/wrp-go/v3"
)

// Default values
const (
unknownHostname = "unknown"
)

// ackDispatcher is an internal Dispatcher implementation that processes outbound events
// and determines whether or not an ack to the source device is required.
type ackDispatcher struct {
hostname string
errorLog log.Logger
timeout time.Duration
AckSuccess metrics.Counter
AckFailure metrics.Counter
AckSuccessLatency metrics.Histogram
AckFailureLatency metrics.Histogram
}

// NewAckDispatcher is an ackDispatcher factory which processes outbound events
// and determines whether or not an ack to the source device is required.
func NewAckDispatcher(om OutboundMeasures, o *Outbounder) (Dispatcher, error) {
l := logging.Error(o.logger())
n, err := os.Hostname()
if err != nil {
l.Log(logging.MessageKey(), "Error fetching hostname", logging.ErrorKey(), err)
n = unknownHostname
}

return &ackDispatcher{
hostname: n,
errorLog: l,
timeout: o.requestTimeout(),
AckSuccess: om.AckSuccess,
AckFailure: om.AckFailure,
AckSuccessLatency: om.AckSuccessLatency,
AckFailureLatency: om.AckFailureLatency,
}, nil
}

// OnDeviceEvent is the device.Listener function that processes outbound events
// and determines whether or not an ack to the source device is required.
func (d *ackDispatcher) OnDeviceEvent(event *device.Event) {
var r *device.Request

if event == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil event")
return
} else if event.Device == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil device")
return
}

dm := event.Device.Metadata()
m, ok := event.Message.(*wrp.Message)
if !ok {
return
}

// rdr of 0 is success https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
// Atm, there doesn't exist any conditions that'll cause a request delivery response to be a nonzero
var rdr int64 = 0
// Verify ack conditions are met for a given event
switch event.Type {
// Atm, we're only supporting acks for MessageReceived events
case device.MessageReceived:
// Verify ack conditions are met for a given message and their type
switch m.Type {
// Atm, we're only supporting acks for SimpleEventMessageTypes QOS
case wrp.SimpleEventMessageType:
// Atm, we're only supporting acks for QOS
if !m.IsQOSAckPart() {
return
}

// https://xmidt.io/docs/wrp/simple-messages/#qos-details
r = &device.Request{
Message: &wrp.Message{
// When a qos field is specified that requires an ack, the response ack message SHALL be a msg_type=4.
Type: wrp.SimpleEventMessageType,
// The `source` SHALL be the component that cannot process the event further.
Source: d.hostname,
// The `dest` SHALL be the original requesting `source` address.
Destination: m.Source,
// The `content_type` and `payload` SHALL be omitted & set to empty, or may set to `application/text` and text to help describe the result. **DO NOT** process this text beyond for logging/debugging.
ContentType: "",
Payload: []byte{},
// The `partner_ids` SHALL be the same as the original message.
PartnerIDs: m.PartnerIDs,
// The `headers` SHOULD generally be the same as the original message, except where updating their values is correct.
Headers: m.Headers,
// The `metadata` map SHALL be populated with the original data or set to empty.
Metadata: m.Metadata,
// The `session_id` MAY be added by the cloud.
SessionID: dm.SessionID(),
// The `qos` SHALL be the same as the original message.
QualityOfService: m.QualityOfService,
// The `transaction_uuid` SHALL be the same as the original message.
TransactionUUID: m.TransactionUUID,
// The `rdr` SHALL be present and represent the outcome of the handling of the message.
RequestDeliveryResponse: &rdr,
},
Format: event.Format,
}

default:
return
}

default:
return
}

l := m.QualityOfService.Level()
p := dm.PartnerIDClaim()
t := m.Type.FriendlyName()
// Metric labels
ls := []string{qosLevelLabel, l.String(), partnerIDLabel, p, messageType, t}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()

// Observe the latency of sending an ack to the source device
ackFailure := false
defer func(s time.Time) {
d.recordAckLatency(s, ackFailure, ls...)
}(time.Now())

if _, err := event.Device.Send(r.WithContext(ctx)); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching QOS ack", "qosLevel", l, "partnerID", p, "messageType", t, logging.ErrorKey(), err)
d.AckFailure.With(ls...).Add(1)
ackFailure = true
return
}

d.AckSuccess.With(ls...).Add(1)
}

// recordAckLatency records the latency for both successful and failed acks
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l ...string) {
switch {
case f:
d.AckFailureLatency.With(l...).Observe(time.Since(s).Seconds())

default:
d.AckSuccessLatency.With(l...).Observe(time.Since(s).Seconds())
}
}
Loading

0 comments on commit 7156a66

Please sign in to comment.