Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: eusure tipset exit #24

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ test:
go test -race ./...

lint: $(BUILD_DEPS)
go run github.com/golangci/golangci-lint/cmd/golangci-lint run
golangci-lint run

dist-clean:
git clean -xdff
Expand Down
4 changes: 2 additions & 2 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -16,7 +16,7 @@ func NewLocalRPCClient(ctx context.Context, addr string, opts ...jsonrpc.Option)
port := strings.Split(addr, ":")[1]
endpoint := fmt.Sprintf("http://127.0.0.1:%s/rpc/admin/v0", port)

token, err := ioutil.ReadFile("./token")
token, err := os.ReadFile("./token")
token = bytes.TrimSpace(token)
if err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"io/ioutil"
"os"
"strings"

"github.com/ipfs-force-community/metrics"
Expand Down Expand Up @@ -68,7 +68,7 @@ var runCmd = &cli.Command{
if err != nil {
return err
}
err = ioutil.WriteFile("./token", token, 0666)
err = os.WriteFile("./token", token, 0666)

if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion co/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,16 @@ func (c *Coordinator) delNodeAddr(addr string) {
}
func (c *Coordinator) handleCandidate(hc *headCandidate) {
addr := hc.node.info.Addr
clog := log.With("node", addr, "h", hc.ts.Height(), "w", hc.weight, "drift", time.Now().Unix()-int64(hc.ts.MinTimestamp()))

c.headMu.Lock()
defer c.headMu.Unlock()

if c.sel.Weight(addr) == 0 {
log.Infof("skip zero weight node %s ", addr)
return
}
clog := log.With("node", addr, "h", hc.ts.Height(), "w", hc.weight, "drift", time.Now().Unix()-int64(hc.ts.MinTimestamp()))

//1. more weight
//2. if equal weight. select more blocks
if c.head == nil || hc.weight.GreaterThan(c.weight) || (hc.weight.Equals(c.weight) && len(hc.ts.Blocks()) > len(c.head.Blocks())) {
Expand Down
36 changes: 34 additions & 2 deletions co/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/filecoin-project/lotus/api/v1api"
"github.com/ipfs-force-community/venus-common-utils/apiinfo"

"github.com/filecoin-project/go-jsonrpc"
"github.com/hashicorp/go-multierror"

"github.com/ipfs/go-cid"
"go.uber.org/zap"

Expand Down Expand Up @@ -70,7 +73,8 @@ type Node struct {
closer jsonrpc.ClientCloser
}

log *zap.SugaredLogger
blkCache *lru.ARCCache
log *zap.SugaredLogger
}

func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
Expand All @@ -79,6 +83,11 @@ func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
return nil, err
}
ctx, cancel := context.WithCancel(cctx.lc)
blkCache, err := lru.NewARC(100)
if err != nil {
cancel()
return nil, err
}
return &Node{
reListenInterval: cctx.nodeOpt.ReListenMinInterval,
opt: cctx.nodeOpt,
Expand All @@ -87,6 +96,7 @@ func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
cancel: cancel,
sctx: cctx,
Addr: info.Addr,
blkCache: blkCache,
log: log.With("remote", addr),
}, nil
}
Expand Down Expand Up @@ -206,8 +216,11 @@ func (n *Node) applyChanges(lifeCtx context.Context, changes []*api.HeadChange)
idx := -1
for i := range changes {
switch changes[i].Type {
case store.HCCurrent, store.HCApply:
case store.HCCurrent:
idx = i
case store.HCApply:
idx = i
n.storeKey(changes[i].Val.Key())
}
}

Expand Down Expand Up @@ -287,6 +300,25 @@ func (n *Node) loadBlockHeader(ctx context.Context, c cid.Cid) (*types.BlockHead
return blk, err
}

func (n *Node) hasKey(key types.TipSetKey) bool {
for _, blkCid := range key.Cids() {
_, has := n.blkCache.Peek(blkCid.String())
if !has {
return false
}
}
return true
}

func (n *Node) storeKey(key types.TipSetKey) {
for _, blkCid := range key.Cids() {
_, has := n.blkCache.Peek(blkCid.String())
if !has {
n.blkCache.Add(blkCid.String(), nil)
}
}
}

const (
ADD = true
REMOVE = false
Expand Down
16 changes: 15 additions & 1 deletion co/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package co
import (
"fmt"
"sync"

"github.com/filecoin-project/lotus/chain/types"
)

type Priority int
Expand Down Expand Up @@ -127,6 +129,13 @@ func (s *Selector) SetWeight(addr string, weight int) error {
return nil
}

func (s *Selector) Weight(addr string) int {
s.lk.Lock()
defer s.lk.Unlock()

return s.weight[addr]
}

func (s *Selector) ListWeight() map[string]int {
s.lk.RLock()
defer s.lk.RUnlock()
Expand All @@ -138,7 +147,7 @@ func (s *Selector) ListWeight() map[string]int {
}

// Select tries to choose a node from the candidates
func (s *Selector) Select() (*Node, error) {
func (s *Selector) Select(tsk types.TipSetKey) (*Node, error) {
s.lk.RLock()
defer s.lk.RUnlock()

Expand All @@ -147,6 +156,11 @@ func (s *Selector) Select() (*Node, error) {
catchUpQue := make(map[string]int)

for addr, p := range s.priority {
node := s.nodeProvider.GetNode(addr)
if !node.hasKey(tsk) {
log.Warnf("node %s not contains key %s", addr, tsk)
continue
}
if p == CatchUpPriority {
catchUpQue[addr] = s.weight[addr]
} else if p == DelayPriority {
Expand Down
Loading