Skip to content

Commit

Permalink
better documentation; mispells; unexport methods
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed May 31, 2023
1 parent c6952aa commit e738d0f
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 82 deletions.
39 changes: 18 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ var (
errDisconnected = errors.New("natiu-mqtt: disconnected")
)

// Client is a asynchronous MQTT v3.1.1 client implementation.
// The first field of the Client type will always be the RxTx non-pointer type.
// Client is a asynchronous MQTT v3.1.1 client implementation which is
// safe for concurrent use.
type Client struct {
cs clientState
txlock sync.Mutex
Expand All @@ -23,14 +23,18 @@ type Client struct {
tx Tx
}

// ClientConfig is used to configure a new Client.
type ClientConfig struct {
// If a Decoder is not set one will automatically be picked.
Decoder Decoder
// OnPub is executed on every PUBLISH message received. Do not call
// HandleNext or other client methods from within this function.
OnPub func(pubHead Header, varPub VariablesPublish, r io.Reader) error
// TODO: add a backoff algorithm callback here so clients can roll their own.
}

// NewClient creates a new MQTT client with the configuration parameters provided.
// If no Decoder is provided a DecoderNoAlloc will be used.
func NewClient(cfg ClientConfig) *Client {
var onPub func(rx *Rx, varPub VariablesPublish, r io.Reader) error
if cfg.OnPub != nil {
Expand Down Expand Up @@ -94,7 +98,7 @@ func (c *Client) StartConnect(rwc io.ReadWriteCloser, vc *VariablesConnect) erro
}

// Connect sends a CONNECT packet over the transport and waits for a
// CONNACK response.
// CONNACK response from the server. The client is connected if the returned error is nil.
func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *VariablesConnect) error {
err := c.StartConnect(rwc, vc)
if err != nil {
Expand All @@ -108,6 +112,9 @@ func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *Variab
return err
}
}
if c.IsConnected() {
return nil
}
return ctx.Err()
}

Expand Down Expand Up @@ -150,15 +157,15 @@ func (c *Client) StartSubscribe(vsub VariablesSubscribe) error {
return errDisconnected
}
if c.AwaitingSuback() {
// TODO(soypat): Allow multiple subscriptions to be queued.
return errors.New("tried to subscribe while still awaiting suback")
}
c.cs.pendingSubs = vsub.Copy()
return c.tx.WriteSubscribe(vsub)
}

