diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index e0ef36bf533..0cc40694138 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -48,6 +48,7 @@ const ( unrestrictedApiAccessKwd = "unrestricted-api" writableKwd = "writable" enableFloodSubKwd = "enable-pubsub-experiment" + enableMultiplexKwd = "enable-mplex-experiment" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -158,6 +159,7 @@ Headers. cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false), cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), + cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction."), // TODO: add way to override addresses. tricky part: updating the config if also --init. // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), @@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { offline, _, _ := req.Option(offlineKwd).Bool() pubsub, _, _ := req.Option(enableFloodSubKwd).Bool() + mplex, _, _ := req.Option(enableMultiplexKwd).Bool() // Start assembling node config ncfg := &core.BuildCfg{ @@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { Online: !offline, ExtraOpts: map[string]bool{ "pubsub": pubsub, + "mplex": mplex, }, //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral } diff --git a/core/builder.go b/core/builder.go index a126a7b34c3..49a40440b5f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil { + if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil { return err } } else { diff --git a/core/core.go b/core/core.go index da795acd0f4..b79827cdbfc 100644 --- a/core/core.go +++ b/core/core.go @@ -14,7 +14,10 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" + "os" + "strings" "time" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -38,25 +41,30 @@ import ( ft "github.com/ipfs/go-ipfs/unixfs" dht "gx/ipfs/QmNQPjpcXrwwwgDErKzKUm2xxhXCB3cuFgTHsrcCJ5uGbu/go-libp2p-kad-dht" + mplex "gx/ipfs/QmP574KufTvaSPP1kRE7LkpMo8tUgDvw6h4Mdj4YqLUrSX/go-smux-multiplex" p2phost "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" + mssmux "gx/ipfs/QmTfjLsou9ic6L4KqCcmbLSZcdiFu8q1v6njKp121pbbXx/go-smux-multistream" ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub" addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util" + spdy "gx/ipfs/QmWUNsat6Jb19nC5CiJCDXepTkxjdxi3eZqeoB6mrmmaGu/go-smux-spdystream" swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing" + yamux "gx/ipfs/Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U/go-smux-yamux" discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery" p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed" ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" ) @@ -129,7 +137,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error { if n.PeerHost != nil { // already online. return errors.New("node already online") @@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.Reporter = metrics.NewBandwidthCounter() } - peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter) + tpt := makeSmuxTransport(mplex) + + peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter, tpt) if err != nil { return err } @@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func makeSmuxTransport(mplexExp bool) smux.Transport { + mstpt := mssmux.NewBlankTransport() + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 8192, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(1024 * 512), + LogOutput: ioutil.Discard, + } + + mstpt.AddTransport("/yamux/1.0.0", ymxtpt) + + mstpt.AddTransport("/spdy/3.1.0", spdy.Transport) + + if mplexExp { + mstpt.AddTransport("/mplex/1.0.0", mplex.DefaultTransport) + } + + // Allow muxer preference order overriding + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + mstpt.OrderPreference = strings.Fields(prefs) + } + + return mstpt +} + func setupDiscoveryOption(d config.Discovery) DiscoveryOption { if d.MDNS.Enabled { return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) { @@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) +type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) var DefaultHostOption HostOption = constructPeerHost // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) { +func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) { // no addresses to begin with. we'll start later. - network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr) + swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, nil, tpt, bwr) if err != nil { return nil, err } + network := (*swarm.Network)(swrm) + for _, f := range fs { network.Swarm().Filters.AddDialFilter(f) } diff --git a/core/mock/mock.go b/core/mock/mock.go index 2303afd3eb7..06e98b3463d 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -1,22 +1,23 @@ package coremock import ( + "context" "net" - context "context" - "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" - syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" - commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) { } func MockHostOption(mn mocknet.Mocknet) core.HostOption { - return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) { + return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } } diff --git a/package.json b/package.json index 50ff060d06c..e102b3661fa 100644 --- a/package.json +++ b/package.json @@ -287,6 +287,12 @@ "hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93", "name": "go-os-helper", "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmP574KufTvaSPP1kRE7LkpMo8tUgDvw6h4Mdj4YqLUrSX", + "name": "go-smux-multiplex", + "version": "1.1.2" } ], "gxVersion": "0.4.0",