From 7201f00b7899a0c046b14f0f318d0ba1abd92564 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 15 Jul 2019 14:31:56 -0700 Subject: [PATCH 1/3] discovery/syncer: don't reply with historical updates to gossip filters This commit removes the existing behavior that responds with all known updates when a remote peer sends a gossip_timestamp_filter. When a peer sets this with a very old start timestamp, this can cause the local node to dump the entire graph to the peer, which is functionally equivalent to the original `intitial_routing_sync` feature. That feature was deprecated because it is expensive and can force a node to use a disproportionate amount of resources, especially on flapping connections. Now, the filter is only applied to new gossip messages when deciding whether or not to forward them to a particular peer. --- discovery/syncer.go | 47 +-------------------- discovery/syncer_test.go | 90 +++++++++++++--------------------------- 2 files changed, 29 insertions(+), 108 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index 7281012ad4d..34ffaec0560 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -941,55 +941,10 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error // to the peer that aren't within the time range of the filter. func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error { g.Lock() + defer g.Unlock() g.remoteUpdateHorizon = filter - startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0) - endTime := startTime.Add( - time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second, - ) - - g.Unlock() - - // Now that the remote peer has applied their filter, we'll query the - // database for all the messages that are beyond this filter. - newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon( - g.cfg.chainHash, startTime, endTime, - ) - if err != nil { - return err - } - - log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+ - "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime, - len(newUpdatestoSend)) - - // If we don't have any to send, then we can return early. - if len(newUpdatestoSend) == 0 { - return nil - } - - // We'll conclude by launching a goroutine to send out any updates. - g.wg.Add(1) - go func() { - defer g.wg.Done() - - for _, msg := range newUpdatestoSend { - err := g.cfg.sendToPeerSync(msg) - switch { - case err == ErrGossipSyncerExiting: - return - - case err == lnpeer.ErrPeerExiting: - return - - case err != nil: - log.Errorf("Unable to send message for "+ - "peer catch up: %v", err) - } - } - }() - return nil } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 68c4a8c92dd..cc2c363926e 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -1,8 +1,10 @@ package discovery import ( + "errors" "math" "reflect" + "sync" "testing" "time" @@ -344,7 +346,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. - msgChan, syncer, chanSeries := newTestSyncer( + _, syncer, chanSeries := newTestSyncer( lnwire.NewShortChanIDFromInt(10), defaultEncoding, defaultChunkSize, ) @@ -355,25 +357,23 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { TimestampRange: uint32(1000), } - // Before we apply the horizon, we'll dispatch a response to the query - // that the syncer will issue. + // After applying the gossip filter, the chan series should not be + // queried using the updated horizon. + errChan := make(chan error, 1) + var wg sync.WaitGroup + wg.Add(1) go func() { - select { - case <-time.After(time.Second * 15): - t.Fatalf("no query recvd") + defer wg.Done() - case query := <-chanSeries.horizonReq: - // The syncer should have translated the time range - // into the proper star time. - if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) { - t.Fatalf("wrong query stamp: expected %v, got %v", - remoteHorizon.FirstTimestamp, query.start) - } - - // For this first response, we'll send back an empty - // set of messages. As result, we shouldn't send any - // messages. - chanSeries.horizonResp <- []lnwire.Message{} + select { + // No query received, success. + case <-time.After(3 * time.Second): + errChan <- nil + + // Unexpected query received. + case <-chanSeries.horizonReq: + errChan <- errors.New("chan series should not have been " + + "queried") } }() @@ -383,54 +383,20 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { t.Fatalf("unable to apply filter: %v", err) } - // There should be no messages in the message queue as we didn't send - // the syncer and messages within the horizon. - select { - case msgs := <-msgChan: - t.Fatalf("expected no msgs, instead got %v", spew.Sdump(msgs)) - default: + // Ensure that the syncer's remote horizon was properly updated. + if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) { + t.Fatalf("expected remote horizon: %v, got: %v", + remoteHorizon, syncer.remoteUpdateHorizon) } - // If we repeat the process, but give the syncer a set of valid - // messages, then these should be sent to the remote peer. - go func() { - select { - case <-time.After(time.Second * 15): - t.Fatalf("no query recvd") - - case query := <-chanSeries.horizonReq: - // The syncer should have translated the time range - // into the proper star time. - if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) { - t.Fatalf("wrong query stamp: expected %v, got %v", - remoteHorizon.FirstTimestamp, query.start) - } + // Wait for the query check to finish. + wg.Wait() - // For this first response, we'll send back a proper - // set of messages that should be echoed back. - chanSeries.horizonResp <- []lnwire.Message{ - &lnwire.ChannelUpdate{ - ShortChannelID: lnwire.NewShortChanIDFromInt(25), - Timestamp: unixStamp(5), - }, - } - } - }() - err = syncer.ApplyGossipFilter(remoteHorizon) + // Assert that no query was made as a result of applying the gossip + // filter. + err = <-errChan if err != nil { - t.Fatalf("unable to apply filter: %v", err) - } - - // We should get back the exact same message. - select { - case <-time.After(time.Second * 15): - t.Fatalf("no msgs received") - - case msgs := <-msgChan: - if len(msgs) != 1 { - t.Fatalf("wrong messages: expected %v, got %v", - 1, len(msgs)) - } + t.Fatalf(err.Error()) } } From 92cd930ffef89a7d063d2739dee6b83e97a525c5 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 15 Jul 2019 14:36:29 -0700 Subject: [PATCH 2/3] discovery: remove error return value from ApplyGossipFilter This method can no longer fail since it doesn't interact with the db. --- discovery/gossiper.go | 8 +------- discovery/syncer.go | 4 +--- discovery/syncer_test.go | 7 ++----- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a3aefd48ce6..b71b54dfd45 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -470,13 +470,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If we've found the message target, then we'll dispatch the // message directly to it. - if err := syncer.ApplyGossipFilter(m); err != nil { - log.Warnf("Unable to apply gossip filter for peer=%x: "+ - "%v", peer.PubKey(), err) - - errChan <- err - return errChan - } + syncer.ApplyGossipFilter(m) errChan <- nil return errChan diff --git a/discovery/syncer.go b/discovery/syncer.go index 34ffaec0560..075de090f8b 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -939,13 +939,11 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error // ApplyGossipFilter applies a gossiper filter sent by the remote node to the // state machine. Once applied, we'll ensure that we don't forward any messages // to the peer that aren't within the time range of the filter. -func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error { +func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) { g.Lock() defer g.Unlock() g.remoteUpdateHorizon = filter - - return nil } // FilterGossipMsgs takes a set of gossip messages, and only send it to a peer diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index cc2c363926e..76ca10cccd3 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -378,10 +378,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { }() // We'll now attempt to apply the gossip filter for the remote peer. - err := syncer.ApplyGossipFilter(remoteHorizon) - if err != nil { - t.Fatalf("unable to apply filter: %v", err) - } + syncer.ApplyGossipFilter(remoteHorizon) // Ensure that the syncer's remote horizon was properly updated. if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) { @@ -394,7 +391,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { // Assert that no query was made as a result of applying the gossip // filter. - err = <-errChan + err := <-errChan if err != nil { t.Fatalf(err.Error()) } From d31b6c746e9743bda5780ced952d6b5bc7e05c1d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 15 Jul 2019 20:30:47 -0700 Subject: [PATCH 3/3] discovery/multi: remove UpdatesInHorizon from ChanSeries iface --- discovery/chan_series.go | 90 ---------------------------------------- discovery/syncer_test.go | 9 ---- 2 files changed, 99 deletions(-) diff --git a/discovery/chan_series.go b/discovery/chan_series.go index ddea51041f8..729fcfdf551 100644 --- a/discovery/chan_series.go +++ b/discovery/chan_series.go @@ -1,8 +1,6 @@ package discovery import ( - "time" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" @@ -23,13 +21,6 @@ type ChannelGraphTimeSeries interface { // the remote node. HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) - // UpdatesInHorizon returns all known channel and node updates with an - // update timestamp between the start time and end time. We'll use this - // to catch up a remote node to the set of channel updates that they - // may have missed out on within the target chain. - UpdatesInHorizon(chain chainhash.Hash, - startTime time.Time, endTime time.Time) ([]lnwire.Message, error) - // FilterKnownChanIDs takes a target chain, and a set of channel ID's, // and returns a filtered set of chan ID's. This filtered set of chan // ID's represents the ID's that we don't know of which were in the @@ -92,87 +83,6 @@ func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID return &shortChanID, nil } -// UpdatesInHorizon returns all known channel and node updates with an update -// timestamp between the start time and end time. We'll use this to catch up a -// remote node to the set of channel updates that they may have missed out on -// within the target chain. -// -// NOTE: This is part of the ChannelGraphTimeSeries interface. -func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash, - startTime time.Time, endTime time.Time) ([]lnwire.Message, error) { - - var updates []lnwire.Message - - // First, we'll query for all the set of channels that have an update - // that falls within the specified horizon. - chansInHorizon, err := c.graph.ChanUpdatesInHorizon( - startTime, endTime, - ) - if err != nil { - return nil, err - } - for _, channel := range chansInHorizon { - // If the channel hasn't been fully advertised yet, or is a - // private channel, then we'll skip it as we can't construct a - // full authentication proof if one is requested. - if channel.Info.AuthProof == nil { - continue - } - - chanAnn, edge1, edge2, err := CreateChanAnnouncement( - channel.Info.AuthProof, channel.Info, channel.Policy1, - channel.Policy2, - ) - if err != nil { - return nil, err - } - - updates = append(updates, chanAnn) - if edge1 != nil { - updates = append(updates, edge1) - } - if edge2 != nil { - updates = append(updates, edge2) - } - } - - // Next, we'll send out all the node announcements that have an update - // within the horizon as well. We send these second to ensure that they - // follow any active channels they have. - nodeAnnsInHorizon, err := c.graph.NodeUpdatesInHorizon( - startTime, endTime, - ) - if err != nil { - return nil, err - } - for _, nodeAnn := range nodeAnnsInHorizon { - // Ensure we only forward nodes that are publicly advertised to - // prevent leaking information about nodes. - isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes) - if err != nil { - log.Errorf("Unable to determine if node %x is "+ - "advertised: %v", nodeAnn.PubKeyBytes, err) - continue - } - - if !isNodePublic { - log.Tracef("Skipping forwarding announcement for "+ - "node %x due to being unadvertised", - nodeAnn.PubKeyBytes) - continue - } - - nodeUpdate, err := nodeAnn.NodeAnnouncement(true) - if err != nil { - return nil, err - } - - updates = append(updates, nodeUpdate) - } - - return updates, nil -} - // FilterKnownChanIDs takes a target chain, and a set of channel ID's, and // returns a filtered set of chan ID's. This filtered set of chan ID's // represents the ID's that we don't know of which were in the passed superSet. diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 76ca10cccd3..b4f7a0f20d5 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -78,15 +78,6 @@ func newMockChannelGraphTimeSeries( func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) { return &m.highestID, nil } -func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash, - startTime time.Time, endTime time.Time) ([]lnwire.Message, error) { - - m.horizonReq <- horizonQuery{ - chain, startTime, endTime, - } - - return <-m.horizonResp, nil -} func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash, superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) {