// Ping writes a ping packet over the network and blocks until it receives the ping
// response back. It uses an exponential backoff algorithm to time checks on the
// status of the ping.
// Subscribe writes a SUBSCRIBE packet over the network and waits for the server
// to respond with a SUBACK packet or until the context ends.
func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error {
session := c.ConnectedAt()
err := c.StartSubscribe(vsub)
Expand All @@ -177,7 +184,7 @@ func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error {
return ctx.Err()
}

// SubscribedTopics returns list of topics the client succesfully subscribed to.
// SubscribedTopics returns list of topics the client successfully subscribed to.
// Returns a copy of a slice so is safe for concurrent use.
func (c *Client) SubscribedTopics() []string {
c.cs.mu.Lock()
Expand Down Expand Up @@ -208,16 +215,6 @@ func (c *Client) Err() error {
return c.cs.Err()
}

// setTransport sets the underlying transport. This allows users to re-open
// failed/closed connections on [RxTx] side and resuming communication with server.
func (c *Client) setTransport(transport io.ReadWriteCloser) {
c.rxlock.Lock()
defer c.rxlock.Unlock()
c.txlock.Lock()
defer c.txlock.Unlock()

}

// StartPing writes a PINGREQ packet over the network without blocking waiting for response.
func (c *Client) StartPing() error {
c.txlock.Lock()
Expand All @@ -227,7 +224,7 @@ func (c *Client) StartPing() error {
}
err := c.tx.WriteSimple(PacketPingreq)
if err == nil {
c.cs.PingSent() // Flag the fact that a ping has been sent succesfully.
c.cs.PingSent() // Flag the fact that a ping has been sent successfully.
}
return err
}
Expand Down Expand Up @@ -260,7 +257,7 @@ func (c *Client) Ping(ctx context.Context) error {
// AwaitingPingresp checks if a ping sent over the wire had no response received back.
func (c *Client) AwaitingPingresp() bool { return c.cs.AwaitingPingresp() }

// ConnectedAt returns the time the client managed to succesfully connect. If
// ConnectedAt returns the time the client managed to successfully connect. If
// client is disconnected ConnectedAt returns the zero-value for time.Time.
func (c *Client) ConnectedAt() time.Time { return c.cs.ConnectedAt() }

Expand All @@ -272,8 +269,8 @@ func (c *Client) AwaitingSuback() bool { return c.cs.AwaitingSuback() }
// If Client is disconnected LastRx returns the zero value of time.Time.
func (c *Client) LastRx() time.Time { return c.cs.LastRx() }

// LastTx returns the time the last succesful packet transmission finished at.
// A "succesful" transmission does not necessarily mean the packet was received on the other end.
// LastTx returns the time the last successful packet transmission finished at.
// A "successful" transmission does not necessarily mean the packet was received on the other end.
// If Client is disconnected LastTx returns the zero value of time.Time.
func (c *Client) LastTx() time.Time { return c.cs.LastTx() }

Expand Down
3 changes: 1 addition & 2 deletions clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cs *clientState) callbacks(onPub func(rx *Rx, varPub VariablesPublish, r i
return errors.New("connack received while connected")
}
if vc.ReturnCode != 0 {
return errors.New(vc.ReturnCode.String())
return vc.ReturnCode
}
cs.onConnect(connTime)
return nil
Expand Down Expand Up @@ -119,7 +119,6 @@ func (cs *clientState) callbacks(onPub func(rx *Rx, varPub VariablesPublish, r i
OnRxError: func(r *Rx, err error) {
cs.onDisconnect(err)
},
// OnOther: ,
}, TxCallbacks{
OnTxError: func(tx *Tx, err error) {
cs.onDisconnect(err)
Expand Down
63 changes: 31 additions & 32 deletions decoder_lowmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,6 @@ func (d DecoderNoAlloc) DecodeConnect(r io.Reader) (varConn VariablesConnect, n
return varConn, n, nil
}

// DecodeConnack implements [Decoder] interface. It is the responsibility of the caller
// to handle a non-zero [ConnectReturnCode].
func (d DecoderNoAlloc) DecodeConnack(r io.Reader) (VariablesConnack, int, error) {
var buf [2]byte
n, err := readFull(r, buf[:])
if err != nil {
return VariablesConnack{}, n, err
}
varConnack := VariablesConnack{AckFlags: buf[0], ReturnCode: ConnectReturnCode(buf[1])}
if err = varConnack.validate(); err != nil {
return VariablesConnack{}, n, err
}
return varConnack, n, nil
}

// DecodePublish implements [Decoder] interface.
func (d DecoderNoAlloc) DecodePublish(r io.Reader, qos QoSLevel) (_ VariablesPublish, n int, err error) {
topic, n, err := decodeMQTTString(r, d.UserBuffer)
Expand Down Expand Up @@ -152,23 +137,6 @@ func (d DecoderNoAlloc) DecodeSubscribe(r io.Reader, remainingLen uint32) (varSu
return varSub, n, nil
}

// DecodeSuback implements [Decoder] interface.
func (d DecoderNoAlloc) DecodeSuback(r io.Reader, remainingLen uint32) (varSuback VariablesSuback, n int, err error) {
varSuback.PacketIdentifier, n, err = decodeUint16(r)
if err != nil {
return VariablesSuback{}, n, err
}
for n < int(remainingLen) {
qos, err := decodeByte(r)
if err != nil {
return VariablesSuback{}, n, err
}
n++
varSuback.ReturnCodes = append(varSuback.ReturnCodes, QoSLevel(qos))
}
return varSuback, n, nil
}

// DecodeUnsubscribe implements [Decoder] interface.
func (d DecoderNoAlloc) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (varUnsub VariablesUnsubscribe, n int, err error) {
payloadDst := d.UserBuffer
Expand All @@ -188,6 +156,37 @@ func (d DecoderNoAlloc) DecodeUnsubscribe(r io.Reader, remainingLength uint32) (
return varUnsub, n, nil
}

// decodeConnack decodes a connack packet. It is the responsibility of the caller to handle a non-zero [ConnectReturnCode].
func decodeConnack(r io.Reader) (VariablesConnack, int, error) {
var buf [2]byte
n, err := readFull(r, buf[:])
if err != nil {
return VariablesConnack{}, n, err
}
varConnack := VariablesConnack{AckFlags: buf[0], ReturnCode: ConnectReturnCode(buf[1])}
if err = varConnack.validate(); err != nil {
return VariablesConnack{}, n, err
}
return varConnack, n, nil
}

// decodeSuback decodes a SUBACK packet.
func decodeSuback(r io.Reader, remainingLen uint32) (varSuback VariablesSuback, n int, err error) {
varSuback.PacketIdentifier, n, err = decodeUint16(r)
if err != nil {
return VariablesSuback{}, n, err
}
for n < int(remainingLen) {
qos, err := decodeByte(r)
if err != nil {
return VariablesSuback{}, n, err
}
n++
varSuback.ReturnCodes = append(varSuback.ReturnCodes, QoSLevel(qos))
}
return varSuback, n, nil
}

// decodeRemainingLength decodes the Remaining Length variable length integer
// in MQTT fixed headers. This value can range from 1 to 4 bytes in length and
func decodeRemainingLength(r io.Reader) (value uint32, n int, err error) {
Expand Down
15 changes: 3 additions & 12 deletions definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const (

// ConnectReturnCode represents the CONNACK return code, which is the second byte in the variable header.
// It indicates if the connection was successful (0 value) or if the connection attempt failed on the server side.
// ConnectReturnCode also implements the error interface and can be returned on a failed connection.
type ConnectReturnCode uint8

const (
Expand All @@ -119,15 +120,5 @@ const (
minInvalidReturnCode
)

type wrapErr struct {
msg string
suberr []error
}

func (w *wrapErr) Error() string {
return w.msg + " (wraps various errors)"
}

func (w *wrapErr) Unwrap() []error {
return w.suberr
}
// Error implements the error interface for a non-zero return code.
func (rc ConnectReturnCode) Error() string { return rc.String() }
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func ExampleClient_concurrent() {
}
message := <-txQueue
varPub.PacketIdentifier = uint16(rand.Int())
// Loop until message is sent succesfully. This guarantees
// Loop until message is sent successfully. This guarantees
// all messages are sent, even in events of disconnect.
for {
err := client.PublishPayload(pubFlags, varPub, message)
Expand Down
8 changes: 4 additions & 4 deletions rxtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// If there is an error after reading the first byte of a packet the transport is closed
// and a new transport must be set with [Rx.SetRxTransport].
// If OnRxError is set the underlying transport is not automatically closed and
// it becomes the callback's responsability to close the transport.
// it becomes the callback's responsibility to close the transport.
//
// Not safe for concurrent use.
type Rx struct {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (rx *Rx) ReadNextPacket() (int, error) {
break
}
var vc VariablesConnack
vc, ngot, err = rx.dec.DecodeConnack(rx.rxTrp)
vc, ngot, err = decodeConnack(rx.rxTrp)
n += ngot
if err != nil {
break
Expand Down Expand Up @@ -152,7 +152,7 @@ func (rx *Rx) ReadNextPacket() (int, error) {
break
}
var vsbck VariablesSuback
vsbck, ngot, err = rx.dec.DecodeSuback(rx.rxTrp, hdr.RemainingLength)
vsbck, ngot, err = decodeSuback(rx.rxTrp, hdr.RemainingLength)
n += ngot
if err != nil {
break
Expand Down Expand Up @@ -248,7 +248,7 @@ func (rx *Rx) exhaustReader(r io.Reader) (err error) {
// and a new transport must be set with [Tx.SetTxTransport].
// A Tx will not validate data before encoding, that is up to the caller, Malformed packets
// will be rejected and the connection will be closed immediately. If OnTxError is
// set then the underlying transport is not closed and it becomes responsability
// set then the underlying transport is not closed and it becomes responsibility
// of the callback to close the transport.
type Tx struct {
txTrp io.WriteCloser
Expand Down
22 changes: 12 additions & 10 deletions subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"strings"
)

// Subscriptions provides clients and servers with a way to manage requested
// topic published messages. Subscriptions is an abstraction over state, not
// Subscriptions is a WIP.

// subscriptions provides clients and servers with a way to manage requested
// topic published messages. subscriptions is an abstraction over state, not
// input/output operations, so calls to Subscribe should not write bytes over a transport.
type Subscriptions interface {
type subscriptions interface {
// Subscribe takes a []byte slice to make it explicit and abundantly clear that
// Subscriptions is in charge of the memory corresponding to subscription topics.
// This is to say that Subscriptions should copy topic contents into its own memory
Expand All @@ -26,13 +28,13 @@ type Subscriptions interface {

// TODO(soypat): Add AVL tree implementation like the one in github.com/soypat/go-canard, supposedly is best data structure for this [citation needed].

var _ Subscriptions = SubscriptionsMap{}
var _ subscriptions = subscriptionsMap{}

// SubscriptionsMap implements Subscriptions interface with a map.
// subscriptionsMap implements Subscriptions interface with a map.
// It performs allocations.
type SubscriptionsMap map[string]struct{}
type subscriptionsMap map[string]struct{}

func (sm SubscriptionsMap) Subscribe(topic []byte) error {
func (sm subscriptionsMap) Subscribe(topic []byte) error {
tp := string(topic)
if _, ok := sm[tp]; ok {
return errors.New("topic already exists in subscriptions")
Expand All @@ -41,15 +43,15 @@ func (sm SubscriptionsMap) Subscribe(topic []byte) error {
return nil
}

func (sm SubscriptionsMap) Unsubscribe(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
func (sm subscriptionsMap) Unsubscribe(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
return sm.match(topicFilter, userBuffer, true)
}

func (sm SubscriptionsMap) Match(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
func (sm subscriptionsMap) Match(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
return sm.match(topicFilter, userBuffer, false)
}

func (sm SubscriptionsMap) match(topicFilter string, userBuffer []byte, deleteMatches bool) (matched [][]byte, err error) {
func (sm subscriptionsMap) match(topicFilter string, userBuffer []byte, deleteMatches bool) (matched [][]byte, err error) {
n := 0 // Bytes copied into userBuffer.
filterParts := strings.Split(topicFilter, "/")
if err := validateWildcards(filterParts); err != nil {
Expand Down

0 comments on commit e738d0f

Please sign in to comment.