-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpeermanager.go
163 lines (133 loc) · 3.8 KB
/
peermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package gotorrent
import (
log "code.google.com/p/tcgl/applog"
"github.com/moretti/gotorrent/messages"
"math/rand"
"net"
)
type PeerManager struct {
Torrent *Torrent
Peers map[string]*Peer
AddPeerAddr chan net.TCPAddr
PeerReadyToDownload chan Peer
Errors chan PeerError
InMessages chan PeerMessage
Quit <-chan bool
}
func NewPeerManager(torrent *Torrent) *PeerManager {
pm := new(PeerManager)
pm.Torrent = torrent
pm.Peers = make(map[string]*Peer)
pm.AddPeerAddr = make(chan net.TCPAddr)
pm.PeerReadyToDownload = make(chan Peer)
pm.Errors = make(chan PeerError)
pm.InMessages = make(chan PeerMessage)
pm.Quit = make(<-chan bool)
go pm.manage()
return pm
}
func (pm *PeerManager) manage() {
for {
select {
case peerAddr := <-pm.AddPeerAddr:
pm.addPeer(peerAddr)
case peerMessage := <-pm.InMessages:
//log.Debugf("Receiving message %v from peer %v", peerMessage.Message.Header, peerMessage.Addr)
pm.processMessage(peerMessage)
case peerError := <-pm.Errors:
pm.handleError(peerError)
case <-pm.Quit:
log.Debugf("Quitting...")
return
}
}
}
func (pm *PeerManager) addPeer(peerAddr net.TCPAddr) {
strAddr := peerAddr.String()
if _, ok := pm.Peers[strAddr]; !ok {
log.Debugf("Found Peer: %v", peerAddr)
peer := NewPeer(
peerAddr,
pm.Torrent,
pm.Errors,
pm.InMessages)
pm.Peers[strAddr] = peer
peer.Connect()
}
}
func (pm *PeerManager) handleError(peerError PeerError) {
log.Errorf("Peer %v: %v", peerError.Addr.String(), peerError.Err)
delete(pm.Peers, peerError.Addr.String())
}
func (pm *PeerManager) processMessage(peerMessage PeerMessage) {
message := peerMessage.Message
peer, ok := pm.Peers[peerMessage.Addr.String()]
if !ok {
log.Errorf("Unable to find peer %v", peerMessage.Addr)
return
}
if message.Header.Length == 0 {
log.Debugf("Peer %v - Keep alive", peer.String())
peer.SetKeepAlive()
} else {
switch message.Header.Id {
case messages.ChokeId:
log.Debugf("Peer %v - Chocked", peer.String())
peer.IsChoked = true
case messages.UnchokeId:
peer.IsChoked = false
log.Debugf("Peer %v - Unchocked", peer.String())
pm.downloadPiece(peer)
case messages.InterestedId:
case messages.NotInterestedId:
case messages.HaveId:
peer.SetHave(message)
pm.downloadPiece(peer)
case messages.BitFieldId:
peer.SetBitField(message)
pm.downloadPiece(peer)
case messages.RequestId:
case messages.PieceId:
peer.RequestsCount--
pm.processPiece(message, peer)
case messages.CancelId:
case messages.PortId:
default:
log.Errorf("Peer %v - Unknown id: %v", peer.String(), message.Header.Id)
}
}
}
func (pm *PeerManager) downloadPiece(peer *Peer) {
if peer.IsChoked || peer.RequestsCount > PeerMaxRequests {
return
}
amInterested, pieceIndices := peer.AmInterested(pm.Torrent.ActivePieces, pm.Torrent.CompletedPieces)
if !amInterested {
return
}
// Choose a random piece that I don't have
pieceIndex := randomChoice(pieceIndices)
piece := pm.Torrent.Pieces[pieceIndex]
peer.RequestsCount++
log.Debugf("Requesting piece #%v to peer %v", pieceIndex, peer.String())
peer.RequestPiece(piece)
}
func (pm *PeerManager) processPiece(message messages.Message, peer *Peer) {
pieceMsg, err := message.ToPiece()
if err != nil {
log.Errorf("Peer %v - Unable to parse the piece message: %v", peer.String(), err)
return
}
log.Debugf("Peer %v - Found a new block - PieceIndex: %v BlockOffset: %v", peer.String(), pieceMsg.PieceIndex, pieceMsg.BlockOffset)
piece := pm.Torrent.Pieces[pieceMsg.PieceIndex]
piece.SetBlock(int(pieceMsg.BlockOffset), pieceMsg.BlockData)
}
func randomChoice(slice []int) int {
randomIndex := rand.Intn(len(slice))
return slice[randomIndex]
}
func (pm *PeerManager) UpdatePeers(addresses []net.TCPAddr) {
for _, peerAddr := range addresses {
pm.AddPeerAddr <- peerAddr
}
}