From 10ddd40f7b47a78cd28743c6dcd11cf97f3748ae Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 29 Nov 2016 17:43:51 -0800 Subject: [PATCH] add option to enable go-multiplex experiment License: MIT Signed-off-by: Jeromy --- cmd/ipfs/daemon.go | 4 ++++ core/builder.go | 2 +- core/core.go | 50 +++++++++++++++++++++++++++++++++++++++++----- core/mock/mock.go | 11 +++++----- package.json | 6 ++++++ 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index e0ef36bf533..0cc40694138 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -48,6 +48,7 @@ const ( unrestrictedApiAccessKwd = "unrestricted-api" writableKwd = "writable" enableFloodSubKwd = "enable-pubsub-experiment" + enableMultiplexKwd = "enable-mplex-experiment" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -158,6 +159,7 @@ Headers. cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false), cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), + cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction."), // TODO: add way to override addresses. tricky part: updating the config if also --init. // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), @@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { offline, _, _ := req.Option(offlineKwd).Bool() pubsub, _, _ := req.Option(enableFloodSubKwd).Bool() + mplex, _, _ := req.Option(enableMultiplexKwd).Bool() // Start assembling node config ncfg := &core.BuildCfg{ @@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { Online: !offline, ExtraOpts: map[string]bool{ "pubsub": pubsub, + "mplex": mplex, }, //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral } diff --git a/core/builder.go b/core/builder.go index a126a7b34c3..49a40440b5f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if cfg.Online { do := setupDiscoveryOption(rcfg.Discovery) - if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil { + if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil { return err } } else { diff --git a/core/core.go b/core/core.go index da795acd0f4..6635166715b 100644 --- a/core/core.go +++ b/core/core.go @@ -14,7 +14,10 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" + "os" + "strings" "time" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -44,19 +47,24 @@ import ( mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" + mssmux "gx/ipfs/QmTfjLsou9ic6L4KqCcmbLSZcdiFu8q1v6njKp121pbbXx/go-smux-multistream" ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub" addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util" + spdy "gx/ipfs/QmWUNsat6Jb19nC5CiJCDXepTkxjdxi3eZqeoB6mrmmaGu/go-smux-spdystream" swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" + mplex "gx/ipfs/QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on/go-smux-multiplex" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing" + yamux "gx/ipfs/Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U/go-smux-yamux" discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery" p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed" ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" ) @@ -129,7 +137,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error { if n.PeerHost != nil { // already online. return errors.New("node already online") @@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin n.Reporter = metrics.NewBandwidthCounter() } - peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter) + tpt := makeSmuxTransport(mplex) + + peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter, tpt) if err != nil { return err } @@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func makeSmuxTransport(mplexExp bool) smux.Transport { + mstpt := mssmux.NewBlankTransport() + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 8192, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(1024 * 512), + LogOutput: ioutil.Discard, + } + + mstpt.AddTransport("/yamux/1.0.0", ymxtpt) + + mstpt.AddTransport("/spdy/3.1.0", spdy.Transport) + + if mplexExp { + mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport) + } + + // Allow muxer preference order overriding + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + mstpt.OrderPreference = strings.Fields(prefs) + } + + return mstpt +} + func setupDiscoveryOption(d config.Discovery) DiscoveryOption { if d.MDNS.Enabled { return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) { @@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } -type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) +type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) var DefaultHostOption HostOption = constructPeerHost // isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) { +func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) { // no addresses to begin with. we'll start later. - network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr) + swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, nil, tpt, bwr) if err != nil { return nil, err } + network := (*swarm.Network)(swrm) + for _, f := range fs { network.Swarm().Filters.AddDialFilter(f) } diff --git a/core/mock/mock.go b/core/mock/mock.go index 2303afd3eb7..06e98b3463d 100644 --- a/core/mock/mock.go +++ b/core/mock/mock.go @@ -1,22 +1,23 @@ package coremock import ( + "context" "net" - context "context" - "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" - syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" - commands "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" + host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" + "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" + smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) { } func MockHostOption(mn mocknet.Mocknet) core.HostOption { - return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) { + return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport) (host.Host, error) { return mn.AddPeerWithPeerstore(id, ps) } } diff --git a/package.json b/package.json index 50ff060d06c..20e3e5a06b3 100644 --- a/package.json +++ b/package.json @@ -287,6 +287,12 @@ "hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93", "name": "go-os-helper", "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on", + "name": "go-smux-multiplex", + "version": "1.1.4" } ], "gxVersion": "0.4.0",