Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 24 additions & 32 deletions p2p/discv5/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,9 @@ const (
const testTopic = "foo"

const (
printDebugLogs = false
printTestImgLogs = false
)

func debugLog(s string) {
if printDebugLogs {
fmt.Println(s)
}
}

// Network manages the table and all protocol interaction.
type Network struct {
db *nodeDB // database of known nodes
Expand Down Expand Up @@ -388,14 +381,14 @@ func (net *Network) loop() {
}
}()
resetNextTicket := func() {
t, timeout := net.ticketStore.nextFilteredTicket()
if t != nextTicket {
nextTicket = t
ticket, timeout := net.ticketStore.nextFilteredTicket()
if nextTicket != ticket {
nextTicket = ticket
if nextRegisterTimer != nil {
nextRegisterTimer.Stop()
nextRegisterTime = nil
}
if t != nil {
if ticket != nil {
nextRegisterTimer = time.NewTimer(timeout)
nextRegisterTime = nextRegisterTimer.C
}
Expand Down Expand Up @@ -423,13 +416,13 @@ loop:

select {
case <-net.closeReq:
debugLog("<-net.closeReq")
log.Trace("<-net.closeReq")
break loop

// Ingress packet handling.
case pkt := <-net.read:
//fmt.Println("read", pkt.ev)
debugLog("<-net.read")
log.Trace("<-net.read")
n := net.internNode(&pkt)
prestate := n.state
status := "ok"
Expand All @@ -444,7 +437,7 @@ loop:

// State transition timeouts.
case timeout := <-net.timeout:
debugLog("<-net.timeout")
log.Trace("<-net.timeout")
if net.timeoutTimers[timeout] == nil {
// Stale timer (was aborted).
continue
Expand All @@ -462,20 +455,20 @@ loop:

// Querying.
case q := <-net.queryReq:
debugLog("<-net.queryReq")
log.Trace("<-net.queryReq")
if !q.start(net) {
q.remote.deferQuery(q)
}

// Interacting with the table.
case f := <-net.tableOpReq:
debugLog("<-net.tableOpReq")
log.Trace("<-net.tableOpReq")
f()
net.tableOpResp <- struct{}{}

// Topic registration stuff.
case req := <-net.topicRegisterReq:
debugLog("<-net.topicRegisterReq")
log.Trace("<-net.topicRegisterReq")
if !req.add {
net.ticketStore.removeRegisterTopic(req.topic)
continue
Expand All @@ -486,7 +479,7 @@ loop:
// determination for new topics.
// if topicRegisterLookupDone == nil {
if topicRegisterLookupTarget.target == (common.Hash{}) {
debugLog("topicRegisterLookupTarget == null")
log.Trace("topicRegisterLookupTarget == null")
if topicRegisterLookupTick.Stop() {
<-topicRegisterLookupTick.C
}
Expand All @@ -496,7 +489,7 @@ loop:
}

case nodes := <-topicRegisterLookupDone:
debugLog("<-topicRegisterLookupDone")
log.Trace("<-topicRegisterLookupDone")
net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte {
net.ping(n, n.addr())
return n.pingEcho
Expand All @@ -507,7 +500,7 @@ loop:
topicRegisterLookupDone = nil

case <-topicRegisterLookupTick.C:
debugLog("<-topicRegisterLookupTick")
log.Trace("<-topicRegisterLookupTick")
if (topicRegisterLookupTarget.target == common.Hash{}) {
target, delay := net.ticketStore.nextRegisterLookup()
topicRegisterLookupTarget = target
Expand All @@ -520,14 +513,14 @@ loop:
}

case <-nextRegisterTime:
debugLog("<-nextRegisterTime")
log.Trace("<-nextRegisterTime")
net.ticketStore.ticketRegistered(*nextTicket)
//fmt.Println("sendTopicRegister", nextTicket.t.node.addr().String(), nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)

case req := <-net.topicSearchReq:
if refreshDone == nil {
debugLog("<-net.topicSearchReq")
log.Trace("<-net.topicSearchReq")
info, ok := searchInfo[req.topic]
if ok {
if req.delay == time.Duration(0) {
Expand Down Expand Up @@ -588,7 +581,7 @@ loop:
})

case <-statsDump.C:
debugLog("<-statsDump.C")
log.Trace("<-statsDump.C")
/*r, ok := net.ticketStore.radius[testTopic]
if !ok {
fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now())
Expand Down Expand Up @@ -617,7 +610,7 @@ loop:

// Periodic / lookup-initiated bucket refresh.
case <-refreshTimer.C:
debugLog("<-refreshTimer.C")
log.Trace("<-refreshTimer.C")
// TODO: ideally we would start the refresh timer after
// fallback nodes have been set for the first time.
if refreshDone == nil {
Expand All @@ -631,7 +624,7 @@ loop:
bucketRefreshTimer.Reset(bucketRefreshInterval)
}()
case newNursery := <-net.refreshReq:
debugLog("<-net.refreshReq")
log.Trace("<-net.refreshReq")
if newNursery != nil {
net.nursery = newNursery
}
Expand All @@ -641,7 +634,7 @@ loop:
}
net.refreshResp <- refreshDone
case <-refreshDone:
debugLog("<-net.refreshDone")
log.Trace("<-net.refreshDone")
refreshDone = nil
list := searchReqWhenRefreshDone
searchReqWhenRefreshDone = nil
Expand All @@ -652,7 +645,7 @@ loop:
}()
}
}
debugLog("loop stopped")
log.Trace("loop stopped")

log.Debug(fmt.Sprintf("shutting down"))
if net.conn != nil {
Expand Down Expand Up @@ -1109,14 +1102,14 @@ func (net *Network) ping(n *Node, addr *net.UDPAddr) {
//fmt.Println(" not sent")
return
}
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
log.Trace("Pinging remote node", "node", n.ID)
n.pingTopics = net.ticketStore.regTopicSet()
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
net.timedEvent(respTimeout, n, pongTimeout)
}

func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
debugLog(fmt.Sprintf("handlePing(node = %x)", n.ID[:8]))
log.Trace("Handling remote ping", "node", n.ID)
ping := pkt.data.(*ping)
n.TCP = ping.From.TCP
t := net.topictab.getTicket(n, ping.Topics)
Expand All @@ -1131,17 +1124,16 @@ func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
}

func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error {
debugLog(fmt.Sprintf("handleKnownPong(node = %x)", n.ID[:8]))
log.Trace("Handling known pong", "node", n.ID)
net.abortTimedEvent(n, pongTimeout)
now := mclock.Now()
ticket, err := pongToTicket(now, n.pingTopics, n, pkt)
if err == nil {
// fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data)
net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket)
} else {
debugLog(fmt.Sprintf(" error: %v", err))
log.Trace("Failed to convert pong to ticket", "err", err)
}

n.pingEcho = nil
n.pingTopics = nil
return err
Expand Down
5 changes: 5 additions & 0 deletions p2p/discv5/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (n NodeID) GoString() string {
return fmt.Sprintf("discover.HexID(\"%x\")", n[:])
}

// TerminalString returns a shortened hex string for terminal logging.
func (n NodeID) TerminalString() string {
return hex.EncodeToString(n[:8])
}

// HexID converts a hex string to a NodeID.
// The string may be prefixed with 0x.
func HexID(in string) (NodeID, error) {
Expand Down
Loading