forked from decred/dcrlnd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel_notifier.go
156 lines (133 loc) · 4.63 KB
/
channel_notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package dcrlnd
import (
"fmt"
"net"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/decred/dcrd/wire"
"github.com/decred/dcrlnd/chanbackup"
"github.com/decred/dcrlnd/channeldb"
"github.com/decred/dcrlnd/channelnotifier"
)
// addrSource is an interface that allow us to get the addresses for a target
// node. We'll need this in order to be able to properly proxy the
// notifications to create SCBs.
type addrSource interface {
// AddrsForNode returns all known addresses for the target node public
// key.
AddrsForNode(nodePub *secp256k1.PublicKey) ([]net.Addr, error)
}
// channelNotifier is an implementation of the chanbackup.ChannelNotifier
// interface using the existing channelnotifier.ChannelNotifier struct. This
// implementation allows us to satisfy all the dependencies of the
// chanbackup.SubSwapper struct.
type channelNotifier struct {
// chanNotifier is the based channel notifier that we'll proxy requests
// from.
chanNotifier *channelnotifier.ChannelNotifier
// addrs is an implementation of the addrSource interface that allows
// us to get the latest set of addresses for a given node. We'll need
// this to be able to create an SCB for new channels.
addrs addrSource
}
// SubscribeChans requests a new channel subscription relative to the initial
// set of known channels. We use the knownChans as a synchronization point to
// ensure that the chanbackup.SubSwapper does not miss any channel open or
// close events in the period between when it's created, and when it requests
// the channel subscription.
//
// NOTE: This is part of the chanbackup.ChannelNotifier interface.
func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{}) (
*chanbackup.ChannelSubscription, error) {
ltndLog.Infof("Channel backup proxy channel notifier starting")
// TODO(roasbeef): read existing set of chans and diff
quit := make(chan struct{})
chanUpdates := make(chan chanbackup.ChannelEvent, 1)
// sendChanOpenUpdate is a closure that sends a ChannelEvent to the
// chanUpdates channel to inform subscribers about new pending or
// confirmed channels.
sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) {
nodeAddrs, err := c.addrs.AddrsForNode(
newOrPendingChan.IdentityPub,
)
if err != nil {
pub := newOrPendingChan.IdentityPub
ltndLog.Errorf("unable to fetch addrs for %x: %v",
pub.SerializeCompressed(), err)
}
chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: newOrPendingChan,
Addrs: nodeAddrs,
},
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
}
// In order to adhere to the interface, we'll proxy the events from the
// channel notifier to the sub-swapper in a format it understands.
go func() {
// First, we'll subscribe to the primary channel notifier so we can
// obtain events for new opened/closed channels.
chanSubscription, err := c.chanNotifier.SubscribeChannelEvents()
if err != nil {
panic(fmt.Sprintf("unable to subscribe to chans: %v",
err))
}
defer chanSubscription.Cancel()
for {
select {
// A new event has been sent by the chanNotifier, we'll
// filter out the events we actually care about and
// send them to the sub-swapper.
case e := <-chanSubscription.Updates():
// TODO(roasbeef): batch dispatch ntnfs
switch event := e.(type) {
// A new channel has been opened and is still
// pending. We can still create a backup, even
// if the final channel ID is not yet available.
case channelnotifier.PendingOpenChannelEvent:
pendingChan := event.PendingChannel
sendChanOpenUpdate(pendingChan)
// A new channel has been confirmed, we'll
// obtain the node address, then send to the
// sub-swapper.
case channelnotifier.OpenChannelEvent:
sendChanOpenUpdate(event.Channel)
// An existing channel has been closed, we'll
// send only the chanPoint of the closed
// channel to the sub-swapper.
case channelnotifier.ClosedChannelEvent:
chanPoint := event.CloseSummary.ChanPoint
chanEvent := chanbackup.ChannelEvent{
ClosedChans: []wire.OutPoint{
chanPoint,
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
}
// The cancel method has been called, signalling us to
// exit
case <-quit:
return
}
}
}()
return &chanbackup.ChannelSubscription{
ChanUpdates: chanUpdates,
Cancel: func() {
close(quit)
},
}, nil
}
// A compile-time constraint to ensure channelNotifier implements
// chanbackup.ChannelNotifier.
var _ chanbackup.ChannelNotifier = (*channelNotifier)(nil)