Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New client is skipping small events and errors on big events #7

Open
halsbox opened this issue Feb 9, 2024 · 10 comments
Open

New client is skipping small events and errors on big events #7

halsbox opened this issue Feb 9, 2024 · 10 comments
Assignees
Labels
bug Something isn't working

Comments

@halsbox
Copy link

halsbox commented Feb 9, 2024

func readPack(conn net.Conn, buf []byte) (string, error) {
	var packet string
	for {
		n, err := conn.Read(buf)
		if err != nil {
			return "", fmt.Errorf("%w: failed read: %s", ErrConn, err)
		}
		packet += string(buf[:n])
		if len(packet) > len(buf) {
			return "", fmt.Errorf("%w: too long input: %d", ErrAMI, len(packet))
		}
		if strings.HasSuffix(packet, "\r\n\r\n") {
			return packet, nil
		}
	}
}

In this fragment if AMI event is larger than buf, you'll get "too long input" error.
In my case AMI messages are going continuously and often conn.Read(buf) reads the whole AMI message together with the beginning of the next AMI message, so there is no "\r\n\r\n" in the end of packet value and finally function errors with "too long input" in next loop.

@staskobzar staskobzar self-assigned this Apr 12, 2024
@staskobzar staskobzar added the bug Something isn't working label Apr 12, 2024
@staskobzar
Copy link
Owner

@halsbox thanks for reporting this and sorry for delay
you are right, this is a wrong behaviour
I will try to fix it soon

staskobzar added a commit that referenced this issue Apr 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
@halsbox
Copy link
Author

halsbox commented Apr 12, 2024

@staskobzar Hi!
I've reviewed your fix and I'm not sure it actually fixes the problem. The point is you are not guarantied to have \r\n\r\n as a suffix of a packet assembled by packet += string(buf[:n]).
As a workaround, I do all the job in the main loop, keeping packet variable outside the conn.Read loop and splitting packet on \r\n\r\n as separator and then parsing each part as it arrives.
Like this:

func (c *Client) Run() {
  go func() {
    buf := make([]byte, bufSize)
    for {
      if c.conn != nil {
        break
      }
      time.Sleep(netTimeout)
    }
    c.ReadForever()
    var packet string
    for {
      n, err := c.conn.Read(buf)
      if err != nil {
        switch {
        case errors.Is(err, syscall.ECONNABORTED):
          fallthrough
        case errors.Is(err, syscall.ECONNRESET):
          fallthrough
        case errors.Is(err, syscall.ECONNREFUSED):
          fallthrough
        case err == io.EOF:
          c.emitNetErr(ErrEOF)
          <-c.waitNewConnection
        //Pause reading when connection is not available while
        //reconnecting and reconnection is triggered from network error
        //in separate AMI Action Ping goroutine.
        case errors.Is(err, os.ErrDeadlineExceeded):
          c.emitNetErr(ErrDead)
          <-c.waitNewConnection
          _ = c.conn.SetReadDeadline(time.Time{})
        default:
          c.emitErr(fmt.Errorf("%w: conn read error: %s", Error, err))
        }
        continue
      }
      // We are not guarantied to have `\r\n\r\n` right at the end of data that was read.
      // We even may have the end of one message, one small but whole message
      // and the beginning of the third message in the buf.
      parts := bytes.SplitAfter(buf[:n], []byte("\r\n\r\n"))
      // append one whole message to empty packet or the tail of the big message
      // to packet whith previous parts of big message.
      packet += string(parts[0])
      if len(parts) > 1 {
        // whole message is in the packet so we parse it
        msg, err := Parse(packet)
        if err != nil {
          c.emitErr(fmt.Errorf("%w: failed to parse AMI message: %s", ErrAMI, err))
        } else {
          c.emitMsg(msg)
        }
        // parse other parts except the last one
        for i := 1; i < len(parts)-1; i++ {
          msg, err := Parse(string(parts[i]))
          if err != nil {
            c.emitErr(fmt.Errorf("%w: failed to parse AMI message: %s", ErrAMI, err))
          } else {
            c.emitMsg(msg)
          }
        }
        // Assign last part to packet.
        // Last part will always be either empty string or first fragment
        // of upcoming message because of SplitAfter behavior.
        packet = string(parts[len(parts)-1])
      }
    }
  }()
}

@staskobzar
Copy link
Owner

@halsbox
I think this is AMI who should guaranty terminating "\r\n\r\n" to the packet.
We need just read and return an error if the packet is invalid or there is a connection problem.

the fix is just read till "\r\n\r\n" and then returns the packet.

@staskobzar staskobzar reopened this Apr 12, 2024
@halsbox
Copy link
Author

halsbox commented Apr 12, 2024

@staskobzar
We have asterisk 18.x under heavy load and it's generating a lot of AMI messages that we need to consume. It's a common case when packet is not ending with \r\n\r\n. It takes less then a second from starting of connection to get error on buf data not ending with \r\n\r\n. I even had to use my parsing code in the login function cause 8 times of 10 I had login reply received to buf with the first part of the following Event message.

@halsbox
Copy link
Author

halsbox commented Apr 12, 2024

I did a lot of experimenting on this, playing with different buf sizes from very small to huge and also different asterisk versions from 14.x to 20.x and came to conclusion that you either have to read from connection in portions and then split on \r\n\r\n or read byte-by-byte until \r\n\r\n as it was done in goami2 1.6 where textproto.Reader does this under the hood in ReadMIMEHeader().

@staskobzar
Copy link
Owner

@halsbox I see what you mean
I remember now I have this problem when network packet has two AMI packets with \r\n\r\n in the middle
You are right, I will have to rework it
thank you

@halsbox
Copy link
Author

halsbox commented Apr 12, 2024

@halsbox I think this is AMI who should guaranty terminating "\r\n\r\n" to the packet.

To be clear, I mean that every AMI Message is terminated with "\r\n\r\n", but when reading from connection with Read([]byte) we can get any portion of stream of AMI messages. It can be a whole message terminated by "\r\n\r\n" if there is enough delay between messages or it can be an ending part (with "\r\n\r\n") of big message that did not feat in a buf size in previous loop, followed by very small message with it's "\r\n\r\n" and followed by beginning of the third message that will be cut in the middle and next part of which will be received in next loop of Read([]byte).

@halsbox
Copy link
Author

halsbox commented Apr 12, 2024

One more thing I'd like to share:
In my tests under heavy load, reading in portions and then splitting on terminator strangely consumed less CPU then reading byte by byte until terminator by custom function or by textproto.Reader. Unfortunately I had no time to investigate the reasons. Currently I use reading byte by byte until '\n' only before login in AMI prompt prefix verification (like: bytes.Equal(label[:22], []byte("Asterisk Call Manager/")))

@staskobzar
Copy link
Owner

@halsbox thanks, I will keep in mind
will share here when I have some results

@staskobzar
Copy link
Owner

there is new v1.7.4 release where I have tried to fix all we have discussed here.
I was using your idea of reading from connection as a stream and scan for the terminating \r\n\r\n even if it is in the middle of network packet.
Also, considering your last comment I am using reading by line and it seems to give a good performance rate.
Take a look when you have a time and let me know if you have any comment.
Thanks your help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants