Skip to content

Commit

Permalink
More review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Sep 12, 2023
1 parent 66468e5 commit 42dce09
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 90 deletions.
133 changes: 83 additions & 50 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ type binance struct {
tradeIDNonce atomic.Uint32
tradeIDNoncePrefix dex.Bytes

markets atomic.Value // map[string]*market
markets atomic.Value // map[string]*bnMarket
// tokenIDs maps the token's symbol to the list of bip ids of the token
// for each chain for which deposits and withdrawals are enabled on
// binance.
tokenIDs atomic.Value // map[string][]uint32

balanceMtx sync.RWMutex
balances map[string]*bncBalance
Expand Down Expand Up @@ -180,7 +184,8 @@ func newBinance(apiKey, secretKey string, log dex.Logger, net dex.Network, binan
tradeIDNoncePrefix: encode.RandomBytes(10),
}

bnc.markets.Store(make(map[string]symbol))
bnc.markets.Store(make(map[string]*bnMarket))
bnc.tokenIDs.Store(make(map[string][]uint32))

return bnc
}
Expand All @@ -205,19 +210,62 @@ func (bnc *binance) getCoinInfo(ctx context.Context) error {
}

bnc.updateBalances(coins)

tokenIDs := make(map[string][]uint32)
for _, nfo := range coins {
tokenSymbol := strings.ToLower(nfo.Coin)
chainIDs, isToken := dex.TokenChains[tokenSymbol]
if !isToken {
continue
}
isSupportedChain := func(assetID uint32) (uint32, bool) {
for _, chainID := range chainIDs {
if chainID[1] == assetID {
return chainID[0], true
}
}
return 0, false
}
for _, netInfo := range nfo.NetworkList {
chainSymbol := strings.ToLower(netInfo.Network)
chainID, found := dex.BipSymbolID(chainSymbol)
if !found {
continue
}
if !netInfo.WithdrawEnable || !netInfo.DepositEnable {
bnc.log.Tracef("Skipping %s network %s because deposits and/or withdraws are not enabled.", tokenSymbol, chainSymbol)
continue
}
if tokenBipId, supported := isSupportedChain(chainID); supported {
tokenIDs[tokenSymbol] = append(tokenIDs[tokenSymbol], tokenBipId)
}
}
}
bnc.tokenIDs.Store(tokenIDs)

return nil
}

func (bnc *binance) getMarkets(ctx context.Context) error {
exchangeInfo := &exchangeInfo{}
err := bnc.getAPI(ctx, "/api/v3/exchangeInfo", nil, false, false, exchangeInfo)
var exchangeInfo struct {
Timezone string `json:"timezone"`
ServerTime int64 `json:"serverTime"`
RateLimits []struct {
RateLimitType string `json:"rateLimitType"`
Interval string `json:"interval"`
IntervalNum int64 `json:"intervalNum"`
Limit int64 `json:"limit"`
} `json:"rateLimits"`
Symbols []*bnMarket `json:"symbols"`
}
err := bnc.getAPI(ctx, "/api/v3/exchangeInfo", nil, false, false, &exchangeInfo)
if err != nil {
return fmt.Errorf("error getting markets from Binance: %w", err)
}

marketsMap := make(map[string]symbol, len(exchangeInfo.Symbols))
for _, symbol := range exchangeInfo.Symbols {
marketsMap[symbol.Symbol] = symbol
marketsMap := make(map[string]*bnMarket, len(exchangeInfo.Symbols))
for _, market := range exchangeInfo.Symbols {
marketsMap[market.Symbol] = market
}

bnc.markets.Store(marketsMap)
Expand Down Expand Up @@ -326,45 +374,41 @@ func (bnc *binance) Balance(symbol string) (*ExchangeBalance, error) {
}, nil
}

// GenerateTradeID returns a trade ID that must be passed as an argument
// when calling Trade. This ID will be used to identify updates to the
// trade. It is necessary to pre-generate this because updates to the
// trade may arrive before the Trade function returns.
func (bnc *binance) GenerateTradeID() string {
func (bnc *binance) generateTradeID() string {
nonce := bnc.tradeIDNonce.Add(1)
nonceB := encode.Uint32Bytes(nonce)
return hex.EncodeToString(append(bnc.tradeIDNoncePrefix, nonceB...))
}

// Trade executes a trade on the CEX. updaterID takes an ID returned from
// SubscribeTradeUpdates, and tradeID takes an ID returned from
// GenerateTradeID.
func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int, tradeID string) error {
// Trade executes a trade on the CEX. subscriptionID takes an ID returned from
// SubscribeTradeUpdates.
func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error) {
side := "BUY"
if sell {
side = "SELL"
}

baseCfg, err := bncSymbolData(baseSymbol)
if err != nil {
return fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err)
return "", fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err)
}

quoteCfg, err := bncSymbolData(quoteSymbol)
if err != nil {
return fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err)
return "", fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err)
}

slug := baseCfg.coin + quoteCfg.coin

