Skip to content

Commit

Permalink
Corenet API: Apply suggestions, cleanups
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed May 29, 2017
1 parent 1bb0143 commit 7fcb9d0
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 39 deletions.
118 changes: 82 additions & 36 deletions core/commands/corenet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"text/tabwriter"

cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
corenet "github.com/ipfs/go-ipfs/core/corenet"

peerstore "gx/ipfs/QmNUVzEjq3XWJ89hegahPvyfJbTXgTaom48pLb7YBD9gHQ/go-libp2p-peerstore"
Expand All @@ -18,14 +19,13 @@ import (
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)

// Command output types.
type AppInfoOutput struct {
type CorenetAppInfoOutput struct {
Protocol string
Address string
}

type StreamInfoOutput struct {
HandlerId string
type CorenetStreamInfoOutput struct {
HandlerID string
Protocol string
LocalPeer string
LocalAddress string
Expand All @@ -34,11 +34,11 @@ type StreamInfoOutput struct {
}

type CorenetLsOutput struct {
Apps []AppInfoOutput
Apps []CorenetAppInfoOutput
}

type CorenetStreamsOutput struct {
Streams []StreamInfoOutput
Streams []CorenetStreamInfoOutput
}

// cnAppInfo holds information on a local application protocol listener service.
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *cnAppRegistry) Deregister(proto string) {

// cnStreamInfo holds information on active incoming and outgoing protocol app streams.
type cnStreamInfo struct {
handlerId uint64
handlerID uint64

protocol string

Expand All @@ -108,27 +108,27 @@ type cnStreamInfo struct {
func (c *cnStreamInfo) Close() error {
c.local.Close()
c.remote.Close()
streams.Deregister(c.handlerId)
streams.Deregister(c.handlerID)
return nil
}

// cnStreamRegistry is a collection of active incoming and outgoing protocol app streams.
type cnStreamRegistry struct {
streams []*cnStreamInfo

nextId uint64
nextID uint64
}

func (c *cnStreamRegistry) Register(streamInfo *cnStreamInfo) {
streamInfo.handlerId = c.nextId
streamInfo.handlerID = c.nextID
c.streams = append(c.streams, streamInfo)
c.nextId += 1
c.nextID++
}

func (c *cnStreamRegistry) Deregister(handlerId uint64) {
func (c *cnStreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.streams {
if s.handlerId == handlerId {
if s.handlerID == handlerID {
foundAt = i
break
}
Expand All @@ -145,7 +145,11 @@ var streams cnStreamRegistry

var CorenetCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Application network streams.",
Tagline: "Libp2p stream mounting.",
ShortDescription: `
Expose a local application to remote peers over libp2p
Note: this command is experimental and subject to change as usecases and APIs are refined`,
},

Subcommands: map[string]*cmds.Command{
Expand All @@ -162,7 +166,7 @@ var CorenetLsCmd = &cmds.Command{
Tagline: "List active application protocol listeners.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HandlerId, Protocol, Local, Remote).").Default(false),
cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Expand All @@ -171,6 +175,12 @@ var CorenetLsCmd = &cmds.Command{
return
}

err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
Expand All @@ -179,7 +189,7 @@ var CorenetLsCmd = &cmds.Command{
output := &CorenetLsOutput{}

for _, a := range apps.apps {
output.Apps = append(output.Apps, AppInfoOutput{
output.Apps = append(output.Apps, CorenetAppInfoOutput{
Protocol: a.protocol,
Address: a.address.String(),
})
Expand Down Expand Up @@ -210,10 +220,10 @@ var CorenetLsCmd = &cmds.Command{

var CorenetStreamsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active application protocol connections.",
Tagline: "List active application protocol streams.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HandlerId, Protocol, Local, Remote).").Default(false),
cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Expand All @@ -222,6 +232,12 @@ var CorenetStreamsCmd = &cmds.Command{
return
}

err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
Expand All @@ -230,8 +246,8 @@ var CorenetStreamsCmd = &cmds.Command{
output := &CorenetStreamsOutput{}

for _, s := range streams.streams {
output.Streams = append(output.Streams, StreamInfoOutput{
HandlerId: strconv.FormatUint(s.handlerId, 10),
output.Streams = append(output.Streams, CorenetStreamInfoOutput{
HandlerID: strconv.FormatUint(s.handlerID, 10),

Protocol: s.protocol,

Expand All @@ -254,10 +270,10 @@ var CorenetStreamsCmd = &cmds.Command{
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, stream := range list.Streams {
if headers {
fmt.Fprintln(w, "HandlerId\tProtocol\tLocal\tRemote")
fmt.Fprintln(w, "HandlerID\tProtocol\tLocal\tRemote")
}

fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerId, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
}
w.Flush()

Expand All @@ -281,14 +297,20 @@ var CorenetListenCmd = &cmds.Command{
return
}

err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}

proto := "/app/" + req.Arguments()[0]
if checkProtoExists(n.PeerHost.Mux().Protocols(), proto) {
res.SetError(errors.New("Protocol handler already registered."), cmds.ErrNormal)
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
return
}

Expand Down Expand Up @@ -317,7 +339,7 @@ var CorenetListenCmd = &cmds.Command{
apps.Register(&app)

// Successful response.
res.SetOutput(&AppInfoOutput{
res.SetOutput(&CorenetAppInfoOutput{
Protocol: proto,
Address: addr.String(),
})
Expand Down Expand Up @@ -395,6 +417,12 @@ var CorenetDialCmd = &cmds.Command{
return
}

err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
Expand Down Expand Up @@ -454,11 +482,11 @@ var CorenetDialCmd = &cmds.Command{
go doAccept(&app, remote, listener)

default:
res.SetError(errors.New("Unsupported protocol: "+lnet), cmds.ErrNormal)
res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal)
return
}

output := AppInfoOutput{
output := CorenetAppInfoOutput{
Protocol: app.protocol,
Address: app.address.String(),
}
Expand Down Expand Up @@ -497,8 +525,8 @@ var CorenetCloseCmd = &cmds.Command{
Tagline: "Closes an active stream listener or client.",
},
Arguments: []cmds.Argument{
cmds.StringArg("HandlerId", false, false, "Application listener or client HandlerId"),
cmds.StringArg("Protocol", false, false, "Application listener or client HandlerId"),
cmds.StringArg("HandlerID", false, false, "Application listener or client HandlerID"),
cmds.StringArg("Protocol", false, false, "Application listener or client HandlerID"),
},
Options: []cmds.Option{
cmds.BoolOption("all", "a", "Close all streams and listeners.").Default(false),
Expand All @@ -510,6 +538,12 @@ var CorenetCloseCmd = &cmds.Command{
return
}

err = checkEnabled(n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
Expand All @@ -518,27 +552,27 @@ var CorenetCloseCmd = &cmds.Command{
closeAll, _, _ := req.Option("all").Bool()

var proto string
var handlerId uint64
var handlerID uint64

useHandlerId := false
useHandlerID := false

if !closeAll && len(req.Arguments()) == 0 {
res.SetError(errors.New("You must supply a handlerId or stream protocol."), cmds.ErrNormal)
res.SetError(errors.New(" handlerID nor stream protocol"), cmds.ErrNormal)
return

} else if !closeAll {
handlerId, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
if err != nil {
proto = "/app/" + req.Arguments()[0]

} else {
useHandlerId = true
useHandlerID = true
}
}

if closeAll || useHandlerId {
if closeAll || useHandlerID {
for _, s := range streams.streams {
if !closeAll && handlerId != s.handlerId {
if !closeAll && handlerID != s.handlerID {
continue
}
s.Close()
Expand All @@ -548,7 +582,7 @@ var CorenetCloseCmd = &cmds.Command{
}
}

if closeAll || !useHandlerId {
if closeAll || !useHandlerID {
for _, a := range apps.apps {
if !closeAll && a.protocol != proto {
continue
Expand All @@ -564,3 +598,15 @@ var CorenetCloseCmd = &cmds.Command{
}
},
}

func checkEnabled(n *core.IpfsNode) error {
config, err := n.Repo.Config()
if err != nil {
return err
}

if !config.Experimental.Libp2pStreamMounting {
return errors.New("libp2p stream mounting not enabled")
}
return nil
}
5 changes: 3 additions & 2 deletions repo/config/experiments.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
FilestoreEnabled bool
ShardingEnabled bool
Libp2pStreamMounting bool
}
11 changes: 10 additions & 1 deletion test/sharness/t0180-corenet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@ test_expect_success "test ports are closed" '
(! (netstat -ln | grep "LISTEN" | grep ":10102 "))
'

test_expect_success 'start ipfs listener' '
test_must_fail 'fail without config option being enabled' '
ipfsi 0 exp corenet ls
'

test_expect_success "enable filestore config setting" '
ipfsi 0 config --json Experimental.Libp2pStreamMounting true
ipfsi 1 config --json Experimental.Libp2pStreamMounting true
'

test_expect_success 'start corenet listener' '
ipfsi 0 exp corenet listen corenet-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log
'

Expand Down

0 comments on commit 7fcb9d0

Please sign in to comment.