Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Throttling enabled #227

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/magneticod/dht/mainline/indexingService.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mainline
import (
"math/rand"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -78,6 +79,9 @@ func (is *IndexingService) Start() {
go is.index()

zap.L().Info("Indexing Service started!")
if DefaultThrottleRate > 0{
zap.L().Info("Throttle set to "+strconv.Itoa(DefaultThrottleRate)+" msg/s")
}
}

func (is *IndexingService) Terminate() {
Expand Down
62 changes: 62 additions & 0 deletions cmd/magneticod/dht/mainline/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package mainline

import (
"net"
"time"

"github.com/anacrolix/torrent/bencode"
sockaddr "github.com/libp2p/go-sockaddr/net"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)

var(
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
DefaultThrottleRate = -1 // <= 0 for unlimited requests
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
)

type Transport struct {
fd int
laddr *net.UDPAddr
Expand All @@ -21,6 +26,9 @@ type Transport struct {
onMessage func(*Message, *net.UDPAddr)
// OnCongestion
onCongestion func()

throttlingRate int //available messages per second. If <=0, it is considered disabled
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
throttleTicketsChannel chan struct{} //channel giving tickets (allowance) to make send a message
}

func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport {
Expand All @@ -39,6 +47,8 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges
t.buffer = make([]byte, 65507)
t.onMessage = onMessage
t.onCongestion = onCongestion
t.throttleTicketsChannel = make(chan struct{})
t.throttlingRate = DefaultThrottleRate

var err error
t.laddr, err = net.ResolveUDPAddr("udp", laddr)
Expand All @@ -52,6 +62,10 @@ func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onConges
return t
}

func (t *Transport) SetThrottle(rate int){
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
t.throttlingRate = rate
}

func (t *Transport) Start() {
// Why check whether the Transport `t` started or not, here and not -for instance- in
// t.Terminate()?
Expand Down Expand Up @@ -80,6 +94,7 @@ func (t *Transport) Start() {
}

go t.readMessages()
go t.Throttle()
}

func (t *Transport) Terminate() {
Expand Down Expand Up @@ -121,7 +136,54 @@ func (t *Transport) readMessages() {
}
}

func (t *Transport) Throttle(){
if t.throttlingRate > 0{
resetChannel := make(chan struct{})

dealer := func(resetRequest chan struct{}){
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
ticketGiven := 0
tooManyTicketGiven := false
for{
select{
case <- t.throttleTicketsChannel: {
ticketGiven++
if ticketGiven >= t.throttlingRate{
tooManyTicketGiven = true
break
}
}
case <- resetRequest: {
return
}
}

if tooManyTicketGiven{break}
}

<- resetRequest
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
return

}

go dealer(resetChannel)
for range time.Tick(1*time.Second){
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
resetChannel <- struct{}{}

go dealer(resetChannel)
}

}else{
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
//no limit, keep giving tickets to whoever requests it
for{
lruggieri marked this conversation as resolved.
Show resolved Hide resolved
<-t.throttleTicketsChannel
}
}
}

func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) {
//get ticket
t.throttleTicketsChannel <- struct{}{}

data, err := bencode.Marshal(msg)
if err != nil {
zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)")
Expand Down
4 changes: 4 additions & 0 deletions cmd/magneticod/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
"math/rand"
"net"
"os"
Expand Down Expand Up @@ -141,6 +142,7 @@ func parseFlags() (*opFlags, error) {
IndexerMaxNeighbors uint `long:"indexer-max-neighbors" description:"Maximum number of neighbors of an indexer." default:"10000"`

LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"`
MaxThrottle uint `long:"max-throttle" description:"Maximum requests per second." default:"0"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably called this variable more precisely. Probably MaxRps and --max-rps or something more self-describing as a parameter name.


Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory"`
Expand Down Expand Up @@ -183,6 +185,8 @@ func parseFlags() (*opFlags, error) {
)
}

mainline.DefaultThrottleRate = int(cmdF.MaxThrottle)

opF.Verbosity = len(cmdF.Verbose)

opF.Profile = cmdF.Profile
Expand Down