marketsMap := bnc.markets.Load().(map[string]symbol)
marketsMap := bnc.markets.Load().(map[string]*bnMarket)
market, found := marketsMap[slug]
if !found {
return fmt.Errorf("market not found: %v", slug)
return "", fmt.Errorf("market not found: %v", slug)
}

price := calc.ConventionalRateAlt(rate, baseCfg.conversionFactor, quoteCfg.conversionFactor)
amt := float64(qty) / float64(baseCfg.conversionFactor)
tradeID := bnc.generateTradeID()

v := make(url.Values)
v.Add("symbol", slug)
Expand All @@ -375,21 +419,24 @@ func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, s
v.Add("quantity", strconv.FormatFloat(amt, 'f', market.BaseAssetPrecision, 64))
v.Add("price", strconv.FormatFloat(price, 'f', market.QuoteAssetPrecision, 64))

bnc.tradeUpdaterMtx.Lock()
_, found = bnc.tradeUpdaters[updaterID]
bnc.tradeUpdaterMtx.RLock()
_, found = bnc.tradeUpdaters[subscriptionID]
if !found {
bnc.tradeUpdaterMtx.Unlock()
return fmt.Errorf("no trade updater with ID %v", updaterID)
bnc.tradeUpdaterMtx.RUnlock()
return "", fmt.Errorf("no trade updater with ID %v", subscriptionID)
}
bnc.tradeToUpdater[tradeID] = updaterID
bnc.tradeUpdaterMtx.Unlock()
bnc.tradeUpdaterMtx.RUnlock()

err = bnc.postAPI(ctx, "/api/v3/order", v, nil, true, true, &struct{}{})
if err != nil {
bnc.removeTradeUpdater(tradeID)
return "", err
}

return err
bnc.tradeUpdaterMtx.Lock()
defer bnc.tradeUpdaterMtx.Unlock()
bnc.tradeToUpdater[tradeID] = subscriptionID

return tradeID, err
}

// SubscribeTradeUpdates returns a channel that the caller can use to
Expand Down Expand Up @@ -462,11 +509,11 @@ func (bnc *binance) Balances() (map[uint32]*ExchangeBalance, error) {
}

