From 8221f8760125f6b365c34fbb801d470b5f2de5ec Mon Sep 17 00:00:00 2001 From: martonp Date: Thu, 28 Dec 2023 13:18:06 -0500 Subject: [PATCH] More refactor. --- client/mm/exchange_adaptor.go | 363 +++++++++++-- client/mm/exchange_adaptor_test.go | 368 +++++++++++-- client/mm/mm.go | 72 ++- client/mm/mm_arb_market_maker.go | 369 +++++-------- client/mm/mm_arb_market_maker_test.go | 746 +++++++++++++------------- client/mm/mm_basic.go | 358 +++--------- client/mm/mm_basic_test.go | 499 ++++++++--------- client/mm/mm_simple_arb.go | 37 +- client/mm/mm_simple_arb_test.go | 54 +- client/mm/mm_test.go | 37 +- 10 files changed, 1551 insertions(+), 1352 deletions(-) diff --git a/client/mm/exchange_adaptor.go b/client/mm/exchange_adaptor.go index ef7d3abe26..e0529c2e60 100644 --- a/client/mm/exchange_adaptor.go +++ b/client/mm/exchange_adaptor.go @@ -30,23 +30,76 @@ type botBalance struct { Pending uint64 `json:"pending"` } +// multiTradePlacement is equivalent to core.QtyRate, but with the grouping +// field added. The grouping field is used by the botCoreAdaptor's +// GroupedBookedOrders to place orders in groups. +type multiTradePlacement struct { + qty uint64 + rate uint64 + grouping uint64 +} + +// multiTradeForm is equivalent to core.MultiTradeForm, but with the +// placements field replaced with a []*multiTradePlacement. This is +// in order to support the botCoreAdaptor's GroupedBookedOrders method. +type multiTradeForm struct { + host string + sell bool + base uint32 + quote uint32 + placements []*multiTradePlacement + options map[string]string + maxLock uint64 +} + +func (m *multiTradeForm) toCore() *core.MultiTradeForm { + placements := make([]*core.QtyRate, 0, len(m.placements)) + for _, placement := range m.placements { + placements = append(placements, &core.QtyRate{ + Qty: placement.qty, + Rate: placement.rate, + }) + } + + return &core.MultiTradeForm{ + Host: m.host, + Sell: m.sell, + Base: m.base, + Quote: m.quote, + Placements: placements, + Options: m.options, + MaxLock: m.maxLock, + } +} + +type orderFees struct { + swap uint64 + redemption uint64 + refund uint64 + funding uint64 +} + // botCoreAdaptor is an interface used by bots to access functionality // implemented by client.Core. There are slight differences with the methods // of client.Core. One example is AssetBalance returns a botBalance instead of // a *core.WalletBalance. type botCoreAdaptor interface { - NotificationFeed() *core.NoteFeed SyncBook(host string, base, quote uint32) (*orderbook.OrderBook, core.BookFeed, error) SingleLotFees(form *core.SingleLotFeesForm) (uint64, uint64, uint64, error) Cancel(oidB dex.Bytes) error MaxBuy(host string, base, quote uint32, rate uint64) (*core.MaxOrderEstimate, error) MaxSell(host string, base, quote uint32) (*core.MaxOrderEstimate, error) AssetBalance(assetID uint32) (*botBalance, error) - MultiTrade(pw []byte, form *core.MultiTradeForm) ([]*core.Order, error) + MultiTrade(form *multiTradeForm) ([]*core.Order, error) MaxFundingFees(fromAsset uint32, host string, numTrades uint32, fromSettings map[string]string) (uint64, error) FiatConversionRates() map[uint32]float64 ExchangeMarket(host string, base, quote uint32) (*core.Market, error) Broadcast(core.Notification) + GroupedBookedOrders() (buys, sells map[uint64][]*core.Order) + CancelAllOrders() bool + FiatRate(assetID uint32) float64 + OrderFees() (buyFees, sellFees *orderFees, err error) + SubscribeOrderUpdates() (updates <-chan *core.Order) } // botCexAdaptor is an interface used by bots to access functionality @@ -100,6 +153,7 @@ type pendingDEXOrder struct { availableDiff map[uint32]int64 locked map[uint32]uint64 pending map[uint32]uint64 + order *core.Order // swaps, redeems, and refunds are caches of transactions. This avoids // having to query the wallet for transactions that are already confirmed. @@ -107,6 +161,9 @@ type pendingDEXOrder struct { swaps map[string]*asset.WalletTransaction redeems map[string]*asset.WalletTransaction refunds map[string]*asset.WalletTransaction + + // Used to group orders in the GroupedBookedOrders method. + grouping uint64 } // unifiedExchangeAdaptor implements both botCoreAdaptor and botCexAdaptor. @@ -114,8 +171,15 @@ type unifiedExchangeAdaptor struct { clientCore libxc.CEX - botID string - log dex.Logger + botID string + log dex.Logger + fiatRates atomic.Value // map[uint32]float64 + orderUpdates atomic.Value // chan *core.Order + market *MarketWithHost + baseWalletOptions map[string]string + quoteWalletOptions map[string]string + maxBuyPlacements uint32 + maxSellPlacements uint32 subscriptionIDMtx sync.RWMutex subscriptionID *int @@ -123,6 +187,10 @@ type unifiedExchangeAdaptor struct { withdrawalNoncePrefix string withdrawalNonce atomic.Uint64 + feesMtx sync.RWMutex + buyFees *orderFees + sellFees *orderFees + balancesMtx sync.RWMutex // baseDEXBalance/baseCEXBalance are the balances the bots have before // taking into account any pending actions. These are updated whenever @@ -300,10 +368,10 @@ func (u *unifiedExchangeAdaptor) maxSellQty(host string, baseID, quoteID, numTra return maxLots * mkt.LotSize, nil } -func (c *unifiedExchangeAdaptor) sufficientBalanceForMultiSell(host string, base, quote uint32, placements []*core.QtyRate, options map[string]string) (bool, error) { +func (c *unifiedExchangeAdaptor) sufficientBalanceForMultiSell(host string, base, quote uint32, placements []*multiTradePlacement, options map[string]string) (bool, error) { var totalQty uint64 for _, placement := range placements { - totalQty += placement.Qty + totalQty += placement.qty } maxQty, err := c.maxSellQty(host, base, quote, uint32(len(placements)), options) if err != nil { @@ -312,7 +380,7 @@ func (c *unifiedExchangeAdaptor) sufficientBalanceForMultiSell(host string, base return maxQty >= totalQty, nil } -func (u *unifiedExchangeAdaptor) sufficientBalanceForMultiBuy(host string, baseID, quoteID uint32, placements []*core.QtyRate, options map[string]string) (bool, error) { +func (u *unifiedExchangeAdaptor) sufficientBalanceForMultiBuy(host string, baseID, quoteID uint32, placements []*multiTradePlacement, options map[string]string) (bool, error) { baseBalance, err := u.AssetBalance(baseID) if err != nil { return false, err @@ -353,8 +421,8 @@ func (u *unifiedExchangeAdaptor) sufficientBalanceForMultiBuy(host string, baseI var totalLots uint64 remainingBalance := availQuoteBal - fundingFees for _, placement := range placements { - quoteQty := calc.BaseToQuote(placement.Rate, placement.Qty) - numLots := placement.Qty / mkt.LotSize + quoteQty := calc.BaseToQuote(placement.rate, placement.qty) + numLots := placement.qty / mkt.LotSize totalLots += numLots req := quoteQty + (numLots * (swapFees + refundFees)) if remainingBalance < req { @@ -370,18 +438,17 @@ func (u *unifiedExchangeAdaptor) sufficientBalanceForMultiBuy(host string, baseI return true, nil } -func (c *unifiedExchangeAdaptor) sufficientBalanceForMultiTrade(host string, base, quote uint32, sell bool, placements []*core.QtyRate, options map[string]string) (bool, error) { +func (c *unifiedExchangeAdaptor) sufficientBalanceForMultiTrade(host string, base, quote uint32, sell bool, placements []*multiTradePlacement, options map[string]string) (bool, error) { if sell { return c.sufficientBalanceForMultiSell(host, base, quote, placements, options) } return c.sufficientBalanceForMultiBuy(host, base, quote, placements, options) } -func (u *unifiedExchangeAdaptor) addPendingDexOrders(orders []*core.Order) { - u.balancesMtx.Lock() - defer u.balancesMtx.Unlock() - - for _, o := range orders { +// addPendingDexOrders adds new orders to the pendingDEXOrders map. balancesMtx +// must be locked before calling this method. +func (u *unifiedExchangeAdaptor) addPendingDexOrders(orders []*core.Order, placements []*multiTradePlacement) { + for i, o := range orders { var orderID order.OrderID copy(orderID[:], o.ID) @@ -400,6 +467,14 @@ func (u *unifiedExchangeAdaptor) addPendingDexOrders(orders []*core.Order) { locked[fromFeeAsset] += o.ParentAssetLockedAmt + o.RefundLockedAmt locked[toFeeAsset] += o.RedeemLockedAmt + var grouping uint64 + if len(placements) > i { + grouping = placements[i].grouping + } else { + // indicates a bug in MultiTrade + u.log.Errorf("# of placements %d < # of orders %d", len(placements), len(orders)) + } + u.pendingDEXOrders[orderID] = &pendingDEXOrder{ swaps: make(map[string]*asset.WalletTransaction), redeems: make(map[string]*asset.WalletTransaction), @@ -407,14 +482,16 @@ func (u *unifiedExchangeAdaptor) addPendingDexOrders(orders []*core.Order) { availableDiff: availableDiff, locked: locked, pending: map[uint32]uint64{}, + order: o, + grouping: grouping, } } } // MultiTrade is used to place multiple standing limit orders on the same // side of the same market simultaneously. -func (u *unifiedExchangeAdaptor) MultiTrade(pw []byte, form *core.MultiTradeForm) ([]*core.Order, error) { - enough, err := u.sufficientBalanceForMultiTrade(form.Host, form.Base, form.Quote, form.Sell, form.Placements, form.Options) +func (u *unifiedExchangeAdaptor) MultiTrade(form *multiTradeForm) ([]*core.Order, error) { + enough, err := u.sufficientBalanceForMultiTrade(form.host, form.base, form.quote, form.sell, form.placements, form.options) if err != nil { return nil, err } @@ -422,26 +499,68 @@ func (u *unifiedExchangeAdaptor) MultiTrade(pw []byte, form *core.MultiTradeForm return nil, fmt.Errorf("insufficient balance") } - fromAsset := form.Quote - if form.Sell { - fromAsset = form.Base + fromAsset := form.quote + if form.sell { + fromAsset = form.base } fromBalance, err := u.AssetBalance(fromAsset) if err != nil { return nil, err } - form.MaxLock = fromBalance.Available + form.maxLock = fromBalance.Available + + u.balancesMtx.Lock() + defer u.balancesMtx.Unlock() - orders, err := u.clientCore.MultiTrade(pw, form) + orders, err := u.clientCore.MultiTrade([]byte{}, form.toCore()) if err != nil { return nil, err } - u.addPendingDexOrders(orders) + u.addPendingDexOrders(orders, form.placements) return orders, nil } +// GroupedBookedOrders returns the buy and sell orders created by this adaptor +// that are still on the books. The orders are grouped by the grouping field +// of the multiTradePlacement that was used to create them. +func (u *unifiedExchangeAdaptor) GroupedBookedOrders() (buys, sells map[uint64][]*core.Order) { + buys = make(map[uint64][]*core.Order) + sells = make(map[uint64][]*core.Order) + + groupPendingOrder := func(pendingOrder *pendingDEXOrder) { + pendingOrder.balancesMtx.RLock() + defer pendingOrder.balancesMtx.RUnlock() + + if pendingOrder.order.Status > order.OrderStatusBooked { + return + } + + grouping := pendingOrder.grouping + if pendingOrder.order.Sell { + if sells[grouping] == nil { + sells[grouping] = []*core.Order{} + } + sells[grouping] = append(sells[grouping], pendingOrder.order) + } else { + if buys[grouping] == nil { + buys[grouping] = []*core.Order{} + } + buys[grouping] = append(buys[grouping], pendingOrder.order) + } + } + + u.balancesMtx.RLock() + defer u.balancesMtx.RUnlock() + + for _, pendingOrder := range u.pendingDEXOrders { + groupPendingOrder(pendingOrder) + } + + return +} + // MayBuy returns the maximum quantity of the base asset that the bot can // buy for rate using its balance of the quote asset. func (u *unifiedExchangeAdaptor) MaxBuy(host string, base, quote uint32, rate uint64) (*core.MaxOrderEstimate, error) { @@ -889,17 +1008,18 @@ func (u *unifiedExchangeAdaptor) Withdraw(ctx context.Context, assetID uint32, a } } + u.balancesMtx.Lock() + defer u.balancesMtx.Unlock() + err = u.CEX.Withdraw(ctx, assetID, amount, addr, confirmWithdrawal) if err != nil { return err } - u.balancesMtx.Lock() u.pendingWithdrawals[withdrawalID] = &pendingWithdrawal{ assetID: assetID, amtWithdrawn: amount, } - u.balancesMtx.Unlock() return nil } @@ -1020,22 +1140,28 @@ func (u *unifiedExchangeAdaptor) Trade(ctx context.Context, baseID, quoteID uint return nil, fmt.Errorf("Trade called before SubscribeTradeUpdates") } + u.balancesMtx.Lock() + defer u.balancesMtx.Unlock() + trade, err := u.CEX.Trade(ctx, baseID, quoteID, sell, rate, qty, *subscriptionID) if err != nil { return nil, err } - u.balancesMtx.Lock() - defer u.balancesMtx.Unlock() - if trade.Complete { + diffs := make(map[uint32]int64) if trade.Sell { u.baseCexBalances[trade.BaseID] -= trade.BaseFilled u.baseCexBalances[trade.QuoteID] += trade.QuoteFilled + diffs[trade.BaseID] = -int64(trade.BaseFilled) + diffs[trade.QuoteID] = int64(trade.QuoteFilled) } else { u.baseCexBalances[trade.BaseID] += trade.BaseFilled u.baseCexBalances[trade.QuoteID] -= trade.QuoteFilled + diffs[trade.BaseID] = int64(trade.BaseFilled) + diffs[trade.QuoteID] = -int64(trade.QuoteFilled) } + u.logBalanceAdjustments(nil, diffs, fmt.Sprintf("CEX trade %s completed.", trade.ID)) } else { u.pendingCEXOrders[trade.ID] = trade } @@ -1043,6 +1169,62 @@ func (u *unifiedExchangeAdaptor) Trade(ctx context.Context, baseID, quoteID uint return trade, nil } +// FiatRate returns the current exchange rate for the specified asset. +// 0 is returned if the rate is not available. +func (u *unifiedExchangeAdaptor) FiatRate(assetID uint32) float64 { + rates := u.fiatRates.Load() + if rates == nil { + return 0 + } + + return rates.(map[uint32]float64)[assetID] +} + +// OrderFees returns the fees for a buy and sell order. The order fees are for +// placing orders on the market specified by the exchangeAdaptorCfg used to +// create the unifiedExchangeAdaptor. +func (u *unifiedExchangeAdaptor) OrderFees() (buyFees, sellFees *orderFees, err error) { + u.feesMtx.RLock() + defer u.feesMtx.RUnlock() + + if u.buyFees == nil || u.sellFees == nil { + return nil, nil, fmt.Errorf("order fees not available") + } + + return u.buyFees, u.sellFees, nil +} + +// CancelAllOrders cancels all booked orders. True is returned no orders +// needed to be cancelled. +func (u *unifiedExchangeAdaptor) CancelAllOrders() bool { + u.balancesMtx.RLock() + defer u.balancesMtx.RUnlock() + + noCancels := true + + for _, pendingOrder := range u.pendingDEXOrders { + pendingOrder.balancesMtx.RLock() + if pendingOrder.order.Status <= order.OrderStatusBooked { + err := u.clientCore.Cancel(pendingOrder.order.ID) + if err != nil { + u.log.Errorf("Error canceling order %s: %v", pendingOrder.order.ID, err) + } + noCancels = false + } + pendingOrder.balancesMtx.RUnlock() + } + + return noCancels +} + +// SubscribeOrderUpdates returns a channel that sends updates for orders placed +// on the DEX. This function should be called only once. +func (u *unifiedExchangeAdaptor) SubscribeOrderUpdates() <-chan *core.Order { + orderUpdates := make(chan *core.Order, 128) + u.orderUpdates.Store(orderUpdates) + return orderUpdates +} + // isAccountLocker returns if the asset's wallet is an asset.AccountLocker. func (u *unifiedExchangeAdaptor) isAccountLocker(assetID uint32) bool { walletState := u.clientCore.WalletState(assetID) @@ -1231,6 +1413,8 @@ func (u *unifiedExchangeAdaptor) updatePendingDEXOrder(o *core.Order) { for _, tx := range pendingOrder.redeems { isDynamicSwapper := u.isDynamicSwapper(toAsset) + u.log.Infof("~~~~ redeem %s: %v", tx.ID, tx.PartOfActiveBalance) + // For dynamic fee assets, the fees are paid from the active balance, // and are not taken out of the redeem amount. if isDynamicSwapper || tx.PartOfActiveBalance { @@ -1269,8 +1453,14 @@ func (u *unifiedExchangeAdaptor) updatePendingDEXOrder(o *core.Order) { pendingOrder.availableDiff = availableDiff pendingOrder.locked = locked pendingOrder.pending = pending + pendingOrder.order = o pendingOrder.balancesMtx.Unlock() + orderUpdates := u.orderUpdates.Load() + if orderUpdates != nil { + orderUpdates.(chan *core.Order) <- o + } + // If complete, remove the order from the pending list, and update the // bot's balance. if dexOrderComplete(o) { @@ -1311,10 +1501,75 @@ func (u *unifiedExchangeAdaptor) handleDEXNotification(n core.Notification) { return } u.updatePendingDEXOrder(o) + case *core.FiatRatesNote: + u.fiatRates.Store(note.FiatRates) + } +} + +// updateFeeRates updates the cached fee rates for placing orders on the market +// specified by the exchangeAdaptorCfg used to create the unifiedExchangeAdaptor. +func (u *unifiedExchangeAdaptor) updateFeeRates() error { + buySwapFees, buyRedeemFees, buyRefundFees, err := u.clientCore.SingleLotFees(&core.SingleLotFeesForm{ + Host: u.market.Host, + Base: u.market.BaseID, + Quote: u.market.QuoteID, + UseMaxFeeRate: true, + UseSafeTxSize: true, + }) + if err != nil { + return fmt.Errorf("failed to get buy single lot fees: %v", err) + } + + sellSwapFees, sellRedeemFees, sellRefundFees, err := u.clientCore.SingleLotFees(&core.SingleLotFeesForm{ + Host: u.market.Host, + Base: u.market.BaseID, + Quote: u.market.QuoteID, + UseMaxFeeRate: true, + UseSafeTxSize: true, + Sell: true, + }) + if err != nil { + return fmt.Errorf("failed to get sell single lot fees: %v", err) + } + + buyFundingFees, err := u.clientCore.MaxFundingFees(u.market.QuoteID, u.market.Host, u.maxBuyPlacements, u.baseWalletOptions) + if err != nil { + return fmt.Errorf("failed to get buy funding fees: %v", err) + } + + sellFundingFees, err := u.clientCore.MaxFundingFees(u.market.BaseID, u.market.Host, u.maxSellPlacements, u.quoteWalletOptions) + if err != nil { + return fmt.Errorf("failed to get sell funding fees: %v", err) + } + + u.feesMtx.Lock() + defer u.feesMtx.Unlock() + + u.buyFees = &orderFees{ + swap: buySwapFees, + redemption: buyRedeemFees, + refund: buyRefundFees, + funding: buyFundingFees, } + u.sellFees = &orderFees{ + swap: sellSwapFees, + redemption: sellRedeemFees, + refund: sellRefundFees, + funding: sellFundingFees, + } + + return nil } func (u *unifiedExchangeAdaptor) run(ctx context.Context) { + u.fiatRates.Store(u.clientCore.FiatConversionRates()) + + err := u.updateFeeRates() + if err != nil { + u.log.Errorf("Error updating fee rates: %v", err) + } + + // Listen for core notifications go func() { feed := u.clientCore.NotificationFeed() defer feed.ReturnFeed() @@ -1328,20 +1583,58 @@ func (u *unifiedExchangeAdaptor) run(ctx context.Context) { } } }() + + go func() { + refreshTime := time.Minute * 10 + for { + select { + case <-time.NewTimer(refreshTime).C: + err := u.updateFeeRates() + if err != nil { + u.log.Error(err) + refreshTime = time.Minute + } else { + refreshTime = time.Minute * 10 + } + case <-ctx.Done(): + return + } + } + }() +} + +type exchangeAdaptorCfg struct { + botID string + market *MarketWithHost + baseDexBalances map[uint32]uint64 + baseCexBalances map[uint32]uint64 + core clientCore + cex libxc.CEX + maxBuyPlacements uint32 + maxSellPlacements uint32 + baseWalletOptions map[string]string + quoteWalletOptions map[string]string + log dex.Logger } // unifiedExchangeAdaptorForBot returns a unifiedExchangeAdaptor for the specified bot. -func unifiedExchangeAdaptorForBot(botID string, baseDexBalances, baseCexBalances map[uint32]uint64, core clientCore, cex libxc.CEX, log dex.Logger) *unifiedExchangeAdaptor { +func unifiedExchangeAdaptorForBot(cfg *exchangeAdaptorCfg) *unifiedExchangeAdaptor { return &unifiedExchangeAdaptor{ - clientCore: core, - CEX: cex, - botID: botID, - log: log, - baseDexBalances: baseDexBalances, - baseCexBalances: baseCexBalances, + clientCore: cfg.core, + CEX: cfg.cex, + botID: cfg.botID, + log: cfg.log, + maxBuyPlacements: cfg.maxBuyPlacements, + maxSellPlacements: cfg.maxSellPlacements, + baseWalletOptions: cfg.baseWalletOptions, + quoteWalletOptions: cfg.quoteWalletOptions, + + baseDexBalances: cfg.baseDexBalances, + baseCexBalances: cfg.baseCexBalances, pendingDEXOrders: make(map[order.OrderID]*pendingDEXOrder), pendingCEXOrders: make(map[string]*libxc.Trade), pendingDeposits: make(map[string]*pendingDeposit), pendingWithdrawals: make(map[string]*pendingWithdrawal), + market: cfg.market, } } diff --git a/client/mm/exchange_adaptor_test.go b/client/mm/exchange_adaptor_test.go index fde4f5e3d1..a57e8beef2 100644 --- a/client/mm/exchange_adaptor_test.go +++ b/client/mm/exchange_adaptor_test.go @@ -1,6 +1,7 @@ package mm import ( + "bytes" "context" "encoding/hex" "fmt" @@ -10,12 +11,13 @@ import ( "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/core" "decred.org/dcrdex/client/mm/libxc" + "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/calc" "decred.org/dcrdex/dex/encode" "decred.org/dcrdex/dex/order" ) -func TestExchangeAdaptorMaxSell(t *testing.T) { +func TestMaxSell(t *testing.T) { tCore := newTCore() tCore.isAccountLocker[60] = true dcrBtcID := fmt.Sprintf("%s-%d-%d", "host1", 42, 0) @@ -227,8 +229,19 @@ func TestExchangeAdaptorMaxSell(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adaptor := unifiedExchangeAdaptorForBot(botID, test.assetBalances, nil, tCore, nil, tLogger) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: test.assetBalances, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) + res, err := adaptor.MaxSell("host1", test.market.BaseID, test.market.QuoteID) if test.wantErr { if err == nil { @@ -250,7 +263,7 @@ func TestExchangeAdaptorMaxSell(t *testing.T) { } } -func TestExchangeAdaptorMaxBuy(t *testing.T) { +func TestMaxBuy(t *testing.T) { tCore := newTCore() tCore.isAccountLocker[60] = true @@ -473,7 +486,17 @@ func TestExchangeAdaptorMaxBuy(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adaptor := unifiedExchangeAdaptorForBot(botID, test.dexBalances, nil, tCore, nil, tLogger) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: test.dexBalances, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) res, err := adaptor.MaxBuy("host1", test.market.BaseID, test.market.QuoteID, test.rate) @@ -497,7 +520,7 @@ func TestExchangeAdaptorMaxBuy(t *testing.T) { } } -func TestExchangeAdaptorDEXTrade(t *testing.T) { +func TestDEXTrade(t *testing.T) { host := "dex.com" orderIDs := make([]order.OrderID, 5) @@ -531,7 +554,7 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { name string isDynamicSwapper map[uint32]bool balances map[uint32]uint64 - multiTrade *core.MultiTradeForm + multiTrade *multiTradeForm multiTradeResponse []*core.Order wantErr bool postTradeBalances map[uint32]*botBalance @@ -545,19 +568,19 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { 42: 1e8, 0: 1e8, }, - multiTrade: &core.MultiTradeForm{ - Host: host, - Sell: true, - Base: 42, - Quote: 0, - Placements: []*core.QtyRate{ + multiTrade: &multiTradeForm{ + host: host, + sell: true, + base: 42, + quote: 0, + placements: []*multiTradePlacement{ { - Qty: 5e6, - Rate: 5e7, + qty: 5e6, + rate: 5e7, }, { - Qty: 5e6, - Rate: 6e7, + qty: 5e6, + rate: 6e7, }, }, }, @@ -920,19 +943,19 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { 42: 1e8, 0: 1e8, }, - multiTrade: &core.MultiTradeForm{ - Host: host, - Sell: false, - Base: 42, - Quote: 0, - Placements: []*core.QtyRate{ + multiTrade: &multiTradeForm{ + host: host, + sell: false, + base: 42, + quote: 0, + placements: []*multiTradePlacement{ { - Qty: 5e6, - Rate: 5e7, + qty: 5e6, + rate: 5e7, }, { - Qty: 5e6, - Rate: 6e7, + qty: 5e6, + rate: 6e7, }, }, }, @@ -1301,19 +1324,19 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { 966: 1e8, 60: 1e8, }, - multiTrade: &core.MultiTradeForm{ - Host: host, - Sell: true, - Base: 60, - Quote: 966001, - Placements: []*core.QtyRate{ + multiTrade: &multiTradeForm{ + host: host, + sell: true, + base: 60, + quote: 966001, + placements: []*multiTradePlacement{ { - Qty: 5e6, - Rate: 5e7, + qty: 5e6, + rate: 5e7, }, { - Qty: 5e6, - Rate: 6e7, + qty: 5e6, + rate: 6e7, }, }, }, @@ -1728,19 +1751,19 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { 966: 1e8, 60: 1e8, }, - multiTrade: &core.MultiTradeForm{ - Host: host, - Sell: false, - Base: 60, - Quote: 966001, - Placements: []*core.QtyRate{ + multiTrade: &multiTradeForm{ + host: host, + sell: false, + base: 60, + quote: 966001, + placements: []*multiTradePlacement{ { - Qty: 5e6, - Rate: 5e7, + qty: 5e6, + rate: 5e7, }, { - Qty: 5e6, - Rate: 6e7, + qty: 5e6, + rate: 6e7, }, }, }, @@ -2170,10 +2193,21 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - botID := dexMarketID(test.multiTrade.Host, test.multiTrade.Base, test.multiTrade.Quote) - adaptor := unifiedExchangeAdaptorForBot(botID, test.balances, nil, tCore, nil, tLogger) + botID := dexMarketID(test.multiTrade.host, test.multiTrade.base, test.multiTrade.quote) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: test.balances, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) - _, err := adaptor.MultiTrade([]byte{}, test.multiTrade) + + _, err := adaptor.MultiTrade(test.multiTrade) if test.wantErr { if err == nil { t.Fatalf("%s: expected error but did not get", test.name) @@ -2221,7 +2255,7 @@ func TestExchangeAdaptorDEXTrade(t *testing.T) { } } -func TestExchangeAdaptorDeposit(t *testing.T) { +func TestDeposit(t *testing.T) { type test struct { name string isWithdrawer bool @@ -2430,7 +2464,19 @@ func TestExchangeAdaptorDeposit(t *testing.T) { defer cancel() botID := dexMarketID("host1", test.assetID, 0) - adaptor := unifiedExchangeAdaptorForBot(botID, dexBalances, cexBalances, tCore, tCEX, tLogger) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: dexBalances, + baseCexBalances: cexBalances, + cex: tCEX, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) err := adaptor.Deposit(ctx, test.assetID, test.depositAmt, func() {}) @@ -2489,7 +2535,7 @@ func TestExchangeAdaptorDeposit(t *testing.T) { } } -func TestExchangeAdaptorWithdraw(t *testing.T) { +func TestWithdraw(t *testing.T) { assetID := uint32(42) id := encode.RandomBytes(32) @@ -2553,7 +2599,19 @@ func TestExchangeAdaptorWithdraw(t *testing.T) { defer cancel() botID := dexMarketID("host1", assetID, 0) - adaptor := unifiedExchangeAdaptorForBot(botID, dexBalances, cexBalances, tCore, tCEX, tLogger) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: dexBalances, + baseCexBalances: cexBalances, + cex: tCEX, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) err := adaptor.Withdraw(ctx, assetID, test.withdrawAmt, func() {}) @@ -2591,7 +2649,7 @@ func TestExchangeAdaptorWithdraw(t *testing.T) { } } -func TestExchangeAdaptorTrade(t *testing.T) { +func TestTrade(t *testing.T) { baseID := uint32(42) quoteID := uint32(0) tradeID := "123" @@ -2894,7 +2952,19 @@ func TestExchangeAdaptorTrade(t *testing.T) { defer cancel() botID := dexMarketID(botCfg.Host, botCfg.BaseAsset, botCfg.QuoteAsset) - adaptor := unifiedExchangeAdaptorForBot(botID, test.balances, test.balances, tCore, tCEX, tLogger) + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: botID, + core: tCore, + baseDexBalances: test.balances, + baseCexBalances: test.balances, + cex: tCEX, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) adaptor.run(ctx) adaptor.SubscribeTradeUpdates() @@ -2941,3 +3011,191 @@ func TestExchangeAdaptorTrade(t *testing.T) { runTest(test) } } + +func TestGroupedBookedOrders(t *testing.T) { + orderIDs := make([]dex.Bytes, 10) + for i := range orderIDs { + orderIDs[i] = encode.RandomBytes(32) + } + + tCore := newTCore() + tCore.market = &core.Market{ + BaseID: 42, + QuoteID: 0, + LotSize: 5e6, + } + + bals := map[uint32]uint64{ + 42: 1e9, + 0: 1e9, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + adaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: "host1", + core: tCore, + baseDexBalances: bals, + baseCexBalances: bals, + log: tLogger, + market: &MarketWithHost{ + Host: "host1", + BaseID: 42, + QuoteID: 0, + }, + }) + adaptor.run(ctx) + + form := &multiTradeForm{ + host: "host1", + base: 42, + quote: 0, + sell: true, + placements: []*multiTradePlacement{ + { + rate: 5e7, + qty: 5e6, + grouping: 0, + }, + { + rate: 6e7, + qty: 5e6, + grouping: 1, + }, + { + rate: 7e7, + qty: 5e6, + grouping: 2, + }, + }, + } + + tCore.multiTradeResult = []*core.Order{ + { + ID: orderIDs[0], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 5e7, + Sell: true, + }, + { + ID: orderIDs[1], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 6e7, + Sell: true, + }, + { + ID: orderIDs[2], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 7e7, + Sell: true, + }, + } + + _, err := adaptor.MultiTrade(form) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + checkGroupedOrders := func(groupedOrders map[uint64][]*core.Order, expected map[uint64][]dex.Bytes) { + t.Helper() + if len(groupedOrders) != len(expected) { + t.Fatalf("unexpected number of grouped orders. want %d, got %d", len(expected), len(groupedOrders)) + } + for groupID, orders := range groupedOrders { + expectedIDs := expected[groupID] + if len(orders) != len(expectedIDs) { + t.Fatalf("unexpected number of orders for group %d. want %d, got %d", groupID, len(expectedIDs), len(orders)) + } + for i, order := range orders { + if !bytes.Equal(order.ID, expectedIDs[i]) { + t.Fatalf("unexpected order ID for group %d at index %d. want %s, got %s", groupID, i, expectedIDs[i], order.ID) + } + } + } + } + + groupedBuys, groupedSells := adaptor.GroupedBookedOrders() + expectedBuyIDs := map[uint64][]dex.Bytes{} + expectedSellIDs := map[uint64][]dex.Bytes{ + 0: {orderIDs[0]}, + 1: {orderIDs[1]}, + 2: {orderIDs[2]}, + } + checkGroupedOrders(groupedBuys, expectedBuyIDs) + checkGroupedOrders(groupedSells, expectedSellIDs) + + tCore.multiTradeResult = []*core.Order{ + { + ID: orderIDs[3], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 5e7, + Sell: false, + }, + { + ID: orderIDs[4], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 6e7, + Sell: false, + }, + { + ID: orderIDs[5], + Status: order.OrderStatusBooked, + Qty: 5e6, + Rate: 7e7, + Sell: false, + }, + } + form.sell = false + + _, err = adaptor.MultiTrade(form) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + groupedBuys, groupedSells = adaptor.GroupedBookedOrders() + expectedBuyIDs = map[uint64][]dex.Bytes{ + 0: {orderIDs[3]}, + 1: {orderIDs[4]}, + 2: {orderIDs[5]}, + } + checkGroupedOrders(groupedBuys, expectedBuyIDs) + checkGroupedOrders(groupedSells, expectedSellIDs) + + tCore.noteFeed <- &core.OrderNote{ + Order: &core.Order{ + ID: orderIDs[2], + Status: order.OrderStatusCanceled, + Qty: 5e6, + Rate: 6e7, + Sell: true, + }, + } + tCore.noteFeed <- &core.OrderNote{ + Order: &core.Order{ + ID: orderIDs[4], + Status: order.OrderStatusExecuted, + Qty: 5e6, + Rate: 6e7, + Sell: false, + }, + } + tCore.noteFeed <- &core.BondPostNote{} // dummy note + + expectedBuyIDs = map[uint64][]dex.Bytes{ + 0: {orderIDs[3]}, + 2: {orderIDs[5]}, + } + expectedSellIDs = map[uint64][]dex.Bytes{ + 0: {orderIDs[0]}, + 1: {orderIDs[1]}, + } + + groupedBuys, groupedSells = adaptor.GroupedBookedOrders() + checkGroupedOrders(groupedBuys, expectedBuyIDs) + checkGroupedOrders(groupedSells, expectedSellIDs) +} diff --git a/client/mm/mm.go b/client/mm/mm.go index 7169367f41..8909d0a710 100644 --- a/client/mm/mm.go +++ b/client/mm/mm.go @@ -66,10 +66,6 @@ type MarketWithHost struct { QuoteID uint32 `json:"quote"` } -func (m *MarketWithHost) String() string { - return fmt.Sprintf("%s-%d-%d", m.Host, m.BaseID, m.QuoteID) -} - // MarketMaker handles the market making process. It supports running different // strategies on different markets. type MarketMaker struct { @@ -582,7 +578,6 @@ func (m *MarketMaker) Run(ctx context.Context, pw []byte, alternateConfigPath *s } m.logInitialBotBalances(dexBaseBalances, cexBaseBalances) - fiatRates := m.core.FiatConversionRates() startedMarketMaking = true m.core.Broadcast(newMMStartStopNote(true)) @@ -602,6 +597,7 @@ func (m *MarketMaker) Run(ctx context.Context, pw []byte, alternateConfigPath *s wg.Add(1) go func(cfg *BotConfig) { defer wg.Done() + mkt := MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset} m.markBotAsRunning(mkt, true) defer func() { @@ -612,52 +608,98 @@ func (m *MarketMaker) Run(ctx context.Context, pw []byte, alternateConfigPath *s defer func() { m.core.Broadcast(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, false)) }() - logger := m.log.SubLogger(fmt.Sprintf("MarketMaker-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) + mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) - baseFiatRate := fiatRates[cfg.BaseAsset] - quoteFiatRate := fiatRates[cfg.QuoteAsset] - exchangeAdaptor := unifiedExchangeAdaptorForBot(mktID, dexBaseBalances[mktID], cexBaseBalances[mktID], m.core, nil, logger) + logger := m.log.SubLogger(fmt.Sprintf("MarketMaker-%s", mktID)) + exchangeAdaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: mktID, + market: &mkt, + baseDexBalances: dexBaseBalances[mktID], + baseCexBalances: cexBaseBalances[mktID], + core: m.core, + cex: nil, + maxBuyPlacements: uint32(len(cfg.BasicMMConfig.BuyPlacements)), + maxSellPlacements: uint32(len(cfg.BasicMMConfig.SellPlacements)), + baseWalletOptions: cfg.BasicMMConfig.BaseOptions, + quoteWalletOptions: cfg.BasicMMConfig.QuoteOptions, + log: logger, + }) exchangeAdaptor.run(ctx) - RunBasicMarketMaker(m.ctx, cfg, exchangeAdaptor, oracle, baseFiatRate, quoteFiatRate, logger) + + RunBasicMarketMaker(m.ctx, cfg, exchangeAdaptor, oracle, logger) }(cfg) case cfg.SimpleArbConfig != nil: wg.Add(1) go func(cfg *BotConfig) { defer wg.Done() - logger := m.log.SubLogger(fmt.Sprintf("SimpleArbitrage-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) + mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) + logger := m.log.SubLogger(fmt.Sprintf("SimpleArbitrage-%s", mktID)) + cex, found := cexes[cfg.CEXCfg.Name] if !found { logger.Errorf("Cannot start %s bot due to CEX not starting", mktID) return } + mkt := MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset} m.markBotAsRunning(mkt, true) defer func() { m.markBotAsRunning(mkt, false) }() - exchangeAdaptor := unifiedExchangeAdaptorForBot(mktID, dexBaseBalances[mktID], cexBaseBalances[mktID], m.core, cex, logger) + + exchangeAdaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: mktID, + market: &mkt, + baseDexBalances: dexBaseBalances[mktID], + baseCexBalances: cexBaseBalances[mktID], + core: m.core, + cex: cex, + maxBuyPlacements: 1, + maxSellPlacements: 1, + baseWalletOptions: cfg.SimpleArbConfig.BaseOptions, + quoteWalletOptions: cfg.SimpleArbConfig.QuoteOptions, + log: logger, + }) exchangeAdaptor.run(ctx) + RunSimpleArbBot(m.ctx, cfg, exchangeAdaptor, exchangeAdaptor, logger) }(cfg) case cfg.ArbMarketMakerConfig != nil: wg.Add(1) go func(cfg *BotConfig) { defer wg.Done() - logger := m.log.SubLogger(fmt.Sprintf("ArbMarketMaker-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) - cex, found := cexes[cfg.CEXCfg.Name] + mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) + logger := m.log.SubLogger(fmt.Sprintf("ArbMarketMaker-%s", mktID)) + + cex, found := cexes[cfg.CEXCfg.Name] if !found { logger.Errorf("Cannot start %s bot due to CEX not starting", mktID) return } + mkt := MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset} m.markBotAsRunning(mkt, true) defer func() { m.markBotAsRunning(mkt, false) }() - exchangeAdaptor := unifiedExchangeAdaptorForBot(mktID, dexBaseBalances[mktID], cexBaseBalances[mktID], m.core, cex, logger) + + exchangeAdaptor := unifiedExchangeAdaptorForBot(&exchangeAdaptorCfg{ + botID: mktID, + market: &mkt, + baseDexBalances: dexBaseBalances[mktID], + baseCexBalances: cexBaseBalances[mktID], + core: m.core, + cex: cex, + maxBuyPlacements: uint32(len(cfg.ArbMarketMakerConfig.BuyPlacements)), + maxSellPlacements: uint32(len(cfg.ArbMarketMakerConfig.SellPlacements)), + baseWalletOptions: cfg.ArbMarketMakerConfig.BaseOptions, + quoteWalletOptions: cfg.ArbMarketMakerConfig.QuoteOptions, + log: logger, + }) exchangeAdaptor.run(ctx) + RunArbMarketMaker(m.ctx, cfg, exchangeAdaptor, exchangeAdaptor, logger) }(cfg) default: diff --git a/client/mm/mm_arb_market_maker.go b/client/mm/mm_arb_market_maker.go index 513a342a32..b12a4500c2 100644 --- a/client/mm/mm_arb_market_maker.go +++ b/client/mm/mm_arb_market_maker.go @@ -10,6 +10,7 @@ import ( "sync" "sync/atomic" + "decred.org/dcrdex/client/asset" "decred.org/dcrdex/client/core" "decred.org/dcrdex/client/mm/libxc" "decred.org/dcrdex/dex" @@ -146,33 +147,19 @@ type arbMarketMaker struct { rebalanceRunning atomic.Bool currEpoch atomic.Uint64 - ordMtx sync.RWMutex - ords map[order.OrderID]*core.Order - oidToPlacement map[order.OrderID]int - - matchesMtx sync.RWMutex - matchesSeen map[order.MatchID]bool + matchesMtx sync.RWMutex + matchesSeen map[order.MatchID]bool + pendingOrders map[order.OrderID]bool cexTradesMtx sync.RWMutex cexTrades map[string]uint64 - feesMtx sync.RWMutex - buyFees *orderFees - sellFees *orderFees - reserves autoRebalanceReserves pendingBaseRebalance atomic.Bool pendingQuoteRebalance atomic.Bool } -// groupedOrders returns the buy and sell orders grouped by placement index. -func (m *arbMarketMaker) groupedOrders() (buys, sells map[int][]*groupedOrder) { - m.ordMtx.RLock() - defer m.ordMtx.RUnlock() - return groupOrders(m.ords, m.oidToPlacement, m.mkt.LotSize) -} - func (a *arbMarketMaker) handleCEXTradeUpdate(update *libxc.Trade) { a.log.Debugf("CEX trade update: %+v", update) @@ -184,22 +171,11 @@ func (a *arbMarketMaker) handleCEXTradeUpdate(update *libxc.Trade) { } } -// processDEXMatch checks to see if this is the first time the bot has seen -// this match. If so, it sends a trade to the CEX. -func (a *arbMarketMaker) processDEXMatch(o *core.Order, match *core.Match) { - var matchID order.MatchID - copy(matchID[:], match.MatchID) - - a.matchesMtx.Lock() - if a.matchesSeen[matchID] { - a.matchesMtx.Unlock() - return - } - a.matchesSeen[matchID] = true - a.matchesMtx.Unlock() - +// makeCounterTrade sends a trade to the CEX that is the counter trade to the +// specified match. sell indicates that the bot is selling on the DEX. +func (a *arbMarketMaker) makeCounterTrade(match *core.Match, sell bool) { var cexRate uint64 - if o.Sell { + if sell { cexRate = uint64(float64(match.Rate) / (1 + a.cfg.Profit)) } else { cexRate = uint64(float64(match.Rate) * (1 + a.cfg.Profit)) @@ -209,7 +185,7 @@ func (a *arbMarketMaker) processDEXMatch(o *core.Order, match *core.Match) { a.cexTradesMtx.Lock() defer a.cexTradesMtx.Unlock() - cexTrade, err := a.cex.Trade(a.ctx, a.baseID, a.quoteID, !o.Sell, cexRate, match.Qty) + cexTrade, err := a.cex.Trade(a.ctx, a.baseID, a.quoteID, !sell, cexRate, match.Qty) if err != nil { a.log.Errorf("Error sending trade to CEX: %v", err) return @@ -221,50 +197,34 @@ func (a *arbMarketMaker) processDEXMatch(o *core.Order, match *core.Match) { a.cexTrades[cexTrade.ID] = a.currEpoch.Load() } -func (a *arbMarketMaker) processDEXMatchNote(note *core.MatchNote) { - var oid order.OrderID - copy(oid[:], note.OrderID) - - a.ordMtx.RLock() - o, found := a.ords[oid] - a.ordMtx.RUnlock() - if !found { - return - } - - a.processDEXMatch(o, note.Match) -} +func (a *arbMarketMaker) processDEXOrderUpdate(o *core.Order) { + var orderID order.OrderID + copy(orderID[:], o.ID) -func (a *arbMarketMaker) processDEXOrderNote(note *core.OrderNote) { - var oid order.OrderID - copy(oid[:], note.Order.ID) + a.matchesMtx.Lock() + defer a.matchesMtx.Unlock() - a.ordMtx.Lock() - o, found := a.ords[oid] - if !found { - a.ordMtx.Unlock() + if _, found := a.pendingOrders[orderID]; !found { return } - a.ords[oid] = note.Order - a.ordMtx.Unlock() - for _, match := range note.Order.Matches { - a.processDEXMatch(o, match) - } + for _, match := range o.Matches { + var matchID order.MatchID + copy(matchID[:], match.MatchID) - if !note.Order.Status.IsActive() { - a.ordMtx.Lock() - delete(a.ords, oid) - delete(a.oidToPlacement, oid) - a.ordMtx.Unlock() + if !a.matchesSeen[matchID] { + a.matchesSeen[matchID] = true + a.makeCounterTrade(match, o.Sell) + } + } - a.matchesMtx.Lock() - for _, match := range note.Order.Matches { + if !o.Status.IsActive() { + delete(a.pendingOrders, orderID) + for _, match := range o.Matches { var matchID order.MatchID copy(matchID[:], match.MatchID) delete(a.matchesSeen, matchID) } - a.matchesMtx.Unlock() } } @@ -272,17 +232,22 @@ func (a *arbMarketMaker) vwap(sell bool, qty uint64) (vwap, extrema uint64, fill return a.cex.VWAP(a.baseID, a.quoteID, sell, qty) } +func (a *arbMarketMaker) groupedOrders() (buys, sells map[uint64][]*core.Order) { + return a.core.GroupedBookedOrders() +} + type arbMMRebalancer interface { vwap(sell bool, qty uint64) (vwap, extrema uint64, filled bool, err error) - groupedOrders() (buys, sells map[int][]*groupedOrder) + groupedOrders() (buys, sells map[uint64][]*core.Order) } func (a *arbMarketMaker) placeMultiTrade(placements []*rateLots, sell bool) { - qtyRates := make([]*core.QtyRate, 0, len(placements)) + multiTradePlacements := make([]*multiTradePlacement, 0, len(placements)) for _, p := range placements { - qtyRates = append(qtyRates, &core.QtyRate{ - Qty: p.lots * a.mkt.LotSize, - Rate: p.rate, + multiTradePlacements = append(multiTradePlacements, &multiTradePlacement{ + qty: p.lots * a.mkt.LotSize, + rate: p.rate, + grouping: uint64(p.placementIndex), }) } @@ -293,27 +258,27 @@ func (a *arbMarketMaker) placeMultiTrade(placements []*rateLots, sell bool) { options = a.cfg.QuoteOptions } - orders, err := a.core.MultiTrade(nil, &core.MultiTradeForm{ - Host: a.host, - Sell: sell, - Base: a.baseID, - Quote: a.quoteID, - Placements: qtyRates, - Options: options, + orders, err := a.core.MultiTrade(&multiTradeForm{ + host: a.host, + sell: sell, + base: a.baseID, + quote: a.quoteID, + placements: multiTradePlacements, + options: options, }) if err != nil { a.log.Errorf("Error placing rebalancing order: %v", err) return } - a.ordMtx.Lock() - for i, ord := range orders { - var oid order.OrderID - copy(oid[:], ord.ID) - a.ords[oid] = ord - a.oidToPlacement[oid] = placements[i].placementIndex + a.matchesMtx.Lock() + defer a.matchesMtx.Unlock() + + for _, o := range orders { + var orderID order.OrderID + copy(orderID[:], o.ID) + a.pendingOrders[orderID] = true } - a.ordMtx.Unlock() } // cancelExpiredCEXTrades cancels any trades on the CEX that have been open for @@ -330,6 +295,8 @@ func (a *arbMarketMaker) cancelExpiredCEXTrades() { if err != nil { a.log.Errorf("Error canceling CEX trade %s: %v", tradeID, err) } + + a.log.Infof("Cex trade %s was cancelled before it was filled", tradeID) } } } @@ -347,12 +314,12 @@ func arbMarketMakerRebalance(newEpoch uint64, a arbMMRebalancer, c botCoreAdapto } cancels = make([]dex.Bytes, 0, 1) - addCancel := func(o *groupedOrder) { - if newEpoch-o.epoch < 2 { + addCancel := func(o *core.Order) { + if newEpoch-o.Epoch < 2 { log.Debugf("rebalance: skipping cancel not past free cancel threshold") return } - cancels = append(cancels, o.id[:]) + cancels = append(cancels, o.ID) } baseDEXBalance, err := c.AssetBalance(mkt.BaseID) @@ -381,7 +348,7 @@ func arbMarketMakerRebalance(newEpoch uint64, a arbMMRebalancer, c botCoreAdapto processSide := func(sell bool) []*rateLots { var cfgPlacements []*ArbMarketMakingPlacement - var existingOrders map[int][]*groupedOrder + var existingOrders map[uint64][]*core.Order var remainingDEXBalance, remainingCEXBalance, fundingFees uint64 if sell { cfgPlacements = cfg.SellPlacements @@ -419,10 +386,10 @@ func arbMarketMakerRebalance(newEpoch uint64, a arbMMRebalancer, c botCoreAdapto for _, o := range ordersForPlacement { var requiredOnCEX uint64 if sell { - rate := uint64(float64(o.rate) / (1 + cfg.Profit)) - requiredOnCEX = calc.BaseToQuote(rate, o.lots*mkt.LotSize) + rate := uint64(float64(o.Rate) / (1 + cfg.Profit)) + requiredOnCEX = calc.BaseToQuote(rate, o.Qty-o.Filled) } else { - requiredOnCEX = o.lots * mkt.LotSize + requiredOnCEX = o.Qty - o.Filled } if requiredOnCEX <= remainingCEXBalance { remainingCEXBalance -= requiredOnCEX @@ -471,11 +438,11 @@ func arbMarketMakerRebalance(newEpoch uint64, a arbMMRebalancer, c botCoreAdapto placementRate = steppedRate(uint64(float64(extrema)/(1+cfg.Profit)), mkt.RateStep) } - ordersForPlacement := existingOrders[i] + ordersForPlacement := existingOrders[uint64(i)] var existingLots uint64 for _, o := range ordersForPlacement { - existingLots += o.lots - if !withinTolerance(o.rate, placementRate) { + existingLots += (o.Qty - o.Filled) / mkt.LotSize + if !withinTolerance(o.Rate, placementRate) { addCancel(o) } } @@ -523,28 +490,6 @@ func arbMarketMakerRebalance(newEpoch uint64, a arbMMRebalancer, c botCoreAdapto return cancels, buys, sells } -// fundsLockedInOrders returns the total amount of the asset that is -// currently locked in a booked order on the DEX. -func (a *arbMarketMaker) fundsLockedInOrders(base bool) uint64 { - buys, sells := a.groupedOrders() - var locked uint64 - - var orders map[int][]*groupedOrder - if base { - orders = sells - } else { - orders = buys - } - - for _, ordersForPlacement := range orders { - for _, o := range ordersForPlacement { - locked += o.lockedAmt - } - } - - return locked -} - // dexToCexQty returns the amount of backing asset on the CEX that is required // for a DEX order of the specified quantity and rate. dexSell indicates that // we are selling on the DEX, and therefore buying on the CEX. @@ -561,7 +506,7 @@ func (a *arbMarketMaker) dexToCexQty(qty, rate uint64, dexSell bool) uint64 { // trades could be made on the CEX. func (a *arbMarketMaker) cexBalanceBackingDexOrders(base bool) uint64 { buys, sells := a.groupedOrders() - var orders map[int][]*groupedOrder + var orders map[uint64][]*core.Order if base { orders = buys } else { @@ -571,7 +516,7 @@ func (a *arbMarketMaker) cexBalanceBackingDexOrders(base bool) uint64 { var locked uint64 for _, ordersForPlacement := range orders { for _, o := range ordersForPlacement { - locked += a.dexToCexQty(o.lots*a.mkt.LotSize, o.rate, !base) + locked += a.dexToCexQty(o.Qty-o.Filled, o.Rate, !base) } } @@ -579,18 +524,19 @@ func (a *arbMarketMaker) cexBalanceBackingDexOrders(base bool) uint64 { } // freeUpFunds cancels active orders to free up the specified amount of funds -// for a rebalance between the dex and the cex. The orders are cancelled in -// reverse order of priority. +// for a rebalance between the dex and the cex. If we are freeing up funds for +// withdrawal, DEX orders that may require a counter-trade on the CEX are +// cancelled. The orders are cancelled in reverse order of priority. func (a *arbMarketMaker) freeUpFunds(base, cex bool, amt uint64) { buys, sells := a.groupedOrders() - var orders map[int][]*groupedOrder + var orders map[uint64][]*core.Order if base && !cex || !base && cex { orders = sells } else { orders = buys } - highToLowIndexes := make([]int, 0, len(orders)) + highToLowIndexes := make([]uint64, 0, len(orders)) for i := range orders { highToLowIndexes = append(highToLowIndexes, i) } @@ -600,25 +546,41 @@ func (a *arbMarketMaker) freeUpFunds(base, cex bool, amt uint64) { currEpoch := a.currEpoch.Load() + amtFreedByCancellingOrder := func(o *core.Order) uint64 { + if cex { + return a.dexToCexQty(o.Qty-o.Filled, o.Rate, !base) + } + + if o.RefundLockedAmt == 0 { + return o.LockedAmt + } + + assetID := a.quoteID + if base { + assetID = a.baseID + } + if token := asset.TokenInfo(assetID); token != nil { + return o.LockedAmt + } + + return o.LockedAmt + o.RefundLockedAmt + } + for _, index := range highToLowIndexes { ordersForPlacement := orders[index] for _, o := range ordersForPlacement { // If the order is too recent, just wait for the next epoch to // cancel. We still count this order towards the freedAmt in // order to not cancel a higher priority trade. - if currEpoch-o.epoch >= 2 { - err := a.core.Cancel(o.id[:]) + if currEpoch-o.Epoch >= 2 { + err := a.core.Cancel(o.ID) if err != nil { a.log.Errorf("error cancelling order: %v", err) continue } } - var freedAmt uint64 - if cex { - freedAmt = a.dexToCexQty(o.lots*a.mkt.LotSize, o.rate, !base) - } else { - freedAmt = o.lockedAmt - } + + freedAmt := amtFreedByCancellingOrder(o) if freedAmt >= amt { return } @@ -648,13 +610,12 @@ func (a *arbMarketMaker) rebalanceAssets() { } symbol := dex.BipIDSymbol(assetID) - dexAvailableBalance, err := a.core.AssetBalance(assetID) + dexBalance, err := a.core.AssetBalance(assetID) if err != nil { a.log.Errorf("Error getting %s balance: %v", symbol, err) return } - - totalDexBalance := dexAvailableBalance.Available + a.fundsLockedInOrders(base) + totalDexBalance := dexBalance.Available + dexBalance.Locked cexBalance, err := a.cex.Balance(assetID) if err != nil { @@ -662,6 +623,8 @@ func (a *arbMarketMaker) rebalanceAssets() { return } + // Don't take into account locked funds on CEX, because we don't do + // rebalancing while there are active orders on the CEX. if (totalDexBalance+cexBalance.Available)/2 < minAmount { a.log.Warnf("Cannot rebalance %s because balance is too low on both DEX and CEX. Min amount: %v, CEX balance: %v, DEX Balance: %v", symbol, minAmount, cexBalance.Available, totalDexBalance) @@ -695,9 +658,9 @@ func (a *arbMarketMaker) rebalanceAssets() { // If we need to cancel some orders to send the required amount to // the CEX, cancel some orders, and then try again on the next // epoch. - if amt > dexAvailableBalance.Available { + if amt > dexBalance.Available { a.reserves.set(base, false, amt) - a.freeUpFunds(base, false, amt-dexAvailableBalance.Available) + a.freeUpFunds(base, false, amt-dexBalance.Available) return } @@ -779,8 +742,14 @@ func (a *arbMarketMaker) rebalance(epoch uint64) { } a.currEpoch.Store(epoch) + buyFees, sellFees, err := a.core.OrderFees() + if err != nil { + a.log.Errorf("Error getting order fees: %v", err) + return + } + cancels, buyOrders, sellOrders := arbMarketMakerRebalance(epoch, a, a.core, - a.cex, a.cfg, a.mkt, a.buyFees, a.sellFees, &a.reserves, a.log) + a.cex, a.cfg, a.mkt, buyFees, sellFees, &a.reserves, a.log) for _, cancel := range cancels { err := a.core.Cancel(cancel) @@ -800,82 +769,6 @@ func (a *arbMarketMaker) rebalance(epoch uint64) { a.rebalanceAssets() } -func (a *arbMarketMaker) handleNotification(note core.Notification) { - switch n := note.(type) { - case *core.MatchNote: - a.processDEXMatchNote(n) - case *core.OrderNote: - a.processDEXOrderNote(n) - case *core.EpochNotification: - go a.rebalance(n.Epoch) - } -} - -func (a *arbMarketMaker) cancelAllOrders() { - a.ordMtx.Lock() - defer a.ordMtx.Unlock() - for oid := range a.ords { - if err := a.core.Cancel(oid[:]); err != nil { - a.log.Errorf("error cancelling order: %v", err) - } - } - a.ords = make(map[order.OrderID]*core.Order) - a.oidToPlacement = make(map[order.OrderID]int) -} - -func (a *arbMarketMaker) updateFeeRates() error { - buySwapFees, buyRedeemFees, buyRefundFees, err := a.core.SingleLotFees(&core.SingleLotFeesForm{ - Host: a.host, - Base: a.baseID, - Quote: a.quoteID, - UseMaxFeeRate: true, - UseSafeTxSize: true, - }) - if err != nil { - return fmt.Errorf("failed to get fees: %v", err) - } - - sellSwapFees, sellRedeemFees, sellRefundFees, err := a.core.SingleLotFees(&core.SingleLotFeesForm{ - Host: a.host, - Base: a.baseID, - Quote: a.quoteID, - UseMaxFeeRate: true, - UseSafeTxSize: true, - Sell: true, - }) - if err != nil { - return fmt.Errorf("failed to get fees: %v", err) - } - - buyFundingFees, err := a.core.MaxFundingFees(a.quoteID, a.host, uint32(len(a.cfg.BuyPlacements)), a.cfg.QuoteOptions) - if err != nil { - return fmt.Errorf("failed to get funding fees: %v", err) - } - - sellFundingFees, err := a.core.MaxFundingFees(a.baseID, a.host, uint32(len(a.cfg.SellPlacements)), a.cfg.BaseOptions) - if err != nil { - return fmt.Errorf("failed to get funding fees: %v", err) - } - - a.feesMtx.Lock() - defer a.feesMtx.Unlock() - - a.buyFees = &orderFees{ - swap: buySwapFees, - redemption: buyRedeemFees, - funding: buyFundingFees, - refund: buyRefundFees, - } - a.sellFees = &orderFees{ - swap: sellSwapFees, - redemption: sellRedeemFees, - funding: sellFundingFees, - refund: sellRefundFees, - } - - return nil -} - func (a *arbMarketMaker) run() { book, bookFeed, err := a.core.SyncBook(a.host, a.baseID, a.quoteID) if err != nil { @@ -884,12 +777,6 @@ func (a *arbMarketMaker) run() { } a.book = book - err = a.updateFeeRates() - if err != nil { - a.log.Errorf("Failed to get fees: %v", err) - return - } - err = a.cex.SubscribeMarket(a.ctx, a.baseID, a.quoteID) if err != nil { a.log.Errorf("Failed to subscribe to cex market: %v", err) @@ -900,15 +787,16 @@ func (a *arbMarketMaker) run() { defer unsubscribe() wg := &sync.WaitGroup{} - wg.Add(1) go func() { defer wg.Done() for { select { - case <-bookFeed.Next(): - // Really nothing to do with the updates. We just need to keep - // the subscription live in order to get VWAP on dex orderbook. + case n := <-bookFeed.Next(): + if n.Action == core.EpochMatchSummary { + payload := n.Payload.(*core.EpochMatchSummaryPayload) + a.rebalance(payload.Epoch + 1) + } case <-a.ctx.Done(): return } @@ -928,15 +816,15 @@ func (a *arbMarketMaker) run() { } }() - noteFeed := a.core.NotificationFeed() wg.Add(1) go func() { defer wg.Done() - defer noteFeed.ReturnFeed() + orderUpdates := a.core.SubscribeOrderUpdates() + fmt.Println("subscribed for order updates") for { select { - case n := <-noteFeed.C: - a.handleNotification(n) + case n := <-orderUpdates: + a.processDEXOrderUpdate(n) case <-a.ctx.Done(): return } @@ -944,7 +832,7 @@ func (a *arbMarketMaker) run() { }() wg.Wait() - a.cancelAllOrders() + a.core.CancelAllOrders() } func RunArbMarketMaker(ctx context.Context, cfg *BotConfig, c botCoreAdaptor, cex botCexAdaptor, log dex.Logger) { @@ -961,18 +849,17 @@ func RunArbMarketMaker(ctx context.Context, cfg *BotConfig, c botCoreAdaptor, ce } (&arbMarketMaker{ - ctx: ctx, - host: cfg.Host, - baseID: cfg.BaseAsset, - quoteID: cfg.QuoteAsset, - cex: cex, - core: c, - log: log, - cfg: cfg.ArbMarketMakerConfig, - mkt: mkt, - ords: make(map[order.OrderID]*core.Order), - oidToPlacement: make(map[order.OrderID]int), - matchesSeen: make(map[order.MatchID]bool), - cexTrades: make(map[string]uint64), + ctx: ctx, + host: cfg.Host, + baseID: cfg.BaseAsset, + quoteID: cfg.QuoteAsset, + cex: cex, + core: c, + log: log, + cfg: cfg.ArbMarketMakerConfig, + mkt: mkt, + matchesSeen: make(map[order.MatchID]bool), + pendingOrders: make(map[order.OrderID]bool), + cexTrades: make(map[string]uint64), }).run() } diff --git a/client/mm/mm_arb_market_maker_test.go b/client/mm/mm_arb_market_maker_test.go index 2b18abe49c..a4b6cd5db7 100644 --- a/client/mm/mm_arb_market_maker_test.go +++ b/client/mm/mm_arb_market_maker_test.go @@ -19,8 +19,8 @@ import ( type tArbMMRebalancer struct { buyVWAP map[uint64]*vwapResult sellVWAP map[uint64]*vwapResult - groupedBuys map[int][]*groupedOrder - groupedSells map[int][]*groupedOrder + groupedBuys map[uint64][]*core.Order + groupedSells map[uint64][]*core.Order } var _ arbMMRebalancer = (*tArbMMRebalancer)(nil) @@ -38,7 +38,7 @@ func (r *tArbMMRebalancer) vwap(sell bool, qty uint64) (vwap, extrema uint64, fi return 0, 0, false, nil } -func (r *tArbMMRebalancer) groupedOrders() (buys, sells map[int][]*groupedOrder) { +func (r *tArbMMRebalancer) groupedOrders() (buys, sells map[uint64][]*core.Order) { return r.groupedBuys, r.groupedSells } @@ -118,7 +118,7 @@ func TestArbMarketMakerRebalance(t *testing.T) { rebalancer *tArbMMRebalancer cfg *ArbMarketMakerConfig - dexBalances map[uint32]uint64 + dexBalances map[uint32]*botBalance cexBalances map[uint32]*botBalance reserves autoRebalanceReserves @@ -156,9 +156,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg1, - dexBalances: map[uint32]uint64{ - 42: lotSize * 3, - 0: calc.BaseToQuote(1e6, 3*lotSize), + dexBalances: map[uint32]*botBalance{ + 42: {Available: lotSize * 3}, + 0: {Available: calc.BaseToQuote(1e6, 3*lotSize)}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -189,23 +189,23 @@ func TestArbMarketMakerRebalance(t *testing.T) { extrema: 2.2e6, }, }, - groupedBuys: map[int][]*groupedOrder{ + groupedBuys: map[uint64][]*core.Order{ 0: {{ - rate: 1.882e6, - lots: 1, + Rate: 1.882e6, + Qty: lotSize, }}, }, - groupedSells: map[int][]*groupedOrder{ + groupedSells: map[uint64][]*core.Order{ 0: {{ - rate: 2.223e6, - lots: 1, + Rate: 2.223e6, + Qty: lotSize, }}, }, }, cfg: cfg1, - dexBalances: map[uint32]uint64{ - 42: lotSize * 3, - 0: calc.BaseToQuote(1e6, 3*lotSize), + dexBalances: map[uint32]*botBalance{ + 42: {Available: lotSize * 3}, + 0: {Available: calc.BaseToQuote(1e6, 3*lotSize)}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -222,18 +222,18 @@ func TestArbMarketMakerRebalance(t *testing.T) { extrema: 1.9e6, }, }, - groupedBuys: map[int][]*groupedOrder{ + groupedBuys: map[uint64][]*core.Order{ 0: {{ - id: orderIDs[0], - rate: 1.883e6, - lots: 1, + ID: orderIDs[0][:], + Rate: 1.883e6, + Qty: lotSize, }}, }, - groupedSells: map[int][]*groupedOrder{ + groupedSells: map[uint64][]*core.Order{ 0: {{ - id: orderIDs[1], - rate: 2.225e6, - lots: 1, + ID: orderIDs[1][:], + Rate: 2.225e6, + Qty: lotSize, }}, }, sellVWAP: map[uint64]*vwapResult{ @@ -244,9 +244,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg1, - dexBalances: map[uint32]uint64{ - 42: lotSize * 3, - 0: calc.BaseToQuote(1e6, 3*lotSize), + dexBalances: map[uint32]*botBalance{ + 42: {Available: lotSize * 3}, + 0: {Available: calc.BaseToQuote(1e6, 3*lotSize)}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -267,20 +267,20 @@ func TestArbMarketMakerRebalance(t *testing.T) { extrema: 1.9e6, }, }, - groupedBuys: map[int][]*groupedOrder{ + groupedBuys: map[uint64][]*core.Order{ 0: {{ - id: orderIDs[0], - rate: 1.883e6, - lots: 1, - epoch: newEpoch - 1, + ID: orderIDs[0][:], + Rate: 1.883e6, + Qty: lotSize, + Epoch: newEpoch - 1, }}, }, - groupedSells: map[int][]*groupedOrder{ + groupedSells: map[uint64][]*core.Order{ 0: {{ - id: orderIDs[1], - rate: 2.225e6, - lots: 1, - epoch: newEpoch - 2, + ID: orderIDs[1][:], + Rate: 2.225e6, + Qty: lotSize, + Epoch: newEpoch - 2, }}, }, sellVWAP: map[uint64]*vwapResult{ @@ -291,9 +291,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg1, - dexBalances: map[uint32]uint64{ - 42: lotSize * 3, - 0: calc.BaseToQuote(1e6, 3*lotSize), + dexBalances: map[uint32]*botBalance{ + 42: {Available: lotSize * 3}, + 0: {Available: calc.BaseToQuote(1e6, 3*lotSize)}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -329,9 +329,11 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 2*(lotSize+sellFees.swap) + sellFees.funding, - 0: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + 2*buyFees.swap + buyFees.funding, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 2*(lotSize+sellFees.swap) + sellFees.funding}, + 0: {Available: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + + 2*buyFees.swap + buyFees.funding}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -386,9 +388,11 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 2*(lotSize+sellFees.swap) + sellFees.funding - 1, - 0: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + 2*buyFees.swap + buyFees.funding - 1, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 2*(lotSize+sellFees.swap) + sellFees.funding - 1}, + 0: {Available: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + + 2*buyFees.swap + buyFees.funding - 1}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -433,9 +437,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize)}, @@ -490,9 +494,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize) - 1}, @@ -535,23 +539,25 @@ func TestArbMarketMakerRebalance(t *testing.T) { extrema: 2.4e6, }, }, - groupedBuys: map[int][]*groupedOrder{ + groupedBuys: map[uint64][]*core.Order{ 0: {{ - rate: divideRate(1.9e6, 1+profit), - lots: 1, + Rate: divideRate(1.9e6, 1+profit), + Qty: 2 * lotSize, + Filled: lotSize, }}, }, - groupedSells: map[int][]*groupedOrder{ + groupedSells: map[uint64][]*core.Order{ 0: {{ - rate: multiplyRate(2.2e6, 1+profit), - lots: 1, + Rate: multiplyRate(2.2e6, 1+profit), + Qty: 2 * lotSize, + Filled: lotSize, }}, }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize)}, @@ -596,23 +602,23 @@ func TestArbMarketMakerRebalance(t *testing.T) { extrema: 2.4e6, }, }, - groupedBuys: map[int][]*groupedOrder{ + groupedBuys: map[uint64][]*core.Order{ 0: {{ - rate: divideRate(1.9e6, 1+profit), - lots: 1, + Rate: divideRate(1.9e6, 1+profit), + Qty: lotSize, }}, }, - groupedSells: map[int][]*groupedOrder{ + groupedSells: map[uint64][]*core.Order{ 0: {{ - rate: multiplyRate(2.2e6, 1+profit), - lots: 1, + Rate: multiplyRate(2.2e6, 1+profit), + Qty: lotSize, }}, }, }, cfg: cfg2, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize) - 1}, @@ -648,9 +654,10 @@ func TestArbMarketMakerRebalance(t *testing.T) { reserves: autoRebalanceReserves{ baseDexReserves: 2 * lotSize, }, - dexBalances: map[uint32]uint64{ - 42: 2*(lotSize+sellFees.swap) + sellFees.funding + 2*lotSize, - 0: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + 2*buyFees.swap + buyFees.funding, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 2*(lotSize+sellFees.swap) + sellFees.funding + 2*lotSize}, + 0: {Available: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + + 2*buyFees.swap + buyFees.funding}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -708,9 +715,11 @@ func TestArbMarketMakerRebalance(t *testing.T) { reserves: autoRebalanceReserves{ baseDexReserves: 2 * lotSize, }, - dexBalances: map[uint32]uint64{ - 42: 2*(lotSize+sellFees.swap) + sellFees.funding + 2*lotSize - 1, - 0: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + 2*buyFees.swap + buyFees.funding, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 2*(lotSize+sellFees.swap) + sellFees.funding + 2*lotSize - 1}, + 0: {Available: calc.BaseToQuote(divideRate(1.9e6, 1+profit), lotSize) + + calc.BaseToQuote(divideRate(1.7e6, 1+profit), lotSize) + + 2*buyFees.swap + buyFees.funding}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: 1e19}, @@ -763,9 +772,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { reserves: autoRebalanceReserves{ quoteCexReserves: lotSize, }, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize) + lotSize}, @@ -823,9 +832,9 @@ func TestArbMarketMakerRebalance(t *testing.T) { reserves: autoRebalanceReserves{ baseCexReserves: lotSize, }, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e19, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e19}, }, cexBalances: map[uint32]*botBalance{ 0: {Available: calc.BaseToQuote(2.2e6, mkt.LotSize) + calc.BaseToQuote(2.4e6, mkt.LotSize) + lotSize - 1}, @@ -852,13 +861,19 @@ func TestArbMarketMakerRebalance(t *testing.T) { } for _, test := range tests { + if test.name != "one existing order, enough cex balance for second" { + continue + } + tCore := newTCore() - tCore.setAssetBalances(test.dexBalances) + coreAdaptor := newTBotCoreAdaptor(tCore) + coreAdaptor.balances = test.dexBalances + cex := newTBotCEXAdaptor() cex.balances = test.cexBalances cancels, buys, sells := arbMarketMakerRebalance(newEpoch, test.rebalancer, - newTBotCoreAdaptor(tCore), cex, test.cfg, mkt, buyFees, sellFees, &test.reserves, tLogger) + coreAdaptor, cex, test.cfg, mkt, buyFees, sellFees, &test.reserves, tLogger) if len(cancels) != len(test.expectedCancels) { t.Fatalf("%s: expected %d cancels, got %d", test.name, len(test.expectedCancels), len(cancels)) @@ -934,158 +949,71 @@ func TestArbMarketMakerDEXUpdates(t *testing.T) { type test struct { name string - orders []*core.Order - notes []core.Notification + pendingOrderIDs map[order.OrderID]bool + orderUpdates []*core.Order expectedCEXTrades []*libxc.Trade } tests := []*test{ { - name: "one buy and one sell match notifications", - orders: []*core.Order{ + name: "one buy and one sell match, repeated", + pendingOrderIDs: map[order.OrderID]bool{ + orderIDs[0]: true, + orderIDs[1]: true, + }, + orderUpdates: []*core.Order{ { ID: orderIDs[0][:], Sell: true, Qty: lotSize, Rate: 8e5, + Matches: []*core.Match{ + { + MatchID: matchIDs[0][:], + Qty: lotSize, + Rate: 8e5, + }, + }, }, { ID: orderIDs[1][:], Sell: false, Qty: lotSize, Rate: 6e5, - }, - }, - notes: []core.Notification{ - &core.MatchNote{ - OrderID: orderIDs[0][:], - Match: &core.Match{ - MatchID: matchIDs[0][:], - Qty: lotSize, - Rate: 8e5, - }, - }, - &core.MatchNote{ - OrderID: orderIDs[1][:], - Match: &core.Match{ - MatchID: matchIDs[1][:], - Qty: lotSize, - Rate: 6e5, - }, - }, - &core.OrderNote{ - Order: &core.Order{ - ID: orderIDs[0][:], - Sell: true, - Qty: lotSize, - Rate: 8e5, - Matches: []*core.Match{ - { - MatchID: matchIDs[0][:], - Qty: lotSize, - Rate: 8e5, - }, - }, - }, - }, - &core.OrderNote{ - Order: &core.Order{ - ID: orderIDs[1][:], - Sell: false, - Qty: lotSize, - Rate: 8e5, - Matches: []*core.Match{ - { - MatchID: matchIDs[1][:], - Qty: lotSize, - Rate: 6e5, - }, + Matches: []*core.Match{ + { + MatchID: matchIDs[1][:], + Qty: lotSize, + Rate: 6e5, }, }, }, - }, - expectedCEXTrades: []*libxc.Trade{ - { - BaseID: 42, - QuoteID: 0, - Qty: lotSize, - Rate: divideRate(8e5, 1+profit), - Sell: false, - }, - { - BaseID: 42, - QuoteID: 0, - Qty: lotSize, - Rate: multiplyRate(6e5, 1+profit), - Sell: true, - }, - nil, - nil, - }, - }, - { - name: "place cex trades due to order note", - orders: []*core.Order{ { ID: orderIDs[0][:], Sell: true, Qty: lotSize, Rate: 8e5, + Matches: []*core.Match{ + { + MatchID: matchIDs[0][:], + Qty: lotSize, + Rate: 8e5, + }, + }, }, { ID: orderIDs[1][:], Sell: false, Qty: lotSize, Rate: 6e5, - }, - }, - notes: []core.Notification{ - &core.OrderNote{ - Order: &core.Order{ - ID: orderIDs[0][:], - Sell: true, - Qty: lotSize, - Rate: 8e5, - Matches: []*core.Match{ - { - MatchID: matchIDs[0][:], - Qty: lotSize, - Rate: 8e5, - }, - }, - }, - }, - &core.OrderNote{ - Order: &core.Order{ - ID: orderIDs[1][:], - Sell: false, - Qty: lotSize, - Rate: 8e5, - Matches: []*core.Match{ - { - MatchID: matchIDs[1][:], - Qty: lotSize, - Rate: 6e5, - }, + Matches: []*core.Match{ + { + MatchID: matchIDs[1][:], + Qty: lotSize, + Rate: 6e5, }, }, }, - &core.MatchNote{ - OrderID: orderIDs[0][:], - Match: &core.Match{ - MatchID: matchIDs[0][:], - Qty: lotSize, - Rate: 8e5, - }, - }, - &core.MatchNote{ - OrderID: orderIDs[1][:], - Match: &core.Match{ - MatchID: matchIDs[1][:], - Qty: lotSize, - Rate: 6e5, - }, - }, }, expectedCEXTrades: []*libxc.Trade{ { @@ -1111,46 +1039,37 @@ func TestArbMarketMakerDEXUpdates(t *testing.T) { runTest := func(test *test) { cex := newTBotCEXAdaptor() tCore := newTCore() + coreAdaptor := newTBotCoreAdaptor(tCore) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ords := make(map[order.OrderID]*core.Order) - - for _, o := range test.orders { - var oid order.OrderID - copy(oid[:], o.ID) - ords[oid] = o - } - arbMM := &arbMarketMaker{ - cex: cex, - core: newTBotCoreAdaptor(tCore), - ctx: ctx, - ords: ords, - baseID: 42, - quoteID: 0, - oidToPlacement: make(map[order.OrderID]int), - matchesSeen: make(map[order.MatchID]bool), - cexTrades: make(map[string]uint64), - mkt: mkt, + cex: cex, + core: coreAdaptor, + ctx: ctx, + baseID: 42, + quoteID: 0, + matchesSeen: make(map[order.MatchID]bool), + cexTrades: make(map[string]uint64), + mkt: mkt, cfg: &ArbMarketMakerConfig{ Profit: profit, }, + pendingOrders: test.pendingOrderIDs, } arbMM.currEpoch.Store(123) go arbMM.run() - dummyNote := &core.BondRefundNote{} - - for i, note := range test.notes { + for i, note := range test.orderUpdates { cex.lastTrade = nil - tCore.noteFeed <- note - tCore.noteFeed <- dummyNote + coreAdaptor.orderUpdates <- note + coreAdaptor.orderUpdates <- &core.Order{} // Dummy update should have no effect expectedCEXTrade := test.expectedCEXTrades[i] if (expectedCEXTrade == nil) != (cex.lastTrade == nil) { - t.Fatalf("%s: expected cex order %v but got %v", test.name, (expectedCEXTrade != nil), (cex.lastTrade != nil)) + t.Fatalf("%s: expected cex order after update %d %v but got %v", test.name, i, (expectedCEXTrade != nil), (cex.lastTrade != nil)) } if cex.lastTrade != nil && @@ -1187,13 +1106,13 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { type test struct { name string cfg *AutoRebalanceConfig - orders map[order.OrderID]*core.Order - oidToPlacement map[order.OrderID]int - cexBaseBalance uint64 - cexQuoteBalance uint64 - dexBaseBalance uint64 - dexQuoteBalance uint64 + cexBaseBalance *botBalance + cexQuoteBalance *botBalance + dexBaseBalance *botBalance + dexQuoteBalance *botBalance activeCEXOrders bool + groupedBuys map[uint64][]*core.Order + groupedSells map[uint64][]*core.Order expectedDeposit *withdrawArgs expectedWithdraw *withdrawArgs @@ -1213,12 +1132,18 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 1e16, MinQuoteAmt: 1e12, }, - orders: map[order.OrderID]*core.Order{}, - oidToPlacement: map[order.OrderID]int{}, - cexBaseBalance: 1e16, - cexQuoteBalance: 1e12, - dexBaseBalance: 1e16, - dexQuoteBalance: 1e12, + cexBaseBalance: &botBalance{ + Available: 1e16, + }, + cexQuoteBalance: &botBalance{ + Available: 1e12, + }, + dexBaseBalance: &botBalance{ + Available: 1e16, + }, + dexQuoteBalance: &botBalance{ + Available: 1e12, + }, }, // "no action with active cex orders" { @@ -1227,31 +1152,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 4 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 4 * mkt.LotSize, Rate: 6e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 3 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 5 * mkt.LotSize, + Locked: 8 * mkt.LotSize, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - cexBaseBalance: 3 * mkt.LotSize, - dexBaseBalance: 5 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), activeCEXOrders: true, }, // "no orders, need to withdraw base" @@ -1261,12 +1193,18 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 1e16, MinQuoteAmt: 1e12, }, - orders: map[order.OrderID]*core.Order{}, - oidToPlacement: map[order.OrderID]int{}, - cexBaseBalance: 8e16, - cexQuoteBalance: 1e12, - dexBaseBalance: 9e15, - dexQuoteBalance: 1e12, + cexBaseBalance: &botBalance{ + Available: 8e16, + }, + cexQuoteBalance: &botBalance{ + Available: 1e12, + }, + dexBaseBalance: &botBalance{ + Available: 9e15, + }, + dexQuoteBalance: &botBalance{ + Available: 1e12, + }, expectedWithdraw: &withdrawArgs{ assetID: 42, amt: (9e15+8e16)/2 - 9e15, @@ -1280,31 +1218,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 4 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 4 * mkt.LotSize, Rate: 6e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 3 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 5 * mkt.LotSize, + Locked: 8 * mkt.LotSize, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - cexBaseBalance: 3 * mkt.LotSize, - dexBaseBalance: 5 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), expectedDeposit: &withdrawArgs{ assetID: 42, amt: 5 * mkt.LotSize, @@ -1318,32 +1263,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 4 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 4 * mkt.LotSize, Rate: 6e7, Sell: true, LockedAmt: 4 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 3 * mkt.LotSize, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, + dexBaseBalance: &botBalance{ + Available: 5*mkt.LotSize - 2, + Locked: 8 * mkt.LotSize, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - cexBaseBalance: 3 * mkt.LotSize, - dexBaseBalance: 5*mkt.LotSize - 2, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), expectedCancels: []dex.Bytes{ orderIDs[1][:], }, @@ -1358,41 +1309,47 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 3 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 2 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 2 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 2 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 2 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[2]: { + }}, + 2: {{ ID: orderIDs[2][:], - Qty: 2 * mkt.LotSize, + Qty: 6 * mkt.LotSize, + Filled: 4 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 2 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 0, + }, + dexBaseBalance: &botBalance{ + Available: 1000, + Locked: 6 * mkt.LotSize, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, - orderIDs[2]: 2, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - cexBaseBalance: 0, - dexBaseBalance: 1000, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), expectedCancels: []dex.Bytes{ orderIDs[2][:], orderIDs[1][:], @@ -1408,16 +1365,18 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 3 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{}, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, - orderIDs[2]: 2, + cexBaseBalance: &botBalance{ + Available: 6 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 0, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - cexBaseBalance: 6 * mkt.LotSize, - dexBaseBalance: 0, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), expectedWithdraw: &withdrawArgs{ assetID: baseID, amt: 3 * mkt.LotSize, @@ -1431,32 +1390,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 3 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedSells: map[uint64][]*core.Order{}, + groupedBuys: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 2 * mkt.LotSize, Rate: 5e7, Sell: false, LockedAmt: 2*mkt.LotSize + 1500, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 2 * mkt.LotSize, Rate: 6e7, Sell: false, LockedAmt: 2*mkt.LotSize + 1500, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 8*mkt.LotSize - 2, + }, + dexBaseBalance: &botBalance{ + Available: 0, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 6*mkt.LotSize), + Locked: 4*mkt.LotSize + 3000, }, - cexBaseBalance: 8*mkt.LotSize - 2, - dexBaseBalance: 0, - cexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 6*mkt.LotSize), expectedCancels: []dex.Bytes{ orderIDs[1][:], }, @@ -1471,31 +1436,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedSells: map[uint64][]*core.Order{}, + groupedBuys: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 2 * mkt.LotSize, Rate: 5e7, Sell: false, LockedAmt: calc.BaseToQuote(5e7, 2*mkt.LotSize), Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 2 * mkt.LotSize, Rate: 5e7, Sell: false, LockedAmt: calc.BaseToQuote(5e7, 2*mkt.LotSize), Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 4*mkt.LotSize), }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 8*mkt.LotSize), + Locked: 2 * calc.BaseToQuote(5e7, 2*mkt.LotSize), }, - cexBaseBalance: 10 * mkt.LotSize, - dexBaseBalance: 10 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 4*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 8*mkt.LotSize), expectedDeposit: &withdrawArgs{ assetID: 0, amt: calc.BaseToQuote(5e7, 4*mkt.LotSize), @@ -1509,32 +1481,38 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedSells: map[uint64][]*core.Order{}, + groupedBuys: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 4 * mkt.LotSize, Rate: 5e7, Sell: false, LockedAmt: calc.BaseToQuote(5e7, 4*mkt.LotSize), Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 4 * mkt.LotSize, Rate: 5e7, Sell: false, LockedAmt: calc.BaseToQuote(5e7, 4*mkt.LotSize), Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 4*mkt.LotSize), }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 2*mkt.LotSize), + Locked: 2 * calc.BaseToQuote(5e7, 4*mkt.LotSize), }, - cexBaseBalance: 10 * mkt.LotSize, - dexBaseBalance: 10 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 4*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 2*mkt.LotSize), expectedCancels: []dex.Bytes{ orderIDs[1][:], }, @@ -1549,71 +1527,83 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 3 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 3 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 3 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 3 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + }, + dexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + Locked: 6 * mkt.LotSize, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 4*mkt.LotSize) + calc.BaseToQuote(uint64(float64(5e7)*(1+profitRate)), 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 2*mkt.LotSize) + 12e6, }, - cexBaseBalance: 10 * mkt.LotSize, - dexBaseBalance: 10 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 4*mkt.LotSize) + calc.BaseToQuote(uint64(float64(5e7)*(1+profitRate)), 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 2*mkt.LotSize) + 12e6, expectedWithdraw: &withdrawArgs{ assetID: quoteID, amt: calc.BaseToQuote(5e7, 4*mkt.LotSize), }, expectedQuotePending: true, }, - // "need to withdraw quote, no need to cancel 1 order" + // "need to withdraw quote, need to cancel 1 order" { - name: "need to withdraw quote, no need to cancel 1 order", + name: "need to withdraw quote, need to cancel 1 order", cfg: &AutoRebalanceConfig{ MinBaseAmt: 6 * mkt.LotSize, MinQuoteAmt: calc.BaseToQuote(5e7, 6*mkt.LotSize), }, - orders: map[order.OrderID]*core.Order{ - orderIDs[0]: { + groupedBuys: map[uint64][]*core.Order{}, + groupedSells: map[uint64][]*core.Order{ + 0: {{ ID: orderIDs[0][:], Qty: 3 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 3 * mkt.LotSize, Epoch: currEpoch - 2, - }, - orderIDs[1]: { + }}, + 1: {{ ID: orderIDs[1][:], Qty: 3 * mkt.LotSize, Rate: 5e7, Sell: true, LockedAmt: 3 * mkt.LotSize, Epoch: currEpoch - 2, - }, + }}, + }, + cexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, }, - oidToPlacement: map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 1, + dexBaseBalance: &botBalance{ + Available: 10 * mkt.LotSize, + Locked: 6 * mkt.LotSize, + }, + cexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 4*mkt.LotSize) + calc.BaseToQuote(uint64(float64(5e7)*(1+profitRate)), 6*mkt.LotSize), + }, + dexQuoteBalance: &botBalance{ + Available: calc.BaseToQuote(5e7, 2*mkt.LotSize) + 12e6 - 2, }, - cexBaseBalance: 10 * mkt.LotSize, - dexBaseBalance: 10 * mkt.LotSize, - cexQuoteBalance: calc.BaseToQuote(5e7, 4*mkt.LotSize) + calc.BaseToQuote(uint64(float64(5e7)*(1+profitRate)), 6*mkt.LotSize), - dexQuoteBalance: calc.BaseToQuote(5e7, 2*mkt.LotSize) + 12e6 - 2, expectedCancels: []dex.Bytes{ orderIDs[1][:], }, @@ -1626,27 +1616,29 @@ func TestArbMarketMakerAutoRebalance(t *testing.T) { runTest := func(test *test) { cex := newTBotCEXAdaptor() cex.balances = map[uint32]*botBalance{ - baseID: {Available: test.cexBaseBalance}, - quoteID: {Available: test.cexQuoteBalance}, + baseID: test.cexBaseBalance, + quoteID: test.cexQuoteBalance, } + tCore := newTCore() - tCore.setAssetBalances(map[uint32]uint64{ + coreAdaptor := newTBotCoreAdaptor(tCore) + coreAdaptor.balances = map[uint32]*botBalance{ baseID: test.dexBaseBalance, quoteID: test.dexQuoteBalance, - }) + } + coreAdaptor.groupedBuys = test.groupedBuys + coreAdaptor.groupedSells = test.groupedSells ctx, cancel := context.WithCancel(context.Background()) defer cancel() mm := &arbMarketMaker{ - ctx: ctx, - cex: cex, - core: newTBotCoreAdaptor(tCore), - baseID: baseID, - quoteID: quoteID, - oidToPlacement: test.oidToPlacement, - ords: test.orders, - log: tLogger, + ctx: ctx, + cex: cex, + core: coreAdaptor, + baseID: baseID, + quoteID: quoteID, + log: tLogger, cfg: &ArbMarketMakerConfig{ AutoRebalance: test.cfg, Profit: profitRate, diff --git a/client/mm/mm_basic.go b/client/mm/mm_basic.go index 3912b2d812..c6e84f72f9 100644 --- a/client/mm/mm_basic.go +++ b/client/mm/mm_basic.go @@ -10,13 +10,11 @@ import ( "math" "sync" "sync/atomic" - "time" "decred.org/dcrdex/client/core" "decred.org/dcrdex/client/orderbook" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/calc" - "decred.org/dcrdex/dex/order" ) const ( @@ -192,87 +190,18 @@ func steppedRate(r, step uint64) uint64 { return uint64(math.Round(steps * float64(step))) } -// orderFees are the fees used for calculating the half-spread. -type orderFees struct { - swap uint64 - redemption uint64 - refund uint64 - funding uint64 -} - type basicMarketMaker struct { - ctx context.Context - host string - base uint32 - quote uint32 - cfg *BasicMarketMakingConfig - book dexOrderBook - log dex.Logger - core botCoreAdaptor - oracle oracle - mkt *core.Market - // the fiat rate is the rate determined by comparing the fiat rates - // of the two assets. - fiatRateV atomic.Uint64 + ctx context.Context + host string + base uint32 + quote uint32 + cfg *BasicMarketMakingConfig + book dexOrderBook + log dex.Logger + core botCoreAdaptor + oracle oracle + mkt *core.Market rebalanceRunning atomic.Bool - - ordMtx sync.RWMutex - ords map[order.OrderID]*core.Order - oidToPlacement map[order.OrderID]int - - feesMtx sync.RWMutex - buyFees *orderFees - sellFees *orderFees -} - -// groupedOrder is a subset of an *core.Order. -type groupedOrder struct { - id order.OrderID - rate uint64 - lots uint64 - epoch uint64 - lockedAmt uint64 -} - -func groupOrders(orders map[order.OrderID]*core.Order, oidToPlacement map[order.OrderID]int, lotSize uint64) (buys, sells map[int][]*groupedOrder) { - makeGroupedOrder := func(o *core.Order) *groupedOrder { - var oid order.OrderID - copy(oid[:], o.ID) - return &groupedOrder{ - id: oid, - rate: o.Rate, - lots: (o.Qty - o.Filled) / lotSize, - epoch: o.Epoch, - lockedAmt: o.LockedAmt, - } - } - - buys, sells = make(map[int][]*groupedOrder), make(map[int][]*groupedOrder) - for _, ord := range orders { - var oid order.OrderID - copy(oid[:], ord.ID) - placementIndex := oidToPlacement[oid] - if ord.Sell { - if _, found := sells[placementIndex]; !found { - sells[placementIndex] = make([]*groupedOrder, 0, 1) - } - sells[placementIndex] = append(sells[placementIndex], makeGroupedOrder(ord)) - } else { - if _, found := buys[placementIndex]; !found { - buys[placementIndex] = make([]*groupedOrder, 0, 1) - } - buys[placementIndex] = append(buys[placementIndex], makeGroupedOrder(ord)) - } - } - - return buys, sells -} - -// groupedOrders returns the buy and sell orders grouped by placement index. -func (m *basicMarketMaker) groupedOrders() (buys, sells map[int][]*groupedOrder) { - m.ordMtx.RLock() - defer m.ordMtx.RUnlock() - return groupOrders(m.ords, m.oidToPlacement, m.mkt.LotSize) } // basisPrice calculates the basis price for the market maker. @@ -348,8 +277,18 @@ func basisPrice(book dexOrderBook, oracle oracle, cfg *BasicMarketMakingConfig, return 0 } +func (m *basicMarketMaker) mktRateUsingFiatSources() uint64 { + baseRate := m.core.FiatRate(m.base) + quoteRate := m.core.FiatRate(m.quote) + if baseRate == 0 || quoteRate == 0 { + return 0 + } + return m.mkt.ConventionalRateToMsg(baseRate / quoteRate) + +} + func (m *basicMarketMaker) basisPrice() uint64 { - return basisPrice(m.book, m.oracle, m.cfg, m.mkt, m.fiatRateV.Load(), m.log) + return basisPrice(m.book, m.oracle, m.cfg, m.mkt, m.mktRateUsingFiatSources(), m.log) } func (m *basicMarketMaker) halfSpread(basisPrice uint64) (uint64, error) { @@ -389,12 +328,17 @@ func (m *basicMarketMaker) halfSpread(basisPrice uint64) (uint64, error) { return halfGap, nil } +func (m *basicMarketMaker) groupedOrders() (buys, sells map[uint64][]*core.Order) { + return m.core.GroupedBookedOrders() +} + func (m *basicMarketMaker) placeMultiTrade(placements []*rateLots, sell bool) { - qtyRates := make([]*core.QtyRate, 0, len(placements)) + multiTradePlacements := make([]*multiTradePlacement, 0, len(placements)) for _, p := range placements { - qtyRates = append(qtyRates, &core.QtyRate{ - Qty: p.lots * m.mkt.LotSize, - Rate: p.rate, + multiTradePlacements = append(multiTradePlacements, &multiTradePlacement{ + qty: p.lots * m.mkt.LotSize, + rate: p.rate, + grouping: uint64(p.placementIndex), }) } @@ -405,65 +349,18 @@ func (m *basicMarketMaker) placeMultiTrade(placements []*rateLots, sell bool) { options = m.cfg.QuoteOptions } - orders, err := m.core.MultiTrade(nil, &core.MultiTradeForm{ - Host: m.host, - Sell: sell, - Base: m.base, - Quote: m.quote, - Placements: qtyRates, - Options: options, + _, err := m.core.MultiTrade(&multiTradeForm{ + host: m.host, + sell: sell, + base: m.base, + quote: m.quote, + placements: multiTradePlacements, + options: options, }) if err != nil { m.log.Errorf("Error placing rebalancing order: %v", err) return } - - m.ordMtx.Lock() - for i, ord := range orders { - var oid order.OrderID - copy(oid[:], ord.ID) - m.ords[oid] = ord - m.oidToPlacement[oid] = placements[i].placementIndex - } - m.ordMtx.Unlock() -} - -func (m *basicMarketMaker) processFiatRates(rates map[uint32]float64) { - var fiatRate uint64 - - baseRate := rates[m.base] - quoteRate := rates[m.quote] - if baseRate > 0 && quoteRate > 0 { - fiatRate = m.mkt.ConventionalRateToMsg(baseRate / quoteRate) - } - - m.fiatRateV.Store(fiatRate) -} - -func (m *basicMarketMaker) processTrade(o *core.Order) { - if len(o.ID) == 0 { - return - } - - var oid order.OrderID - copy(oid[:], o.ID) - - m.ordMtx.Lock() - defer m.ordMtx.Unlock() - - _, found := m.ords[oid] - if !found { - return - } - - if o.Status > order.OrderStatusBooked { - // We stop caring when the order is taken off the book. - delete(m.ords, oid) - delete(m.oidToPlacement, oid) - } else { - // Update our reference. - m.ords[oid] = o - } } func orderPrice(basisPrice, breakEven uint64, strategy GapStrategy, factor float64, sell bool, mkt *core.Market) uint64 { @@ -501,7 +398,7 @@ func orderPrice(basisPrice, breakEven uint64, strategy GapStrategy, factor float type rebalancer interface { basisPrice() uint64 halfSpread(uint64) (uint64, error) - groupedOrders() (buys, sells map[int][]*groupedOrder) + groupedOrders() (buys, sells map[uint64][]*core.Order) } type rateLots struct { @@ -531,7 +428,7 @@ func basicMMRebalance(newEpoch uint64, m rebalancer, c botCoreAdaptor, cfg *Basi } existingBuys, existingSells := m.groupedOrders() - getExistingOrders := func(index int, sell bool) []*groupedOrder { + getExistingOrders := func(index uint64, sell bool) []*core.Order { if sell { return existingSells[index] } @@ -543,15 +440,15 @@ func basicMMRebalance(newEpoch uint64, m rebalancer, c botCoreAdaptor, cfg *Basi var highestExistingBuy, lowestExistingSell uint64 = 0, math.MaxUint64 for _, placementOrders := range existingBuys { for _, o := range placementOrders { - if o.rate > highestExistingBuy { - highestExistingBuy = o.rate + if o.Rate > highestExistingBuy { + highestExistingBuy = o.Rate } } } for _, placementOrders := range existingSells { for _, o := range placementOrders { - if o.rate < lowestExistingSell { - lowestExistingSell = o.rate + if o.Rate < lowestExistingSell { + lowestExistingSell = o.Rate } } } @@ -581,12 +478,12 @@ func basicMMRebalance(newEpoch uint64, m rebalancer, c botCoreAdaptor, cfg *Basi } cancels = make([]dex.Bytes, 0, len(cfg.SellPlacements)+len(cfg.BuyPlacements)) - addCancel := func(o *groupedOrder) { - if newEpoch-o.epoch < 2 { + addCancel := func(o *core.Order) { + if newEpoch-o.Epoch < 2 { // TODO: check epoch log.Debugf("rebalance: skipping cancel not past free cancel threshold") return } - cancels = append(cancels, o.id[:]) + cancels = append(cancels, o.ID) } processSide := func(sell bool) []*rateLots { @@ -636,11 +533,12 @@ func basicMMRebalance(newEpoch uint64, m rebalancer, c botCoreAdaptor, cfg *Basi continue } - existingOrders := getExistingOrders(i, sell) + existingOrders := getExistingOrders(uint64(i), sell) var numLotsOnBooks uint64 for _, o := range existingOrders { - numLotsOnBooks += o.lots - if !withinTolerance(o.rate, placementRate) { + lots := (o.Qty - o.Filled) / mkt.LotSize + numLotsOnBooks += lots + if !withinTolerance(o.Rate, placementRate) { addCancel(o) } } @@ -694,9 +592,11 @@ func (m *basicMarketMaker) rebalance(newEpoch uint64) { defer m.rebalanceRunning.Store(false) m.log.Tracef("rebalance: epoch %d", newEpoch) - m.feesMtx.RLock() - buyFees, sellFees := m.buyFees, m.sellFees - m.feesMtx.RUnlock() + buyFees, sellFees, err := m.core.OrderFees() + if err != nil { + m.log.Errorf("Error getting order fees: %v", err) + return + } cancels, buyOrders, sellOrders := basicMMRebalance(newEpoch, m, m.core, m.cfg, m.mkt, buyFees, sellFees, m.log) @@ -715,83 +615,6 @@ func (m *basicMarketMaker) rebalance(newEpoch uint64) { } } -func (m *basicMarketMaker) handleNotification(note core.Notification) { - switch n := note.(type) { - case *core.OrderNote: - ord := n.Order - if ord == nil { - return - } - m.processTrade(ord) - case *core.FiatRatesNote: - go m.processFiatRates(n.FiatRates) - } -} - -func (m *basicMarketMaker) cancelAllOrders() { - m.ordMtx.Lock() - defer m.ordMtx.Unlock() - for oid := range m.ords { - if err := m.core.Cancel(oid[:]); err != nil { - m.log.Errorf("error cancelling order: %v", err) - } - } - m.ords = make(map[order.OrderID]*core.Order) - m.oidToPlacement = make(map[order.OrderID]int) -} - -func (m *basicMarketMaker) updateFeeRates() error { - buySwapFees, buyRedeemFees, buyRefundFees, err := m.core.SingleLotFees(&core.SingleLotFeesForm{ - Host: m.host, - Base: m.base, - Quote: m.quote, - UseMaxFeeRate: true, - UseSafeTxSize: true, - }) - if err != nil { - return fmt.Errorf("failed to get fees: %v", err) - } - - sellSwapFees, sellRedeemFees, sellRefundFees, err := m.core.SingleLotFees(&core.SingleLotFeesForm{ - Host: m.host, - Base: m.base, - Quote: m.quote, - UseMaxFeeRate: true, - UseSafeTxSize: true, - Sell: true, - }) - if err != nil { - return fmt.Errorf("failed to get fees: %v", err) - } - - buyFundingFees, err := m.core.MaxFundingFees(m.quote, m.host, uint32(len(m.cfg.BuyPlacements)), m.cfg.QuoteOptions) - if err != nil { - return fmt.Errorf("failed to get funding fees: %v", err) - } - - sellFundingFees, err := m.core.MaxFundingFees(m.base, m.host, uint32(len(m.cfg.SellPlacements)), m.cfg.BaseOptions) - if err != nil { - return fmt.Errorf("failed to get funding fees: %v", err) - } - - m.feesMtx.Lock() - defer m.feesMtx.Unlock() - m.buyFees = &orderFees{ - swap: buySwapFees, - redemption: buyRedeemFees, - refund: buyRefundFees, - funding: buyFundingFees, - } - m.sellFees = &orderFees{ - swap: sellSwapFees, - redemption: sellRedeemFees, - refund: sellRefundFees, - funding: sellFundingFees, - } - - return nil -} - func (m *basicMarketMaker) run() { book, bookFeed, err := m.core.SyncBook(m.host, m.base, m.quote) if err != nil { @@ -819,49 +642,12 @@ func (m *basicMarketMaker) run() { } }() - // Process core notifications - noteFeed := m.core.NotificationFeed() - wg.Add(1) - go func() { - defer wg.Done() - defer noteFeed.ReturnFeed() - for { - select { - case n := <-noteFeed.C: - m.handleNotification(n) - case <-m.ctx.Done(): - return - } - } - }() - - // Periodically update asset fee rates - wg.Add(1) - go func() { - defer wg.Done() - refreshTime := time.Minute * 10 - for { - select { - case <-time.NewTimer(refreshTime).C: - err := m.updateFeeRates() - if err != nil { - m.log.Error(err) - refreshTime = time.Minute - } else { - refreshTime = time.Minute * 10 - } - case <-m.ctx.Done(): - return - } - } - }() - wg.Wait() - m.cancelAllOrders() + m.core.CancelAllOrders() } // RunBasicMarketMaker starts a basic market maker bot. -func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c botCoreAdaptor, oracle oracle, baseFiatRate, quoteFiatRate float64, log dex.Logger) { +func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c botCoreAdaptor, oracle oracle, log dex.Logger) { if cfg.BasicMMConfig == nil { // implies bug in caller log.Errorf("No market making config provided. Exiting.") @@ -881,27 +667,15 @@ func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c botCoreAdaptor, } mm := &basicMarketMaker{ - ctx: ctx, - core: c, - log: log, - cfg: cfg.BasicMMConfig, - host: cfg.Host, - base: cfg.BaseAsset, - quote: cfg.QuoteAsset, - oracle: oracle, - mkt: mkt, - ords: make(map[order.OrderID]*core.Order), - oidToPlacement: make(map[order.OrderID]int), - } - - err = mm.updateFeeRates() - if err != nil { - log.Errorf("Not starting market maker: %v", err) - return - } - - if baseFiatRate > 0 && quoteFiatRate > 0 { - mm.fiatRateV.Store(mkt.ConventionalRateToMsg(baseFiatRate / quoteFiatRate)) + ctx: ctx, + core: c, + log: log, + cfg: cfg.BasicMMConfig, + host: cfg.Host, + base: cfg.BaseAsset, + quote: cfg.QuoteAsset, + oracle: oracle, + mkt: mkt, } mm.run() diff --git a/client/mm/mm_basic_test.go b/client/mm/mm_basic_test.go index e605915bee..e7e2945744 100644 --- a/client/mm/mm_basic_test.go +++ b/client/mm/mm_basic_test.go @@ -4,8 +4,6 @@ package mm import ( "errors" - "reflect" - "sort" "testing" "decred.org/dcrdex/client/core" @@ -24,8 +22,8 @@ type tRebalancer struct { basis uint64 breakEven uint64 breakEvenErr error - sortedBuys map[int][]*groupedOrder - sortedSells map[int][]*groupedOrder + sortedBuys map[uint64][]*core.Order + sortedSells map[uint64][]*core.Order } var _ rebalancer = (*tRebalancer)(nil) @@ -33,12 +31,11 @@ var _ rebalancer = (*tRebalancer)(nil) func (r *tRebalancer) basisPrice() uint64 { return r.basis } - func (r *tRebalancer) halfSpread(basisPrice uint64) (uint64, error) { return r.breakEven, r.breakEvenErr } -func (r *tRebalancer) groupedOrders() (buys, sells map[int][]*groupedOrder) { +func (r *tRebalancer) groupedOrders() (buys, sells map[uint64][]*core.Order) { return r.sortedBuys, r.sortedSells } @@ -75,7 +72,7 @@ func TestRebalance(t *testing.T) { QuoteID: btcBipID, } - newBalancer := func(existingBuys, existingSells map[int][]*groupedOrder) *tRebalancer { + newBalancer := func(existingBuys, existingSells map[uint64][]*core.Order) *tRebalancer { return &tRebalancer{ basis: midGap, breakEven: breakEven, @@ -93,23 +90,23 @@ func TestRebalance(t *testing.T) { rebalancer *tRebalancer isAccountLocker map[uint32]bool - balances map[uint32]uint64 + balances map[uint32]*botBalance expectedBuys []rateLots expectedSells []rateLots expectedCancels []order.OrderID } - newgroupedOrder := func(id order.OrderID, lots, rate uint64, sell bool, freeCancel bool) *groupedOrder { + newOrder := func(id order.OrderID, lots, rate uint64, sell bool, freeCancel bool) *core.Order { var epoch uint64 = newEpoch if freeCancel { epoch = newEpoch - 2 } - return &groupedOrder{ - id: id, - epoch: epoch, - rate: rate, - lots: lots, + return &core.Order{ + ID: id[:], + Epoch: epoch, + Rate: rate, + Qty: lots * lotSize, } } @@ -153,9 +150,13 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedBuys: []rateLots{ { @@ -184,11 +185,14 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, - expectedBuys: []rateLots{ { rate: midGap - (breakEven * 3), @@ -211,11 +215,14 @@ func TestRebalance(t *testing.T) { BuyPlacements: []*OrderPlacement{}, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, - expectedBuys: []rateLots{}, expectedSells: []rateLots{ { @@ -251,11 +258,14 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, - expectedBuys: []rateLots{ { rate: midGap - (breakEven * 2), @@ -308,27 +318,31 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: requiredForOrder(true, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 1, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 2*sellFees.swap + sellFees.funding, - btcBipID: requiredForOrder(false, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 1, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 2*buyFees.swap + buyFees.funding, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: requiredForOrder(true, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 1, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 2*sellFees.swap + sellFees.funding, + }, + btcBipID: { + Available: requiredForOrder(false, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 1, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 2*buyFees.swap + buyFees.funding, + }, }, expectedBuys: []rateLots{ { @@ -382,27 +396,31 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: requiredForOrder(true, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 1, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 2*sellFees.swap + sellFees.funding - 1, - btcBipID: requiredForOrder(false, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 1, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 2*buyFees.swap + buyFees.funding - 1, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: requiredForOrder(true, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 1, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 2*sellFees.swap + sellFees.funding - 1, + }, + btcBipID: { + Available: requiredForOrder(false, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 1, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 2*buyFees.swap + buyFees.funding - 1, + }, }, expectedBuys: []rateLots{ { @@ -444,29 +462,32 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: requiredForOrder(true, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 2, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 3*sellFees.swap + sellFees.funding, - btcBipID: requiredForOrder(false, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 2, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 3*buyFees.swap + buyFees.funding, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: requiredForOrder(true, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 2, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 3*sellFees.swap + sellFees.funding, + }, + btcBipID: { + Available: requiredForOrder(false, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 2, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 3*buyFees.swap + buyFees.funding, + }, }, - expectedBuys: []rateLots{ { rate: midGap - (breakEven * 2), @@ -519,27 +540,31 @@ func TestRebalance(t *testing.T) { }, }, rebalancer: newBalancer(nil, nil), - balances: map[uint32]uint64{ - dcrBipID: requiredForOrder(true, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 2, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 3*sellFees.swap + sellFees.funding - 1, - btcBipID: requiredForOrder(false, []*OrderPlacement{ - { - Lots: 1, - GapFactor: 2, - }, - { - Lots: 2, - GapFactor: 3, - }, - }, GapStrategyMultiplier) + 3*buyFees.swap + buyFees.funding - 1, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: requiredForOrder(true, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 2, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 3*sellFees.swap + sellFees.funding - 1, + }, + btcBipID: { + Available: requiredForOrder(false, []*OrderPlacement{ + { + Lots: 1, + GapFactor: 2, + }, + { + Lots: 2, + GapFactor: 3, + }, + }, GapStrategyMultiplier) + 3*buyFees.swap + buyFees.funding - 1, + }, }, expectedBuys: []rateLots{ { @@ -592,19 +617,23 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), false), false, true), + newOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), false), false, true), }, }, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 1: { - newgroupedOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), false), true, true), + newOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), false), true, true), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedCancels: []order.OrderID{ orderIDs[0], @@ -653,19 +682,23 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), true), false, true), + newOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), true), false, true), }, }, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 1: { - newgroupedOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), true), true, true), + newOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), true), true, true), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedBuys: []rateLots{ { @@ -709,19 +742,23 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), true), false, true), + newOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), true), false, true), }, }, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 1: { - newgroupedOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), true), true, true), + newOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), true), true, true), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedBuys: []rateLots{ { @@ -774,19 +811,23 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), false), false, true), + newOrder(orderIDs[0], 1, driftToleranceEdge(midGap-(breakEven*2), false), false, true), }, }, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 1: { - newgroupedOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), false), true, true), + newOrder(orderIDs[1], 1, driftToleranceEdge(midGap+(breakEven*3), false), true, true), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedBuys: []rateLots{ { @@ -844,14 +885,18 @@ func TestRebalance(t *testing.T) { }, rebalancer: newBalancer( nil, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[1], 1, midGap-(breakEven*2)-1, true, true), + newOrder(orderIDs[1], 1, midGap-(breakEven*2)-1, true, true), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedCancels: []order.OrderID{ orderIDs[1], @@ -899,15 +944,19 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[1], 1, midGap+(breakEven*2), true, true), + newOrder(orderIDs[1], 1, midGap+(breakEven*2), true, true), }, }, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedCancels: []order.OrderID{ orderIDs[1], @@ -956,14 +1005,18 @@ func TestRebalance(t *testing.T) { }, rebalancer: newBalancer( nil, - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[1], 1, midGap-(breakEven*2)-1, true, false), + newOrder(orderIDs[1], 1, midGap-(breakEven*2)-1, true, false), }, }), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedCancels: []order.OrderID{}, expectedBuys: []rateLots{ @@ -1009,15 +1062,19 @@ func TestRebalance(t *testing.T) { DriftTolerance: driftTolerance, }, rebalancer: newBalancer( - map[int][]*groupedOrder{ + map[uint64][]*core.Order{ 0: { - newgroupedOrder(orderIDs[1], 1, midGap+(breakEven*2), true, false), + newOrder(orderIDs[1], 1, midGap+(breakEven*2), true, false), }, }, nil), - balances: map[uint32]uint64{ - dcrBipID: 1e13, - btcBipID: 1e13, + balances: map[uint32]*botBalance{ + dcrBipID: { + Available: 1e13, + }, + btcBipID: { + Available: 1e13, + }, }, expectedCancels: []order.OrderID{}, expectedBuys: []rateLots{ @@ -1043,15 +1100,15 @@ func TestRebalance(t *testing.T) { } else { tCore.isAccountLocker = map[uint32]bool{} } - - tCore.setAssetBalances(tt.balances) + coreAdaptor := newTBotCoreAdaptor(tCore) + coreAdaptor.balances = tt.balances epoch := tt.epoch if epoch == 0 { epoch = newEpoch } - cancels, buys, sells := basicMMRebalance(epoch, tt.rebalancer, newTBotCoreAdaptor(tCore), tt.cfg, mkt, buyFees, sellFees, log) + cancels, buys, sells := basicMMRebalance(epoch, tt.rebalancer, coreAdaptor, tt.cfg, mkt, buyFees, sellFees, log) if len(cancels) != len(tt.expectedCancels) { t.Fatalf("%s: cancel count mismatch. expected %d, got %d", tt.name, len(tt.expectedCancels), len(cancels)) @@ -1260,115 +1317,3 @@ func TestBreakEvenHalfSpread(t *testing.T) { } } } - -func TestGroupedOrders(t *testing.T) { - const rateStep uint64 = 1e3 - const lotSize uint64 = 50e8 - mkt := &core.Market{ - RateStep: rateStep, - AtomToConv: 1, - LotSize: lotSize, - BaseID: dcrBipID, - QuoteID: btcBipID, - } - - orderIDs := make([]order.OrderID, 5) - for i := range orderIDs { - copy(orderIDs[i][:], encode.RandomBytes(32)) - } - - orders := map[order.OrderID]*core.Order{ - orderIDs[0]: { - ID: orderIDs[0][:], - Sell: true, - Rate: 100e8, - Qty: 2 * lotSize, - Filled: lotSize, - }, - orderIDs[1]: { - ID: orderIDs[1][:], - Sell: false, - Rate: 200e8, - Qty: 2 * lotSize, - Filled: lotSize, - }, - orderIDs[2]: { - ID: orderIDs[2][:], - Sell: true, - Rate: 300e8, - Qty: 2 * lotSize, - }, - orderIDs[3]: { - ID: orderIDs[3][:], - Sell: false, - Rate: 400e8, - Qty: 1 * lotSize, - }, - orderIDs[4]: { - ID: orderIDs[4][:], - Sell: false, - Rate: 402e8, - Qty: 1 * lotSize, - }, - } - - ordToPlacementIndex := map[order.OrderID]int{ - orderIDs[0]: 0, - orderIDs[1]: 0, - orderIDs[2]: 1, - orderIDs[3]: 1, - orderIDs[4]: 1, - } - - mm := &basicMarketMaker{ - mkt: mkt, - ords: orders, - oidToPlacement: ordToPlacementIndex, - } - - expectedBuys := map[int][]*groupedOrder{ - 0: {{ - id: orderIDs[1], - rate: 200e8, - lots: 1, - }}, - 1: {{ - id: orderIDs[3], - rate: 400e8, - lots: 1, - }, { - id: orderIDs[4], - rate: 402e8, - lots: 1, - }}, - } - - expectedSells := map[int][]*groupedOrder{ - 0: {{ - id: orderIDs[0], - rate: 100e8, - lots: 1, - }}, - 1: {{ - id: orderIDs[2], - rate: 300e8, - lots: 2, - }}, - } - - buys, sells := mm.groupedOrders() - - for i, buy := range buys { - sort.Slice(buy, func(i, j int) bool { - return buy[i].rate < buy[j].rate - }) - reflect.DeepEqual(buy, expectedBuys[i]) - } - - for i, sell := range sells { - sort.Slice(sell, func(i, j int) bool { - return sell[i].rate < sell[j].rate - }) - reflect.DeepEqual(sell, expectedSells[i]) - } -} diff --git a/client/mm/mm_simple_arb.go b/client/mm/mm_simple_arb.go index 13f0cf5b28..6a0c2570ab 100644 --- a/client/mm/mm_simple_arb.go +++ b/client/mm/mm_simple_arb.go @@ -383,18 +383,18 @@ func (a *simpleArbMarketMaker) executeArb(sellOnDex bool, lotsToArb, dexRate, ce options = a.cfg.QuoteOptions } - dexOrders, err := a.core.MultiTrade(nil, &core.MultiTradeForm{ - Host: a.host, - Sell: sellOnDex, - Base: a.baseID, - Quote: a.quoteID, - Placements: []*core.QtyRate{ + dexOrders, err := a.core.MultiTrade(&multiTradeForm{ + host: a.host, + sell: sellOnDex, + base: a.baseID, + quote: a.quoteID, + placements: []*multiTradePlacement{ { - Qty: lotsToArb * a.mkt.LotSize, - Rate: dexRate, + qty: lotsToArb * a.mkt.LotSize, + rate: dexRate, }, }, - Options: options, + options: options, }) if err != nil || len(dexOrders) != 1 { if err != nil { @@ -527,17 +527,6 @@ func (a *simpleArbMarketMaker) handleDEXOrderUpdate(o *core.Order) { } } -func (m *simpleArbMarketMaker) handleNotification(note core.Notification) { - switch n := note.(type) { - case *core.OrderNote: - ord := n.Order - if ord == nil { - return - } - m.handleDEXOrderUpdate(ord) - } -} - func (a *simpleArbMarketMaker) run() { book, bookFeed, err := a.core.SyncBook(a.host, a.baseID, a.quoteID) if err != nil { @@ -586,16 +575,14 @@ func (a *simpleArbMarketMaker) run() { } }() - noteFeed := a.core.NotificationFeed() - wg.Add(1) go func() { defer wg.Done() - defer noteFeed.ReturnFeed() + orderUpdates := a.core.SubscribeOrderUpdates() for { select { - case n := <-noteFeed.C: - a.handleNotification(n) + case n := <-orderUpdates: + a.handleDEXOrderUpdate(n) case <-a.ctx.Done(): return } diff --git a/client/mm/mm_simple_arb_test.go b/client/mm/mm_simple_arb_test.go index b1a989078c..9d798ba6b9 100644 --- a/client/mm/mm_simple_arb_test.go +++ b/client/mm/mm_simple_arb_test.go @@ -154,7 +154,7 @@ func TestArbRebalance(t *testing.T) { dexMaxBuyErr error // The strategy uses maxSell/maxBuy to determine how much it can trade. // dexBalances is just used for auto rebalancing. - dexBalances map[uint32]uint64 + dexBalances map[uint32]*botBalance cexBalances map[uint32]*botBalance dexVWAPErr error cexVWAPErr error @@ -750,9 +750,9 @@ func TestArbRebalance(t *testing.T) { Lots: 5, }, }, - dexBalances: map[uint32]uint64{ - 42: 1e14, - 0: 1e17, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e14}, + 0: {Available: 1e17}, }, cexBalances: map[uint32]*botBalance{ 42: {Available: 1e19}, @@ -785,9 +785,9 @@ func TestArbRebalance(t *testing.T) { Lots: 5, }, }, - dexBalances: map[uint32]uint64{ - 42: 9.5e15, - 0: 1.1e12, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 9.5e15}, + 0: {Available: 1.1e12}, }, cexBalances: map[uint32]*botBalance{ 42: {Available: 1.1e16}, @@ -822,9 +822,9 @@ func TestArbRebalance(t *testing.T) { Lots: 5, }, }, - dexBalances: map[uint32]uint64{ - 42: 9.5e15, - 0: 1.1e12, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 9.5e15}, + 0: {Available: 1.1e12}, }, cexBalances: map[uint32]*botBalance{ 42: {Available: 1.1e16}, @@ -851,9 +851,9 @@ func TestArbRebalance(t *testing.T) { Lots: 5, }, }, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e10, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e10}, }, cexBalances: map[uint32]*botBalance{ 42: {Available: 1e14}, @@ -886,9 +886,9 @@ func TestArbRebalance(t *testing.T) { Lots: 5, }, }, - dexBalances: map[uint32]uint64{ - 42: 1e19, - 0: 1e10, + dexBalances: map[uint32]*botBalance{ + 42: {Available: 1e19}, + 0: {Available: 1e10}, }, cexBalances: map[uint32]*botBalance{ 42: {Available: 1e14}, @@ -914,7 +914,10 @@ func TestArbRebalance(t *testing.T) { tCore.maxSellEstimate = test.dexMaxSell tCore.maxSellErr = test.dexMaxSellErr tCore.maxBuyErr = test.dexMaxBuyErr - tCore.setAssetBalances(test.dexBalances) + + coreAdaptor := newTBotCoreAdaptor(tCore) + coreAdaptor.balances = test.dexBalances + if test.expectedDexOrder != nil { tCore.multiTradeResult = []*core.Order{ { @@ -951,7 +954,7 @@ func TestArbRebalance(t *testing.T) { mkt: mkt, baseID: 42, quoteID: 0, - core: newTBotCoreAdaptor(tCore), + core: coreAdaptor, activeArbs: test.existingArbs, cfg: &SimpleArbConfig{ ProfitTrigger: profitTrigger, @@ -1196,7 +1199,7 @@ func TestArbDexTradeUpdates(t *testing.T) { runTest := func(test *test) { cex := newTBotCEXAdaptor() - tCore := newTCore() + coreAdaptor := newTBotCoreAdaptor(newTCore()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1207,7 +1210,7 @@ func TestArbDexTradeUpdates(t *testing.T) { cex: cex, baseID: 42, quoteID: 0, - core: newTBotCoreAdaptor(tCore), + core: coreAdaptor, activeArbs: test.activeArbs, cfg: &SimpleArbConfig{ ProfitTrigger: 0.01, @@ -1218,14 +1221,11 @@ func TestArbDexTradeUpdates(t *testing.T) { go arbEngine.run() - tCore.noteFeed <- &core.OrderNote{ - Order: &core.Order{ - Status: test.updatedOrderStatus, - ID: test.updatedOrderID, - }, + coreAdaptor.orderUpdates <- &core.Order{ + Status: test.updatedOrderStatus, + ID: test.updatedOrderID, } - dummyNote := &core.BondRefundNote{} - tCore.noteFeed <- dummyNote + coreAdaptor.orderUpdates <- &core.Order{} if len(test.expectedActiveArbs) != len(arbEngine.activeArbs) { t.Fatalf("%s: expected %d active arbs but got %d", test.name, len(test.expectedActiveArbs), len(arbEngine.activeArbs)) diff --git a/client/mm/mm_test.go b/client/mm/mm_test.go index a4c09bd1c8..54bb9402ef 100644 --- a/client/mm/mm_test.go +++ b/client/mm/mm_test.go @@ -375,26 +375,48 @@ func (c *tCore) setAssetBalances(balances map[uint32]uint64) { type tBotCoreAdaptor struct { clientCore tCore *tCore + + balances map[uint32]*botBalance + groupedBuys map[uint64][]*core.Order + groupedSells map[uint64][]*core.Order + orderUpdates chan *core.Order } func (c *tBotCoreAdaptor) AssetBalance(assetID uint32) (*botBalance, error) { if c.tCore.assetBalanceErr != nil { return nil, c.tCore.assetBalanceErr } - return &botBalance{ - Available: c.tCore.assetBalances[assetID].Available, - }, nil + return c.balances[assetID], nil } -func (c *tBotCoreAdaptor) MultiTrade(pw []byte, form *core.MultiTradeForm) ([]*core.Order, error) { - c.tCore.multiTradesPlaced = append(c.tCore.multiTradesPlaced, form) +func (c *tBotCoreAdaptor) MultiTrade(form *multiTradeForm) ([]*core.Order, error) { + c.tCore.multiTradesPlaced = append(c.tCore.multiTradesPlaced, form.toCore()) return c.tCore.multiTradeResult, nil } +func (c *tBotCoreAdaptor) GroupedBookedOrders() (buys, sells map[uint64][]*core.Order) { + return c.groupedBuys, c.groupedSells +} + +func (c *tBotCoreAdaptor) CancelAllOrders() bool { return false } + +func (c *tBotCoreAdaptor) FiatRate(assetID uint32) float64 { + return 0 +} + +func (c *tBotCoreAdaptor) OrderFees() (buyFees, sellFees *orderFees, err error) { + return nil, nil, nil +} + +func (c *tBotCoreAdaptor) SubscribeOrderUpdates() (updates <-chan *core.Order) { + return c.orderUpdates +} + func newTBotCoreAdaptor(c *tCore) *tBotCoreAdaptor { return &tBotCoreAdaptor{ - clientCore: c, - tCore: c, + clientCore: c, + tCore: c, + orderUpdates: make(chan *core.Order), } } @@ -1105,7 +1127,6 @@ func TestInitialBaseBalances(t *testing.T) { QuoteBalance: 50, BaseFeeAssetBalanceType: Amount, BaseFeeAssetBalance: 500, - CEXCfg: &BotCEXCfg{ Name: "Binance", BaseBalanceType: Percentage,