Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Throttling enabled #227

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/magneticod/dht/mainline/indexingService.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package mainline
import (
"math/rand"
"net"
"strconv"
"sync"
"time"

"go.uber.org/zap"
)

var (
StatsPrintClock = 10 * time.Second
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
)

type IndexingService struct {
// Private
protocol *Protocol
Expand Down Expand Up @@ -77,6 +82,9 @@ func (is *IndexingService) Start() {
go is.index()

zap.L().Info("Indexing Service started!")
if DefaultThrottleRate > 0 {
zap.L().Info("Throttle set to " + strconv.Itoa(DefaultThrottleRate) + " msg/s")
}
}

func (is *IndexingService) Terminate() {
Expand Down
128 changes: 128 additions & 0 deletions cmd/magneticod/dht/mainline/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package mainline
import (
"crypto/rand"
"crypto/sha1"
"github.com/boramalper/magnetico/pkg/util"
"net"
"sort"
"strconv"
"sync"
"time"

Expand All @@ -16,6 +19,8 @@ type Protocol struct {
transport *Transport
eventHandlers ProtocolEventHandlers
started bool

stats protocolStats
}

type ProtocolEventHandlers struct {
Expand All @@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
p = new(Protocol)
p.eventHandlers = eventHandlers
p.transport = NewTransport(laddr, p.onMessage, p.eventHandlers.OnCongestion)
p.stats = protocolStats{
messageTypeCount: make(map[string]map[string]int),
}

p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20)
_, err := rand.Read(p.currentTokenSecret)
Expand All @@ -56,6 +64,7 @@ func (p *Protocol) Start() {
p.started = true

p.transport.Start()
go p.printStats()
go p.updateTokenSecret()
}

Expand All @@ -67,7 +76,114 @@ func (p *Protocol) Terminate() {
p.transport.Terminate()
}

//statistics
type protocolStats struct {
sync.RWMutex
messageTypeCount map[string]map[string]int //type=>subtype=>count
}

func (ps *protocolStats) Reset() {
ps.Lock()
defer ps.Unlock()
ps.messageTypeCount = make(map[string]map[string]int)
}

type messageTypeCountOrdered struct {
messageType string
messageCount int
percentageOverTotal float64
subMessages orderedMessagesCount
}
type orderedMessagesCount []*messageTypeCountOrdered


func (omc orderedMessagesCount) Len() int {
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
return len(omc)
}
func (omc orderedMessagesCount) Swap(i, j int) {
omc[i], omc[j] = omc[j], omc[i]
}
func (omc orderedMessagesCount) Less(i, j int) bool {
return omc[i].messageCount > omc[j].messageCount
}
func (omc orderedMessagesCount) CalculatePercentagesOverTotal(totalMessages int) {
for _, mtco := range omc {
if mtco.subMessages != nil && len(mtco.subMessages) > 0 {
mtco.subMessages.CalculatePercentagesOverTotal(totalMessages)
}
mtco.percentageOverTotal = util.RoundToDecimal(
(float64(mtco.messageCount)/float64(totalMessages))*100, 2)
}
}
func (omc orderedMessagesCount) Sort() {
for _, mtco := range omc {
if mtco.subMessages != nil && len(mtco.subMessages) > 0 {
mtco.subMessages.Sort()
}
}
sort.Sort(omc)
}
func (omc orderedMessagesCount) String() string {
/*
string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
be a problem and it will be much easier to write down and read
*/
mostReceivedMessageTypes := ""
for mIdx, m := range omc {
if mIdx > 0 {
mostReceivedMessageTypes += ", "
}
mostReceivedMessageTypes += m.messageType
mostReceivedMessageTypes +=
" (" + strconv.Itoa(m.messageCount) + ", " + strconv.FormatFloat(m.percentageOverTotal, 'f', -1, 64) + "%)"

if m.subMessages != nil && len(m.subMessages) > 0 {
//add stats for submessages unless there is only 1 submessage with len 0 (empty)
if !(len(m.subMessages) == 1 && len(m.subMessages[0].messageType) == 0) {
mostReceivedMessageTypes += "[ " + m.subMessages.String() + " ]"
}
}
}
return mostReceivedMessageTypes
}
func (p *Protocol) printStats() {
for {
time.Sleep(StatsPrintClock)
p.stats.RLock()
orderedMessages := make(orderedMessagesCount, 0, len(p.stats.messageTypeCount))
totalMessages := 0
for mType, mSubTypes := range p.stats.messageTypeCount {
mCount := 0
orderedSubMessages := make(orderedMessagesCount, 0, len(mSubTypes))
for mSubType, mSubCount := range mSubTypes {
mCount += mSubCount
totalMessages += mSubCount
orderedSubMessages = append(orderedSubMessages, &messageTypeCountOrdered{
messageType: mSubType,
messageCount: mSubCount,
})
}
orderedMessages = append(orderedMessages, &messageTypeCountOrdered{
messageType: mType,
messageCount: mCount,
subMessages: orderedSubMessages,
})
}
p.stats.RUnlock()
orderedMessages.CalculatePercentagesOverTotal(totalMessages)
orderedMessages.Sort()

zap.L().Info("Protocol stats (on "+StatsPrintClock.String()+"):",
zap.String("message type", orderedMessages.String()),
)

p.stats.Reset()
}
}

func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
temporaryQ := msg.Q

switch msg.Y {
case "q":
switch msg.Q {
Expand Down Expand Up @@ -140,6 +256,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
//
// sample_infohashes > get_peers > find_node > ping / announce_peer
if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response.
temporaryQ = "sample_infohashes"
if !validateSampleInfohashesResponseMessage(msg) {
// zap.L().Debug("An invalid sample_infohashes response received!")
return
Expand All @@ -148,6 +265,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnSampleInfohashesResponse(msg, addr)
}
} else if len(msg.R.Token) != 0 { // The message should be a get_peers response.
temporaryQ = "get_peers"
if !validateGetPeersResponseMessage(msg) {
// zap.L().Debug("An invalid get_peers response received!")
return
Expand All @@ -156,6 +274,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnGetPeersResponse(msg, addr)
}
} else if len(msg.R.Nodes) != 0 { // The message should be a find_node response.
temporaryQ = "find_node"
if !validateFindNodeResponseMessage(msg) {
// zap.L().Debug("An invalid find_node response received!")
return
Expand All @@ -164,6 +283,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
p.eventHandlers.OnFindNodeResponse(msg, addr)
}
} else { // The message should be a ping or an announce_peer response.
temporaryQ = "ping_or_announce"
if !validatePingORannouncePeerResponseMessage(msg) {
// zap.L().Debug("An invalid ping OR announce_peer response received!")
return
Expand All @@ -184,6 +304,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
zap.String("type", msg.Y))
*/
}

//let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
p.stats.Lock()
if _, ok := p.stats.messageTypeCount[msg.Y]; !ok {
p.stats.messageTypeCount[msg.Y] = make(map[string]int)
}
p.stats.messageTypeCount[msg.Y][temporaryQ]++
p.stats.Unlock()
}

func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) {
Expand Down
Loading