func (bnc *binance) Markets() ([]*Market, error) {
symbols := bnc.markets.Load().(map[string]symbol)

bnMarkets := bnc.markets.Load().(map[string]*bnMarket)
markets := make([]*Market, 0, 16)
for _, symbol := range symbols {
markets = append(markets, symbol.dexMarkets()...)
tokenIDs := bnc.tokenIDs.Load().(map[string][]uint32)
for _, mkt := range bnMarkets {
markets = append(markets, mkt.dexMarkets(tokenIDs)...)
}

return markets, nil
Expand Down Expand Up @@ -1145,7 +1192,7 @@ type binanceCoinInfo struct {
NetworkList []*binanceNetworkInfo `json:"networkList"`
}

type symbol struct {
type bnMarket struct {
Symbol string `json:"symbol"`
Status string `json:"status"`
BaseAsset string `json:"baseAsset"`
Expand All @@ -1159,7 +1206,7 @@ type symbol struct {
// represents a single market on the CEX, but tokens on the DEX have a
// different assetID for each network they are on, therefore they will
// match multiple markets as defined using assetID.
func (s *symbol) dexMarkets() []*Market {
func (s *bnMarket) dexMarkets(tokenIDs map[string][]uint32) []*Market {
var baseAssetIDs, quoteAssetIDs []uint32

getAssetIDs := func(coin string) []uint32 {
Expand All @@ -1168,8 +1215,8 @@ func (s *symbol) dexMarkets() []*Market {
return []uint32{assetID}
}

if chains, found := dex.TokenChains[symbol]; found {
return chains
if tokenIDs, found := tokenIDs[symbol]; found {
return tokenIDs
}

return nil
Expand All @@ -1196,17 +1243,3 @@ func (s *symbol) dexMarkets() []*Market {
}
return markets
}

type rateLimit struct {
RateLimitType string `json:"rateLimitType"`
Interval string `json:"interval"`
IntervalNum int64 `json:"intervalNum"`
Limit int64 `json:"limit"`
}

type exchangeInfo struct {
Timezone string `json:"timezone"`
ServerTime int64 `json:"serverTime"`
RateLimits []rateLimit `json:"rateLimits"`
Symbols []symbol `json:"symbols"`
}
5 changes: 2 additions & 3 deletions client/mm/libxc/binance_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func TestTrade(t *testing.T) {
}
}
}()
tradeID := bnc.GenerateTradeID()
err = bnc.Trade(ctx, "eth", "btc", false, 6127e2, 1e7, updaterID, tradeID)
tradeID, err := bnc.Trade(ctx, "eth", "btc", false, 6000e2, 1e7, updaterID)
if err != nil {
t.Fatalf("trade error: %v", err)
}
Expand Down Expand Up @@ -165,7 +164,7 @@ func TestCancelTrade(t *testing.T) {
}

func TestMarkets(t *testing.T) {
bnc := tNewBinance(t, dex.Simnet)
bnc := tNewBinance(t, dex.Mainnet)
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23)
defer cancel()

Expand Down
13 changes: 2 additions & 11 deletions client/mm/libxc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ type CEX interface {
Balances() (map[uint32]*ExchangeBalance, error)
// CancelTrade cancels a trade on the CEX.
CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error
// GenerateTradeID returns a trade ID that must be passed as an argument
// when calling Trade. This ID will be used to identify updates to the
// trade. It is necessary to pre-generate this because updates to the
// trade may be sent before Trade returns.
GenerateTradeID() string
// Markets returns the list of markets at the CEX.
Markets() ([]*Market, error)
// SubscribeCEXUpdates returns a channel which sends an empty struct when
Expand All @@ -60,12 +55,8 @@ type CEX interface {
// returned from this function.
SubscribeTradeUpdates() (updates <-chan *TradeUpdate, unsubscribe func(), subscriptionID int)
// Trade executes a trade on the CEX. updaterID takes a subscriptionID
// returned from SubscribeTradeUpdates, and tradeID takes an ID returned
// from GenerateTradeID. The trade ID must be generated and passed to
// Trade instead being returned from trade, because trade updates may
// be sent before Trade returns.
Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64,
tradeUpdatesSubscriptionID int, tradeID string) error
// returned from SubscribeTradeUpdates.
Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error)
// UnsubscribeMarket unsubscribes from order book updates on a market.
UnsubscribeMarket(baseSymbol, quoteSymbol string)
// VWAP returns the volume weighted average price for a certain quantity
Expand Down
13 changes: 6 additions & 7 deletions client/mm/mm_simple_arb.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ func (a *simpleArbMarketMaker) executeArb(sellOnDex bool, lotsToArb, dexRate, ce
defer a.activeArbsMtx.Unlock()

// Place cex order first. If placing dex order fails then can freely cancel cex order.
cexTradeID := a.cex.GenerateTradeID()
err := a.cex.Trade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), !sellOnDex, cexRate, lotsToArb*a.mkt.LotSize, a.cexTradeUpdatesID, cexTradeID)
cexTradeID, err := a.cex.Trade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), !sellOnDex, cexRate, lotsToArb*a.mkt.LotSize, a.cexTradeUpdatesID)
if err != nil {
a.log.Errorf("error placing cex order: %v", err)
return
Expand Down Expand Up @@ -435,8 +434,6 @@ func (m *simpleArbMarketMaker) handleNotification(note core.Notification) {
return
}
m.handleDEXOrderUpdate(ord)
case *core.EpochNotification:
m.rebalance(n.Epoch)
}
}

Expand Down Expand Up @@ -465,9 +462,11 @@ func (a *simpleArbMarketMaker) run() {
defer wg.Done()
for {
select {
case <-bookFeed.Next():
// Really nothing to do with the updates. We just need to keep
// the subscription live in order to get VWAP on dex orderbook.
case n := <-bookFeed.Next():
if n.Action == core.EpochMatchSummary {
payload := n.Payload.(*core.EpochMatchSummaryPayload)
a.rebalance(payload.Epoch + 1)
}
case <-a.ctx.Done():
return
}
Expand Down
27 changes: 14 additions & 13 deletions client/mm/mm_simple_arb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,12 @@ func (c *tCEX) Markets() ([]*libxc.Market, error) {
func (c *tCEX) Balance(symbol string) (*libxc.ExchangeBalance, error) {
return c.balances[symbol], c.balanceErr
}
func (c *tCEX) GenerateTradeID() string {
return c.tradeID
}
func (c *tCEX) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int, orderID string) error {
func (c *tCEX) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int) (string, error) {
if c.tradeErr != nil {
return c.tradeErr
return "", c.tradeErr
}
c.lastTrade = &cexOrder{baseSymbol, quoteSymbol, qty, rate, sell}
return nil
return c.tradeID, nil
}
func (c *tCEX) CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error {
if c.cancelTradeErr != nil {
Expand Down Expand Up @@ -889,15 +886,19 @@ func TestArbRebalance(t *testing.T) {
}

go arbEngine.run()
dummyNote := &core.BondRefundNote{}
tCore.noteFeed <- dummyNote
tCore.noteFeed <- dummyNote

dummyNote := &core.BookUpdate{}
tCore.bookFeed.c <- dummyNote
tCore.bookFeed.c <- dummyNote
arbEngine.book = orderBook
tCore.noteFeed <- &core.EpochNotification{
Epoch: currEpoch,
tCore.bookFeed.c <- &core.BookUpdate{
Action: core.EpochMatchSummary,
Payload: &core.EpochMatchSummaryPayload{
Epoch: currEpoch - 1,
},
}
tCore.noteFeed <- dummyNote
tCore.noteFeed <- dummyNote
tCore.bookFeed.c <- dummyNote
tCore.bookFeed.c <- dummyNote

// Check dex trade
if test.expectedDexOrder == nil {
Expand Down
Loading

0 comments on commit 42dce09

Please sign in to comment.