Skip to content

Commit

Permalink
Optimise packet capture thread
Browse files Browse the repository at this point in the history
Ensure that thread which capture packets as fast as possible.
Packet parsing logic moved to different threads.
Additionally using os.LockOsThread to reduce CPU context switching
  • Loading branch information
buger committed Jul 26, 2021
1 parent 79ff882 commit 214edb4
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 223 deletions.
85 changes: 59 additions & 26 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"syscall"
"time"

"github.com/buger/goreplay/proto"
"github.com/buger/goreplay/size"
"github.com/buger/goreplay/tcp"

Expand Down Expand Up @@ -59,9 +60,13 @@ type Listener struct {
loopIndex int
Reading chan bool // this channel is closed when the listener has started reading packets
PcapOptions
Engine EngineType
ports []uint16 // src or/and dst ports
trackResponse bool
Engine EngineType
ports []uint16 // src or/and dst ports
trackResponse bool
expiry time.Duration
allowIncomplete bool
messages chan *tcp.Message
protocol tcp.TCPProtocol

host string // pcap file name or interface (name, hardware addr, index or ip address)

Expand Down Expand Up @@ -121,7 +126,7 @@ func (eng *EngineType) String() (e string) {
// NewListener creates and initialize a new Listener. if transport or/and engine are invalid/unsupported
// is "tcp" and "pcap", are assumed. l.Engine and l.Transport can help to get the values used.
// if there is an error it will be associated with getting network interfaces
func NewListener(host string, ports []uint16, transport string, engine EngineType, trackResponse bool) (l *Listener, err error) {
func NewListener(host string, ports []uint16, transport string, engine EngineType, protocol tcp.TCPProtocol, trackResponse bool, expiry time.Duration, allowIncomplete bool) (l *Listener, err error) {
l = &Listener{}

l.host = host
Expand All @@ -139,6 +144,11 @@ func NewListener(host string, ports []uint16, transport string, engine EngineTyp
l.closeDone = make(chan struct{})
l.quit = make(chan struct{})
l.Reading = make(chan bool)
l.expiry = expiry
l.allowIncomplete = allowIncomplete
l.protocol = protocol
l.messages = make(chan *tcp.Message, 10000)

switch engine {
default:
l.Engine = EnginePcap
Expand Down Expand Up @@ -171,8 +181,8 @@ func (l *Listener) SetPcapOptions(opts PcapOptions) {
// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context, handler PacketHandler) (err error) {
l.read(handler)
func (l *Listener) Listen(ctx context.Context) (err error) {
l.read()
done := ctx.Done()
select {
case <-done:
Expand All @@ -185,11 +195,11 @@ func (l *Listener) Listen(ctx context.Context, handler PacketHandler) (err error
}

// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context, handler PacketHandler) chan error {
func (l *Listener) ListenBackground(ctx context.Context) chan error {
err := make(chan error, 1)
go func() {
defer close(err)
if e := l.Listen(ctx, handler); err != nil {
if e := l.Listen(ctx); err != nil {
err <- e
}
}()
Expand Down Expand Up @@ -332,11 +342,34 @@ func (l *Listener) SocketHandle(ifi pcap.Interface) (handle Socket, err error) {
return
}

func (l *Listener) read(handler PacketHandler) {
func http1StartHint(pckt *tcp.Packet) (isRequest, isResponse bool) {
if proto.HasRequestTitle(pckt.Payload) {
return true, false
}

if proto.HasResponseTitle(pckt.Payload) {
return false, true
}

// No request or response detected
return false, false
}

func http1EndHint(m *tcp.Message) bool {
if m.MissingChunk() {
return false
}

return proto.HasFullPayload(m, m.PacketData()...)
}

func (l *Listener) read() {
l.Lock()
defer l.Unlock()
for key, handle := range l.Handles {
go func(key string, hndl packetHandle) {
runtime.LockOSThread()

defer l.closeHandles(key)
linkSize := 14
linkType := int(layers.LinkTypeEthernet)
Expand All @@ -351,6 +384,13 @@ func (l *Listener) read(handler PacketHandler) {
}
}

messageParser := tcp.NewMessageParser(l.messages, l.ports, hndl.ips, l.expiry, l.allowIncomplete)

if l.protocol == tcp.ProtocolHTTP {
messageParser.Start = http1StartHint
messageParser.End = http1EndHint
}

timer := time.NewTicker(1 * time.Second)

for {
Expand All @@ -371,23 +411,12 @@ func (l *Listener) read(handler PacketHandler) {
ci.Timestamp = time.Now()
}

pckt, err := tcp.ParsePacket(data, linkType, linkSize, &ci, false)

if err == nil {
for _, p := range l.ports {
if pckt.DstPort == p {
for _, ip := range hndl.ips {
if pckt.DstIP.Equal(ip) {
pckt.Direction = tcp.DirIncoming
break
}
}
break
}
}

handler(pckt)
}
messageParser.PacketHandler(&tcp.PcapPacket{
Data: data,
LType: linkType,
LTypeLen: linkSize,
Ci: &ci,
})
continue
}
if enext, ok := err.(pcap.NextError); ok && enext == pcap.NextErrorTimeoutExpired {
Expand All @@ -413,6 +442,10 @@ func (l *Listener) read(handler PacketHandler) {
close(l.Reading)
}

func (l *Listener) Messages() chan *tcp.Message {
return l.messages
}

func (l *Listener) closeHandles(key string) {
l.Lock()
defer l.Unlock()
Expand Down
69 changes: 4 additions & 65 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,14 @@ import (
"github.com/buger/goreplay/tcp"
)

// TCPProtocol is a number to indicate type of protocol
type TCPProtocol uint8

const (
// ProtocolHTTP ...
ProtocolHTTP TCPProtocol = iota
// ProtocolBinary ...
ProtocolBinary
)

// Set is here so that TCPProtocol can implement flag.Var
func (protocol *TCPProtocol) Set(v string) error {
switch v {
case "", "http":
*protocol = ProtocolHTTP
case "binary":
*protocol = ProtocolBinary
default:
return fmt.Errorf("unsupported protocol %s", v)
}
return nil
}

func (protocol *TCPProtocol) String() string {
switch *protocol {
case ProtocolBinary:
return "binary"
case ProtocolHTTP:
return "http"
default:
return ""
}
}

// RAWInputConfig represents configuration that can be applied on raw input
type RAWInputConfig struct {
capture.PcapOptions
Expire time.Duration `json:"input-raw-expire"`
CopyBufferSize size.Size `json:"copy-buffer-size"`
Engine capture.EngineType `json:"input-raw-engine"`
TrackResponse bool `json:"input-raw-track-response"`
Protocol TCPProtocol `json:"input-raw-protocol"`
Protocol tcp.TCPProtocol `json:"input-raw-protocol"`
RealIPHeader string `json:"input-raw-realip-header"`
Stats bool `json:"input-raw-stats"`
AllowIncomplete bool `json:"input-raw-allow-incomplete"`
Expand Down Expand Up @@ -117,7 +83,7 @@ func (i *RAWInput) PluginRead() (*Message, error) {
select {
case <-i.quit:
return nil, ErrorStopped
case msgTCP = <-i.messageParser.Messages():
case msgTCP = <-i.listener.Messages():
msg.Data = msgTCP.Data()
}

Expand All @@ -142,14 +108,13 @@ func (i *RAWInput) PluginRead() (*Message, error) {
stat := msgTCP.Stats
go i.addStats(stat)
}
msgTCP.Finalize()
msgTCP = nil
return &msg, nil
}

func (i *RAWInput) listen(address string) {
var err error
i.listener, err = capture.NewListener(i.host, i.ports, "", i.Engine, i.TrackResponse)
i.listener, err = capture.NewListener(i.host, i.ports, "", i.Engine, i.Protocol, i.TrackResponse, i.Expire, i.AllowIncomplete)
if err != nil {
log.Fatal(err)
}
Expand All @@ -158,15 +123,10 @@ func (i *RAWInput) listen(address string) {
if err != nil {
log.Fatal(err)
}
i.messageParser = tcp.NewMessageParser(i.CopyBufferSize, i.Expire, i.AllowIncomplete, Debug)

if i.Protocol == ProtocolHTTP {
i.messageParser.Start = http1StartHint
i.messageParser.End = http1EndHint
}
var ctx context.Context
ctx, i.cancelListener = context.WithCancel(context.Background())
errCh := i.listener.ListenBackground(ctx, i.messageParser.PacketHandler)
errCh := i.listener.ListenBackground(ctx)
<-i.listener.Reading
Debug(1, i)
go func() {
Expand Down Expand Up @@ -210,24 +170,3 @@ func (i *RAWInput) addStats(mStats tcp.Stats) {
i.messageStats = append(i.messageStats, mStats)
i.Unlock()
}

func http1StartHint(pckt *tcp.Packet) (isRequest, isResponse bool) {
if proto.HasRequestTitle(pckt.Payload) {
return true, false
}

if proto.HasResponseTitle(pckt.Payload) {
return false, true
}

// No request or response detected
return false, false
}

func http1EndHint(m *tcp.Message) bool {
if m.MissingChunk() {
return false
}

return proto.HasFullPayload(m, m.PacketData()...)
}
11 changes: 6 additions & 5 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/buger/goreplay/capture"
"github.com/buger/goreplay/proto"
"github.com/buger/goreplay/tcp"
)

const testRawExpire = time.Millisecond * 200
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestRAWInputIPv4(t *testing.T) {
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Expire: 0,
Protocol: ProtocolHTTP,
Protocol: tcp.ProtocolHTTP,
TrackResponse: true,
RealIPHeader: "X-Real-IP",
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestRAWInputNoKeepAlive(t *testing.T) {
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Expire: testRawExpire,
Protocol: ProtocolHTTP,
Protocol: tcp.ProtocolHTTP,
TrackResponse: true,
}
input := NewRAWInput(":"+port, conf)
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestRAWInputIPv6(t *testing.T) {
var respCounter, reqCounter int64
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Protocol: ProtocolHTTP,
Protocol: tcp.ProtocolHTTP,
TrackResponse: true,
}
input := NewRAWInput(originAddr, conf)
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestInputRAWChunkedEncoding(t *testing.T) {
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Expire: time.Second,
Protocol: ProtocolHTTP,
Protocol: tcp.ProtocolHTTP,
TrackResponse: true,
AllowIncomplete: true,
}
Expand Down Expand Up @@ -315,7 +316,7 @@ func BenchmarkRAWInputWithReplay(b *testing.B) {
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Expire: testRawExpire,
Protocol: ProtocolHTTP,
Protocol: tcp.ProtocolHTTP,
TrackResponse: true,
}
input := NewRAWInput(originAddr, conf)
Expand Down
Loading

0 comments on commit 214edb4

Please sign in to comment.