Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/410/expose raft cluster api #501

Merged
merged 6 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,11 @@ web3._extend({
new web3._extend.Property({
name: 'leader',
getter: 'raft_leader'
})
}),
new web3._extend.Property({
name: 'cluster',
jpmsam marked this conversation as resolved.
Show resolved Hide resolved
getter: 'raft_cluster'
}),
]
})
`
Expand Down
7 changes: 6 additions & 1 deletion raft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ func (s *PublicRaftAPI) Leader() (string, error) {
if nil != err {
return "", err
}
return addr.nodeId.String(), nil
return addr.NodeId.String(), nil
}

func (s *PublicRaftAPI) Cluster() []*Address {
return append(s.raftService.raftProtocolManager.NodeInfo().PeerAddresses,
s.raftService.raftProtocolManager.NodeInfo().Address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extract s.raftService.raftProtocolManager.NodeInfo() to a local variable, and then reuse that value for accessing both PeerAddresses and Address? This is because NodeInfo() takes a lock and does a bunch of work:

https://github.com/jpmorganchase/quorum/blob/d71daf324590d0033d0fc0ad255cadb702cf2b3b/raft/handler.go#L184-L219

}
6 changes: 3 additions & 3 deletions raft/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,17 +624,17 @@ func (pm *ProtocolManager) entriesToApply(allEntries []raftpb.Entry) (entriesToA
}

func raftUrl(address *Address) string {
return fmt.Sprintf("http://%s:%d", address.ip, address.raftPort)
return fmt.Sprintf("http://%s:%d", address.Ip, address.RaftPort)
}

func (pm *ProtocolManager) addPeer(address *Address) {
pm.mu.Lock()
defer pm.mu.Unlock()

raftId := address.raftId
raftId := address.RaftId

// Add P2P connection:
p2pNode := discover.NewNode(address.nodeId, address.ip, 0, uint16(address.p2pPort))
p2pNode := discover.NewNode(address.NodeId, address.Ip, 0, uint16(address.P2pPort))
pm.p2pServer.AddPeer(p2pNode)

// Add raft transport connection:
Expand Down
27 changes: 14 additions & 13 deletions raft/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,29 @@ import (
"net"

"fmt"
"log"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp"
"log"
)

// Serializable information about a Peer. Sufficient to build `etcdRaft.Peer`
// or `discover.Node`.
type Address struct {
raftId uint16
nodeId discover.NodeID
ip net.IP
p2pPort uint16
raftPort uint16
RaftId uint16 `json:"raftId"`
NodeId discover.NodeID `json:"nodeId"`
Ip net.IP `json:"ip"`
P2pPort uint16 `json:"p2pPort"`
RaftPort uint16 `json:"raftPort"`
}

func newAddress(raftId uint16, raftPort uint16, node *discover.Node) *Address {
return &Address{
raftId: raftId,
nodeId: node.ID,
ip: node.IP,
p2pPort: node.TCP,
raftPort: raftPort,
RaftId: raftId,
NodeId: node.ID,
Ip: node.IP,
P2pPort: node.TCP,
RaftPort: raftPort,
}
}

Expand All @@ -37,7 +38,7 @@ type Peer struct {
}

func (addr *Address) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{addr.raftId, addr.nodeId, addr.ip, addr.p2pPort, addr.raftPort})
return rlp.Encode(w, []interface{}{addr.RaftId, addr.NodeId, addr.Ip, addr.P2pPort, addr.RaftPort})
}

func (addr *Address) DecodeRLP(s *rlp.Stream) error {
Expand All @@ -53,7 +54,7 @@ func (addr *Address) DecodeRLP(s *rlp.Stream) error {
if err := s.Decode(&temp); err != nil {
return err
} else {
addr.raftId, addr.nodeId, addr.ip, addr.p2pPort, addr.raftPort = temp.RaftId, temp.NodeId, temp.Ip, temp.P2pPort, temp.RaftPort
addr.RaftId, addr.NodeId, addr.Ip, addr.P2pPort, addr.RaftPort = temp.RaftId, temp.NodeId, temp.Ip, temp.P2pPort, temp.RaftPort
return nil
}
}
Expand Down
8 changes: 4 additions & 4 deletions raft/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ByRaftId []Address

func (a ByRaftId) Len() int { return len(a) }
func (a ByRaftId) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByRaftId) Less(i, j int) bool { return a[i].raftId < a[j].raftId }
func (a ByRaftId) Less(i, j int) bool { return a[i].RaftId < a[j].RaftId }

func (pm *ProtocolManager) buildSnapshot() *Snapshot {
pm.mu.RLock()
Expand Down Expand Up @@ -140,17 +140,17 @@ func (pm *ProtocolManager) updateClusterMembership(newConfState raftpb.ConfState
for _, tempAddress := range addresses {
address := tempAddress // Allocate separately on the heap for each iteration.

if address.raftId == pm.raftId {
if address.RaftId == pm.raftId {
// If we're a newcomer to an existing cluster, this is where we learn
// our own Address.
pm.setLocalAddress(&address)
} else {
pm.mu.RLock()
existingPeer := pm.peers[address.raftId]
existingPeer := pm.peers[address.RaftId]
pm.mu.RUnlock()

if existingPeer == nil {
log.Info("adding new raft peer", "raft id", address.raftId)
log.Info("adding new raft peer", "raft id", address.RaftId)
pm.addPeer(&address)
}
}
Expand Down