From 71a74bae27c72c4886634b03c40d40eb1b0aae63 Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Thu, 4 Aug 2022 09:51:33 +0800 Subject: [PATCH 1/2] refrator: network --- app/node/builder.go | 25 ++- app/node/node.go | 21 +-- app/submodule/discovery/discovery_api.go | 9 -- .../discovery/discovery_submodule.go | 137 ----------------- app/submodule/network/network_submodule.go | 57 ++++--- app/submodule/syncer/syncer_submodule.go | 50 +++--- go.mod | 2 +- go.sum | 2 + pkg/chain/testing.go | 2 +- pkg/chainsync/chainsync.go | 2 +- pkg/chainsync/syncer/syncer.go | 2 +- pkg/discovery/bootstrap.go | 138 ----------------- pkg/discovery/bootstrap_test.go | 134 ----------------- pkg/discovery/noop_discovery.go | 29 ---- pkg/discovery/peer_tracker.go | 142 ------------------ pkg/discovery/peer_tracker_test.go | 128 ---------------- pkg/net/address.go | 34 ----- pkg/net/address_test.go | 31 ---- pkg/net/{protocols.go => dht.go} | 0 pkg/{chainsync => net}/exchange/cbor_gen.go | 26 ++-- pkg/{chainsync => net}/exchange/client.go | 4 +- pkg/{chainsync => net}/exchange/doc.go | 0 pkg/{chainsync => net}/exchange/inct.go | 0 pkg/{chainsync => net}/exchange/interfaces.go | 0 .../exchange/peer_tracker.go | 10 +- pkg/{chainsync => net}/exchange/protocol.go | 0 pkg/{chainsync => net}/exchange/server.go | 0 .../helloprotocol}/cbor_gen.go | 2 +- .../helloprotocol}/hello_protocol.go | 22 +-- .../helloprotocol}/hello_protocol_test.go | 43 ++++-- pkg/net/{ => peermgr}/peermgr.go | 2 +- pkg/testhelpers/net.go | 2 +- tools/gen_cbor/main.go | 8 +- 33 files changed, 141 insertions(+), 923 deletions(-) delete mode 100644 app/submodule/discovery/discovery_api.go delete mode 100644 app/submodule/discovery/discovery_submodule.go delete mode 100644 pkg/discovery/bootstrap.go delete mode 100644 pkg/discovery/bootstrap_test.go delete mode 100644 pkg/discovery/noop_discovery.go delete mode 100644 pkg/discovery/peer_tracker.go delete mode 100644 pkg/discovery/peer_tracker_test.go delete mode 100644 pkg/net/address.go delete mode 100644 pkg/net/address_test.go rename pkg/net/{protocols.go => dht.go} (100%) rename pkg/{chainsync => net}/exchange/cbor_gen.go (96%) rename pkg/{chainsync => net}/exchange/client.go (99%) rename pkg/{chainsync => net}/exchange/doc.go (100%) rename pkg/{chainsync => net}/exchange/inct.go (100%) rename pkg/{chainsync => net}/exchange/interfaces.go (100%) rename pkg/{chainsync => net}/exchange/peer_tracker.go (93%) rename pkg/{chainsync => net}/exchange/protocol.go (100%) rename pkg/{chainsync => net}/exchange/server.go (100%) rename pkg/{discovery => net/helloprotocol}/cbor_gen.go (99%) rename pkg/{discovery => net/helloprotocol}/hello_protocol.go (95%) rename pkg/{discovery => net/helloprotocol}/hello_protocol_test.go (73%) rename pkg/net/{ => peermgr}/peermgr.go (99%) diff --git a/app/node/builder.go b/app/node/builder.go index 8e6ead912e..eb2527c328 100644 --- a/app/node/builder.go +++ b/app/node/builder.go @@ -3,6 +3,8 @@ package node import ( "context" "fmt" + "github.com/filecoin-project/venus/app/submodule/dagservice" + "github.com/filecoin-project/venus/app/submodule/network" "time" logging "github.com/ipfs/go-log" @@ -13,13 +15,10 @@ import ( "github.com/filecoin-project/venus/app/submodule/chain" "github.com/filecoin-project/venus/app/submodule/common" config2 "github.com/filecoin-project/venus/app/submodule/config" - "github.com/filecoin-project/venus/app/submodule/dagservice" - "github.com/filecoin-project/venus/app/submodule/discovery" "github.com/filecoin-project/venus/app/submodule/market" "github.com/filecoin-project/venus/app/submodule/mining" "github.com/filecoin-project/venus/app/submodule/mpool" "github.com/filecoin-project/venus/app/submodule/multisig" - "github.com/filecoin-project/venus/app/submodule/network" "github.com/filecoin-project/venus/app/submodule/paych" "github.com/filecoin-project/venus/app/submodule/storagenetworking" "github.com/filecoin-project/venus/app/submodule/syncer" @@ -113,28 +112,23 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { return nil, errors.Wrap(err, "failed to build node.blockstore") } - nd.network, err = network.NewNetworkSubmodule(ctx, (*builder)(b)) + nd.chain, err = chain.NewChainSubmodule(ctx, (*builder)(b), nd.circulatiingSupplyCalculator) if err != nil { - return nil, errors.Wrap(err, "failed to build node.Network") + return nil, errors.Wrap(err, "failed to build node.Chain") } - nd.blockservice, err = dagservice.NewDagserviceSubmodule(ctx, (*builder)(b), nd.network) + nd.network, err = network.NewNetworkSubmodule(ctx, nd.chain.ChainReader, nd.chain.MessageStore, (*builder)(b)) if err != nil { - return nil, errors.Wrap(err, "failed to build node.dagservice") + return nil, errors.Wrap(err, "failed to build node.Network") } - nd.chain, err = chain.NewChainSubmodule(ctx, (*builder)(b), nd.circulatiingSupplyCalculator) + nd.blockservice, err = dagservice.NewDagserviceSubmodule(ctx, (*builder)(b), nd.network) if err != nil { - return nil, errors.Wrap(err, "failed to build node.Chain") + return nil, errors.Wrap(err, "failed to build node.dagservice") } - // todo change builder interface to read config - nd.discovery, err = discovery.NewDiscoverySubmodule(ctx, (*builder)(b), nd.network, nd.chain.ChainReader, nd.chain.MessageStore) - if err != nil { - return nil, errors.Wrap(err, "failed to build node.discovery") - } - nd.syncer, err = syncer.NewSyncerSubmodule(ctx, (*builder)(b), nd.blockstore, nd.network, nd.discovery, nd.chain, nd.circulatiingSupplyCalculator) + nd.syncer, err = syncer.NewSyncerSubmodule(ctx, (*builder)(b), nd.blockstore, nd.network, nd.chain, nd.circulatiingSupplyCalculator) if err != nil { return nil, errors.Wrap(err, "failed to build node.Syncer") } @@ -177,7 +171,6 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { nd.blockstore, nd.network, nd.blockservice, - nd.discovery, nd.chain, nd.syncer, nd.wallet, diff --git a/app/node/node.go b/app/node/node.go index ce90e18da7..5d17f3f54f 100644 --- a/app/node/node.go +++ b/app/node/node.go @@ -16,7 +16,6 @@ import ( chain2 "github.com/filecoin-project/venus/app/submodule/chain" configModule "github.com/filecoin-project/venus/app/submodule/config" "github.com/filecoin-project/venus/app/submodule/dagservice" - "github.com/filecoin-project/venus/app/submodule/discovery" "github.com/filecoin-project/venus/app/submodule/market" "github.com/filecoin-project/venus/app/submodule/mining" "github.com/filecoin-project/venus/app/submodule/mpool" @@ -75,7 +74,6 @@ type Node struct { blockstore *blockstore.BlockstoreSubmodule blockservice *dagservice.DagServiceSubmodule network *network2.NetworkSubmodule - discovery *discovery.DiscoverySubmodule // // Subsystems @@ -125,10 +123,6 @@ func (node *Node) MultiSig() *multisig.MultiSigSubmodule { return node.multiSig } -func (node *Node) Discovery() *discovery.DiscoverySubmodule { - return node.discovery -} - func (node *Node) Network() *network2.NetworkSubmodule { return node.network } @@ -172,11 +166,6 @@ func (node *Node) Start(ctx context.Context) error { var syncCtx context.Context syncCtx, node.syncer.CancelChainSync = context.WithCancel(context.Background()) - // Start node discovery - if err = node.discovery.Start(node.offlineMode); err != nil { - return err - } - // start syncer module to receive new blocks and start sync to latest height err = node.syncer.Start(syncCtx) if err != nil { @@ -194,6 +183,12 @@ func (node *Node) Start(ctx context.Context) error { return err } + // network should start late, + err = node.network.Start(syncCtx) + if err != nil { + return err + } + return nil } @@ -207,10 +202,6 @@ func (node *Node) Stop(ctx context.Context) { log.Infof("shutting down chain syncer...") node.syncer.Stop(ctx) - // Stop discovery submodule - log.Infof("shutting down discovery...") - node.discovery.Stop() - // Stop network submodule log.Infof("shutting down network...") node.network.Stop(ctx) diff --git a/app/submodule/discovery/discovery_api.go b/app/submodule/discovery/discovery_api.go deleted file mode 100644 index 8288215621..0000000000 --- a/app/submodule/discovery/discovery_api.go +++ /dev/null @@ -1,9 +0,0 @@ -package discovery - -type IDiscovery interface{} - -var _ IDiscovery = &discoveryAPI{} - -type discoveryAPI struct { //nolint - discovery *DiscoverySubmodule -} diff --git a/app/submodule/discovery/discovery_submodule.go b/app/submodule/discovery/discovery_submodule.go deleted file mode 100644 index 64bc07d850..0000000000 --- a/app/submodule/discovery/discovery_submodule.go +++ /dev/null @@ -1,137 +0,0 @@ -package discovery - -import ( - "context" - "time" - - "github.com/filecoin-project/venus/app/submodule/network" - "github.com/filecoin-project/venus/pkg/repo" - "github.com/filecoin-project/venus/venus-shared/types" - "github.com/libp2p/go-libp2p-core/host" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/pkg/errors" - - "github.com/filecoin-project/venus/pkg/chain" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" - "github.com/filecoin-project/venus/pkg/discovery" - "github.com/filecoin-project/venus/pkg/net" - "github.com/filecoin-project/venus/pkg/util/moresync" -) - -var log = logging.Logger("discover_module") // nolint - -// DiscoverySubmodule enhances the `Node` with peer discovery capabilities. -type DiscoverySubmodule struct { //nolint - Bootstrapper *discovery.Bootstrapper - BootstrapReady *moresync.Latch - - // PeerTracker maintains a list of peers. - PeerTracker *discovery.PeerTracker - - // HelloHandler handle peer connections for the "hello" protocol. - HelloHandler *discovery.HelloProtocolHandler - // HelloHandler handle peer connections for the "hello" protocol. - ExchangeHandler exchange.Server - ExchangeClient exchange.Client - host host.Host - PeerDiscoveryCallbacks []discovery.PeerDiscoveredCallback - TipSetLoader discovery.GetTipSetFunc -} - -type discoveryConfig interface { - GenesisCid() cid.Cid - Repo() repo.Repo -} - -// NewDiscoverySubmodule creates a new discovery submodule. -func NewDiscoverySubmodule(ctx context.Context, - discoverCfg discoveryConfig, - network *network.NetworkSubmodule, - chainStore *chain.Store, - messageStore *chain.MessageStore, -) (*DiscoverySubmodule, error) { - periodStr := discoverCfg.Repo().Config().Bootstrap.Period - period, err := time.ParseDuration(periodStr) - if err != nil { - return nil, errors.Wrapf(err, "couldn't parse bootstrap period %s", periodStr) - } - - // bootstrapper maintains connections to some subset of addresses - ba := discoverCfg.Repo().Config().Bootstrap.Addresses - bpi, err := net.PeerAddrsToAddrInfo(ba) - if err != nil { - return nil, errors.Wrapf(err, "couldn't parse bootstrap addresses [%s]", ba) - } - - minPeerThreshold := discoverCfg.Repo().Config().Bootstrap.MinPeerThreshold - - exchangeClient := exchange.NewClient(network.Host, network.PeerMgr) - // create a bootstrapper - bootstrapper := discovery.NewBootstrapper(bpi, network.Host, network.Host.Network(), network.Router, minPeerThreshold, period) - - // set up peer tracking - peerTracker := discovery.NewPeerTracker(network.Host.ID()) - - bootStrapReady := moresync.NewLatch(uint(minPeerThreshold)) - - return &DiscoverySubmodule{ - host: network.Host, - Bootstrapper: bootstrapper, - BootstrapReady: bootStrapReady, - PeerTracker: peerTracker, - ExchangeClient: exchangeClient, - HelloHandler: discovery.NewHelloProtocolHandler(network.Host, network.PeerMgr, exchangeClient, chainStore, messageStore, discoverCfg.GenesisCid(), time.Duration(discoverCfg.Repo().Config().NetworkParams.BlockDelay)*time.Second), - ExchangeHandler: exchange.NewServer(chainStore, messageStore, network.Host), - PeerDiscoveryCallbacks: []discovery.PeerDiscoveredCallback{func(msg *types.ChainInfo) { - bootStrapReady.Done() - }}, - TipSetLoader: func() (*types.TipSet, error) { - return chainStore.GetHead(), nil - }, - }, nil -} - -// Start starts the discovery submodule for a node. It blocks until bootstrap -// satisfies the configured security conditions. -func (discovery *DiscoverySubmodule) Start(offline bool) error { - // Register peer tracker disconnect function with network. - discovery.PeerTracker.RegisterDisconnect(discovery.host.Network()) - - // Start up 'hello' handshake service,recv HelloMessage ??? - peerDiscoveredCallback := func(ci *types.ChainInfo) { - for _, fn := range discovery.PeerDiscoveryCallbacks { - fn(ci) - } - } - - // Register the "hello" protocol with the network - discovery.HelloHandler.Register(peerDiscoveredCallback, discovery.TipSetLoader) - - //registre exchange protocol - discovery.ExchangeHandler.Register() - - // Start bootstrapper. - if !offline { - discovery.Bootstrapper.Start(context.Background()) - // Wait for bootstrap to be sufficient connected - discovery.BootstrapReady.Wait() - } - log.Info("discovery module start") - return nil -} - -// Stop stops the discovery submodule. -func (discovery *DiscoverySubmodule) Stop() { - discovery.Bootstrapper.Stop() -} - -//API create a discovery api implement -func (discovery *DiscoverySubmodule) API() IDiscovery { - return &discoveryAPI{discovery: discovery} -} - -func (discovery *DiscoverySubmodule) V0API() IDiscovery { - return &discoveryAPI{discovery: discovery} -} diff --git a/app/submodule/network/network_submodule.go b/app/submodule/network/network_submodule.go index d7c4d1fdf2..035a92855d 100644 --- a/app/submodule/network/network_submodule.go +++ b/app/submodule/network/network_submodule.go @@ -4,10 +4,12 @@ import ( "bytes" "context" "fmt" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" "os" "runtime" "time" + "github.com/dchest/blake2b" "github.com/ipfs/go-bitswap" bsnet "github.com/ipfs/go-bitswap/network" blocks "github.com/ipfs/go-block-format" @@ -29,6 +31,7 @@ import ( "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" libp2pps "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" yamux "github.com/libp2p/go-libp2p-yamux" routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" ma "github.com/multiformats/go-multiaddr" @@ -39,12 +42,13 @@ import ( dtnet "github.com/filecoin-project/go-data-transfer/network" dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" + "github.com/filecoin-project/venus/pkg/chain" "github.com/filecoin-project/venus/pkg/config" - "github.com/filecoin-project/venus/pkg/discovery" "github.com/filecoin-project/venus/pkg/net" + filexchange "github.com/filecoin-project/venus/pkg/net/exchange" + "github.com/filecoin-project/venus/pkg/net/peermgr" "github.com/filecoin-project/venus/pkg/repo" appstate "github.com/filecoin-project/venus/pkg/state" - "github.com/filecoin-project/venus/pkg/util/blockstoreutil" "github.com/filecoin-project/venus/venus-shared/types" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" @@ -72,11 +76,15 @@ type NetworkSubmodule struct { //nolint GraphExchange graphsync.GraphExchange - Blockstore blockstoreutil.Blockstore - PeerMgr net.IPeerMgr + HelloHandler *helloprotocol.HelloProtocolHandler + + PeerMgr peermgr.IPeerMgr + ExchangeClient filexchange.Client //data transfer DataTransfer datatransfer.Manager DataTransferHost dtnet.DataTransferNetwork + + cfg networkConfig } //API create a new network implement @@ -111,10 +119,10 @@ type networkConfig interface { } // NewNetworkSubmodule creates a new network submodule. -func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSubmodule, error) { +func NewNetworkSubmodule(ctx context.Context, chainStore *chain.Store, + messageStore *chain.MessageStore, config networkConfig) (*NetworkSubmodule, error) { bandwidthTracker := p2pmetrics.NewBandwidthCounter() libP2pOpts := append(config.Libp2pOpts(), libp2p.BandwidthReporter(bandwidthTracker), makeSmuxTransportOption()) - var networkName string var err error if !config.Repo().Config().NetworkParams.DevNet { @@ -134,8 +142,7 @@ func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSub } // set up host - var pubsubMessageSigning bool - var peerMgr net.IPeerMgr + var peerMgr peermgr.IPeerMgr rawHost, err := buildHost(ctx, config, libP2pOpts, config.Repo().Config()) if err != nil { @@ -148,25 +155,16 @@ func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSub } peerHost := routedHost(rawHost, router) - - // require message signing in online mode when we have priv key - pubsubMessageSigning = true - period, err := time.ParseDuration(config.Repo().Config().Bootstrap.Period) if err != nil { return nil, err } - peerMgr, err = net.NewPeerMgr(peerHost, router.(*dht.IpfsDHT), period, bootNodes) + peerMgr, err = peermgr.NewPeerMgr(peerHost, router.(*dht.IpfsDHT), period, bootNodes) if err != nil { return nil, err } - // do NOT start `peerMgr` in `offline` mode - if !config.OfflineMode() { - go peerMgr.Run(ctx) - } - // Set up libp2p network // The gossipsub heartbeat timeout needs to be set sufficiently low // to enable publishing on first connection. The default of one @@ -183,8 +181,7 @@ func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSub // goroutine, 8K -> 16K libp2pps.WithValidateThrottle(16 << 10), - libp2pps.WithMessageSigning(pubsubMessageSigning), - libp2pps.WithDiscovery(&discovery.NoopDiscovery{}), + libp2pps.WithMessageIdFn(HashMsgId), } gsub, err := libp2pps.NewGossipSub(ctx, peerHost, options...) if err != nil { @@ -218,7 +215,8 @@ func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSub } // build network network := net.New(peerHost, rawHost, net.NewRouter(router), bandwidthTracker) - + exchangeClient := filexchange.NewClient(peerHost, peerMgr) + helloHandler := helloprotocol.NewHelloProtocolHandler(peerHost, peerMgr, exchangeClient, chainStore, messageStore, config.GenesisCid(), time.Duration(config.Repo().Config().NetworkParams.BlockDelay)*time.Second) // build the network submdule return &NetworkSubmodule{ NetworkName: networkName, @@ -228,14 +226,24 @@ func NewNetworkSubmodule(ctx context.Context, config networkConfig) (*NetworkSub Pubsub: gsub, Bitswap: bswap, GraphExchange: gsync, + ExchangeClient: exchangeClient, Network: network, DataTransfer: dt, DataTransferHost: dtNet, PeerMgr: peerMgr, - Blockstore: config.Repo().Datastore(), + HelloHandler: helloHandler, + cfg: config, }, nil } +func (networkSubmodule *NetworkSubmodule) Start(ctx context.Context) error { + // do NOT start `peerMgr` in `offline` mode + if !networkSubmodule.cfg.OfflineMode() { + go networkSubmodule.PeerMgr.Run(ctx) + } + return nil +} + func (networkSubmodule *NetworkSubmodule) FetchMessagesByCids( ctx context.Context, service bserv.BlockService, @@ -407,3 +415,8 @@ func makeSmuxTransportOption() libp2p.Option { return libp2p.Muxer(yamuxID, &ymxtpt) } + +func HashMsgId(m *pubsub_pb.Message) string { + hash := blake2b.Sum256(m.Data) + return string(hash[:]) +} diff --git a/app/submodule/syncer/syncer_submodule.go b/app/submodule/syncer/syncer_submodule.go index 6be75d124c..7b20b3491b 100644 --- a/app/submodule/syncer/syncer_submodule.go +++ b/app/submodule/syncer/syncer_submodule.go @@ -22,7 +22,6 @@ import ( "go.opencensus.io/trace" "github.com/filecoin-project/venus/app/submodule/blockstore" - "github.com/filecoin-project/venus/app/submodule/discovery" "github.com/filecoin-project/venus/app/submodule/network" "github.com/filecoin-project/venus/pkg/beacon" "github.com/filecoin-project/venus/pkg/chain" @@ -42,10 +41,9 @@ var log = logging.Logger("sync.module") // nolint: deadcode // SyncerSubmodule enhances the node with chain syncing capabilities type SyncerSubmodule struct { //nolint - BlockstoreModule *blockstore.BlockstoreSubmodule - ChainModule *chain2.ChainSubmodule - NetworkModule *network.NetworkSubmodule - DiscoverySubmodule *discovery.DiscoverySubmodule + BlockstoreModule *blockstore.BlockstoreSubmodule + ChainModule *chain2.ChainSubmodule + NetworkModule *network.NetworkSubmodule // todo: use the 'Topic' and 'Subscription' defined in // "github.com/libp2p/go-libp2p-pubsub" replace which defined in @@ -82,7 +80,6 @@ func NewSyncerSubmodule(ctx context.Context, config syncerConfig, blockstore *blockstore.BlockstoreSubmodule, network *network.NetworkSubmodule, - discovery *discovery.DiscoverySubmodule, chn *chain2.ChainSubmodule, circulatingSupplyCalculator chain.ICirculatingSupplyCalcualtor, ) (*SyncerSubmodule, error) { @@ -134,19 +131,11 @@ func NewSyncerSubmodule(ctx context.Context, chn.Waiter.Stmgr = stmgr chainSyncManager, err := chainsync.NewManager(stmgr, blkValid, chn, nodeChainSelector, - blockstore.Blockstore, discovery.ExchangeClient, config.ChainClock(), chn.Fork) + blockstore.Blockstore, network.ExchangeClient, config.ChainClock(), chn.Fork) if err != nil { return nil, err } - discovery.PeerDiscoveryCallbacks = append(discovery.PeerDiscoveryCallbacks, func(ci *types.ChainInfo) { - err := chainSyncManager.BlockProposer().SendHello(ci) - if err != nil { - log.Errorf("error receiving chain info from hello %s: %s", ci, err) - return - } - }) - var ( slashFilter slashfilter.ISlashFilter ) @@ -158,19 +147,26 @@ func NewSyncerSubmodule(ctx context.Context, return nil, err } } + + network.HelloHandler.Register(func(ci *types.ChainInfo) { + err := chainSyncManager.BlockProposer().SendHello(ci) + if err != nil { + log.Errorf("error receiving chain info from hello %s: %s", ci, err) + return + } + }) return &SyncerSubmodule{ - Stmgr: stmgr, - BlockstoreModule: blockstore, - ChainModule: chn, - NetworkModule: network, - DiscoverySubmodule: discovery, - SlashFilter: slashFilter, - ChainSelector: nodeChainSelector, - ChainSyncManager: &chainSyncManager, - Drand: chn.Drand, - SyncProvider: *NewChainSyncProvider(&chainSyncManager), - BlockValidator: blkValid, + Stmgr: stmgr, + BlockstoreModule: blockstore, + ChainModule: chn, + NetworkModule: network, + SlashFilter: slashFilter, + ChainSelector: nodeChainSelector, + ChainSyncManager: &chainSyncManager, + Drand: chn.Drand, + SyncProvider: *NewChainSyncProvider(&chainSyncManager), + BlockValidator: blkValid, }, nil } @@ -216,7 +212,7 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub bm.Header.Height, bm.Header.Cid(), delay.String()) } - blkSvc := blockservice.New(syncer.NetworkModule.Blockstore, syncer.NetworkModule.Bitswap) + blkSvc := blockservice.New(syncer.BlockstoreModule.Blockstore, syncer.NetworkModule.Bitswap) if _, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages); err != nil { log.Errorf("fetch block bls messages failed:%s", err.Error()) diff --git a/go.mod b/go.mod index 9e5ae5ea4a..929c78d969 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/ahmetb/go-linq/v3 v3.2.0 github.com/awnumar/memguard v0.22.2 github.com/bluele/gcache v0.0.0-20190518031135-bc40bd653833 + github.com/dchest/blake2b v1.0.0 github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e github.com/dgraph-io/badger/v2 v2.2007.3 github.com/docker/go-units v0.4.0 @@ -72,7 +73,6 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.1.0 github.com/ipfs/go-ipfs-exchange-offline v0.2.0 github.com/ipfs/go-ipfs-files v0.0.9 - github.com/ipfs/go-ipfs-routing v0.2.1 github.com/ipfs/go-ipld-cbor v0.0.6 github.com/ipfs/go-ipld-format v0.4.0 github.com/ipfs/go-log v1.0.5 diff --git a/go.sum b/go.sum index df54e36b88..98d8371957 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= +github.com/dchest/blake2b v1.0.0 h1:KK9LimVmE0MjRl9095XJmKqZ+iLxWATvlcpVFRtaw6s= +github.com/dchest/blake2b v1.0.0/go.mod h1:U034kXgbJpCle2wSk5ybGIVhOSHCVLMDqOzcPEA0F7s= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4= github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo= diff --git a/pkg/chain/testing.go b/pkg/chain/testing.go index a2b68835fe..74f708b9a0 100644 --- a/pkg/chain/testing.go +++ b/pkg/chain/testing.go @@ -9,6 +9,7 @@ import ( "sync" "testing" + "github.com/filecoin-project/venus/pkg/net/exchange" "github.com/filecoin-project/venus/pkg/util/blockstoreutil" "github.com/ipld/go-car" @@ -26,7 +27,6 @@ import ( cbor "github.com/ipfs/go-ipld-cbor" "github.com/filecoin-project/venus/fixtures/assets" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" "github.com/filecoin-project/venus/pkg/clock" "github.com/filecoin-project/venus/pkg/config" "github.com/filecoin-project/venus/pkg/crypto" diff --git a/pkg/chainsync/chainsync.go b/pkg/chainsync/chainsync.go index 257dd8c251..68d613d1a3 100644 --- a/pkg/chainsync/chainsync.go +++ b/pkg/chainsync/chainsync.go @@ -4,9 +4,9 @@ import ( "context" chain2 "github.com/filecoin-project/venus/app/submodule/chain" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" "github.com/filecoin-project/venus/pkg/chainsync/types" "github.com/filecoin-project/venus/pkg/consensus" + "github.com/filecoin-project/venus/pkg/net/exchange" "github.com/filecoin-project/venus/pkg/statemanger" "github.com/filecoin-project/venus/pkg/util/blockstoreutil" types2 "github.com/filecoin-project/venus/venus-shared/types" diff --git a/pkg/chainsync/syncer/syncer.go b/pkg/chainsync/syncer/syncer.go index 1bc45ba6e7..8d97826a28 100644 --- a/pkg/chainsync/syncer/syncer.go +++ b/pkg/chainsync/syncer/syncer.go @@ -18,12 +18,12 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/venus/pkg/chain" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" "github.com/filecoin-project/venus/pkg/clock" "github.com/filecoin-project/venus/pkg/constants" "github.com/filecoin-project/venus/pkg/fork" "github.com/filecoin-project/venus/pkg/metrics" "github.com/filecoin-project/venus/pkg/metrics/tracing" + "github.com/filecoin-project/venus/pkg/net/exchange" "github.com/filecoin-project/venus/pkg/util/blockstoreutil" "github.com/filecoin-project/venus/venus-shared/actors/policy" "github.com/filecoin-project/venus/venus-shared/types" diff --git a/pkg/discovery/bootstrap.go b/pkg/discovery/bootstrap.go deleted file mode 100644 index e39c548c53..0000000000 --- a/pkg/discovery/bootstrap.go +++ /dev/null @@ -1,138 +0,0 @@ -package discovery - -import ( - "context" - "sync" - "time" - - "github.com/filecoin-project/venus/pkg/util/moresync" - logging "github.com/ipfs/go-log/v2" - host "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" - peer "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" -) - -var logBootstrap = logging.Logger("net.bootstrap") - -// Bootstrapper attempts to keep the p2p host connected to the filecoin network -// by keeping a minimum threshold of connections. If the threshold isn't met it -// connects to a random subset of the bootstrap peers. It does not use peer routing -// to discover new peers. To stop a Bootstrapper cancel the context passed in Start() -// or call Stop(). -type Bootstrapper struct { - // Config - // MinPeerThreshold is the number of connections it attempts to maintain. - MinPeerThreshold int - // Peers to connect to if we fall below the threshold. - bootstrapPeers []peer.AddrInfo - // Period is the interval at which it periodically checks to see - // if the threshold is maintained. - Period time.Duration - // ConnectionTimeout is how long to wait before timing out a connection attempt. - ConnectionTimeout time.Duration - - // Dependencies - h host.Host - d inet.Dialer - r routing.Routing - // Does the work. Usually Bootstrapper.bootstrap. Argument is a slice of - // currently-connected peers (so it won't attempt to reconnect). - Bootstrap func([]peer.ID) - - // Bookkeeping - /* never use `ticker` again */ - // ticker *time.Ticker - ctx context.Context - cancel context.CancelFunc - filecoinPeers *moresync.Latch -} - -// NewBootstrapper returns a new Bootstrapper that will attempt to keep connected -// to the filecoin network by connecting to the given bootstrap peers. -func NewBootstrapper(bootstrapPeers []peer.AddrInfo, h host.Host, d inet.Dialer, r routing.Routing, minPeer int, period time.Duration) *Bootstrapper { - b := &Bootstrapper{ - MinPeerThreshold: minPeer, - bootstrapPeers: bootstrapPeers, - Period: period, - ConnectionTimeout: 20 * time.Second, - - h: h, - d: d, - r: r, - - filecoinPeers: moresync.NewLatch(uint(minPeer)), - } - b.Bootstrap = b.bootstrap - return b -} - -// Start starts the Bootstrapper bootstrapping. Cancel `ctx` or call Stop() to stop it. -func (b *Bootstrapper) Start(ctx context.Context) { - b.ctx, b.cancel = context.WithCancel(ctx) - b.Bootstrap(nil) // boot first - - // does't need a ticker anymore - // b.ticker = time.NewTicker(b.Period) - - /* following commented logical was replaced by `PeerManager` */ - // go func() { - // defer b.ticker.Stop() - // - // for { - // select { - // case <-b.ctx.Done(): - // return - // case <-b.ticker.C: - // b.Bootstrap(b.d.Peers()) - // } - // } - // }() -} - -// Stop stops the Bootstrapper. -func (b *Bootstrapper) Stop() { - if b.cancel != nil { - b.cancel() - } -} - -// bootstrap does the actual work. If the number of connected peers -// has fallen below b.MinPeerThreshold it will attempt to connect to -// a random subset of its bootstrap peers. -func (b *Bootstrapper) bootstrap(currentPeers []peer.ID) { - ctx, cancel := context.WithTimeout(b.ctx, b.ConnectionTimeout) - var wg sync.WaitGroup - defer func() { - wg.Wait() - // After connecting to bootstrap peers, bootstrap the DHT. - // DHT Bootstrap is a persistent process so only do this once. - cancel() - }() - - for _, bootstrappPeer := range b.bootstrapPeers { - pinfo := bootstrappPeer - // Don't try to connect to an already connected peer. - if hasPID(currentPeers, pinfo.ID) { - continue - } - - wg.Add(1) - go func() { - if err := b.h.Connect(ctx, pinfo); err != nil { - logBootstrap.Debugf("got error trying to connect to bootstrap node %+v: %s", pinfo, err.Error()) - } - b.h.ConnManager().TagPeer(pinfo.ID, "boot-strap", 1000) - wg.Done() - }() - } -} - -func hasPID(pids []peer.ID, pid peer.ID) bool { - for _, p := range pids { - if p == pid { - return true - } - } - return false -} diff --git a/pkg/discovery/bootstrap_test.go b/pkg/discovery/bootstrap_test.go deleted file mode 100644 index 47b3a3d29b..0000000000 --- a/pkg/discovery/bootstrap_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package discovery - -import ( - "context" - "sync" - "testing" - "time" - - offroute "github.com/ipfs/go-ipfs-routing/offline" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/assert" - - "github.com/filecoin-project/venus/pkg/repo" - th "github.com/filecoin-project/venus/pkg/testhelpers" - tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" -) - -func panicConnect(_ context.Context, _ peer.AddrInfo) error { panic("shouldn't be called") } -func nopPeers() []peer.ID { return []peer.ID{} } -func panicPeers() []peer.ID { panic("shouldn't be called") } - -type blankValidator struct{} - -func (blankValidator) Validate(_ string, _ []byte) error { return nil } -func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } - -func TestBootstrapperStartAndStop(t *testing.T) { - tf.UnitTest(t) - - fakeHost := th.NewFakeHost() - fakeDialer := &th.FakeDialer{PeersImpl: nopPeers} - fakeRouter := offroute.NewOfflineRouter(repo.NewInMemoryRepo().ChainDatastore(), blankValidator{}) - - // Check that Start() causes Bootstrap() to be periodically called and - // that canceling the context causes it to stop being called. Do this - // by stubbing out Bootstrap to keep a count of the number of times it - // is called and to cancel its context after several calls. - b := NewBootstrapper([]peer.AddrInfo{}, fakeHost, fakeDialer, fakeRouter, 0, 200*time.Millisecond) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // protects callCount - var lk sync.Mutex - callCount := 0 - b.Bootstrap = func([]peer.ID) { - lk.Lock() - defer lk.Unlock() - callCount++ - if callCount == 1 { - - // If b.Period is configured to be a too small, b.ticker will tick - // again before the context's done-channel sees a value. This - // results in a callCount of 4 instead of 3. - cancel() - } - } - - b.Start(ctx) - time.Sleep(1000 * time.Millisecond) - - lk.Lock() - defer lk.Unlock() - assert.Equal(t, 1, callCount) -} - -func TestBootstrapperBootstrap(t *testing.T) { - tf.UnitTest(t) - - t.Run("Doesn't connect if already have enough peers", func(t *testing.T) { - fakeHost := &th.FakeHost{ConnectImpl: panicConnect} - fakeDialer := &th.FakeDialer{PeersImpl: panicPeers} - fakeRouter := offroute.NewOfflineRouter(repo.NewInMemoryRepo().ChainDatastore(), blankValidator{}) - ctx := context.Background() - - b := NewBootstrapper([]peer.AddrInfo{}, fakeHost, fakeDialer, fakeRouter, 1, time.Minute) - currentPeers := []peer.ID{th.RequireRandomPeerID(t)} // Have 1 - b.ctx = ctx - assert.NotPanics(t, func() { b.bootstrap(currentPeers) }) - }) - - var lk sync.Mutex - var connectCount int - countingConnect := func(context.Context, peer.AddrInfo) error { - lk.Lock() - defer lk.Unlock() - connectCount++ - return nil - } - - t.Run("Connects if don't have enough peers", func(t *testing.T) { - fakeHost := &th.FakeHost{ConnectImpl: countingConnect} - lk.Lock() - connectCount = 0 - lk.Unlock() - fakeDialer := &th.FakeDialer{PeersImpl: panicPeers} - fakeRouter := offroute.NewOfflineRouter(repo.NewInMemoryRepo().ChainDatastore(), blankValidator{}) - - bootstrapPeers := []peer.AddrInfo{ - {ID: th.RequireRandomPeerID(t)}, - {ID: th.RequireRandomPeerID(t)}, - } - b := NewBootstrapper(bootstrapPeers, fakeHost, fakeDialer, fakeRouter, 3, time.Minute) - b.ctx = context.Background() - currentPeers := []peer.ID{th.RequireRandomPeerID(t)} // Have 1 - b.bootstrap(currentPeers) - time.Sleep(20 * time.Millisecond) - lk.Lock() - assert.Equal(t, 2, connectCount) - lk.Unlock() - }) - - t.Run("Doesn't try to connect to an already connected peer", func(t *testing.T) { - fakeHost := &th.FakeHost{ConnectImpl: countingConnect} - lk.Lock() - connectCount = 0 - lk.Unlock() - fakeDialer := &th.FakeDialer{PeersImpl: panicPeers} - fakeRouter := offroute.NewOfflineRouter(repo.NewInMemoryRepo().ChainDatastore(), blankValidator{}) - - connectedPeerID := th.RequireRandomPeerID(t) - bootstrapPeers := []peer.AddrInfo{ - {ID: connectedPeerID}, - } - - b := NewBootstrapper(bootstrapPeers, fakeHost, fakeDialer, fakeRouter, 2, time.Minute) // Need 2 bootstrap peers. - b.ctx = context.Background() - currentPeers := []peer.ID{connectedPeerID} // Have 1, which is the bootstrap peer. - b.bootstrap(currentPeers) - time.Sleep(20 * time.Millisecond) - lk.Lock() - assert.Equal(t, 0, connectCount) - lk.Unlock() - }) -} diff --git a/pkg/discovery/noop_discovery.go b/pkg/discovery/noop_discovery.go deleted file mode 100644 index 162c309111..0000000000 --- a/pkg/discovery/noop_discovery.go +++ /dev/null @@ -1,29 +0,0 @@ -package discovery - -import ( - "context" - "time" - - libp2pdisc "github.com/libp2p/go-libp2p-core/discovery" - "github.com/libp2p/go-libp2p-core/peer" -) - -// NoopDiscovery satisfies the discovery interface without doing anything -type NoopDiscovery struct{} - -// FindPeers returns a dead channel that is always closed -func (sd *NoopDiscovery) FindPeers(ctx context.Context, ns string, opts ...libp2pdisc.Option) (<-chan peer.AddrInfo, error) { // nolint: staticcheck - closedCh := make(chan peer.AddrInfo) // nolint: staticcheck - // the output is immediately closed, discovery requests end immediately - // Callstack: - // https://github.com/libp2p/go-libp2p-pubsub/blob/55f4ad6eb98b9e617e46641e7078944781abb54c/discovery.go#L157 - // https://github.com/libp2p/go-libp2p-pubsub/blob/55f4ad6eb98b9e617e46641e7078944781abb54c/discovery.go#L287 - // https://github.com/libp2p/go-libp2p-discovery/blob/master/backoffconnector.go#L52 - close(closedCh) - return closedCh, nil -} - -// Advertise does nothing and returns 1 hour. -func (sd *NoopDiscovery) Advertise(ctx context.Context, ns string, opts ...libp2pdisc.Option) (time.Duration, error) { // nolint: staticcheck - return time.Hour, nil -} diff --git a/pkg/discovery/peer_tracker.go b/pkg/discovery/peer_tracker.go deleted file mode 100644 index 76b0ce96b7..0000000000 --- a/pkg/discovery/peer_tracker.go +++ /dev/null @@ -1,142 +0,0 @@ -package discovery - -import ( - "sort" - "sync" - - "github.com/filecoin-project/venus/venus-shared/types" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" -) - -var logPeerTracker = logging.Logger("peer-tracker") - -// PeerTracker is used to record a subset of peers. Its methods are thread safe. -// It is designed to plug directly into libp2p disconnect notifications to -// automatically register dropped connections. -type PeerTracker struct { - // mu protects peers - mu sync.RWMutex - - // self tracks the ID of the peer tracker's owner - self peer.ID - - // peers maps peer.IDs to info about their chains - peers map[peer.ID]*types.ChainInfo - trusted map[peer.ID]struct{} -} - -// NewPeerTracker creates a peer tracker. -func NewPeerTracker(self peer.ID, trust ...peer.ID) *PeerTracker { - trustedSet := make(map[peer.ID]struct{}, len(trust)) - for _, t := range trust { - trustedSet[t] = struct{}{} - } - return &PeerTracker{ - peers: make(map[peer.ID]*types.ChainInfo), - trusted: trustedSet, - self: self, - } -} - -// SelectHead returns the chain info from trusted peers with the greatest height. -// An error is returned if no peers are in the tracker. -func (tracker *PeerTracker) SelectHead() (*types.ChainInfo, error) { - heads := tracker.listTrusted() - if len(heads) == 0 { - return nil, errors.New("no peers tracked") - } - sort.Slice(heads, func(i, j int) bool { return heads[i].Head.Height() > heads[j].Head.Height() }) - return heads[0], nil -} - -// Track adds information about a given peer.ID -func (tracker *PeerTracker) Track(ci *types.ChainInfo) { - tracker.mu.Lock() - defer tracker.mu.Unlock() - - _, tracking := tracker.peers[ci.Sender] - _, trusted := tracker.trusted[ci.Sender] - tracker.peers[ci.Sender] = ci - logPeerTracker.Infow("Track peer", "chainInfo", ci, "new", !tracking, "count", len(tracker.peers), "trusted", trusted) -} - -// Self returns the peer tracker's owner ID -func (tracker *PeerTracker) Self() peer.ID { - return tracker.self -} - -// List returns the chain info of the currently tracked peers (both trusted and untrusted). -// The info tracked by the tracker can change arbitrarily after this is called -- there is no -// guarantee that the peers returned will be tracked when they are used by the caller and no -// guarantee that the chain info is up to date. -func (tracker *PeerTracker) List() []*types.ChainInfo { - tracker.mu.Lock() - defer tracker.mu.Unlock() - - var tracked []*types.ChainInfo - for _, ci := range tracker.peers { - tracked = append(tracked, ci) - } - out := make([]*types.ChainInfo, len(tracked)) - copy(out, tracked) - return out -} - -// Remove removes a peer ID from the tracker. -func (tracker *PeerTracker) Remove(pid peer.ID) { - tracker.mu.Lock() - defer tracker.mu.Unlock() - - _, trusted := tracker.trusted[pid] - if _, tracking := tracker.peers[pid]; tracking { - delete(tracker.peers, pid) - if trusted { - logPeerTracker.Warnw("Dropping peer", "peer", pid.Pretty(), "trusted", trusted) - } else { - logPeerTracker.Infow("Dropping peer", "peer", pid.Pretty(), "trusted", trusted) - } - } -} - -// RegisterDisconnect registers a tracker remove operation as a libp2p -// "Disconnected" network event callback. -func (tracker *PeerTracker) RegisterDisconnect(ntwk network.Network) { - notifee := &network.NotifyBundle{} - notifee.DisconnectedF = func(network network.Network, conn network.Conn) { - pid := conn.RemotePeer() - tracker.Remove(pid) - } - ntwk.Notify(notifee) -} - -// trustedPeers returns a slice of peers trusted by the PeerTracker. trustedPeers remain constant after -// the PeerTracker has been initialized. -// nolint -func (tracker *PeerTracker) trustedPeers() []peer.ID { - var peers []peer.ID - for p := range tracker.trusted { - peers = append(peers, p) - } - return peers -} - -// listTrusted returns the chain info of the trusted tracked peers. The info tracked by the tracker can -// change arbitrarily after this is called -- there is no guarantee that the peers returned will be -// tracked when they are used by the caller and no guarantee that the chain info is up to date. -func (tracker *PeerTracker) listTrusted() []*types.ChainInfo { - tracker.mu.Lock() - defer tracker.mu.Unlock() - - var tracked []*types.ChainInfo - for p, ci := range tracker.peers { - if _, trusted := tracker.trusted[p]; trusted { - tracked = append(tracked, ci) - } - } - out := make([]*types.ChainInfo, len(tracked)) - copy(out, tracked) - return out -} diff --git a/pkg/discovery/peer_tracker_test.go b/pkg/discovery/peer_tracker_test.go deleted file mode 100644 index dd7278b8f4..0000000000 --- a/pkg/discovery/peer_tracker_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package discovery_test - -import ( - "testing" - "time" - - "github.com/filecoin-project/venus/pkg/util/test" - "github.com/filecoin-project/venus/venus-shared/types" - "github.com/libp2p/go-libp2p-core/network" - - "github.com/filecoin-project/venus/pkg/discovery" - "github.com/libp2p/go-libp2p-core/peer" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - th "github.com/filecoin-project/venus/pkg/testhelpers" - tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" -) - -func TestPeerTrackerSelectHead(t *testing.T) { - tf.UnitTest(t) - - pid0 := th.RequireIntPeerID(t, 0) - pid1 := th.RequireIntPeerID(t, 1) - pid2 := th.RequireIntPeerID(t, 2) - pid3 := th.RequireIntPeerID(t, 3) - - ci0 := types.NewChainInfo(pid0, pid0, th.RequireTipsetWithHeight(t, 6)) - ci1 := types.NewChainInfo(pid1, pid1, th.RequireTipsetWithHeight(t, 10)) - ci2 := types.NewChainInfo(pid2, pid2, th.RequireTipsetWithHeight(t, 7)) - ci3 := types.NewChainInfo(pid3, pid3, th.RequireTipsetWithHeight(t, 9)) - - // trusting pid2 and pid3 - tracker := discovery.NewPeerTracker(pid2, pid3) - tracker.Track(ci0) - tracker.Track(ci1) - tracker.Track(ci2) - tracker.Track(ci3) - - // select the highest head - head, err := tracker.SelectHead() - assert.NoError(t, err) - assert.Equal(t, head.Head, ci3.Head) -} - -func TestPeerTrackerRemove(t *testing.T) { - tf.UnitTest(t) - - tracker := discovery.NewPeerTracker(peer.ID("")) - pid0 := th.RequireIntPeerID(t, 0) - pid1 := th.RequireIntPeerID(t, 1) - pid3 := th.RequireIntPeerID(t, 3) - pid7 := th.RequireIntPeerID(t, 7) - - ci0 := types.NewChainInfo(pid0, pid0, th.RequireTipsetWithHeight(t, 6)) - ci1 := types.NewChainInfo(pid1, pid1, th.RequireTipsetWithHeight(t, 0)) - ci3 := types.NewChainInfo(pid3, pid3, th.RequireTipsetWithHeight(t, 0)) - ci7 := types.NewChainInfo(pid7, pid7, th.RequireTipsetWithHeight(t, 0)) - - tracker.Track(ci0) - tracker.Track(ci1) - tracker.Track(ci3) - tracker.Track(ci7) - - tracker.Remove(pid1) - tracker.Remove(pid3) - tracker.Remove(pid7) - - tracked := tracker.List() - expected := []*types.ChainInfo{ci0} - assert.Equal(t, expected, tracked) -} - -func TestPeerTrackerNetworkDisconnect(t *testing.T) { - tf.UnitTest(t) - - mn, err := mocknet.FullMeshConnected(4) - require.NoError(t, err) - - self := mn.Hosts()[0] - a := mn.Hosts()[1] - b := mn.Hosts()[2] - c := mn.Hosts()[3] - - selfID := self.ID() - aID := a.ID() - bID := b.ID() - cID := c.ID() - - aCI := types.NewChainInfo(aID, aID, th.RequireTipsetWithHeight(t, 0)) - bCI := types.NewChainInfo(bID, bID, th.RequireTipsetWithHeight(t, 0)) - - // self is the tracking node - // self tracks peers a and b - // self does not track peer c - tracker := discovery.NewPeerTracker(peer.ID("")) - tracker.Track(aCI) - tracker.Track(bCI) - - // register tracker OnDisconnect callback in self's network - disconnect := make(chan error) - notifee := &network.NotifyBundle{} - notifee.DisconnectedF = func(network network.Network, conn network.Conn) { - disconnect <- nil - } - self.Network().Notify(notifee) - - tracker.RegisterDisconnect(self.Network()) - - waitForDisConnect := func() { - select { - case <-time.After(time.Second * 5): - t.Errorf("time out for wait disconnect notify") - case <-disconnect: - } - } - - // disconnect from tracked a and untracked c - require.NoError(t, mn.DisconnectPeers(selfID, aID)) - waitForDisConnect() - require.NoError(t, mn.DisconnectPeers(selfID, cID)) - waitForDisConnect() - - time.Sleep(time.Second) - tracked := tracker.List() - test.Equal(t, []*types.ChainInfo{bCI}, tracked) -} diff --git a/pkg/net/address.go b/pkg/net/address.go deleted file mode 100644 index cd4eb1ba3e..0000000000 --- a/pkg/net/address.go +++ /dev/null @@ -1,34 +0,0 @@ -package net - -import ( - "github.com/libp2p/go-libp2p-core/peer" - ma "github.com/multiformats/go-multiaddr" -) - -// PeerAddrsToAddrInfo converts a slice of string peer addresses -// (multiaddr + ipfs peerid) to PeerInfos. -func PeerAddrsToAddrInfo(addrs []string) ([]peer.AddrInfo, error) { - var pis []peer.AddrInfo - for _, addr := range addrs { - a, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - - pinfo, err := peer.AddrInfoFromP2pAddr(a) - if err != nil { - return nil, err - } - pis = append(pis, *pinfo) - } - return pis, nil -} - -// AddrInfoToPeerIDs converts a slice of AddrInfo to a slice of peerID's. -func AddrInfoToPeerIDs(ai []peer.AddrInfo) []peer.ID { - var pis []peer.ID - for _, a := range ai { - pis = append(pis, a.ID) - } - return pis -} diff --git a/pkg/net/address_test.go b/pkg/net/address_test.go deleted file mode 100644 index 1cc87824b3..0000000000 --- a/pkg/net/address_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package net - -import ( - "testing" - - tf "github.com/filecoin-project/venus/pkg/testhelpers/testflags" - "github.com/stretchr/testify/assert" -) - -func TestPeerAddrsToPeerInfosSuccess(t *testing.T) { - tf.UnitTest(t) - - addrs := []string{ - "/ip4/127.0.0.1/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", - "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", - } - pis, err := PeerAddrsToAddrInfo(addrs) - assert.NoError(t, err) - assert.Equal(t, "QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", pis[0].ID.Pretty()) - assert.Equal(t, "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", pis[1].ID.Pretty()) -} - -func TestPeerAddrsToPeerInfosFailure(t *testing.T) { - tf.UnitTest(t) - - addrs := []string{ - "/ipv4/no/such/address/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", - } - _, err := PeerAddrsToAddrInfo(addrs) - assert.Error(t, err) -} diff --git a/pkg/net/protocols.go b/pkg/net/dht.go similarity index 100% rename from pkg/net/protocols.go rename to pkg/net/dht.go diff --git a/pkg/chainsync/exchange/cbor_gen.go b/pkg/net/exchange/cbor_gen.go similarity index 96% rename from pkg/chainsync/exchange/cbor_gen.go rename to pkg/net/exchange/cbor_gen.go index 02af41f3f1..ee0b338526 100644 --- a/pkg/chainsync/exchange/cbor_gen.go +++ b/pkg/net/exchange/cbor_gen.go @@ -7,7 +7,7 @@ import ( "io" "sort" - chain "github.com/filecoin-project/venus/venus-shared/types" + "github.com/filecoin-project/venus/venus-shared/types" cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" @@ -270,7 +270,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { scratch := make([]byte, 9) - // t.Bls ([]*chain.Message) (slice) + // t.Bls ([]*types.Message) (slice) if len(t.Bls) > cbg.MaxLength { return xerrors.Errorf("Slice value in field t.Bls was too long") } @@ -307,7 +307,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { } } - // t.Secpk ([]*chain.SignedMessage) (slice) + // t.Secpk ([]*types.SignedMessage) (slice) if len(t.Secpk) > cbg.MaxLength { return xerrors.Errorf("Slice value in field t.Secpk was too long") } @@ -364,7 +364,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Bls ([]*chain.Message) (slice) + // t.Bls ([]*types.Message) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -380,12 +380,12 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.Bls = make([]*chain.Message, extra) + t.Bls = make([]*types.Message, extra) } for i := 0; i < int(extra); i++ { - var v chain.Message + var v types.Message if err := v.UnmarshalCBOR(br); err != nil { return err } @@ -452,7 +452,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) error { } } - // t.Secpk ([]*chain.SignedMessage) (slice) + // t.Secpk ([]*types.SignedMessage) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -468,12 +468,12 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.Secpk = make([]*chain.SignedMessage, extra) + t.Secpk = make([]*types.SignedMessage, extra) } for i := 0; i < int(extra); i++ { - var v chain.SignedMessage + var v types.SignedMessage if err := v.UnmarshalCBOR(br); err != nil { return err } @@ -556,7 +556,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { scratch := make([]byte, 9) - // t.Blocks ([]*chain.BlockHeader) (slice) + // t.Blocks ([]*types.BlockHeader) (slice) if len(t.Blocks) > cbg.MaxLength { return xerrors.Errorf("Slice value in field t.Blocks was too long") } @@ -595,7 +595,7 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Blocks ([]*chain.BlockHeader) (slice) + // t.Blocks ([]*types.BlockHeader) (slice) maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) if err != nil { @@ -611,12 +611,12 @@ func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { } if extra > 0 { - t.Blocks = make([]*chain.BlockHeader, extra) + t.Blocks = make([]*types.BlockHeader, extra) } for i := 0; i < int(extra); i++ { - var v chain.BlockHeader + var v types.BlockHeader if err := v.UnmarshalCBOR(br); err != nil { return err } diff --git a/pkg/chainsync/exchange/client.go b/pkg/net/exchange/client.go similarity index 99% rename from pkg/chainsync/exchange/client.go rename to pkg/net/exchange/client.go index 2676c2acd7..04da85909c 100644 --- a/pkg/chainsync/exchange/client.go +++ b/pkg/net/exchange/client.go @@ -19,7 +19,7 @@ import ( "go.opencensus.io/trace" - "github.com/filecoin-project/venus/pkg/net" + "github.com/filecoin-project/venus/pkg/net/peermgr" "github.com/filecoin-project/venus/venus-shared/types" ) @@ -41,7 +41,7 @@ var _ Client = (*client)(nil) // NewClient creates a new libp2p-based exchange.Client that uses the libp2p // ChainExhange protocol as the fetching mechanism. -func NewClient(host host.Host, pmgr net.IPeerMgr) Client { +func NewClient(host host.Host, pmgr peermgr.IPeerMgr) Client { return &client{ host: host, peerTracker: newPeerTracker(host, pmgr), diff --git a/pkg/chainsync/exchange/doc.go b/pkg/net/exchange/doc.go similarity index 100% rename from pkg/chainsync/exchange/doc.go rename to pkg/net/exchange/doc.go diff --git a/pkg/chainsync/exchange/inct.go b/pkg/net/exchange/inct.go similarity index 100% rename from pkg/chainsync/exchange/inct.go rename to pkg/net/exchange/inct.go diff --git a/pkg/chainsync/exchange/interfaces.go b/pkg/net/exchange/interfaces.go similarity index 100% rename from pkg/chainsync/exchange/interfaces.go rename to pkg/net/exchange/interfaces.go diff --git a/pkg/chainsync/exchange/peer_tracker.go b/pkg/net/exchange/peer_tracker.go similarity index 93% rename from pkg/chainsync/exchange/peer_tracker.go rename to pkg/net/exchange/peer_tracker.go index 2d14e022cc..24d74e99fd 100644 --- a/pkg/chainsync/exchange/peer_tracker.go +++ b/pkg/net/exchange/peer_tracker.go @@ -10,7 +10,7 @@ import ( host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "github.com/filecoin-project/venus/pkg/net" + "github.com/filecoin-project/venus/pkg/net/peermgr" ) type peerStats struct { @@ -26,23 +26,23 @@ type bsPeerTracker struct { peers map[peer.ID]*peerStats avgGlobalTime time.Duration - pmgr net.IPeerMgr + pmgr peermgr.IPeerMgr } -func newPeerTracker(h host.Host, pmgr net.IPeerMgr) *bsPeerTracker { +func newPeerTracker(h host.Host, pmgr peermgr.IPeerMgr) *bsPeerTracker { bsPt := &bsPeerTracker{ peers: make(map[peer.ID]*peerStats), pmgr: pmgr, } - sub, err := h.EventBus().Subscribe(new(net.NewFilPeer)) + sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) if err != nil { panic(err) } go func() { for newPeer := range sub.Out() { - bsPt.addPeer(newPeer.(net.NewFilPeer).Id) + bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) } }() return bsPt diff --git a/pkg/chainsync/exchange/protocol.go b/pkg/net/exchange/protocol.go similarity index 100% rename from pkg/chainsync/exchange/protocol.go rename to pkg/net/exchange/protocol.go diff --git a/pkg/chainsync/exchange/server.go b/pkg/net/exchange/server.go similarity index 100% rename from pkg/chainsync/exchange/server.go rename to pkg/net/exchange/server.go diff --git a/pkg/discovery/cbor_gen.go b/pkg/net/helloprotocol/cbor_gen.go similarity index 99% rename from pkg/discovery/cbor_gen.go rename to pkg/net/helloprotocol/cbor_gen.go index 17579c33b3..9f20ab9b6d 100644 --- a/pkg/discovery/cbor_gen.go +++ b/pkg/net/helloprotocol/cbor_gen.go @@ -1,6 +1,6 @@ // Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. -package discovery +package helloprotocol import ( "fmt" diff --git a/pkg/discovery/hello_protocol.go b/pkg/net/helloprotocol/hello_protocol.go similarity index 95% rename from pkg/discovery/hello_protocol.go rename to pkg/net/helloprotocol/hello_protocol.go index 16e7be1585..950511a08c 100644 --- a/pkg/discovery/hello_protocol.go +++ b/pkg/net/helloprotocol/hello_protocol.go @@ -1,4 +1,4 @@ -package discovery +package helloprotocol import ( "bytes" @@ -7,13 +7,13 @@ import ( "time" "github.com/filecoin-project/venus/pkg/chain" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" + "github.com/filecoin-project/venus/pkg/net/exchange" + "github.com/filecoin-project/venus/pkg/net/peermgr" "github.com/filecoin-project/venus/venus-shared/types" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-state-types/abi" fbig "github.com/filecoin-project/go-state-types/big" - fnet "github.com/filecoin-project/venus/pkg/net" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" @@ -60,14 +60,10 @@ type HelloProtocolHandler struct { // peerDiscovered is called when new peers tell us about their chain peerDiscovered PeerDiscoveredCallback - // is used to retrieve the current heaviest tipset - // for filling out our hello messages. - getHeaviestTipSet GetTipSetFunc - //helloTimeOut is block delay helloTimeOut time.Duration - peerMgr fnet.IPeerMgr + peerMgr peermgr.IPeerMgr exchange exchange.Client chainStore *chain.Store messageStore *chain.MessageStore @@ -80,7 +76,7 @@ type GetTipSetFunc func() (*types.TipSet, error) // NewHelloProtocolHandler creates a new instance of the hello protocol `Handler` and registers it to // the given `host.Host`. func NewHelloProtocolHandler(h host.Host, - peerMgr fnet.IPeerMgr, + peerMgr peermgr.IPeerMgr, exchange exchange.Client, chainStore *chain.Store, messageStore *chain.MessageStore, @@ -99,10 +95,9 @@ func NewHelloProtocolHandler(h host.Host, } // Register registers the handler with the network. -func (h *HelloProtocolHandler) Register(peerDiscoveredCallback PeerDiscoveredCallback, getHeaviestTipSet GetTipSetFunc) { +func (h *HelloProtocolHandler) Register(peerDiscoveredCallback PeerDiscoveredCallback) { // register callbacks h.peerDiscovered = peerDiscoveredCallback - h.getHeaviestTipSet = getHeaviestTipSet // register a handle for when a new connection against someone is created h.host.SetStreamHandler(helloProtocolID, h.handleNewStream) @@ -217,10 +212,7 @@ func (h *HelloProtocolHandler) loadLocalFullTipset(ctx context.Context, tsk type var ErrBadGenesis = fmt.Errorf("bad genesis block") func (h *HelloProtocolHandler) getOurHelloMessage() (*HelloMessage, error) { - heaviest, err := h.getHeaviestTipSet() - if err != nil { - return nil, err - } + heaviest := h.chainStore.GetHead() height := heaviest.Height() weight := heaviest.ParentWeight() diff --git a/pkg/discovery/hello_protocol_test.go b/pkg/net/helloprotocol/hello_protocol_test.go similarity index 73% rename from pkg/discovery/hello_protocol_test.go rename to pkg/net/helloprotocol/hello_protocol_test.go index f4e5b396e6..df38d37ba8 100644 --- a/pkg/discovery/hello_protocol_test.go +++ b/pkg/net/helloprotocol/hello_protocol_test.go @@ -1,7 +1,9 @@ -package discovery_test +package helloprotocol_test import ( "context" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" + "github.com/filecoin-project/venus/pkg/net/peermgr" "testing" "time" @@ -17,7 +19,6 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/venus/pkg/chain" - "github.com/filecoin-project/venus/pkg/discovery" "github.com/filecoin-project/venus/pkg/net" "github.com/filecoin-project/venus/pkg/repo" th "github.com/filecoin-project/venus/pkg/testhelpers" @@ -59,17 +60,20 @@ func TestHelloHandshake(t *testing.T) { store := builder.Store() mstore := builder.Mstore() heavy1 := builder.AppendOn(ctx, genesisA, 1) - heavy2 := builder.AppendOn(ctx, heavy1, 1) + oldStore := copyStoreAndSetHead(ctx, store, heavy1) + heavy2 := builder.AppendOn(ctx, heavy1, 1) + _ = store.SetHead(ctx, heavy2) msc1, msc2 := new(mockHelloCallback), new(mockHelloCallback) - hg1, hg2 := &mockHeaviestGetter{heavy1}, &mockHeaviestGetter{heavy2} + + //hg1, hg2 := &mockHeaviestGetter{heavy1}, &mockHeaviestGetter{heavy2} //peer manager aPeerMgr, err := mockPeerMgr(ctx, t, a) require.NoError(t, err) - discovery.NewHelloProtocolHandler(a, aPeerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback, hg1.getHeaviestTipSet) - discovery.NewHelloProtocolHandler(b, aPeerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback, hg2.getHeaviestTipSet) + helloprotocol.NewHelloProtocolHandler(a, aPeerMgr, nil, oldStore, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback) + helloprotocol.NewHelloProtocolHandler(b, aPeerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback) msc1.On("HelloCallback", b.ID(), heavy2.Key()).Return() msc2.On("HelloCallback", a.ID(), heavy1.Key()).Return() @@ -121,17 +125,19 @@ func TestHelloBadGenesis(t *testing.T) { genesisB := builder.AppendOn(ctx, types.UndefTipSet, 1) heavy1 := builder.AppendOn(ctx, genesisA, 1) + oldStore := copyStoreAndSetHead(ctx, store, heavy1) + heavy2 := builder.AppendOn(ctx, heavy1, 1) + _ = store.SetHead(ctx, heavy2) msc1, msc2 := new(mockHelloCallback), new(mockHelloCallback) - hg1, hg2 := &mockHeaviestGetter{heavy1}, &mockHeaviestGetter{heavy2} //peer manager peerMgr, err := mockPeerMgr(ctx, t, a) require.NoError(t, err) - discovery.NewHelloProtocolHandler(a, peerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback, hg1.getHeaviestTipSet) - discovery.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisB.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback, hg2.getHeaviestTipSet) + helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, oldStore, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback) + helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisB.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback) msc1.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return() msc2.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return() @@ -166,17 +172,18 @@ func TestHelloMultiBlock(t *testing.T) { heavy1 := builder.AppendOn(ctx, genesisTipset, 3) heavy1 = builder.AppendOn(ctx, heavy1, 3) - heavy2 := builder.AppendOn(ctx, heavy1, 3) + oldStore := copyStoreAndSetHead(ctx, store, heavy1) + heavy2 := builder.AppendOn(ctx, heavy1, 3) + _ = store.SetHead(ctx, heavy2) msc1, msc2 := new(mockHelloCallback), new(mockHelloCallback) - hg1, hg2 := &mockHeaviestGetter{heavy1}, &mockHeaviestGetter{heavy2} //peer manager peerMgr, err := mockPeerMgr(ctx, t, a) require.NoError(t, err) - discovery.NewHelloProtocolHandler(a, peerMgr, nil, store, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc1.HelloCallback, hg1.getHeaviestTipSet) - discovery.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc2.HelloCallback, hg2.getHeaviestTipSet) + helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, oldStore, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc1.HelloCallback) + helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc2.HelloCallback) msc1.On("HelloCallback", b.ID(), heavy2.Key()).Return() msc2.On("HelloCallback", a.ID(), heavy1.Key()).Return() @@ -190,9 +197,15 @@ func TestHelloMultiBlock(t *testing.T) { msc2.AssertExpectations(t) } -func mockPeerMgr(ctx context.Context, t *testing.T, h host.Host) (*net.PeerMgr, error) { +func mockPeerMgr(ctx context.Context, t *testing.T, h host.Host) (*peermgr.PeerMgr, error) { addrInfo, err := net.ParseAddresses(ctx, repo.NewInMemoryRepo().Config().Bootstrap.Addresses) require.NoError(t, err) - return net.NewPeerMgr(h, dht.NewDHT(ctx, h, ds.NewMapDatastore()), 10, addrInfo) + return peermgr.NewPeerMgr(h, dht.NewDHT(ctx, h, ds.NewMapDatastore()), 10, addrInfo) +} + +func copyStoreAndSetHead(ctx context.Context, store *chain.Store, ts *types.TipSet) *chain.Store { + storeCopy := *store + storeCopy.SetHead(ctx, ts) + return &storeCopy } diff --git a/pkg/net/peermgr.go b/pkg/net/peermgr/peermgr.go similarity index 99% rename from pkg/net/peermgr.go rename to pkg/net/peermgr/peermgr.go index 99ce2ce532..cce83686fb 100644 --- a/pkg/net/peermgr.go +++ b/pkg/net/peermgr/peermgr.go @@ -1,4 +1,4 @@ -package net +package peermgr import ( "context" diff --git a/pkg/testhelpers/net.go b/pkg/testhelpers/net.go index fb7af50b25..7a1f091426 100644 --- a/pkg/testhelpers/net.go +++ b/pkg/testhelpers/net.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" + "github.com/filecoin-project/venus/pkg/net/exchange" "github.com/filecoin-project/venus/venus-shared/types" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/connmgr" diff --git a/tools/gen_cbor/main.go b/tools/gen_cbor/main.go index 76ffb474e3..76d1cef8f8 100644 --- a/tools/gen_cbor/main.go +++ b/tools/gen_cbor/main.go @@ -2,10 +2,10 @@ package main import ( "github.com/filecoin-project/venus/pkg/chain" - "github.com/filecoin-project/venus/pkg/chainsync/exchange" - "github.com/filecoin-project/venus/pkg/discovery" "github.com/filecoin-project/venus/pkg/fvm" "github.com/filecoin-project/venus/pkg/market" + "github.com/filecoin-project/venus/pkg/net" + "github.com/filecoin-project/venus/pkg/net/exchange" "github.com/filecoin-project/venus/pkg/paychmgr" "github.com/filecoin-project/venus/pkg/state/tree" "github.com/filecoin-project/venus/pkg/vm/dispatch" @@ -28,8 +28,8 @@ func main() { } if err := gen.WriteTupleEncodersToFile("./pkg/discovery/cbor_gen.go", "discovery", - discovery.HelloMessage{}, - discovery.LatencyMessage{}, + net.HelloMessage{}, + net.LatencyMessage{}, ); err != nil { panic(err) } From c08b4c74ed0d67e9cb14d3d28a8143b1977b61ed Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Fri, 5 Aug 2022 17:10:10 +0800 Subject: [PATCH 2/2] fix: fix test error in venus-shared --- .github/workflows/test.yml | 5 ++- Makefile | 4 +- app/node/builder.go | 4 +- app/submodule/network/network_submodule.go | 5 ++- app/submodule/syncer/syncer_submodule.go | 2 +- pkg/net/helloprotocol/hello_protocol.go | 2 +- pkg/net/helloprotocol/hello_protocol_test.go | 41 +++++++++----------- tools/gen_cbor/main.go | 6 +-- 8 files changed, 34 insertions(+), 35 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 37dd689f0d..e444f80b8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -59,6 +59,9 @@ jobs: ./gengen --keypath ./fixtures/live --out-car ./fixtures/live/genesis.car --out-json ./fixtures/live/gen.json --config ./fixtures/setup.json ./gengen --keypath ./fixtures/test --out-car ./fixtures/test/genesis.car --out-json ./fixtures/test/gen.json --config ./fixtures/setup.json + - name: Integration Test + run: go test -coverpkg=./... -coverprofile=coverage_venus_shared.txt -covermode=atomic -timeout=30m -parallel=4 -v ./venus-shared/... + - name: Unit Test run: go test -coverpkg=./... -coverprofile=coverage_unit.txt -covermode=atomic -timeout=30m -parallel=4 -v ./... -integration=false -unit=true @@ -69,7 +72,7 @@ jobs: uses: codecov/codecov-action@v2 with: token: - files: ./coverage_unit.txt,./coverage_integration.txt + files: ./coverage_unit.txt,./coverage_integration.txt,./coverage_venus_shared.txt flags: unittests,integration name: venus fail_ci_if_error: true diff --git a/Makefile b/Makefile index 8e8ef4f62d..622db30489 100644 --- a/Makefile +++ b/Makefile @@ -95,12 +95,12 @@ actor-render: actor-replica: cd venus-devtool && go run ./compatible/actors/*.go replica --dst ../venus-shared/actors/ -test: +test:test-venus-shared go build -o genesis-file-server ./tools/genesis-file-server go build -o gengen ./tools/gengen ./gengen --keypath ./fixtures/live --out-car ./fixtures/live/genesis.car --out-json ./fixtures/live/gen.json --config ./fixtures/setup.json ./gengen --keypath ./fixtures/test --out-car ./fixtures/test/genesis.car --out-json ./fixtures/test/gen.json --config ./fixtures/setup.json - go test -v ./... -integration=true -unit=false + go test $(go list ./... | grep -v /venus-shared/) -unit=false lint: $(BUILD_DEPS) golangci-lint run diff --git a/app/node/builder.go b/app/node/builder.go index eb2527c328..8d01c42cb8 100644 --- a/app/node/builder.go +++ b/app/node/builder.go @@ -3,9 +3,10 @@ package node import ( "context" "fmt" + "time" + "github.com/filecoin-project/venus/app/submodule/dagservice" "github.com/filecoin-project/venus/app/submodule/network" - "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -127,7 +128,6 @@ func (b *Builder) build(ctx context.Context) (*Node, error) { return nil, errors.Wrap(err, "failed to build node.dagservice") } - nd.syncer, err = syncer.NewSyncerSubmodule(ctx, (*builder)(b), nd.blockstore, nd.network, nd.chain, nd.circulatiingSupplyCalculator) if err != nil { return nil, errors.Wrap(err, "failed to build node.Syncer") diff --git a/app/submodule/network/network_submodule.go b/app/submodule/network/network_submodule.go index 035a92855d..6d7bd80ce9 100644 --- a/app/submodule/network/network_submodule.go +++ b/app/submodule/network/network_submodule.go @@ -4,11 +4,12 @@ import ( "bytes" "context" "fmt" - "github.com/filecoin-project/venus/pkg/net/helloprotocol" "os" "runtime" "time" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" + "github.com/dchest/blake2b" "github.com/ipfs/go-bitswap" bsnet "github.com/ipfs/go-bitswap/network" @@ -232,7 +233,7 @@ func NewNetworkSubmodule(ctx context.Context, chainStore *chain.Store, DataTransferHost: dtNet, PeerMgr: peerMgr, HelloHandler: helloHandler, - cfg: config, + cfg: config, }, nil } diff --git a/app/submodule/syncer/syncer_submodule.go b/app/submodule/syncer/syncer_submodule.go index 7b20b3491b..e593b6d757 100644 --- a/app/submodule/syncer/syncer_submodule.go +++ b/app/submodule/syncer/syncer_submodule.go @@ -147,7 +147,7 @@ func NewSyncerSubmodule(ctx context.Context, return nil, err } } - + network.HelloHandler.Register(func(ci *types.ChainInfo) { err := chainSyncManager.BlockProposer().SendHello(ci) if err != nil { diff --git a/pkg/net/helloprotocol/hello_protocol.go b/pkg/net/helloprotocol/hello_protocol.go index 950511a08c..9e32760862 100644 --- a/pkg/net/helloprotocol/hello_protocol.go +++ b/pkg/net/helloprotocol/hello_protocol.go @@ -52,7 +52,7 @@ type LatencyMessage struct { // containing some information about the state of our chain, // and receive the same information from them. This is used to // initiate a chainsync and detect connections to forks. -type HelloProtocolHandler struct { +type HelloProtocolHandler struct { //nolint host host.Host genesis cid.Cid diff --git a/pkg/net/helloprotocol/hello_protocol_test.go b/pkg/net/helloprotocol/hello_protocol_test.go index df38d37ba8..4985f5a776 100644 --- a/pkg/net/helloprotocol/hello_protocol_test.go +++ b/pkg/net/helloprotocol/hello_protocol_test.go @@ -2,11 +2,13 @@ package helloprotocol_test import ( "context" - "github.com/filecoin-project/venus/pkg/net/helloprotocol" - "github.com/filecoin-project/venus/pkg/net/peermgr" + "fmt" "testing" "time" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" + "github.com/filecoin-project/venus/pkg/net/peermgr" + ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-core/host" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -34,14 +36,6 @@ func (msb *mockHelloCallback) HelloCallback(ci *types.ChainInfo) { msb.Called(ci.Sender, ci.Head.Key()) } -type mockHeaviestGetter struct { - heaviest *types.TipSet -} - -func (mhg *mockHeaviestGetter) getHeaviestTipSet() (*types.TipSet, error) { - return mhg.heaviest, nil -} - func TestHelloHandshake(t *testing.T) { tf.UnitTest(t) @@ -60,14 +54,12 @@ func TestHelloHandshake(t *testing.T) { store := builder.Store() mstore := builder.Mstore() heavy1 := builder.AppendOn(ctx, genesisA, 1) - oldStore := copyStoreAndSetHead(ctx, store, heavy1) + oldStore := copyStoreAndSetHead(ctx, t, store, heavy1) heavy2 := builder.AppendOn(ctx, heavy1, 1) _ = store.SetHead(ctx, heavy2) msc1, msc2 := new(mockHelloCallback), new(mockHelloCallback) - //hg1, hg2 := &mockHeaviestGetter{heavy1}, &mockHeaviestGetter{heavy2} - //peer manager aPeerMgr, err := mockPeerMgr(ctx, t, a) require.NoError(t, err) @@ -122,22 +114,24 @@ func TestHelloBadGenesis(t *testing.T) { mstore := builder.Mstore() genesisA := builder.AppendOn(ctx, types.UndefTipSet, 1) - genesisB := builder.AppendOn(ctx, types.UndefTipSet, 1) - heavy1 := builder.AppendOn(ctx, genesisA, 1) - oldStore := copyStoreAndSetHead(ctx, store, heavy1) - heavy2 := builder.AppendOn(ctx, heavy1, 1) _ = store.SetHead(ctx, heavy2) + builder2 := chain.NewBuilder(t, address.Undef) + genesisB := builder2.Build(ctx, types.UndefTipSet, 1, func(b *chain.BlockBuilder, i int) { + b.SetTicket([]byte{1, 3, 4, 5, 6, 1, 3, 6, 7, 8}) + }) + + fmt.Println(genesisB, genesisA) msc1, msc2 := new(mockHelloCallback), new(mockHelloCallback) //peer manager peerMgr, err := mockPeerMgr(ctx, t, a) require.NoError(t, err) - helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, oldStore, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback) - helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisB.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback) + helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback) + helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, builder2.Store(), builder2.Mstore(), genesisB.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback) msc1.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return() msc2.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return() @@ -172,7 +166,7 @@ func TestHelloMultiBlock(t *testing.T) { heavy1 := builder.AppendOn(ctx, genesisTipset, 3) heavy1 = builder.AppendOn(ctx, heavy1, 3) - oldStore := copyStoreAndSetHead(ctx, store, heavy1) + oldStore := copyStoreAndSetHead(ctx, t, store, heavy1) heavy2 := builder.AppendOn(ctx, heavy1, 3) _ = store.SetHead(ctx, heavy2) @@ -204,8 +198,9 @@ func mockPeerMgr(ctx context.Context, t *testing.T, h host.Host) (*peermgr.PeerM return peermgr.NewPeerMgr(h, dht.NewDHT(ctx, h, ds.NewMapDatastore()), 10, addrInfo) } -func copyStoreAndSetHead(ctx context.Context, store *chain.Store, ts *types.TipSet) *chain.Store { - storeCopy := *store - storeCopy.SetHead(ctx, ts) +func copyStoreAndSetHead(ctx context.Context, t *testing.T, store *chain.Store, ts *types.TipSet) *chain.Store { + storeCopy := *store //nolint + err := storeCopy.SetHead(ctx, ts) + require.NoError(t, err) return &storeCopy } diff --git a/tools/gen_cbor/main.go b/tools/gen_cbor/main.go index 76d1cef8f8..71fed0d130 100644 --- a/tools/gen_cbor/main.go +++ b/tools/gen_cbor/main.go @@ -4,8 +4,8 @@ import ( "github.com/filecoin-project/venus/pkg/chain" "github.com/filecoin-project/venus/pkg/fvm" "github.com/filecoin-project/venus/pkg/market" - "github.com/filecoin-project/venus/pkg/net" "github.com/filecoin-project/venus/pkg/net/exchange" + "github.com/filecoin-project/venus/pkg/net/helloprotocol" "github.com/filecoin-project/venus/pkg/paychmgr" "github.com/filecoin-project/venus/pkg/state/tree" "github.com/filecoin-project/venus/pkg/vm/dispatch" @@ -28,8 +28,8 @@ func main() { } if err := gen.WriteTupleEncodersToFile("./pkg/discovery/cbor_gen.go", "discovery", - net.HelloMessage{}, - net.LatencyMessage{}, + helloprotocol.HelloMessage{}, + helloprotocol.LatencyMessage{}, ); err != nil { panic(err) }