Skip to content

Commit

Permalink
[WIP] transport refactor update
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Steven Allen <[email protected]>
  • Loading branch information
Stebalien committed Mar 29, 2018
1 parent 3f6519b commit 5d44fe7
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 176 deletions.
7 changes: 3 additions & 4 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"gx/ipfs/QmRK2LxanhK2gZq6k6R7vk5ZoYZk8ULSSTB7FzDsMUX6CB/go-multiaddr-net"
mprome "gx/ipfs/QmSTf3wJXBQk2fxdmXtodvyczrCPgJaK1B1maY78qeebNX/go-metrics-prometheus"
iconn "gx/ipfs/QmToCvh5eJtoDheMggre7b2zeFCJ6tAyB82YVs457cqoUE/go-libp2p-interface-conn"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
Expand Down Expand Up @@ -215,7 +214,6 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
if unencrypted {
log.Warningf(`Running with --%s: All connections are UNENCRYPTED.
You will not be able to connect to regular encrypted networks.`, unencryptTransportKwd)
iconn.EncryptConnections = false
}

// first, whether user has provided the initialization flag. we may be
Expand Down Expand Up @@ -292,6 +290,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
Repo: repo,
Permanent: true, // It is temporary way to signify that node is permanent
Online: !offline,
DisableEncryptedConnections: unencrypted,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
Expand Down Expand Up @@ -478,7 +477,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error

errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, apiLis.NetListener(), opts...)
errc <- corehttp.Serve(node, manet.NetListener(apiLis), opts...)
close(errc)
}()
return errc, nil
Expand Down Expand Up @@ -565,7 +564,7 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e

errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, gwLis.NetListener(), opts...)
errc <- corehttp.Serve(node, manet.NetListener(gwLis), opts...)
close(errc)
}()
return errc, nil
Expand Down
13 changes: 8 additions & 5 deletions cmd/seccat/seccat.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,20 @@ func connect(args args) error {
}

// log everything that goes through conn
rwc := &logRW{n: "conn", rw: conn}
rwc := &logConn{n: "conn", Conn: conn}

// OK, let's setup the channel.
sk := ps.PrivKey(p)
sg := secio.SessionGenerator{LocalID: p, PrivateKey: sk}
sess, err := sg.NewSession(context.TODO(), rwc)
sg, err := secio.New(sk)
if err != nil {
return err
}
out("remote peer id: %s", sess.RemotePeer())
netcat(sess.ReadWriter().(io.ReadWriteCloser))
sconn, err := sg.SecureInbound(context.TODO(), rwc)
if err != nil {
return err
}
out("remote peer id: %s", sconn.RemotePeer())
netcat(sconn)
return nil
}

Expand Down
24 changes: 10 additions & 14 deletions cmd/seccat/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
"io"
"net"
"os"

logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
Expand All @@ -24,28 +24,24 @@ func out(format string, vals ...interface{}) {
}
}

