diff --git a/channeldb/db.go b/channeldb/db.go index d8ba05f3d26..92e0498eceb 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -27,6 +27,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" "github.com/lightningnetwork/lnd/channeldb/migration32" + "github.com/lightningnetwork/lnd/channeldb/migration33" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/invoices" @@ -291,6 +292,10 @@ var ( number: 32, migration: migration32.MigrateMCRouteSerialisation, }, + { + number: 33, + migration: migration33.MigrateMCStoreNameSpacedResults, + }, } // optionalVersions stores all optional migrations that are applied diff --git a/channeldb/log.go b/channeldb/log.go index e50e5054ef0..10b1b54d3ca 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -11,6 +11,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration31" "github.com/lightningnetwork/lnd/channeldb/migration32" + "github.com/lightningnetwork/lnd/channeldb/migration33" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/kvdb" ) @@ -44,5 +45,6 @@ func UseLogger(logger btclog.Logger) { migration30.UseLogger(logger) migration31.UseLogger(logger) migration32.UseLogger(logger) + migration33.UseLogger(logger) kvdb.UseLogger(logger) } diff --git a/channeldb/migration33/log.go b/channeldb/migration33/log.go new file mode 100644 index 00000000000..e9b271f5df8 --- /dev/null +++ b/channeldb/migration33/log.go @@ -0,0 +1,14 @@ +package migration33 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration33/migration.go b/channeldb/migration33/migration.go new file mode 100644 index 00000000000..6e2069a373f --- /dev/null +++ b/channeldb/migration33/migration.go @@ -0,0 +1,69 @@ +package migration33 + +import ( + "bytes" + + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // resultsKey is the fixed key under which the attempt results are + // stored. + resultsKey = []byte("missioncontrol-results") + + // defaultMCNamespaceKey is the key of the default mission control store + // namespace. + defaultMCNamespaceKey = []byte("default") +) + +// MigrateMCStoreNameSpacedResults reads in all the current mission control +// entries and re-writes them under a new default namespace. +func MigrateMCStoreNameSpacedResults(tx kvdb.RwTx) error { + log.Infof("Migrating Mission Control store to use namespaced results") + + // Get the top level bucket. All the MC results are currently stored + // as KV pairs in this bucket + topLevelBucket := tx.ReadWriteBucket(resultsKey) + + // If the results bucket does not exist then there are no entries in + // the mission control store yet and so there is nothing to migrate. + if topLevelBucket == nil { + return nil + } + + // Create a new default namespace bucket under the top-level bucket. + defaultNSBkt, err := topLevelBucket.CreateBucket(defaultMCNamespaceKey) + if err != nil { + return err + } + + // Iterate through each of the existing result pairs, write them to the + // new namespaced bucket. Also collect the set of keys so that we can + // later delete them from the top level bucket. + var keys [][]byte + err = topLevelBucket.ForEach(func(k, v []byte) error { + // Skip the new default namespace key. + if bytes.Equal(k, defaultMCNamespaceKey) { + return nil + } + + // Collect the key. + keys = append(keys, k) + + // Write the pair under the default namespace. + return defaultNSBkt.Put(k, v) + }) + if err != nil { + return err + } + + // Finally, iterate through the set of keys and delete them from the + // top level bucket. + for _, k := range keys { + if err := topLevelBucket.Delete(k); err != nil { + return err + } + } + + return err +} diff --git a/channeldb/migration33/migration_test.go b/channeldb/migration33/migration_test.go new file mode 100644 index 00000000000..851e2467e3b --- /dev/null +++ b/channeldb/migration33/migration_test.go @@ -0,0 +1,41 @@ +package migration33 + +import ( + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // before represents the structure of the MC store before the migration. + before = map[string]interface{}{ + "key1": "result1", + "key2": "result2", + "key3": "result3", + "key4": "result4", + } + + // after represents the expected structure of the store after the + // migration. It should be identical to before except all the kv pairs + // are now under a new default namespace key. + after = map[string]interface{}{ + string(defaultMCNamespaceKey): before, + } +) + +// TestMigrateMCStoreNameSpacedResults tests that the MC store results are +// correctly moved to be under a new default namespace bucket. +func TestMigrateMCStoreNameSpacedResults(t *testing.T) { + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB(tx, resultsKey, before) + } + + after := func(tx kvdb.RwTx) error { + return migtest.VerifyDB(tx, resultsKey, after) + } + + migtest.ApplyMigration( + t, before, after, MigrateMCStoreNameSpacedResults, false, + ) +} diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 8ab6eb9c56c..a0262b0f572 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -66,6 +66,10 @@ store](https://github.com/lightningnetwork/lnd/pull/8911) to use a more minimal encoding for payment attempt routes. +* [Migrate the mission control + store](https://github.com/lightningnetwork/lnd/pull/9001) so that results are + namespaced. All existing results are written to the "default" namespace. + ## Code Health ## Tooling and Documentation diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index 061dcfc3aa5..085ac1a9f35 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -163,12 +163,15 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, } }) - // Instantiate a new mission control with the current configuration + // Instantiate a new mission controller with the current configuration // values. - mc, err := NewMissionControl(db, c.source.pubkey, &c.mcCfg) - if err != nil { - c.t.Fatal(err) - } + mcController, err := NewMissionController(db, c.source.pubkey, &c.mcCfg) + require.NoError(c.t, err) + + mc, err := mcController.GetNamespacedStore( + DefaultMissionControlNamespace, + ) + require.NoError(c.t, err) getBandwidthHints := func(_ Graph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index ea2a56cf0cd..d848235d882 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -7,7 +7,11 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btclog" + "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" @@ -68,6 +72,11 @@ const ( // FeeEstimationTimeout. It defines the maximum duration that the // probing fee estimation is allowed to take. DefaultFeeEstimationTimeout = time.Minute + + // DefaultMissionControlNamespace is the name of the default mission + // control name space. This is used as the sub-bucket key within the + // top level DB bucket to store mission control results. + DefaultMissionControlNamespace = "default" ) var ( @@ -84,6 +93,16 @@ var ( // NodeResults contains previous results from a node to its peers. type NodeResults map[route.Vertex]TimedPairResult +// mcConfig holds various config members that will be required by all +// MissionControl instances and will be the same regardless of namespace. +type mcConfig struct { + // clock is a time source used by mission control. + clock clock.Clock + + // selfNode is our pubkey. + selfNode route.Vertex +} + // MissionControl contains state which summarizes the past attempts of HTLC // routing by external callers when sending payments throughout the network. It // acts as a shared memory during routing attempts with the goal to optimize the @@ -94,17 +113,12 @@ type NodeResults map[route.Vertex]TimedPairResult // since the last failure is used to estimate a success probability that is fed // into the path finding process for subsequent payment attempts. type MissionControl struct { + cfg *mcConfig + // state is the internal mission control state that is input for // probability estimation. state *missionControlState - // now is expected to return the current time. It is supplied as an - // external function to enable deterministic unit tests. - now func() time.Time - - // selfNode is our pubkey. - selfNode route.Vertex - store *missionControlStore // estimator is the probability estimator that is used with the payment @@ -115,7 +129,19 @@ type MissionControl struct { // mission control state is updated. onConfigUpdate fn.Option[func(cfg *MissionControlConfig)] - sync.Mutex + log btclog.Logger + + mu sync.Mutex +} + +// MissionController manages MissionControl instances in various namespaces. +type MissionController struct { + db kvdb.Backend + cfg *mcConfig + defaultMCCfg *MissionControlConfig + + mc map[string]*MissionControl + mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, // add to another generation @@ -123,6 +149,35 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } +// GetNamespacedStore returns the MissionControl in the given namespace. If one +// does not yet exist, then it is initialised. +func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl, + error) { + + m.mu.Lock() + defer m.mu.Unlock() + + if mc, ok := m.mc[ns]; ok { + return mc, nil + } + + return m.initMissionControl(ns) +} + +// ListNamespaces returns a list of the namespaces that the MissionController +// is aware of. +func (m *MissionController) ListNamespaces() []string { + m.mu.Lock() + defer m.mu.Unlock() + + namespaces := make([]string, 0, len(m.mc)) + for ns := range m.mc { + namespaces = append(namespaces, ns) + } + + return namespaces +} + // MissionControlConfig defines parameters that control mission control // behaviour. type MissionControlConfig struct { @@ -214,9 +269,9 @@ type paymentResult struct { failure lnwire.FailureMessage } -// NewMissionControl returns a new instance of missionControl. -func NewMissionControl(db kvdb.Backend, self route.Vertex, - cfg *MissionControlConfig) (*MissionControl, error) { +// NewMissionController returns a new instance of MissionController. +func NewMissionController(db kvdb.Backend, self route.Vertex, + cfg *MissionControlConfig) (*MissionController, error) { log.Debugf("Instantiating mission control with config: %v, %v", cfg, cfg.Estimator) @@ -225,47 +280,151 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return nil, err } + mcCfg := &mcConfig{ + clock: clock.NewDefaultClock(), + selfNode: self, + } + + mgr := &MissionController{ + db: db, + defaultMCCfg: cfg, + cfg: mcCfg, + mc: make(map[string]*MissionControl), + } + + if err := mgr.loadMissionControls(); err != nil { + return nil, err + } + + for _, mc := range mgr.mc { + if err := mc.init(); err != nil { + return nil, err + } + } + + return mgr, nil +} + +// loadMissionControls initialises a MissionControl in the default namespace if +// one does not yet exist. It then initialises a MissionControl for all other +// namespaces found in the DB. +// +// NOTE: this should only be called once during MissionController construction. +func (m *MissionController) loadMissionControls() error { + m.mu.Lock() + defer m.mu.Unlock() + + // Always initialise the default namespace. + _, err := m.initMissionControl(DefaultMissionControlNamespace) + if err != nil { + return err + } + + namespaces := make(map[string]struct{}) + err = m.db.View(func(tx walletdb.ReadTx) error { + mcStoreBkt := tx.ReadBucket(resultsKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + // Iterate through all the keys in the bucket and collect the + // namespaces. + return mcStoreBkt.ForEach(func(k, _ []byte) error { + // We've already initialised the default namespace so + // we can skip it. + if string(k) == DefaultMissionControlNamespace { + return nil + } + + namespaces[string(k)] = struct{}{} + + return nil + }) + }, func() {}) + if err != nil { + return err + } + + // Now, iterate through all the namespaces and initialise them. + for ns := range namespaces { + _, err = m.initMissionControl(ns) + if err != nil { + return err + } + } + + return nil +} + +// initMissionControl creates a new MissionControl instance with the given +// namespace if one does not yet exist. +// +// NOTE: the MissionController's mutex must be held before calling this method. +func (m *MissionController) initMissionControl(namespace string) ( + *MissionControl, error) { + + // If a mission control with this namespace has already been initialised + // then there is nothing left to do. + if mc, ok := m.mc[namespace]; ok { + return mc, nil + } + + cfg := m.defaultMCCfg + store, err := newMissionControlStore( - db, cfg.MaxMcHistory, cfg.McFlushInterval, + newNamespacedDB(m.db, namespace), cfg.MaxMcHistory, + cfg.McFlushInterval, ) if err != nil { return nil, err } mc := &MissionControl{ - state: newMissionControlState( - cfg.MinFailureRelaxInterval, + cfg: m.cfg, + state: newMissionControlState(cfg.MinFailureRelaxInterval), + store: store, + estimator: cfg.Estimator, + log: build.NewPrefixLog( + fmt.Sprintf("[%s]:", namespace), log, ), - now: time.Now, - selfNode: self, - store: store, - estimator: cfg.Estimator, onConfigUpdate: cfg.OnConfigUpdate, } - if err := mc.init(); err != nil { - return nil, err - } + m.mc[namespace] = mc return mc, nil } -// RunStoreTicker runs the mission control store's ticker. -func (m *MissionControl) RunStoreTicker() { - m.store.run() +// RunStoreTickers runs the mission controller store's tickers. +func (m *MissionController) RunStoreTickers() { + m.mu.Lock() + defer m.mu.Unlock() + + for _, mc := range m.mc { + mc.store.run() + } } -// StopStoreTicker stops the mission control store's ticker. -func (m *MissionControl) StopStoreTicker() { +// StopStoreTickers stops the mission control store's tickers. +func (m *MissionController) StopStoreTickers() { log.Debug("Stopping mission control store ticker") defer log.Debug("Mission control store ticker stopped") - m.store.stop() + m.mu.Lock() + defer m.mu.Unlock() + + for _, mc := range m.mc { + mc.store.stop() + } } // init initializes mission control with historical data. func (m *MissionControl) init() error { - log.Debugf("Mission control state reconstruction started") + m.log.Debugf("Mission control state reconstruction started") + + m.mu.Lock() + defer m.mu.Unlock() start := time.Now() @@ -275,10 +434,10 @@ func (m *MissionControl) init() error { } for _, result := range results { - m.applyPaymentResult(result) + _ = m.applyPaymentResult(result) } - log.Debugf("Mission control state reconstruction finished: "+ + m.log.Debugf("Mission control state reconstruction finished: "+ "n=%v, time=%v", len(results), time.Since(start)) return nil @@ -288,8 +447,8 @@ func (m *MissionControl) init() error { // with. All fields are copied by value, so we do not need to worry about // mutation. func (m *MissionControl) GetConfig() *MissionControlConfig { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() return &MissionControlConfig{ Estimator: m.estimator, @@ -310,10 +469,10 @@ func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error { return err } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() - log.Infof("Active mission control cfg: %v, estimator: %v", cfg, + m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg, cfg.Estimator) m.store.maxRecords = cfg.MaxMcHistory @@ -331,8 +490,8 @@ func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error { // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() error { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() if err := m.store.clear(); err != nil { return err @@ -340,7 +499,7 @@ func (m *MissionControl) ResetHistory() error { m.state.resetHistory() - log.Debugf("Mission control history cleared") + m.log.Debugf("Mission control history cleared") return nil } @@ -350,14 +509,14 @@ func (m *MissionControl) ResetHistory() error { func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() - now := m.now() + now := m.cfg.clock.Now() results, _ := m.state.getLastPairResult(fromNode) // Use a distinct probability estimation function for local channels. - if fromNode == m.selfNode { + if fromNode == m.cfg.selfNode { return m.estimator.LocalPairProbability(now, results, toNode) } @@ -369,10 +528,10 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, // GetHistorySnapshot takes a snapshot from the current mission control state // and actual probability estimates. func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() - log.Debugf("Requesting history snapshot from mission control") + m.log.Debugf("Requesting history snapshot from mission control") return m.state.getSnapshot() } @@ -387,15 +546,15 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, return errors.New("cannot import nil history") } - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() - log.Infof("Importing history snapshot with %v pairs to mission control", - len(history.Pairs)) + m.log.Infof("Importing history snapshot with %v pairs to mission "+ + "control", len(history.Pairs)) imported := m.state.importSnapshot(history, force) - log.Infof("Imported %v results to mission control", imported) + m.log.Infof("Imported %v results to mission control", imported) return nil } @@ -404,8 +563,8 @@ func (m *MissionControl) ImportHistory(history *MissionControlSnapshot, func (m *MissionControl) GetPairHistorySnapshot( fromNode, toNode route.Vertex) TimedPairResult { - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() results, ok := m.state.getLastPairResult(fromNode) if !ok { @@ -429,7 +588,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) ( *channeldb.FailureReason, error) { - timestamp := m.now() + timestamp := m.cfg.clock.Now() result := &paymentResult{ success: false, @@ -449,7 +608,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, func (m *MissionControl) ReportPaymentSuccess(paymentID uint64, rt *route.Route) error { - timestamp := m.now() + timestamp := m.cfg.clock.Now() result := &paymentResult{ timeFwd: timestamp, @@ -471,8 +630,8 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) ( // Store complete result in database. m.store.AddResult(result) - m.Lock() - defer m.Unlock() + m.mu.Lock() + defer m.mu.Unlock() // Apply result to update mission control state. reason := m.applyPaymentResult(result) @@ -519,7 +678,7 @@ func (m *MissionControl) applyPaymentResult( // that case, a node-level failure would not be applied to untried // channels. if i.nodeFailure != nil { - log.Debugf("Reporting node failure to Mission Control: "+ + m.log.Debugf("Reporting node failure to Mission Control: "+ "node=%v", *i.nodeFailure) m.state.setAllFail(*i.nodeFailure, result.timeReply) @@ -529,11 +688,11 @@ func (m *MissionControl) applyPaymentResult( pairResult := pairResult if pairResult.success { - log.Debugf("Reporting pair success to Mission "+ + m.log.Debugf("Reporting pair success to Mission "+ "Control: pair=%v, amt=%v", pair, pairResult.amt) } else { - log.Debugf("Reporting pair failure to Mission "+ + m.log.Debugf("Reporting pair failure to Mission "+ "Control: pair=%v, amt=%v", pair, pairResult.amt) } @@ -545,3 +704,102 @@ func (m *MissionControl) applyPaymentResult( return i.finalFailureReason } + +// namespacedDB is an implementation of the missionControlDB that gives a user +// of the interface access to a namespaced bucket within the top level mission +// control bucket. +type namespacedDB struct { + topLevelBucketKey []byte + namespace []byte + db kvdb.Backend +} + +// A compile-time check to ensure that namespacedDB implements missionControlDB. +var _ missionControlDB = (*namespacedDB)(nil) + +// newDefaultNamespacedStore creates an instance of namespaceDB that uses the +// default namespace. +func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB { + return newNamespacedDB(db, DefaultMissionControlNamespace) +} + +// newNamespacedDB creates a new instance of missionControlDB where the DB will +// have access to a namespaced bucket within the top level mission control +// bucket. +func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB { + return &namespacedDB{ + db: db, + namespace: []byte(namespace), + topLevelBucketKey: resultsKey, + } +} + +// update can be used to perform reads and writes on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error, + reset func()) error { + + return n.db.Update(func(tx kvdb.RwTx) error { + mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey) + if err != nil { + return fmt.Errorf("cannot create top level mission "+ + "control bucket: %w", err) + } + + namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists( + n.namespace, + ) + if err != nil { + return fmt.Errorf("cannot create namespaced bucket "+ + "(%s) in mission control store: %w", + n.namespace, err) + } + + return f(namespacedBkt) + }, reset) +} + +// view can be used to perform reads on the given bucket. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error, + reset func()) error { + + return n.db.View(func(tx kvdb.RTx) error { + mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace) + if namespacedBkt == nil { + return fmt.Errorf("namespaced bucket (%s) not found "+ + "in mission control store", n.namespace) + } + + return f(namespacedBkt) + }, reset) +} + +// purge will delete all the contents in the namespace. +// +// NOTE: this is part of the missionControlDB interface. +func (n *namespacedDB) purge() error { + return n.db.Update(func(tx kvdb.RwTx) error { + mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey) + if mcStoreBkt == nil { + return nil + } + + err := mcStoreBkt.DeleteNestedBucket(n.namespace) + if err != nil { + return err + } + + _, err = mcStoreBkt.CreateBucket(n.namespace) + + return err + }, func() {}) +} diff --git a/routing/missioncontrol_state.go b/routing/missioncontrol_state.go index 7d6633f2bd8..c61ec8335f1 100644 --- a/routing/missioncontrol_state.go +++ b/routing/missioncontrol_state.go @@ -37,7 +37,8 @@ func newMissionControlState( } } -// getLastPairResult returns the current state for connections to the given node. +// getLastPairResult returns the current state for connections to the given +// node. func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, bool) { @@ -45,8 +46,8 @@ func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, return result, ok } -// ResetHistory resets the history of MissionControl returning it to a state as -// if no payment attempts have been made. +// ResetHistory resets the history of missionControlState returning it to a +// state as if no payment attempts have been made. func (m *missionControlState) resetHistory() { m.lastPairResult = make(map[route.Vertex]NodeResults) m.lastSecondChance = make(map[DirectedNodePair]time.Time) diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index 9f1dc8d3109..c40f24697ab 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -32,6 +32,21 @@ const ( unknownFailureSourceIdx = -1 ) +// missionControlDB is an interface that defines the database methods that a +// single missionControlStore has access to. It allows the missionControlStore +// to be unaware of the overall DB structure and restricts its access to the DB +// by only providing it the bucket that it needs to care about. +type missionControlDB interface { + // update can be used to perform reads and writes on the given bucket. + update(f func(bkt kvdb.RwBucket) error, reset func()) error + + // view can be used to perform reads on the given bucket. + view(f func(bkt kvdb.RBucket) error, reset func()) error + + // purge will delete all the contents in this store. + purge() error +} + // missionControlStore is a bolt db based implementation of a mission control // store. It stores the raw payment attempt data from which the internal mission // controls state can be rederived on startup. This allows the mission control @@ -41,7 +56,7 @@ const ( type missionControlStore struct { done chan struct{} wg sync.WaitGroup - db kvdb.Backend + db missionControlDB // queueCond is signalled when items are put into the queue. queueCond *sync.Cond @@ -67,7 +82,7 @@ type missionControlStore struct { flushInterval time.Duration } -func newMissionControlStore(db kvdb.Backend, maxRecords int, +func newMissionControlStore(db missionControlDB, maxRecords int, flushInterval time.Duration) (*missionControlStore, error) { var ( @@ -76,13 +91,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int, ) // Create buckets if not yet existing. - err := kvdb.Update(db, func(tx kvdb.RwTx) error { - resultsBucket, err := tx.CreateTopLevelBucket(resultsKey) - if err != nil { - return fmt.Errorf("cannot create results bucket: %w", - err) - } - + err := db.update(func(resultsBucket kvdb.RwBucket) error { // Collect all keys to be able to quickly calculate the // difference when updating the DB state. c := resultsBucket.ReadCursor() @@ -119,20 +128,12 @@ func (b *missionControlStore) clear() error { b.queueCond.L.Lock() defer b.queueCond.L.Unlock() - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { - return err - } - - _, err := tx.CreateTopLevelBucket(resultsKey) - return err - }, func() {}) - - if err != nil { + if err := b.db.purge(); err != nil { return err } b.queue = list.New() + return nil } @@ -140,8 +141,7 @@ func (b *missionControlStore) clear() error { func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { var results []*paymentResult - err := kvdb.View(b.db, func(tx kvdb.RTx) error { - resultBucket := tx.ReadBucket(resultsKey) + err := b.db.view(func(resultBucket kvdb.RBucket) error { results = make([]*paymentResult, 0) return resultBucket.ForEach(func(k, v []byte) error { @@ -511,9 +511,7 @@ func (b *missionControlStore) storeResults() error { } } - err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { - bucket := tx.ReadWriteBucket(resultsKey) - + err := b.db.update(func(bucket kvdb.RwBucket) error { for e := l.Front(); e != nil; e = e.Next() { pr, ok := e.Value.(*paymentResult) if !ok { diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 804824f64f0..2dbfc11214f 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -62,7 +62,9 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int, require.NoError(t, db.Close()) }) - store, err := newMissionControlStore(db, maxRecords, flushInterval) + store, err := newMissionControlStore( + newDefaultNamespacedStore(db), maxRecords, flushInterval, + ) require.NoError(t, err) return mcStoreTestHarness{db: db, store: store} @@ -115,7 +117,9 @@ func TestMissionControlStore(t *testing.T) { require.Equal(t, &result2, results[1]) // Recreate store to test pruning. - store, err = newMissionControlStore(db, testMaxRecords, time.Second) + store, err = newMissionControlStore( + newDefaultNamespacedStore(db), testMaxRecords, time.Second, + ) require.NoError(t, err) // Add a newer result which failed due to mpp timeout. @@ -213,7 +217,9 @@ func TestMissionControlStoreFlushing(t *testing.T) { store.stop() // Recreate store. - store, err := newMissionControlStore(db, testMaxRecords, flushInterval) + store, err := newMissionControlStore( + newDefaultNamespacedStore(db), testMaxRecords, flushInterval, + ) require.NoError(t, err) store.run() defer store.stop() diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 4a0f7387152..dad033ac442 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -40,9 +41,11 @@ var ( ) type mcTestContext struct { - t *testing.T - mc *MissionControl - now time.Time + t *testing.T + + mcController *MissionController + mc *MissionControl + clock *testClock db kvdb.Backend dbPath string @@ -52,8 +55,8 @@ type mcTestContext struct { func createMcTestContext(t *testing.T) *mcTestContext { ctx := &mcTestContext{ - t: t, - now: mcTestTime, + t: t, + clock: newTestClock(mcTestTime), } file, err := os.CreateTemp("", "*.db") @@ -85,9 +88,11 @@ func createMcTestContext(t *testing.T) *mcTestContext { // restartMc creates a new instances of mission control on the same database. func (ctx *mcTestContext) restartMc() { // Since we don't run a timer to store results in unit tests, we store - // them here before fetching back everything in NewMissionControl. - if ctx.mc != nil { - require.NoError(ctx.t, ctx.mc.store.storeResults()) + // them here before fetching back everything in NewMissionController. + if ctx.mcController != nil { + for _, mc := range ctx.mcController.mc { + require.NoError(ctx.t, mc.store.storeResults()) + } } aCfg := AprioriConfig{ @@ -99,7 +104,7 @@ func (ctx *mcTestContext) restartMc() { estimator, err := NewAprioriEstimator(aCfg) require.NoError(ctx.t, err) - mc, err := NewMissionControl( + ctx.mcController, err = NewMissionController( ctx.db, mcTestSelf, &MissionControlConfig{Estimator: estimator}, ) @@ -107,8 +112,18 @@ func (ctx *mcTestContext) restartMc() { ctx.t.Fatal(err) } - mc.now = func() time.Time { return ctx.now } - ctx.mc = mc + ctx.mcController.cfg.clock = ctx.clock + + // By default, select the default namespace. + ctx.setNamespacedMC(DefaultMissionControlNamespace) +} + +// setNamespacedMC sets the currently selected MissionControl instance of the +// mcTextContext to the one with the given namespace. +func (ctx *mcTestContext) setNamespacedMC(namespace string) { + var err error + ctx.mc, err = ctx.mcController.GetNamespacedStore(namespace) + require.NoError(ctx.t, err) } // Assert that mission control returns a probability for an edge. @@ -150,7 +165,7 @@ func (ctx *mcTestContext) reportSuccess() { func TestMissionControl(t *testing.T) { ctx := createMcTestContext(t) - ctx.now = testTime + ctx.clock.setTime(testTime) testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) @@ -178,7 +193,7 @@ func TestMissionControl(t *testing.T) { // Edge decay started. The node probability weighted average should now // have shifted from 1:1 to 1:0.5 -> 60%. The connection probability is // half way through the recovery, so we expect 30% here. - ctx.now = testTime.Add(30 * time.Minute) + ctx.clock.setTime(testTime.Add(30 * time.Minute)) ctx.expectP(1000, 0.3) // Edge fails again, this time without a min penalization amt. The edge @@ -188,7 +203,7 @@ func TestMissionControl(t *testing.T) { ctx.expectP(500, 0) // Edge decay started. - ctx.now = testTime.Add(60 * time.Minute) + ctx.clock.setTime(testTime.Add(60 * time.Minute)) ctx.expectP(1000, 0.3) // Restart mission control to test persistence. @@ -230,3 +245,94 @@ func TestMissionControlChannelUpdate(t *testing.T) { ) ctx.expectP(100, 0) } + +// TestMissionControlNamespaces tests that the results reported to a +// MissionControl instance in one namespace does not affect the query results in +// another namespace. +func TestMissionControlNamespaces(t *testing.T) { + // Create a new MC context. This will select the default namespace + // MissionControl instance. + ctx := createMcTestContext(t) + + // Initially, the controller should only be aware of the default + // namespace. + require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{ + DefaultMissionControlNamespace, + }) + + // Initial probability is expected to be the apriori. + ctx.expectP(1000, testAprioriHopProbability) + + // Expect probability to be zero after reporting the edge as failed. + ctx.reportFailure(1000, lnwire.NewTemporaryChannelFailure(nil)) + ctx.expectP(1000, 0) + + // Now, switch namespaces. + const newNs = "new-namespace" + ctx.setNamespacedMC(newNs) + + // Now, the controller should only be aware of the default namespace and + // the new one. + require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{ + DefaultMissionControlNamespace, + newNs, + }) + + // Since this new namespace has no idea about the reported failure, the + // expected probability should once again be the apriori probability. + ctx.expectP(1000, testAprioriHopProbability) + + // Report a success in the new namespace. + ctx.reportSuccess() + + // The probability of the pair should now have increased. + ctx.expectP(1000, testAprioriHopProbability+0.05) + + // Switch back to the default namespace. + ctx.setNamespacedMC(DefaultMissionControlNamespace) + + // The probability in the default namespace should still be zero. + ctx.expectP(1000, 0) + + // We also want to test that the initial loading of the namespaces is + // done correctly. So let's reload the controller and assert that the + // probabilities in both namespaces remain the same after restart. + ctx.restartMc() + + // Assert that both namespaces were loaded. + require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{ + DefaultMissionControlNamespace, + newNs, + }) + + // Assert that the probabilities in both namespaces remain unchanged. + ctx.expectP(1000, 0) + ctx.setNamespacedMC(newNs) + ctx.expectP(1000, testAprioriHopProbability+0.05) +} + +// testClock is an implementation of clock.Clock that lets the caller overwrite +// the current time at any point. +type testClock struct { + now time.Time + clock.Clock +} + +// newTestClock constructs a new testClock. +func newTestClock(startTime time.Time) *testClock { + return &testClock{ + now: startTime, + } +} + +// Now returns the underlying current time. +// +// NOTE: this is part of the clock.Clock interface. +func (c *testClock) Now() time.Time { + return c.now +} + +// setTime overwrites the current time. +func (c *testClock) setTime(n time.Time) { + c.now = n +} diff --git a/routing/mock_test.go b/routing/mock_test.go index 32f83420fc3..35ac3ecd99f 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -125,10 +125,10 @@ func (m *mockPaymentSessionSourceOld) NewPaymentSessionEmpty() PaymentSession { } type mockMissionControlOld struct { - MissionControl + MissionController } -var _ MissionController = (*mockMissionControlOld)(nil) +var _ MissionControlQuerier = (*mockMissionControlOld)(nil) func (m *mockMissionControlOld) ReportPaymentFail( paymentID uint64, rt *route.Route, @@ -657,7 +657,7 @@ type mockMissionControl struct { mock.Mock } -var _ MissionController = (*mockMissionControl)(nil) +var _ MissionControlQuerier = (*mockMissionControl)(nil) func (m *mockMissionControl) ReportPaymentFail( paymentID uint64, rt *route.Route, diff --git a/routing/payment_session.go b/routing/payment_session.go index 9ddb6260076..89fb2fd1376 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -159,7 +159,7 @@ type PaymentSession interface { // paymentSession is used during an HTLC routings session to prune the local // chain view in response to failures, and also report those failures back to -// MissionControl. The snapshot copied for this session will only ever grow, +// MissionController. The snapshot copied for this session will only ever grow, // and will now be pruned after a decay like the main view within mission // control. We do this as we want to avoid the case where we continually try a // bad edge or route multiple times in a session. This can lead to an infinite @@ -184,7 +184,7 @@ type paymentSession struct { // trade-off in path finding between fees and probability. pathFindingConfig PathFindingConfig - missionControl MissionController + missionControl MissionControlQuerier // minShardAmt is the amount beyond which we won't try to further split // the payment if no route is found. If the maximum number of htlcs @@ -199,7 +199,8 @@ type paymentSession struct { // newPaymentSession instantiates a new payment session. func newPaymentSession(p *LightningPayment, selfNode route.Vertex, getBandwidthHints func(Graph) (bandwidthHints, error), - graphSessFactory GraphSessionFactory, missionControl MissionController, + graphSessFactory GraphSessionFactory, + missionControl MissionControlQuerier, pathFindingConfig PathFindingConfig) (*paymentSession, error) { edges, err := RouteHintsToEdges(p.RouteHints, p.Target) @@ -266,7 +267,7 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, // Taking into account this prune view, we'll attempt to locate a path // to our destination, respecting the recommendations from - // MissionControl. + // MissionController. restrictions := &RestrictParams{ ProbabilitySource: p.missionControl.GetProbability, FeeLimit: feeLimit, diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index c89d6a8e52a..ccee9bc449d 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -11,7 +11,7 @@ import ( "github.com/lightningnetwork/lnd/zpay32" ) -// A compile time assertion to ensure MissionControl meets the +// A compile time assertion to ensure SessionSource meets the // PaymentSessionSource interface. var _ PaymentSessionSource = (*SessionSource)(nil) @@ -40,7 +40,7 @@ type SessionSource struct { // then take into account this set of pruned vertexes/edges to reduce // route failure and pass on graph information gained to the next // execution. - MissionControl MissionController + MissionControl MissionControlQuerier // PathFindingConfig defines global parameters that control the // trade-off in path finding between fees and probability. diff --git a/routing/router.go b/routing/router.go index 3b340f35a07..1fea60ddbe5 100644 --- a/routing/router.go +++ b/routing/router.go @@ -167,9 +167,9 @@ type PaymentSessionSource interface { NewPaymentSessionEmpty() PaymentSession } -// MissionController is an interface that exposes failure reporting and +// MissionControlQuerier is an interface that exposes failure reporting and // probability estimation. -type MissionController interface { +type MissionControlQuerier interface { // ReportPaymentFail reports a failed payment to mission control as // input for future probability estimates. It returns a bool indicating // whether this error is a final error and no further payment attempts @@ -260,7 +260,7 @@ type Config struct { // Each run will then take into account this set of pruned // vertexes/edges to reduce route failure and pass on graph information // gained to the next execution. - MissionControl MissionController + MissionControl MissionControlQuerier // SessionSource defines a source for the router to retrieve new payment // sessions. diff --git a/routing/router_test.go b/routing/router_test.go index 0380bab0022..6f6f2e4342e 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -128,11 +128,16 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, mcConfig := &MissionControlConfig{Estimator: estimator} - mc, err := NewMissionControl( + mcController, err := NewMissionController( graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") + mc, err := mcController.GetNamespacedStore( + DefaultMissionControlNamespace, + ) + require.NoError(t, err) + sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) sessionSource := &SessionSource{ @@ -1081,11 +1086,15 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { return preImage, nil }) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.IsType(t, ctx.router.cfg.MissionControl, &MissionControl{}) + mc, _ := ctx.router.cfg.MissionControl.(*MissionControl) + + err := mc.ResetHistory() + require.NoError(t, err) // When we try to dispatch that payment, we should receive an error as // both attempts should fail and cause both routes to be pruned. - _, _, err := ctx.router.SendPayment(payment) + _, _, err = ctx.router.SendPayment(payment) require.Error(t, err, "payment didn't return error") // The final error returned should also indicate that the peer wasn't @@ -1102,12 +1111,10 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { // We expect the first attempt to have failed with a // TemporaryChannelFailure, the second with UnknownNextPeer. msg := htlcs[0].Failure.Message - _, ok := msg.(*lnwire.FailTemporaryChannelFailure) - require.True(t, ok, "unexpected fail message") + require.IsType(t, msg, &lnwire.FailTemporaryChannelFailure{}) msg = htlcs[1].Failure.Message - _, ok = msg.(*lnwire.FailUnknownNextPeer) - require.True(t, ok, "unexpected fail message") + require.IsType(t, msg, &lnwire.FailUnknownNextPeer{}) err = ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() require.NoError(t, err, "reset history failed") @@ -1144,7 +1151,11 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { getAliasFromPubKey(rt.Hops[0].PubKeyBytes, ctx.aliases), ) - ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() + require.IsType(t, ctx.router.cfg.MissionControl, &MissionControl{}) + mc, _ = ctx.router.cfg.MissionControl.(*MissionControl) + + err = mc.ResetHistory() + require.NoError(t, err) // Finally, we'll modify the SendToSwitch function to indicate that the // roasbeef -> luoji channel has insufficient capacity. This should diff --git a/rpcserver.go b/rpcserver.go index 44a3ae39144..c9edf852a39 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -729,7 +729,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return info.NodeKey1Bytes, info.NodeKey2Bytes, nil }, FindRoute: s.chanRouter.FindRoute, - MissionControl: s.missionControl, + MissionControl: s.defaultMC, ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, @@ -6071,7 +6071,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context, return r.server.chanRouter.FindBlindedPaths( r.selfNode, amt, - r.server.missionControl.GetProbability, + r.server.defaultMC.GetProbability, blindingRestrictions, ) }, diff --git a/server.go b/server.go index 9a7276948dc..8b142f87a49 100644 --- a/server.go +++ b/server.go @@ -275,7 +275,8 @@ type server struct { breachArbitrator *contractcourt.BreachArbitrator - missionControl *routing.MissionControl + missionController *routing.MissionController + defaultMC *routing.MissionControl graphBuilder *graph.Builder @@ -955,11 +956,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr, McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, } - s.missionControl, err = routing.NewMissionControl( + + s.missionController, err = routing.NewMissionController( dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg, ) if err != nil { - return nil, fmt.Errorf("can't create mission control: %w", err) + return nil, fmt.Errorf("can't create mission control "+ + "manager: %w", err) + } + s.defaultMC, err = s.missionController.GetNamespacedStore( + routing.DefaultMissionControlNamespace, + ) + if err != nil { + return nil, fmt.Errorf("can't create mission control in the "+ + "default namespace: %w", err) } srvrLog.Debugf("Instantiating payment session source with config: "+ @@ -985,7 +995,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanGraph, ), SourceNode: sourceNode, - MissionControl: s.missionControl, + MissionControl: s.defaultMC, GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig, } @@ -1020,7 +1030,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower, - MissionControl: s.missionControl, + MissionControl: s.defaultMC, SessionSource: paymentSessionSource, GetLink: s.htlcSwitch.GetLinkByShortID, NextPaymentID: sequencer.NextID, @@ -2182,10 +2192,10 @@ func (s *server) Start() error { } cleanup.add(func() error { - s.missionControl.StopStoreTicker() + s.missionController.StopStoreTickers() return nil }) - s.missionControl.RunStoreTicker() + s.missionController.RunStoreTickers() // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure @@ -2467,7 +2477,7 @@ func (s *server) Stop() error { srvrLog.Warnf("Unable to stop ChannelEventStore: %v", err) } - s.missionControl.StopStoreTicker() + s.missionController.StopStoreTickers() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.