Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simultaneous open extension #42

Merged
merged 10 commits into from
Feb 12, 2021
221 changes: 218 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package multistream

import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
"io"
"strconv"
"strings"
)

// ErrNotSupported is the error returned when the muxer does not support
Expand All @@ -14,6 +18,12 @@ var ErrNotSupported = errors.New("protocol not supported")
// specified.
var ErrNoProtocols = errors.New("no protocols specified")

const (
tieBreakerPrefix = "select:"
initiator = "initiator"
responder = "responder"
)

// SelectProtoOrFail performs the initial multistream handshake
// to inform the muxer of the protocol that will be used to communicate
// on this ReadWriteCloser. It returns an error if, for example,
Expand All @@ -22,8 +32,10 @@ func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error {
errCh := make(chan error, 1)
go func() {
var buf bytes.Buffer
delimWrite(&buf, []byte(ProtocolID))
delimWrite(&buf, []byte(proto))
if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte(proto)); err != nil {
errCh <- err
return
}
_, err := io.Copy(rwc, &buf)
errCh <- err
}()
Expand Down Expand Up @@ -61,7 +73,80 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) {
default:
return "", err
}
for _, p := range protos[1:] {
return selectProtosOrFail(protos[1:], rwc)
}

// Performs protocol negotiation with the simultaneous open extension; the returned boolean
// indicator will be true if we should act as a server.
func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (string, bool, error) {
if len(protos) == 0 {
return "", false, ErrNoProtocols
}

werrCh := make(chan error, 1)
go func() {
var buf bytes.Buffer
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte("iamclient"), []byte(protos[0])); err != nil {
werrCh <- err
return
}

_, err := io.Copy(rwc, &buf)
werrCh <- err
}()

err := readMultistreamHeader(rwc)
if err != nil {
return "", false, err
}

tok, err := ReadNextToken(rwc)
if err != nil {
return "", false, err
}

if err = <-werrCh; err != nil {
return "", false, err
}

switch tok {
case "iamclient":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: define a string constant.

// simultaneous open
return simOpen(protos, rwc)

case "na":
// client open
proto, err := clientOpen(protos, rwc)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", false, err
}
Comment on lines +120 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need this if err? Or can we just return proto, false, err.


return proto, false, nil

default:
return "", false, errors.New("unexpected response: " + tok)
}
}

func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) {
// check to see if we selected the pipelined protocol
tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}

switch tok {
case protos[0]:
return tok, nil
case "na":
return selectProtosOrFail(protos[1:], rwc)
default:
return "", errors.New("unexpected response: " + tok)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fmt.Errorf("unexpected response: %s", tok) is technically more idiomatic (although it really doesn't matter).

}
}

func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) {
for _, p := range protos {
err := trySelect(p, rwc)
switch err {
case nil:
Expand All @@ -74,6 +159,136 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) {
return "", ErrNotSupported
}

func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) {
randBytes := make([]byte, 8)
_, err := rand.Read(randBytes)
if err != nil {
return "", false, err
}
myNonce := binary.LittleEndian.Uint64(randBytes)
Comment on lines +163 to +168
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's probably slightly simpler to do:

var myNonce uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &myNonce); err != nil {
    ...
}
``


werrCh := make(chan error, 1)
go func() {
myselect := []byte(tieBreakerPrefix + strconv.FormatUint(myNonce, 10))
err := delimWriteBuffered(rwc, myselect)
werrCh <- err
}()

// skip exactly one protocol
// see https://github.com/multiformats/go-multistream/pull/42#discussion_r558757135
_, err = ReadNextToken(rwc)
if err != nil {
return "", false, err
}

// read the tie breaker nonce
tok, err := ReadNextToken(rwc)
if err != nil {
return "", false, err
}
if !strings.HasPrefix(tok, tieBreakerPrefix) {
return "", false, errors.New("tie breaker nonce not sent with the correct prefix")
}

if err = <-werrCh; err != nil {
return "", false, err
}

peerNone, err := strconv.ParseUint(tok[len(tieBreakerPrefix):], 10, 64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? peerNonce?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if err != nil {
return "", false, err
}

var iamserver bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: drop this line.

if peerNone > myNonce {
// peer nonce bigger, he is client
iamserver = true
} else if peerNone < myNonce {
// my nonce bigger, i am client
iamserver = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably nicer:

if peerNonce == myNonce {
   return ...
}
iamserver := peerNonce > myNonce

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else {
return "", false, errors.New("failed client selection; identical nonces!")
}

var proto string
if iamserver {
proto, err = simOpenSelectServer(protos, rwc)
} else {
proto, err = simOpenSelectClient(protos, rwc)
}

return proto, iamserver, err
}

func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) {
werrCh := make(chan error, 1)
go func() {
err := delimWriteBuffered(rwc, []byte(responder))
werrCh <- err
}()

tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}
if tok != initiator {
return "", errors.New("unexpected response: " + tok)
}
if err = <-werrCh; err != nil {
return "", err
}

for {
tok, err = ReadNextToken(rwc)

if err == io.EOF {
return "", ErrNotSupported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo When can this EOF happen ? I mean, why do we treat this error separately ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EOF will happen when the remote side gives up because we have no protocols in common. In general, it's best to avoid returning EOF for actual error cases anyways (usually, you want to either return ErrUnexpectedEOF, or a better error like we're doing here).

}

if err != nil {
return "", err
}

for _, p := range protos {
if tok == p {
err = delimWriteBuffered(rwc, []byte(p))
if err != nil {
return "", err
}

return p, nil
}
}

err = delimWriteBuffered(rwc, []byte("na"))
if err != nil {
return "", err
}
}

}

func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) {
werrCh := make(chan error, 1)
go func() {
err := delimWriteBuffered(rwc, []byte(initiator))
werrCh <- err
}()

tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}
if tok != responder {
return "", errors.New("unexpected response: " + tok)
}
if err = <-werrCh; err != nil {
return "", err
}

return selectProtosOrFail(protos, rwc)
}

func handshake(rw io.ReadWriter) error {
errCh := make(chan error, 1)
go func() {
Expand Down
11 changes: 11 additions & 0 deletions multistream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"bytes"
"errors"
"fmt"

"io"
"sync"
Expand Down Expand Up @@ -81,6 +82,16 @@ func delimWriteBuffered(w io.Writer, mes []byte) error {
return bw.Flush()
}

func delitmWriteAll(w io.Writer, messages ...[]byte) error {
for _, mes := range messages {
if err := delimWrite(w, mes); err != nil {
return fmt.Errorf("failed to write messages %s, err: %v ", string(mes), err)
}
}

return nil
}

func delimWrite(w io.Writer, mes []byte) error {
err := writeUvarint(w, uint64(len(mes)+1))
if err != nil {
Expand Down
Loading