diff --git a/client/core/notification.go b/client/core/notification.go index dc04502769..b8e5b6db9f 100644 --- a/client/core/notification.go +++ b/client/core/notification.go @@ -67,6 +67,10 @@ func (c *Core) logNote(n Notification) { logFun("notify: %v", n) } +func (c *Core) Broadcast(n Notification) { + c.notify(n) +} + // notify sends a notification to all subscribers. If the notification is of // sufficient severity, it is stored in the database. func (c *Core) notify(n Notification) { diff --git a/client/mm/mm.go b/client/mm/mm.go index 96aaf6b3b4..652608941d 100644 --- a/client/mm/mm.go +++ b/client/mm/mm.go @@ -8,8 +8,6 @@ import ( "encoding/hex" "errors" "fmt" - "strconv" - "strings" "sync" "sync/atomic" @@ -39,6 +37,7 @@ type clientCore interface { User() *core.User Login(pw []byte) error OpenWallet(assetID uint32, appPW []byte) error + Broadcast(core.Notification) } var _ clientCore = (*core.Core)(nil) @@ -166,7 +165,7 @@ type MarketMaker struct { unsyncedOracle *priceOracle runningBotsMtx sync.RWMutex - runningBots map[string]interface{} + runningBots map[MarketWithHost]interface{} noteMtx sync.RWMutex noteChans map[uint64]chan core.Notification @@ -182,7 +181,7 @@ func NewMarketMaker(c clientCore, log dex.Logger) (*MarketMaker, error) { log: log, running: atomic.Bool{}, orders: make(map[order.OrderID]*orderInfo), - runningBots: make(map[string]interface{}), + runningBots: make(map[MarketWithHost]interface{}), noteChans: make(map[uint64]chan core.Notification), unsyncedOracle: newUnsyncedPriceOracle(log), }, nil @@ -195,49 +194,23 @@ func (m *MarketMaker) Running() bool { // MarketWithHost represents a market on a specific dex server. type MarketWithHost struct { - Host string `json:"host"` - Base uint32 `json:"base"` - Quote uint32 `json:"quote"` + Host string `json:"host"` + BaseID uint32 `json:"base"` + QuoteID uint32 `json:"quote"` } func (m *MarketWithHost) String() string { - return fmt.Sprintf("%s-%d-%d", m.Host, m.Base, m.Quote) -} - -func parseMarketWithHost(mkt string) (*MarketWithHost, error) { - parts := strings.Split(mkt, "-") - if len(parts) != 3 { - return nil, fmt.Errorf("invalid market %s", mkt) - } - host := parts[0] - base64, err := strconv.ParseUint(parts[1], 10, 32) - if err != nil { - return nil, fmt.Errorf("invalid market %s", mkt) - } - quote64, err := strconv.ParseUint(parts[2], 10, 32) - if err != nil { - return nil, fmt.Errorf("invalid market %s", mkt) - } - return &MarketWithHost{ - Host: host, - Base: uint32(base64), - Quote: uint32(quote64), - }, nil + return fmt.Sprintf("%s-%d-%d", m.Host, m.BaseID, m.QuoteID) } // RunningBots returns the markets on which a bot is running. -func (m *MarketMaker) RunningBots() []*MarketWithHost { +func (m *MarketMaker) RunningBots() []MarketWithHost { m.runningBotsMtx.RLock() defer m.runningBotsMtx.RUnlock() - mkts := make([]*MarketWithHost, 0, len(m.runningBots)) + mkts := make([]MarketWithHost, 0, len(m.runningBots)) for mkt := range m.runningBots { - mktWithHost, err := parseMarketWithHost(mkt) - if err != nil { - m.log.Errorf("failed to parse market %s: %v", mkt, err) - continue - } - mkts = append(mkts, mktWithHost) + mkts = append(mkts, mkt) } return mkts @@ -285,13 +258,13 @@ func priceOracleFromConfigs(ctx context.Context, cfgs []*BotConfig, log dex.Logg return oracle, nil } -func (m *MarketMaker) markBotAsRunning(id string, running bool) { +func (m *MarketMaker) markBotAsRunning(mkt MarketWithHost, running bool) { m.runningBotsMtx.Lock() defer m.runningBotsMtx.Unlock() if running { - m.runningBots[id] = struct{}{} + m.runningBots[mkt] = struct{}{} } else { - delete(m.runningBots, id) + delete(m.runningBots, mkt) } if len(m.runningBots) == 0 { @@ -308,8 +281,11 @@ func (m *MarketMaker) MarketReport(base, quote uint32) (*MarketReport, error) { m.syncedOracleMtx.RLock() if m.syncedOracle != nil { - price, oracles, err := m.syncedOracle.GetOracleInfo(base, quote) + price, oracles, err := m.syncedOracle.getOracleInfo(base, quote) m.syncedOracleMtx.RUnlock() + if err != nil && !errors.Is(err, errUnsyncedMarket) { + m.log.Errorf("failed to get oracle info for market %d-%d: %v", base, quote, err) + } if err == nil { return &MarketReport{ Price: price, @@ -321,7 +297,7 @@ func (m *MarketMaker) MarketReport(base, quote uint32) (*MarketReport, error) { } m.syncedOracleMtx.RUnlock() - price, oracles, err := m.unsyncedOracle.GetOracleInfo(base, quote) + price, oracles, err := m.unsyncedOracle.getOracleInfo(base, quote) if err != nil { return nil, err } @@ -888,7 +864,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err user := m.core.User() startedMarketMaking = true - m.notify(newMMStartStopNote(true)) + m.core.Broadcast(newMMStartStopNote(true)) wg := new(sync.WaitGroup) @@ -915,15 +891,15 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err case cfg.MMCfg != nil: wg.Add(1) go func(cfg *BotConfig) { - mkt := &MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset} - m.markBotAsRunning(mkt.String(), true) + mkt := MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset} + m.markBotAsRunning(mkt, true) defer func() { - m.markBotAsRunning(mkt.String(), false) + m.markBotAsRunning(mkt, false) }() - m.notify(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, true)) + m.core.Broadcast(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, true)) defer func() { - m.notify(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, false)) + m.core.Broadcast(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, false)) }() logger := m.log.SubLogger(fmt.Sprintf("MarketMaker-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) @@ -932,7 +908,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err baseFiatRate = user.FiatRates[cfg.BaseAsset] quoteFiatRate = user.FiatRates[cfg.QuoteAsset] } - RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger, m.notify) + RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger) wg.Done() }(cfg) default: @@ -944,7 +920,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err wg.Wait() m.log.Infof("All bots have stopped running.") m.running.Store(false) - m.notify(newMMStartStopNote(false)) + m.core.Broadcast(newMMStartStopNote(false)) }() return nil diff --git a/client/mm/mm_basic.go b/client/mm/mm_basic.go index 1267ef4550..654364b371 100644 --- a/client/mm/mm_basic.go +++ b/client/mm/mm_basic.go @@ -291,7 +291,7 @@ func basisPrice(book dexOrderBook, oracle oracle, cfg *MarketMakingConfig, mkt * var oracleWeighting, oraclePrice float64 if cfg.OracleWeighting != nil && *cfg.OracleWeighting > 0 { oracleWeighting = *cfg.OracleWeighting - oraclePrice = oracle.GetMarketPrice(mkt.BaseID, mkt.QuoteID) + oraclePrice = oracle.getMarketPrice(mkt.BaseID, mkt.QuoteID) if oraclePrice == 0 { log.Warnf("no oracle price available for %s bot", mkt.Name) } @@ -856,8 +856,7 @@ func (m *basicMarketMaker) run() { } // RunBasicMarketMaker starts a basic market maker bot. -func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, oracle oracle, baseFiatRate, quoteFiatRate float64, log dex.Logger, - notify func(core.Notification)) { +func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, oracle oracle, baseFiatRate, quoteFiatRate float64, log dex.Logger) { if cfg.MMCfg == nil { // implies bug in caller log.Errorf("No market making config provided. Exiting.") @@ -866,7 +865,7 @@ func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, orac err := cfg.MMCfg.Validate() if err != nil { - notify(newValidationErrorNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, fmt.Sprintf("invalid market making config: %v", err))) + c.Broadcast(newValidationErrorNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, fmt.Sprintf("invalid market making config: %v", err))) return } diff --git a/client/mm/mm_test.go b/client/mm/mm_test.go index 8b9581f28d..75d96723f6 100644 --- a/client/mm/mm_test.go +++ b/client/mm/mm_test.go @@ -236,6 +236,8 @@ func (c *tCore) User() *core.User { return nil } +func (c *tCore) Broadcast(core.Notification) {} + var _ clientCore = (*tCore)(nil) func tMaxOrderEstimate(lots uint64, swapFees, redeemFees uint64) *core.MaxOrderEstimate { @@ -297,7 +299,7 @@ type tOracle struct { marketPrice float64 } -func (o *tOracle) GetMarketPrice(base, quote uint32) float64 { +func (o *tOracle) getMarketPrice(base, quote uint32) float64 { return o.marketPrice } diff --git a/client/mm/notification.go b/client/mm/notification.go index 46d00694af..700fffb03f 100644 --- a/client/mm/notification.go +++ b/client/mm/notification.go @@ -5,9 +5,7 @@ package mm import ( "fmt" - "sync/atomic" - "decred.org/dcrdex/client/core" "decred.org/dcrdex/client/db" "decred.org/dcrdex/dex" ) @@ -18,96 +16,12 @@ const ( mmStartStop = "mmstartstop" ) -// NoteFeed contains a receiving channel for notifications. -type NoteFeed struct { - C <-chan core.Notification - closer func() -} - -// ReturnFeed should be called when the channel is no longer needed. -func (f *NoteFeed) ReturnFeed() { - if f.closer != nil { - f.closer() - } -} - -// NotificationFeed returns a new receiving channel for notifications. The -// channel has capacity 1024, and should be monitored for the lifetime of the -// Core. Blocking channels are silently ignored. -func (m *MarketMaker) NotificationFeed() *NoteFeed { - id, ch := m.notificationFeed() - return &NoteFeed{ - C: ch, - closer: func() { m.returnFeed(id) }, - } -} - -func (m *MarketMaker) returnFeed(channelID uint64) { - m.noteMtx.Lock() - delete(m.noteChans, channelID) - m.noteMtx.Unlock() -} - -func (m *MarketMaker) logNote(n core.Notification) { - if n.Subject() == "" && n.Details() == "" { - return - } - - logFun := m.log.Warnf // default in case the Severity level is unknown to notify - switch n.Severity() { - case db.Data: - logFun = m.log.Tracef - case db.Poke: - logFun = m.log.Debugf - case db.Success: - logFun = m.log.Infof - case db.WarningLevel: - logFun = m.log.Warnf - case db.ErrorLevel: - logFun = m.log.Errorf - } - - logFun("notify: %v", n) -} - -// notify sends a notification to all subscribers. If the notification is of -// sufficient severity, it is stored in the database. -func (m *MarketMaker) notify(n core.Notification) { - m.logNote(n) - - m.noteMtx.RLock() - for _, ch := range m.noteChans { - select { - case ch <- n: - default: - m.log.Errorf("blocking notification channel") - } - } - m.noteMtx.RUnlock() -} - -var noteChanCounter uint64 - -func (m *MarketMaker) notificationFeed() (uint64, <-chan core.Notification) { - ch := make(chan core.Notification, 1024) - cid := atomic.AddUint64(¬eChanCounter, 1) - m.noteMtx.Lock() - m.noteChans[cid] = ch - m.noteMtx.Unlock() - return cid, ch -} - -type botValidationErrorNote struct { - db.Notification -} - -func newValidationErrorNote(host string, baseID, quoteID uint32, errorMsg string) *botValidationErrorNote { +func newValidationErrorNote(host string, baseID, quoteID uint32, errorMsg string) *db.Notification { baseSymbol := dex.BipIDSymbol(baseID) quoteSymbol := dex.BipIDSymbol(quoteID) - msg := fmt.Sprintf("%s-%s @ %s: %s", host, baseSymbol, quoteSymbol, errorMsg) - return &botValidationErrorNote{ - Notification: db.NewNotification(validationNote, "", "Bot Config Validation Error", msg, db.ErrorLevel), - } + msg := fmt.Sprintf("%s-%s @ %s: %s", baseSymbol, quoteSymbol, host, errorMsg) + note := db.NewNotification(validationNote, "", "Bot Config Validation Error", msg, db.ErrorLevel) + return ¬e } type botStartStopNote struct { diff --git a/client/mm/price_oracle.go b/client/mm/price_oracle.go index 3e2431bad0..ea5588ea74 100644 --- a/client/mm/price_oracle.go +++ b/client/mm/price_oracle.go @@ -23,7 +23,10 @@ const ( oraclePriceExpiration = time.Minute * 10 oracleRecheckInterval = time.Minute * 3 - ErrNoMarkets = dex.ErrorKind("no markets") + errNoMarkets = dex.ErrorKind("no markets") + // an autosync oracle will return errUnsyncedMarket if the price oracle + // was not initialized with the market that it was queried for. + errUnsyncedMarket = dex.ErrorKind("market not synced") ) // MarketReport contains a market's rates on various exchanges and the fiat @@ -55,6 +58,10 @@ type cachedPrice struct { } // priceOracle periodically fetches market prices from a set of oracles. +// The priceOracle may be synced or unsynced. A synced priceOracle will +// periodically fetch market prices for all markets that it was initialized +// with. An unsynced priceOracle will only fetch market prices when they are +// requested. type priceOracle struct { ctx context.Context log dex.Logger @@ -65,7 +72,7 @@ type priceOracle struct { } type oracle interface { - GetMarketPrice(base, quote uint32) float64 + getMarketPrice(base, quote uint32) float64 } var _ oracle = (*priceOracle)(nil) @@ -77,13 +84,15 @@ func (o *priceOracle) getOracleDataAutoSync(base, quote uint32) (float64, []*Ora price, ok := o.cachedPrices[mktStr] o.cachedPricesMtx.RUnlock() - if ok { - price.mtx.RLock() - defer price.mtx.RUnlock() + if !ok { + return 0, nil, errUnsyncedMarket + } - if o.autoSync && time.Since(price.stamp) < oraclePriceExpiration { - return price.price, price.oracles, nil - } + price.mtx.RLock() + defer price.mtx.RUnlock() + + if time.Since(price.stamp) < oraclePriceExpiration { + return price.price, price.oracles, nil } return 0, nil, fmt.Errorf("expired price data for %s", mktStr) @@ -107,8 +116,7 @@ func (o *priceOracle) getOracleDataNoAutoSync(base, quote uint32) (float64, []*O price.mtx.RLock() if time.Since(price.stamp) < oracleRecheckInterval { - o.log.Infof("priceOracle: within recheck interval %v", mktStr) - price.mtx.RUnlock() + defer price.mtx.RUnlock() return price.price, price.oracles, nil } price.mtx.RUnlock() @@ -123,7 +131,10 @@ func (o *priceOracle) getOracleDataNoAutoSync(base, quote uint32) (float64, []*O return price.price, price.oracles, nil } -func (o *priceOracle) GetMarketPrice(base, quote uint32) float64 { +// getMarketPrice returns the volume weighted market rate for the specified +// base/quote pair. This market rate is used as the "oracleRate" in the +// basic market making strategy. +func (o *priceOracle) getMarketPrice(base, quote uint32) float64 { var price float64 var err error if o.autoSync { @@ -138,7 +149,11 @@ func (o *priceOracle) GetMarketPrice(base, quote uint32) float64 { return price } -func (o *priceOracle) GetOracleInfo(base, quote uint32) (float64, []*OracleReport, error) { +// getOracleInfo returns the volume weighted market rate for a given base/quote pair +// and details about the market on each available exchange that was used to determine +// the market rate. This market rate is used as the "oracleRate" in the basic market +// making strategy. +func (o *priceOracle) getOracleInfo(base, quote uint32) (float64, []*OracleReport, error) { var price float64 var oracles []*OracleReport var err error @@ -163,16 +178,14 @@ func (mkt *mkt) String() string { } func newCachedPrice(baseID, quoteID uint32, registeredAssets map[uint32]*asset.RegisteredAsset) (*cachedPrice, error) { - var baseAsset, quoteAsset *asset.RegisteredAsset - if b, ok := registeredAssets[baseID]; !ok { + baseAsset, ok := registeredAssets[baseID] + if !ok { return nil, fmt.Errorf("base asset %d (%s) not supported", baseID, dex.BipIDSymbol(baseID)) - } else { - baseAsset = b } - if q, ok := registeredAssets[quoteID]; !ok { + + quoteAsset, ok := registeredAssets[quoteID] + if !ok { return nil, fmt.Errorf("quote asset %d (%s) not supported", quoteID, dex.BipIDSymbol(quoteID)) - } else { - quoteAsset = q } return &cachedPrice{ @@ -295,7 +308,7 @@ func fetchMarketPrice(ctx context.Context, b, q *asset.RegisteredAsset, log dex. return 0, nil, err } price, err := oracleAverage(oracles, log) - if err != nil && !errors.Is(err, ErrNoMarkets) { + if err != nil && !errors.Is(err, errNoMarkets) { return 0, nil, err } @@ -312,7 +325,7 @@ func oracleAverage(mkts []*OracleReport, log dex.Logger) (float64, error) { } if usdVolume == 0 { log.Tracef("marketAveragedPrice: no markets") - return 0, ErrNoMarkets + return 0, errNoMarkets } rate := weightedSum / usdVolume diff --git a/client/webserver/api.go b/client/webserver/api.go index 6d31f979e4..a68862330e 100644 --- a/client/webserver/api.go +++ b/client/webserver/api.go @@ -1806,9 +1806,9 @@ func (s *WebServer) apiMarketMakingStatus(w http.ResponseWriter, r *http.Request running := s.mm.Running() runningBots := s.mm.RunningBots() writeJSON(w, &struct { - OK bool `json:"ok"` - Running bool `json:"running"` - RunningBots []*mm.MarketWithHost `json:"runningBots"` + OK bool `json:"ok"` + Running bool `json:"running"` + RunningBots []mm.MarketWithHost `json:"runningBots"` }{ OK: true, Running: running, diff --git a/client/webserver/site/src/css/mm.scss b/client/webserver/site/src/css/mm.scss index e7cbb5da9e..e24ef7efd0 100644 --- a/client/webserver/site/src/css/mm.scss +++ b/client/webserver/site/src/css/mm.scss @@ -229,4 +229,4 @@ body.dark #main.mm { #botTable { @extend .table-dark; } -} \ No newline at end of file +} diff --git a/client/webserver/site/src/html/markets.tmpl b/client/webserver/site/src/html/markets.tmpl index f44270f767..cf93e5a151 100644 --- a/client/webserver/site/src/html/markets.tmpl +++ b/client/webserver/site/src/html/markets.tmpl @@ -211,7 +211,7 @@ {{- /* ORDER TYPE BUTTONS */ -}} -