Skip to content

Commit

Permalink
Merge pull request #233 from Daniel1984/feature/rate-limit
Browse files Browse the repository at this point in the history
ability to instantly subscripbe to 20 channels and rate limit afterwa…
  • Loading branch information
robertkowalski authored Jun 8, 2021
2 parents b4f52b0 + 22a1b9b commit 9e0b26f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
3.0.5
- Features
- rate limit to avoid 429 HTTP status codes when subscribing too often
- Fixes
- auth channel payload event name to avoid invalid channel exception

3.0.4
- Adds new rest v2 functions
- tickers/hist
Expand Down
8 changes: 8 additions & 0 deletions examples/ws/public-feed-ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,32 @@ func main() {

for _, pair := range pairs {
tradePld := event.Subscribe{
Event: "subscribe",
Channel: "trades",
Symbol: "t" + pair,
}

tickPld := event.Subscribe{
Event: "subscribe",
Channel: "ticker",
Symbol: "t" + pair,
}

candlesPld := event.Subscribe{
Event: "subscribe",
Channel: "candles",
Key: "trade:1m:t" + pair,
}

rawBookPld := event.Subscribe{
Event: "subscribe",
Channel: "book",
Precision: "R0",
Symbol: "t" + pair,
}

bookPld := event.Subscribe{
Event: "subscribe",
Channel: "book",
Precision: "P0",
Frequency: "F0",
Expand All @@ -67,16 +72,19 @@ func main() {
}

derivStatusPld := event.Subscribe{
Event: "subscribe",
Channel: "status",
Key: "deriv:tBTCF0:USTF0",
}

liqStatusPld := event.Subscribe{
Event: "subscribe",
Channel: "status",
Key: "liq:global",
}

fundingPairTrade := event.Subscribe{
Event: "subscribe",
Channel: "trades",
Symbol: "fUSD",
}
Expand Down
1 change: 0 additions & 1 deletion pkg/mux/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (c *Client) Private(key, sec, url string, dms int) (*Client, error) {
// Subscribe takes subscription payload as per docs and subscribes client to it.
// We keep track of subscriptions so that when client failes, we can resubscribe.
func (c *Client) Subscribe(sub event.Subscribe) error {
sub.Event = "subscribe"
if err := c.Send(sub); err != nil {
return err
}
Expand Down
87 changes: 59 additions & 28 deletions pkg/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"sync"
"time"

"github.com/bitfinexcom/bitfinex-api-go/pkg/models/event"
"github.com/bitfinexcom/bitfinex-api-go/pkg/mux/client"
Expand All @@ -16,25 +17,32 @@ import (
// to all incomming client messages and reconnect client with all its subscriptions
// in case of a failure
type Mux struct {
cid int
dms int
publicChan chan msg.Msg
publicClients map[int]*client.Client
privateChan chan msg.Msg
closeChan chan bool
privateClient *client.Client
mtx *sync.RWMutex
Err error
transform bool
apikey string
apisec string
subInfo map[int64]event.Info
authenticated bool
publicURL string
authURL string
online bool
cid int
dms int
publicChan chan msg.Msg
publicClients map[int]*client.Client
privateChan chan msg.Msg
closeChan chan bool
privateClient *client.Client
mtx *sync.RWMutex
Err error
transform bool
apikey string
apisec string
subInfo map[int64]event.Info
authenticated bool
publicURL string
authURL string
online bool
rateLimitQueueSize int
}

// api rate limit is 20 calls per minute. 1x3s, 20x1min
const (
rateLimitDuration = 3 * time.Second
maxRateLimitQueueSize = 20
)

// New returns pointer to instance of mux
func New() *Mux {
return &Mux{
Expand Down Expand Up @@ -68,7 +76,7 @@ func (m *Mux) WithDeadManSwitch() *Mux {
return m
}

// WithAPISEC accepts and persists api sec
// WithAPISEC accepts and persists api secret
func (m *Mux) WithAPISEC(sec string) *Mux {
m.apisec = sec
return m
Expand All @@ -95,17 +103,23 @@ func (m *Mux) Close() bool {
return true
}

// Subscribe - given the details in form of event.Subscribe,
// subscribes client to public channels
// Subscribe - given the details in form of event.Subscribe, subscribes client to public
// channels. If rate limit is reached, calls itself recursively after 1s with same params
func (m *Mux) Subscribe(sub event.Subscribe) *Mux {
if m.Err != nil {
return m
}

m.mtx.Lock()
defer m.mtx.Unlock()
// if limit is reached, wait 1 second and recuresively
// call Subscribe again with same subscription details
if m.rateLimitQueueSize == maxRateLimitQueueSize {
time.Sleep(1 * time.Second)
return m.Subscribe(sub)
}

if subscribed := m.publicClients[m.cid].SubAdded(sub); subscribed {
m.mtx.RLock()
defer m.mtx.RUnlock()
if m.publicClients[m.cid].SubAdded(sub) {
return m
}

Expand All @@ -117,6 +131,8 @@ func (m *Mux) Subscribe(sub event.Subscribe) *Mux {
log.Printf("subs limit is reached on cid: %d, spawning new conn\n", m.cid)
m.addPublicClient()
}

m.rateLimitQueueSize++
return m
}

Expand All @@ -126,7 +142,9 @@ func (m *Mux) Start() *Mux {
m.addPrivateClient()
}

return m.addPublicClient()
m.watchRateLimit()
m.addPublicClient()
return m
}

// Listen accepts a callback func that will get called each time mux
Expand All @@ -138,12 +156,11 @@ func (m *Mux) Listen(cb func(interface{}, error)) error {
}

m.online = true

for {
select {
case ms, ok := <-m.publicChan:
if !ok {
return errors.New("channel has closed unexpectedly")
return errors.New("public channel has closed unexpectedly")
}
if ms.Err != nil {
cb(nil, fmt.Errorf("conn:%d has failed | err:%s | reconnecting", ms.CID, ms.Err))
Expand Down Expand Up @@ -179,7 +196,7 @@ func (m *Mux) Listen(cb func(interface{}, error)) error {
cb(nil, fmt.Errorf("unrecognized msg signature: %s", ms.Data))
case ms, ok := <-m.privateChan:
if !ok {
return errors.New("channel has closed unexpectedly")
return errors.New("private channel has closed unexpectedly")
}
if ms.Err != nil {
cb(nil, fmt.Errorf("err: %s | reconnecting", ms.Err))
Expand Down Expand Up @@ -286,7 +303,7 @@ func (m *Mux) addPublicClient() *Mux {
c, err := client.
New().
WithID(m.cid).
WithSubsLimit(20).
WithSubsLimit(30).
Public(m.publicURL)
if err != nil {
m.Err = err
Expand All @@ -311,3 +328,17 @@ func (m *Mux) addPrivateClient() *Mux {
go c.Read(m.privateChan)
return m
}

// watchRateLimit will run once every rateLimitDuration
// and free up the queue
func (m *Mux) watchRateLimit() {
go func() {
for {
if m.rateLimitQueueSize > 0 {
m.rateLimitQueueSize--
}

time.Sleep(rateLimitDuration)
}
}()
}

0 comments on commit 9e0b26f

Please sign in to comment.