Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FR: QOS Ack Implementation #236

Merged
merged 17 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ coverage.txt
#Goland
.idea/

# VSCode
*.code-workspace
.vscode/*

# helm
deploy/k8s/talaria/rendered.*

Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- https://nvd.nist.gov/vuln/detail/CVE-2021-31525
- https://nvd.nist.gov/vuln/detail/CVE-2021-44716
- Introduces new vuln https://www.mend.io/vulnerability-database/CVE-2022-29526

- QOS Ack implementation [#228](https://github.com/xmidt-org/talaria/pull/228) [#236](https://github.com/xmidt-org/talaria/pull/236)

## [v0.6.5]
- Bumped webpa-common to v2.0.6, fixing panic on send endpoint. [#229](https://github.com/xmidt-org/talaria/pull/229)
Expand Down
175 changes: 175 additions & 0 deletions ackDispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright 2017 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main

import (
"context"
"os"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/webpa-common/v2/device"
"github.com/xmidt-org/webpa-common/v2/logging"
"github.com/xmidt-org/wrp-go/v3"
)

// Default values
const (
unknownHostname = "unknown"
)

// ackDispatcher is an internal Dispatcher implementation that processes outbound events
// and determines whether or not an ack to the source device is required.
type ackDispatcher struct {
hostname string
errorLog log.Logger
timeout time.Duration
AckSuccess metrics.Counter
AckFailure metrics.Counter
AckSuccessLatency metrics.Histogram
AckFailureLatency metrics.Histogram
}

// NewAckDispatcher is an ackDispatcher factory which processes outbound events
// and determines whether or not an ack to the source device is required.
func NewAckDispatcher(om OutboundMeasures, o *Outbounder) (Dispatcher, error) {
l := logging.Error(o.logger())
n, err := os.Hostname()
if err != nil {
l.Log(logging.MessageKey(), "Error fetching hostname", logging.ErrorKey(), err)
n = unknownHostname
}

return &ackDispatcher{
hostname: n,
errorLog: l,
timeout: o.requestTimeout(),
AckSuccess: om.AckSuccess,
AckFailure: om.AckFailure,
AckSuccessLatency: om.AckSuccessLatency,
AckFailureLatency: om.AckFailureLatency,
}, nil
}

// OnDeviceEvent is the device.Listener function that processes outbound events
// and determines whether or not an ack to the source device is required.
func (d *ackDispatcher) OnDeviceEvent(event *device.Event) {
var r *device.Request

if event == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil event")
return
} else if event.Device == nil {
d.errorLog.Log(logging.MessageKey(), "Error nil device")
return
}

dm := event.Device.Metadata()
m, ok := event.Message.(*wrp.Message)
if !ok {
return
}

// rdr of 0 is success https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
// Atm, there doesn't exist any conditions that'll cause a request delivery response to be a nonzero
var rdr int64 = 0
// Verify ack conditions are met for a given event
switch event.Type {
// Atm, we're only supporting acks for MessageReceived events
case device.MessageReceived:
// Verify ack conditions are met for a given message and their type
switch m.Type {
// Atm, we're only supporting acks for SimpleEventMessageTypes QOS
case wrp.SimpleEventMessageType:
// Atm, we're only supporting acks for QOS
if !m.IsQOSAckPart() {
return
}

// https://xmidt.io/docs/wrp/simple-messages/#qos-details
r = &device.Request{
Message: &wrp.Message{
// When a qos field is specified that requires an ack, the response ack message SHALL be a msg_type=4.
Type: wrp.SimpleEventMessageType,
// The `source` SHALL be the component that cannot process the event further.
Source: d.hostname,
// The `dest` SHALL be the original requesting `source` address.
Destination: m.Source,
// The `content_type` and `payload` SHALL be omitted & set to empty, or may set to `application/text` and text to help describe the result. **DO NOT** process this text beyond for logging/debugging.
ContentType: "",
Payload: []byte{},
// The `partner_ids` SHALL be the same as the original message.
PartnerIDs: m.PartnerIDs,
// The `headers` SHOULD generally be the same as the original message, except where updating their values is correct.
Headers: m.Headers,
// The `metadata` map SHALL be populated with the original data or set to empty.
Metadata: m.Metadata,
// The `session_id` MAY be added by the cloud.
SessionID: dm.SessionID(),
// The `qos` SHALL be the same as the original message.
QualityOfService: m.QualityOfService,
// The `transaction_uuid` SHALL be the same as the original message.
TransactionUUID: m.TransactionUUID,
// The `rdr` SHALL be present and represent the outcome of the handling of the message.
RequestDeliveryResponse: &rdr,
},
Format: event.Format,
}

default:
return
}

default:
return
}

l := m.QualityOfService.Level()
p := dm.PartnerIDClaim()
t := m.Type.FriendlyName()
// Metric labels
ls := []string{qosLevelLabel, l.String(), partnerIDLabel, p, messageType, t}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()

// Observe the latency of sending an ack to the source device
ackFailure := false
defer func(s time.Time) {
d.recordAckLatency(s, ackFailure, ls...)
}(time.Now())

if _, err := event.Device.Send(r.WithContext(ctx)); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching QOS ack", "qosLevel", l, "partnerID", p, "messageType", t, logging.ErrorKey(), err)
d.AckFailure.With(ls...).Add(1)
ackFailure = true
return
}

d.AckSuccess.With(ls...).Add(1)
}

// recordAckLatency records the latency for both successful and failed acks
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l ...string) {
switch {
case f:
d.AckFailureLatency.With(l...).Observe(time.Since(s).Seconds())

default:
d.AckSuccessLatency.With(l...).Observe(time.Since(s).Seconds())
}
}
Loading