type logRW struct {
n string
rw io.ReadWriter
type logConn struct {
net.Conn
n string
}

func (r *logRW) Read(buf []byte) (int, error) {
n, err := r.rw.Read(buf)
func (r *logConn) Read(buf []byte) (int, error) {
n, err := r.Conn.Read(buf)
if n > 0 {
log.Debugf("%s read: %v", r.n, buf)
}
return n, err
}

func (r *logRW) Write(buf []byte) (int, error) {
func (r *logConn) Write(buf []byte) (int, error) {
log.Debugf("%s write: %v", r.n, buf)
return r.rw.Write(buf)
return r.Conn.Write(buf)
}

func (r *logRW) Close() error {
c, ok := r.rw.(io.Closer)
if ok {
return c.Close()
}
return nil
func (r *logConn) Close() error {
return r.Conn.Close()
}
19 changes: 18 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/ipfs/go-ipfs/thirdparty/verifbs"
uio "github.com/ipfs/go-ipfs/unixfs/io"

libp2p "gx/ipfs/QmNh1kGFFdsPu79KNSaL4NUKUPb4Eiz4KHdMtFY6664RDp/go-libp2p"
p2phost "gx/ipfs/QmNmJZL7FQySMtE2BQuLMuZg2EB2CLEunJJUSVSc9YnnbV/go-libp2p-host"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
goprocessctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
ds "gx/ipfs/QmXRKBQA4wXP7xWbFiZsR1GP4HV6wMDQ1aWFxZZ4uBcPX9/go-datastore"
Expand All @@ -42,6 +44,10 @@ type BuildCfg struct {
// that will improve performance in long run
Permanent bool

// DisableEncryptedConnections disables connection encryption *entirely*.
// DO NOT SET THIS UNLESS YOU'RE TESTING.
DisableEncryptedConnections bool

// If NilRepo is set, a repo backed by a nil datastore will be constructed
NilRepo bool

Expand Down Expand Up @@ -126,6 +132,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
if err != nil {
return nil, err
}

ctx = metrics.CtxScope(ctx, "ipfs")

n := &IpfsNode{
Expand Down Expand Up @@ -214,9 +221,19 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
bs.HashOnRead(true)
}

hostOption := cfg.Host
if cfg.DisableEncryptedConnections {
innerHostOption := hostOption
hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...)
}
log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS.
You will not be able to connect to any nodes configured to use encrypted connections`)
}

if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {
Expand Down
40 changes: 20 additions & 20 deletions core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ var swarmPeersCmd = &cmds.Command{
Peer: pid.Pretty(),
}

swcon, ok := c.(*swarm.Conn)
if ok {
ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn())
}
/*
// FIXME(steb):
swcon, ok := c.(*swarm.Conn)
if ok {
ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn())
}
*/

if verbose || latency {
lat := n.Peerstore.LatencyEWMA(pid)
Expand All @@ -104,11 +107,7 @@ var swarmPeersCmd = &cmds.Command{
}
}
if verbose || streams {
strs, err := c.GetStreams()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
strs := c.GetStreams()

for _, s := range strs {
ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s.Protocol())})
Expand Down Expand Up @@ -384,14 +383,13 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3
return
}

snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb): Nasty
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal)
return
}

swrm := snet.Swarm()

pis, err := peersWithAddresses(addrs)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand Down Expand Up @@ -574,14 +572,15 @@ Filters default to those specified under the "Swarm.AddrFilters" config key.
return
}

snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
}

var output []string
for _, f := range snet.Filters.Filters() {
for _, f := range swrm.Filters.Filters() {
s, err := mafilter.ConvertIPNet(f)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand Down Expand Up @@ -621,7 +620,8 @@ add your filters to the ipfs config file.
return
}

snet, ok := n.PeerHost.Network().(*swarm.Network)
// FIXME(steb)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -651,7 +651,7 @@ add your filters to the ipfs config file.
return
}

snet.Filters.AddDialFilter(mask)
swrm.Filters.AddDialFilter(mask)
}

added, err := filtersAdd(r, cfg, req.Arguments())
Expand Down Expand Up @@ -693,7 +693,7 @@ remove your filters from the ipfs config file.
return
}

snet, ok := n.PeerHost.Network().(*swarm.Network)
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal)
return
Expand All @@ -712,9 +712,9 @@ remove your filters from the ipfs config file.
}

if req.Arguments()[0] == "all" || req.Arguments()[0] == "*" {
fs := snet.Filters.Filters()
fs := swrm.Filters.Filters()
for _, f := range fs {
snet.Filters.Remove(f)
swrm.Filters.Remove(f)
}

removed, err := filtersRemoveAll(r, cfg)
Expand All @@ -735,7 +735,7 @@ remove your filters from the ipfs config file.
return
}

snet.Filters.Remove(mask)
swrm.Filters.Remove(mask)
}

removed, err := filtersRemove(r, cfg, req.Arguments())
Expand Down
Loading

0 comments on commit 5d44fe7

Please sign in to comment.