Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions shared/relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
bufferSize = 8820
serverResponseTimeout = 8 * time.Second
connChannelSize = 100
)

var (
Expand Down Expand Up @@ -69,15 +70,37 @@ type connContainer struct {
cancel context.CancelFunc
}

func newConnContainer(log *log.Entry, conn *Conn, messages chan Msg) *connContainer {
func newConnContainer(log *log.Entry, c *Client, peerID messages.PeerID, instanceURL *RelayAddr) *connContainer {
ctx, cancel := context.WithCancel(context.Background())
return &connContainer{
msgChan := make(chan Msg, connChannelSize)
cn := &Conn{
dstID: peerID,
messageChan: msgChan,
instanceURL: instanceURL,
}
cc := &connContainer{
log: log,
conn: conn,
messages: messages,
conn: cn,
messages: msgChan,
ctx: ctx,
cancel: cancel,
}

// bind conn to client
cn.writeFn = func(dstID messages.PeerID, payload []byte) (int, error) {
return c.writeTo(cc, dstID, payload)
}
cn.closeFn = func(dstID messages.PeerID) error {
return c.closeConn(cc, dstID)
}
cn.localAddrFn = func() net.Addr {
return c.relayConn.LocalAddr()
}
return cc
}

func (cc *connContainer) netConn() net.Conn {
return cc.conn
}

func (cc *connContainer) writeMsg(msg Msg) {
Expand Down Expand Up @@ -232,9 +255,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro
instanceURL := c.instanceURL
c.muInstanceURL.Unlock()

msgChannel := make(chan Msg, 100)
conn := NewConn(c, peerID, msgChannel, instanceURL)
container := newConnContainer(c.log, conn, msgChannel)
container := newConnContainer(c.log, c, peerID, instanceURL)
c.conns[peerID] = container
c.mu.Unlock()

Expand All @@ -261,7 +282,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro
c.mu.Unlock()

c.log.Infof("remote peer is available: %s", peerID)
return conn, nil
return container.netConn(), nil
}

// ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection.
Expand Down Expand Up @@ -481,15 +502,15 @@ func (c *Client) handleTransportMsg(buf []byte, bufPtr *[]byte, internallyStoppe
return true
}

func (c *Client) writeTo(connReference *Conn, dstID messages.PeerID, payload []byte) (int, error) {
func (c *Client) writeTo(containerRef *connContainer, dstID messages.PeerID, payload []byte) (int, error) {
c.mu.Lock()
conn, ok := c.conns[dstID]
current, ok := c.conns[dstID]
c.mu.Unlock()
if !ok {
return 0, net.ErrClosed
}

if conn.conn != connReference {
if current != containerRef {
return 0, net.ErrClosed
}

Expand Down Expand Up @@ -560,26 +581,26 @@ func (c *Client) closeConnsByPeerID(peerIDs []messages.PeerID) {
}
}

func (c *Client) closeConn(connReference *Conn, id messages.PeerID) error {
func (c *Client) closeConn(containerRef *connContainer, id messages.PeerID) error {
c.mu.Lock()
defer c.mu.Unlock()

container, ok := c.conns[id]
current, ok := c.conns[id]
if !ok {
return net.ErrClosed
}

if container.conn != connReference {
if current != containerRef {
return fmt.Errorf("conn reference mismatch")
}

if err := c.stateSubscription.UnsubscribeStateChange([]messages.PeerID{id}); err != nil {
container.log.Errorf("failed to unsubscribe from peer state change: %s", err)
current.log.Errorf("failed to unsubscribe from peer state change: %s", err)
}

c.log.Infof("free up connection to peer: %s", id)
delete(c.conns, id)
container.close()
current.close()

return nil
}
Expand Down
32 changes: 9 additions & 23 deletions shared/relay/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,35 @@ import (

// Conn represent a connection to a relayed remote peer.
type Conn struct {
client *Client
dstID messages.PeerID
messageChan chan Msg
instanceURL *RelayAddr
}

// NewConn creates a new connection to a relayed remote peer.
// client: the client instance, it used to send messages to the destination peer
// dstID: the destination peer ID
// messageChan: the channel where the messages will be received
// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer
func NewConn(client *Client, dstID messages.PeerID, messageChan chan Msg, instanceURL *RelayAddr) *Conn {
c := &Conn{
client: client,
dstID: dstID,
messageChan: messageChan,
instanceURL: instanceURL,
}

return c
writeFn func(messages.PeerID, []byte) (int, error)
closeFn func(messages.PeerID) error
localAddrFn func() net.Addr
}

func (c *Conn) Write(p []byte) (n int, err error) {
return c.client.writeTo(c, c.dstID, p)
return c.writeFn(c.dstID, p)
}

func (c *Conn) Read(b []byte) (n int, err error) {
msg, ok := <-c.messageChan
m, ok := <-c.messageChan
if !ok {
return 0, net.ErrClosed
}

n = copy(b, msg.Payload)
msg.Free()
n = copy(b, m.Payload)
m.Free()
return n, nil
}

func (c *Conn) Close() error {
return c.client.closeConn(c, c.dstID)
return c.closeFn(c.dstID)
}

func (c *Conn) LocalAddr() net.Addr {
return c.client.relayConn.LocalAddr()
return c.localAddrFn()
}

func (c *Conn) RemoteAddr() net.Addr {
Expand Down
Loading