Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support NATS #270

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

feat: support NATS #270

wants to merge 4 commits into from

Conversation

mannkafai
Copy link
Contributor

This will close #206

Copy link

vercel bot commented Jan 16, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
kyanos ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 18, 2025 3:05pm

@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. enhancement New feature or request labels Jan 16, 2025
bpf/protocol_inference.h Outdated Show resolved Hide resolved
bpf/protocol_inference.h Outdated Show resolved Hide resolved
@mannkafai
Copy link
Contributor Author

@hengyoush All the tests are stuck in Test filter by l3/l4 info. There are multiple IPs returned when executing dig example.com +short. Can you help me take a look?

+ test_filter_by_remote_ip
++ dig example.com +short
+ remote_ip='23.192.228.84
23.215.0.136
23.215.0.138
96.7.128.175
96.7.128.198
23.192.228.80'

if !ok1 || !ok2 {
return []protocol.Record{}
}
return protocol.MatchByTimestamp(reqStream, respStream)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the NATS protocol, not all requests necessarily have responses (https://docs.nats.io/reference/reference-protocols/nats-protocol#ok-err). Is it possible that solely using MatchByTimestamp might result in losing records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Only [H]PUB or [H]MSG with reply-to and PING/PONG are matching request/response. Both NATS server and client could start with PING, I'm not sure it's possible to distinguish which side starts with PING.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's possible to distinguish which side starts with PING.

I wonder if it is really necessary to distinguish which side initiated the ping. For ping/pong, can we match them using the MatchByTimestamp logic?

For other types of messages, we can traverse both respStream and reqStream simultaneously. If the respMessage is ok/err, then it forms a record with the reqMessage.
If the respMessage is not ok/err, then we need to output a record based on the timestamps of the current reqMessage and respMessage. If reqMessage.Ts < respMessage.Ts, output a record with only the req (resp is empty); otherwise, output a record with only the resp.

}

func (parser *NatsStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int {
return 0
Copy link
Owner

@hengyoush hengyoush Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can determine the boundary based on the message type (nats protocol messages) (actually what you did in protocol_inference.h)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will fix this

}
}
offset := index - len(method)
msg, prasedLen := natsParser.Parse(buffer[offset:], messageType)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo here: prasedLen => parsedLen

if packetLen < 0 {
return nil, -1
}
parts := splitFields(payload[:packetLen-len(_CRLF_)])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check slice bound here?(len(payload) < packetLen-len(_CRLF_))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's no necessary. packetLen means the first line length in payload(if contains \r\n), len(payload) >= packetLen always true.

@@ -127,6 +127,141 @@ static __always_inline enum message_type_t is_http_protocol(const char *old_buf,
return kUnknown;
}

static __always_inline bool is_nats_info(const char *old_buf, size_t count) {
if (count < 6)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check for less than 6 redundant, since 20 bytes will be read below?

}

static __always_inline bool is_nats_connect(const char *old_buf, size_t count) {
if (count < 8)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

bpf_probe_read_user(buf, 20, old_buf);

#pragma unroll
for (size_t p = 5; p < 20; p++) {
Copy link
Owner

@hengyoush hengyoush Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this loop check be omitted? If so, we can determine nats message type with just one bpf_probe_read_user call to reduce the num of instructions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure is other protocols start whit INFO or CONNECT as NATS did, if not I think this loop can be omitted.

@hengyoush
Copy link
Owner

hengyoush commented Jan 19, 2025

@hengyoush All the tests are stuck in Test filter by l3/l4 info. There are multiple IPs returned when executing dig example.com +short. Can you help me take a look?

Indeed, this need to be fixed with adding a tail/head -n 1, I have fixed in main branch.

Copy link

@ccoVeille ccoVeille left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I TBI k I could continue but it's a good start

Comment on lines +4 to +7
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
"slices"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
"slices"
"slices"
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"

Comment on lines +3 to +13
import (
"bytes"
"encoding/json"
"fmt"
"kyanos/agent/buffer"
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
"strconv"
"strings"
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix import orders

)

const (
_CRLF_ = "\r\n"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use uppercase constant in Go

Comment on lines +138 to +141
if msg.ProtocolCode == PING || msg.ProtocolCode == PONG {
return 1
}
return 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a switch

Comment on lines +173 to +175
// if strings.ToUpper(string(parts[0])) != "INFO" {
// return nil, packetLen
// }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented code is always suspicious.

Either it could have been removed, or something was temporarily commented for debug and it was left commented while it shouldn't


func (m *Hmsg) Parse(payload []byte, messageType protocol.MessageType) (*NatsMessage, int) {
common.ProtocolParserLog.Debugf("NATS Parse Hmsg:%d, %x", len(payload), string(payload))
msg, prasedLen := m.ParseData(payload)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsedLen

Comment on lines +642 to +644
// if !strings.HasPrefix(strings.ToUpper(string(payload[:4])), "PING") {
// return nil, packetLen
// }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again

}

func init() {
natsCmd.Flags().StringSlice("protocols", []string{}, "Specify the nats protocol to monitor(PUB, SUB, MSG), seperate by ','")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
natsCmd.Flags().StringSlice("protocols", []string{}, "Specify the nats protocol to monitor(PUB, SUB, MSG), seperate by ','")
natsCmd.Flags().StringSlice("protocols", []string{}, "Specify the NATS protocol to monitor (PUB, SUB, MSG), seperate by ','")


func init() {
natsCmd.Flags().StringSlice("protocols", []string{}, "Specify the nats protocol to monitor(PUB, SUB, MSG), seperate by ','")
natsCmd.Flags().StringSlice("subjects", []string{}, "Specify the nats subject to monitor, seperate by ','")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
natsCmd.Flags().StringSlice("subjects", []string{}, "Specify the nats subject to monitor, seperate by ','")
natsCmd.Flags().StringSlice("subjects", []string{}, "Specify the NATS subject to monitor, seperate by ','")

interval := flag.Duration("interval", 5*time.Second, "Interval between messages")
flag.Parse()

// 连接到NATS服务器

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use plain English comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for NATS protocol parsing
3 participants