From 42dce099e555fa58cd4bd82e15968c7eb2bc8873 Mon Sep 17 00:00:00 2001 From: martonp Date: Mon, 11 Sep 2023 23:55:37 -0400 Subject: [PATCH] More review fixes --- client/mm/libxc/binance.go | 133 +++++++++++++++++---------- client/mm/libxc/binance_live_test.go | 5 +- client/mm/libxc/interface.go | 13 +-- client/mm/mm_simple_arb.go | 13 ++- client/mm/mm_simple_arb_test.go | 27 +++--- client/mm/mm_test.go | 12 ++- dex/bip-id.go | 12 ++- 7 files changed, 125 insertions(+), 90 deletions(-) diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 6064e8bc45..19c0596eea 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -127,7 +127,11 @@ type binance struct { tradeIDNonce atomic.Uint32 tradeIDNoncePrefix dex.Bytes - markets atomic.Value // map[string]*market + markets atomic.Value // map[string]*bnMarket + // tokenIDs maps the token's symbol to the list of bip ids of the token + // for each chain for which deposits and withdrawals are enabled on + // binance. + tokenIDs atomic.Value // map[string][]uint32 balanceMtx sync.RWMutex balances map[string]*bncBalance @@ -180,7 +184,8 @@ func newBinance(apiKey, secretKey string, log dex.Logger, net dex.Network, binan tradeIDNoncePrefix: encode.RandomBytes(10), } - bnc.markets.Store(make(map[string]symbol)) + bnc.markets.Store(make(map[string]*bnMarket)) + bnc.tokenIDs.Store(make(map[string][]uint32)) return bnc } @@ -205,19 +210,62 @@ func (bnc *binance) getCoinInfo(ctx context.Context) error { } bnc.updateBalances(coins) + + tokenIDs := make(map[string][]uint32) + for _, nfo := range coins { + tokenSymbol := strings.ToLower(nfo.Coin) + chainIDs, isToken := dex.TokenChains[tokenSymbol] + if !isToken { + continue + } + isSupportedChain := func(assetID uint32) (uint32, bool) { + for _, chainID := range chainIDs { + if chainID[1] == assetID { + return chainID[0], true + } + } + return 0, false + } + for _, netInfo := range nfo.NetworkList { + chainSymbol := strings.ToLower(netInfo.Network) + chainID, found := dex.BipSymbolID(chainSymbol) + if !found { + continue + } + if !netInfo.WithdrawEnable || !netInfo.DepositEnable { + bnc.log.Tracef("Skipping %s network %s because deposits and/or withdraws are not enabled.", tokenSymbol, chainSymbol) + continue + } + if tokenBipId, supported := isSupportedChain(chainID); supported { + tokenIDs[tokenSymbol] = append(tokenIDs[tokenSymbol], tokenBipId) + } + } + } + bnc.tokenIDs.Store(tokenIDs) + return nil } func (bnc *binance) getMarkets(ctx context.Context) error { - exchangeInfo := &exchangeInfo{} - err := bnc.getAPI(ctx, "/api/v3/exchangeInfo", nil, false, false, exchangeInfo) + var exchangeInfo struct { + Timezone string `json:"timezone"` + ServerTime int64 `json:"serverTime"` + RateLimits []struct { + RateLimitType string `json:"rateLimitType"` + Interval string `json:"interval"` + IntervalNum int64 `json:"intervalNum"` + Limit int64 `json:"limit"` + } `json:"rateLimits"` + Symbols []*bnMarket `json:"symbols"` + } + err := bnc.getAPI(ctx, "/api/v3/exchangeInfo", nil, false, false, &exchangeInfo) if err != nil { return fmt.Errorf("error getting markets from Binance: %w", err) } - marketsMap := make(map[string]symbol, len(exchangeInfo.Symbols)) - for _, symbol := range exchangeInfo.Symbols { - marketsMap[symbol.Symbol] = symbol + marketsMap := make(map[string]*bnMarket, len(exchangeInfo.Symbols)) + for _, market := range exchangeInfo.Symbols { + marketsMap[market.Symbol] = market } bnc.markets.Store(marketsMap) @@ -326,20 +374,15 @@ func (bnc *binance) Balance(symbol string) (*ExchangeBalance, error) { }, nil } -// GenerateTradeID returns a trade ID that must be passed as an argument -// when calling Trade. This ID will be used to identify updates to the -// trade. It is necessary to pre-generate this because updates to the -// trade may arrive before the Trade function returns. -func (bnc *binance) GenerateTradeID() string { +func (bnc *binance) generateTradeID() string { nonce := bnc.tradeIDNonce.Add(1) nonceB := encode.Uint32Bytes(nonce) return hex.EncodeToString(append(bnc.tradeIDNoncePrefix, nonceB...)) } -// Trade executes a trade on the CEX. updaterID takes an ID returned from -// SubscribeTradeUpdates, and tradeID takes an ID returned from -// GenerateTradeID. -func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int, tradeID string) error { +// Trade executes a trade on the CEX. subscriptionID takes an ID returned from +// SubscribeTradeUpdates. +func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error) { side := "BUY" if sell { side = "SELL" @@ -347,24 +390,25 @@ func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, s baseCfg, err := bncSymbolData(baseSymbol) if err != nil { - return fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err) + return "", fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err) } quoteCfg, err := bncSymbolData(quoteSymbol) if err != nil { - return fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err) + return "", fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err) } slug := baseCfg.coin + quoteCfg.coin - marketsMap := bnc.markets.Load().(map[string]symbol) + marketsMap := bnc.markets.Load().(map[string]*bnMarket) market, found := marketsMap[slug] if !found { - return fmt.Errorf("market not found: %v", slug) + return "", fmt.Errorf("market not found: %v", slug) } price := calc.ConventionalRateAlt(rate, baseCfg.conversionFactor, quoteCfg.conversionFactor) amt := float64(qty) / float64(baseCfg.conversionFactor) + tradeID := bnc.generateTradeID() v := make(url.Values) v.Add("symbol", slug) @@ -375,21 +419,24 @@ func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, s v.Add("quantity", strconv.FormatFloat(amt, 'f', market.BaseAssetPrecision, 64)) v.Add("price", strconv.FormatFloat(price, 'f', market.QuoteAssetPrecision, 64)) - bnc.tradeUpdaterMtx.Lock() - _, found = bnc.tradeUpdaters[updaterID] + bnc.tradeUpdaterMtx.RLock() + _, found = bnc.tradeUpdaters[subscriptionID] if !found { - bnc.tradeUpdaterMtx.Unlock() - return fmt.Errorf("no trade updater with ID %v", updaterID) + bnc.tradeUpdaterMtx.RUnlock() + return "", fmt.Errorf("no trade updater with ID %v", subscriptionID) } - bnc.tradeToUpdater[tradeID] = updaterID - bnc.tradeUpdaterMtx.Unlock() + bnc.tradeUpdaterMtx.RUnlock() err = bnc.postAPI(ctx, "/api/v3/order", v, nil, true, true, &struct{}{}) if err != nil { - bnc.removeTradeUpdater(tradeID) + return "", err } - return err + bnc.tradeUpdaterMtx.Lock() + defer bnc.tradeUpdaterMtx.Unlock() + bnc.tradeToUpdater[tradeID] = subscriptionID + + return tradeID, err } // SubscribeTradeUpdates returns a channel that the caller can use to @@ -462,11 +509,11 @@ func (bnc *binance) Balances() (map[uint32]*ExchangeBalance, error) { } func (bnc *binance) Markets() ([]*Market, error) { - symbols := bnc.markets.Load().(map[string]symbol) - + bnMarkets := bnc.markets.Load().(map[string]*bnMarket) markets := make([]*Market, 0, 16) - for _, symbol := range symbols { - markets = append(markets, symbol.dexMarkets()...) + tokenIDs := bnc.tokenIDs.Load().(map[string][]uint32) + for _, mkt := range bnMarkets { + markets = append(markets, mkt.dexMarkets(tokenIDs)...) } return markets, nil @@ -1145,7 +1192,7 @@ type binanceCoinInfo struct { NetworkList []*binanceNetworkInfo `json:"networkList"` } -type symbol struct { +type bnMarket struct { Symbol string `json:"symbol"` Status string `json:"status"` BaseAsset string `json:"baseAsset"` @@ -1159,7 +1206,7 @@ type symbol struct { // represents a single market on the CEX, but tokens on the DEX have a // different assetID for each network they are on, therefore they will // match multiple markets as defined using assetID. -func (s *symbol) dexMarkets() []*Market { +func (s *bnMarket) dexMarkets(tokenIDs map[string][]uint32) []*Market { var baseAssetIDs, quoteAssetIDs []uint32 getAssetIDs := func(coin string) []uint32 { @@ -1168,8 +1215,8 @@ func (s *symbol) dexMarkets() []*Market { return []uint32{assetID} } - if chains, found := dex.TokenChains[symbol]; found { - return chains + if tokenIDs, found := tokenIDs[symbol]; found { + return tokenIDs } return nil @@ -1196,17 +1243,3 @@ func (s *symbol) dexMarkets() []*Market { } return markets } - -type rateLimit struct { - RateLimitType string `json:"rateLimitType"` - Interval string `json:"interval"` - IntervalNum int64 `json:"intervalNum"` - Limit int64 `json:"limit"` -} - -type exchangeInfo struct { - Timezone string `json:"timezone"` - ServerTime int64 `json:"serverTime"` - RateLimits []rateLimit `json:"rateLimits"` - Symbols []symbol `json:"symbols"` -} diff --git a/client/mm/libxc/binance_live_test.go b/client/mm/libxc/binance_live_test.go index 6f08b0be7e..4210aaee18 100644 --- a/client/mm/libxc/binance_live_test.go +++ b/client/mm/libxc/binance_live_test.go @@ -130,8 +130,7 @@ func TestTrade(t *testing.T) { } } }() - tradeID := bnc.GenerateTradeID() - err = bnc.Trade(ctx, "eth", "btc", false, 6127e2, 1e7, updaterID, tradeID) + tradeID, err := bnc.Trade(ctx, "eth", "btc", false, 6000e2, 1e7, updaterID) if err != nil { t.Fatalf("trade error: %v", err) } @@ -165,7 +164,7 @@ func TestCancelTrade(t *testing.T) { } func TestMarkets(t *testing.T) { - bnc := tNewBinance(t, dex.Simnet) + bnc := tNewBinance(t, dex.Mainnet) ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) defer cancel() diff --git a/client/mm/libxc/interface.go b/client/mm/libxc/interface.go index 74599bed18..cf4268add1 100644 --- a/client/mm/libxc/interface.go +++ b/client/mm/libxc/interface.go @@ -40,11 +40,6 @@ type CEX interface { Balances() (map[uint32]*ExchangeBalance, error) // CancelTrade cancels a trade on the CEX. CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error - // GenerateTradeID returns a trade ID that must be passed as an argument - // when calling Trade. This ID will be used to identify updates to the - // trade. It is necessary to pre-generate this because updates to the - // trade may be sent before Trade returns. - GenerateTradeID() string // Markets returns the list of markets at the CEX. Markets() ([]*Market, error) // SubscribeCEXUpdates returns a channel which sends an empty struct when @@ -60,12 +55,8 @@ type CEX interface { // returned from this function. SubscribeTradeUpdates() (updates <-chan *TradeUpdate, unsubscribe func(), subscriptionID int) // Trade executes a trade on the CEX. updaterID takes a subscriptionID - // returned from SubscribeTradeUpdates, and tradeID takes an ID returned - // from GenerateTradeID. The trade ID must be generated and passed to - // Trade instead being returned from trade, because trade updates may - // be sent before Trade returns. - Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, - tradeUpdatesSubscriptionID int, tradeID string) error + // returned from SubscribeTradeUpdates. + Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error) // UnsubscribeMarket unsubscribes from order book updates on a market. UnsubscribeMarket(baseSymbol, quoteSymbol string) // VWAP returns the volume weighted average price for a certain quantity diff --git a/client/mm/mm_simple_arb.go b/client/mm/mm_simple_arb.go index 253d597390..e6e294e34a 100644 --- a/client/mm/mm_simple_arb.go +++ b/client/mm/mm_simple_arb.go @@ -269,8 +269,7 @@ func (a *simpleArbMarketMaker) executeArb(sellOnDex bool, lotsToArb, dexRate, ce defer a.activeArbsMtx.Unlock() // Place cex order first. If placing dex order fails then can freely cancel cex order. - cexTradeID := a.cex.GenerateTradeID() - err := a.cex.Trade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), !sellOnDex, cexRate, lotsToArb*a.mkt.LotSize, a.cexTradeUpdatesID, cexTradeID) + cexTradeID, err := a.cex.Trade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), !sellOnDex, cexRate, lotsToArb*a.mkt.LotSize, a.cexTradeUpdatesID) if err != nil { a.log.Errorf("error placing cex order: %v", err) return @@ -435,8 +434,6 @@ func (m *simpleArbMarketMaker) handleNotification(note core.Notification) { return } m.handleDEXOrderUpdate(ord) - case *core.EpochNotification: - m.rebalance(n.Epoch) } } @@ -465,9 +462,11 @@ func (a *simpleArbMarketMaker) run() { defer wg.Done() for { select { - case <-bookFeed.Next(): - // Really nothing to do with the updates. We just need to keep - // the subscription live in order to get VWAP on dex orderbook. + case n := <-bookFeed.Next(): + if n.Action == core.EpochMatchSummary { + payload := n.Payload.(*core.EpochMatchSummaryPayload) + a.rebalance(payload.Epoch + 1) + } case <-a.ctx.Done(): return } diff --git a/client/mm/mm_simple_arb_test.go b/client/mm/mm_simple_arb_test.go index 1a6a592685..5d21a4f583 100644 --- a/client/mm/mm_simple_arb_test.go +++ b/client/mm/mm_simple_arb_test.go @@ -75,15 +75,12 @@ func (c *tCEX) Markets() ([]*libxc.Market, error) { func (c *tCEX) Balance(symbol string) (*libxc.ExchangeBalance, error) { return c.balances[symbol], c.balanceErr } -func (c *tCEX) GenerateTradeID() string { - return c.tradeID -} -func (c *tCEX) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int, orderID string) error { +func (c *tCEX) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int) (string, error) { if c.tradeErr != nil { - return c.tradeErr + return "", c.tradeErr } c.lastTrade = &cexOrder{baseSymbol, quoteSymbol, qty, rate, sell} - return nil + return c.tradeID, nil } func (c *tCEX) CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error { if c.cancelTradeErr != nil { @@ -889,15 +886,19 @@ func TestArbRebalance(t *testing.T) { } go arbEngine.run() - dummyNote := &core.BondRefundNote{} - tCore.noteFeed <- dummyNote - tCore.noteFeed <- dummyNote + + dummyNote := &core.BookUpdate{} + tCore.bookFeed.c <- dummyNote + tCore.bookFeed.c <- dummyNote arbEngine.book = orderBook - tCore.noteFeed <- &core.EpochNotification{ - Epoch: currEpoch, + tCore.bookFeed.c <- &core.BookUpdate{ + Action: core.EpochMatchSummary, + Payload: &core.EpochMatchSummaryPayload{ + Epoch: currEpoch - 1, + }, } - tCore.noteFeed <- dummyNote - tCore.noteFeed <- dummyNote + tCore.bookFeed.c <- dummyNote + tCore.bookFeed.c <- dummyNote // Check dex trade if test.expectedDexOrder == nil { diff --git a/client/mm/mm_test.go b/client/mm/mm_test.go index 57ce3e745a..3693e75d98 100644 --- a/client/mm/mm_test.go +++ b/client/mm/mm_test.go @@ -117,9 +117,11 @@ func (drv *tDriver) Info() *asset.WalletInfo { return drv.winfo } -type tBookFeed struct{} +type tBookFeed struct { + c chan *core.BookUpdate +} -func (t *tBookFeed) Next() <-chan *core.BookUpdate { return make(chan *core.BookUpdate, 1) } +func (t *tBookFeed) Next() <-chan *core.BookUpdate { return t.c } func (t *tBookFeed) Close() {} func (t *tBookFeed) Candles(dur string) error { return nil } @@ -150,6 +152,7 @@ type tCore struct { multiTradesPlaced []*core.MultiTradeForm maxFundingFees uint64 book *orderbook.OrderBook + bookFeed *tBookFeed } func (c *tCore) NotificationFeed() *core.NoteFeed { @@ -162,7 +165,7 @@ func (c *tCore) ExchangeMarket(host string, base, quote uint32) (*core.Market, e var _ core.BookFeed = (*tBookFeed)(nil) func (t *tCore) SyncBook(host string, base, quote uint32) (*orderbook.OrderBook, core.BookFeed, error) { - return t.book, &tBookFeed{}, nil + return t.book, t.bookFeed, nil } func (*tCore) SupportedAssets() map[uint32]*core.SupportedAsset { return nil @@ -280,6 +283,9 @@ func newTCore() *tCore { cancelsPlaced: make([]dex.Bytes, 0), buysPlaced: make([]*core.TradeForm, 0), sellsPlaced: make([]*core.TradeForm, 0), + bookFeed: &tBookFeed{ + c: make(chan *core.BookUpdate, 1), + }, } } diff --git a/dex/bip-id.go b/dex/bip-id.go index 60bc25fc6a..1f9d967d57 100644 --- a/dex/bip-id.go +++ b/dex/bip-id.go @@ -628,7 +628,9 @@ var bipIDs = map[uint32]string{ 99999999: "qkc", } -var TokenChains = make(map[string][]uint32) +// TokenChains is a map of token symbol to a list of [2]uint32, where the first +// element is the token's BIP ID and the second element is the chain's BIP ID. +var TokenChains = make(map[string][][2]uint32) func init() { for id, symbol := range bipIDs { @@ -636,7 +638,11 @@ func init() { if len(parts) < 2 { continue } - tokenSymbol, _ := parts[0], parts[1] - TokenChains[tokenSymbol] = append(TokenChains[tokenSymbol], id) + tokenSymbol, chainSymbol := parts[0], parts[1] + chainID, found := BipSymbolID(chainSymbol) + if !found { + panic("unknown chain symbol: " + chainSymbol) + } + TokenChains[tokenSymbol] = append(TokenChains[tokenSymbol], [2]uint32{id, chainID}) } }