Skip to content

Commit

Permalink
TCP layer drop connection state on gap
Browse files Browse the repository at this point in the history
Drop and reinitialize connection state on gap, so application layer parser
will restarted (after detecting gap).
  • Loading branch information
urso committed Dec 10, 2015
1 parent a1e5da1 commit 4f444b5
Showing 1 changed file with 87 additions and 68 deletions.
155 changes: 87 additions & 68 deletions packetbeat/protos/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (tcp *Tcp) decideProtocol(tuple *common.IpPortTuple) protos.Protocol {
return protos.UnknownProtocol
}

func (tcp *Tcp) getStream(k common.HashableIpPortTuple) *TcpStream {
func (tcp *Tcp) findStream(k common.HashableIpPortTuple) *TcpConnection {
v := tcp.streams.Get(k)
if v != nil {
return v.(*TcpStream)
return v.(*TcpConnection)
}
return nil
}

type TcpStream struct {
type TcpConnection struct {
id uint32
tuple *common.IpPortTuple
protocol protos.Protocol
Expand All @@ -75,117 +75,136 @@ type TcpStream struct {
data protos.ProtocolData
}

func (stream *TcpStream) String() string {
type TcpStream struct {
conn *TcpConnection
dir uint8
}

func (conn *TcpConnection) String() string {
return fmt.Sprintf("TcpStream id[%d] tuple[%s] protocol[%s] lastSeq[%d %d]",
stream.id, stream.tuple, stream.protocol, stream.lastSeq[0], stream.lastSeq[1])
conn.id, conn.tuple, conn.protocol, conn.lastSeq[0], conn.lastSeq[1])
}

func (stream *TcpStream) addPacket(pkt *protos.Packet, tcphdr *layers.TCP, original_dir uint8) {
mod := stream.tcp.protocols.GetTcp(stream.protocol)
func (stream *TcpStream) addPacket(pkt *protos.Packet, tcphdr *layers.TCP) {
conn := stream.conn
mod := conn.tcp.protocols.GetTcp(conn.protocol)
if mod == nil {
if isDebug {
protocol := conn.protocol
debugf("Ignoring protocol for which we have no module loaded: %s",
stream.protocol)
protocol)
}
return
}

if len(pkt.Payload) > 0 {
stream.data = mod.Parse(pkt, &stream.tcptuple, original_dir, stream.data)
conn.data = mod.Parse(pkt, &conn.tcptuple, stream.dir, conn.data)
}

if tcphdr.FIN {
stream.data = mod.ReceivedFin(&stream.tcptuple, original_dir, stream.data)
conn.data = mod.ReceivedFin(&conn.tcptuple, stream.dir, conn.data)
}
}

func (stream *TcpStream) gapInStream(original_dir uint8, nbytes int) (drop bool) {
mod := stream.tcp.protocols.GetTcp(stream.protocol)
stream.data, drop = mod.GapInStream(&stream.tcptuple, original_dir, nbytes, stream.data)
func (stream *TcpStream) gapInStream(nbytes int) (drop bool) {
conn := stream.conn
mod := conn.tcp.protocols.GetTcp(conn.protocol)
conn.data, drop = mod.GapInStream(&conn.tcptuple, stream.dir, nbytes, conn.data)
return drop
}

func tcpSeqBefore(seq1 uint32, seq2 uint32) bool {
return int32(seq1-seq2) < 0
}

func tcpSeqBeforeEq(seq1 uint32, seq2 uint32) bool {
return int32(seq1-seq2) <= 0
}

func (tcp *Tcp) Process(tcphdr *layers.TCP, pkt *protos.Packet) {

// This Recover should catch all exceptions in
// protocol modules.
defer logp.Recover("Process tcp exception")

stream := tcp.getStream(pkt.Tuple.Hashable())
var original_dir uint8 = TcpDirectionOriginal
created := false
if stream == nil {
stream = tcp.getStream(pkt.Tuple.RevHashable())
if stream == nil {
protocol := tcp.decideProtocol(&pkt.Tuple)
if protocol == protos.UnknownProtocol {
// don't follow
return
}

timeout := time.Duration(0)
mod := tcp.protocols.GetTcp(protocol)
if mod != nil {
timeout = mod.ConnectionTimeout()
}

if isDebug {
debugf("Stream doesn't exist, creating new")
}

// create
stream = &TcpStream{id: tcp.getId(), tuple: &pkt.Tuple, protocol: protocol, tcp: tcp}
stream.tcptuple = common.TcpTupleFromIpPort(stream.tuple, stream.id)
tcp.streams.PutWithTimeout(pkt.Tuple.Hashable(), stream, timeout)
created = true
} else {
original_dir = TcpDirectionReverse
}
stream, created := tcp.getStream(pkt)
if stream.conn == nil {
return
}
conn := stream.conn

tcp_start_seq := tcphdr.Seq
tcp_seq := tcp_start_seq + uint32(len(pkt.Payload))

lastSeq := conn.lastSeq[stream.dir]
if isDebug {
debugf("pkt.start_seq=%v pkt.last_seq=%v stream.last_seq=%v (len=%d)",
tcp_start_seq, tcp_seq, stream.lastSeq[original_dir], len(pkt.Payload))
tcp_start_seq, tcp_seq, lastSeq, len(pkt.Payload))
}

if len(pkt.Payload) > 0 &&
stream.lastSeq[original_dir] != 0 {

if tcpSeqBeforeEq(tcp_seq, stream.lastSeq[original_dir]) {
if len(pkt.Payload) > 0 && lastSeq != 0 {
if tcpSeqBeforeEq(tcp_seq, lastSeq) {
if isDebug {
debugf("Ignoring what looks like a retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
tcphdr.Seq, len(pkt.Payload), stream.lastSeq[original_dir])
debugf("Ignoring retransmitted segment. pkt.seq=%v len=%v stream.seq=%v",
tcphdr.Seq, len(pkt.Payload), lastSeq)
}
return
}

if tcpSeqBefore(stream.lastSeq[original_dir], tcp_start_seq) {
if tcpSeqBefore(lastSeq, tcp_start_seq) {
if !created {
logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d", stream.lastSeq[original_dir], tcp_start_seq)
drop := stream.gapInStream(original_dir,
int(tcp_start_seq-stream.lastSeq[original_dir]))
gap := int(tcp_start_seq - lastSeq)
logp.Warn("Gap in tcp stream. last_seq: %d, seq: %d, gap: %d", lastSeq, tcp_start_seq, gap)
drop := stream.gapInStream(gap)
if drop {
if isDebug {
debugf("Dropping stream because of gap")
debugf("Dropping connection state because of gap")
}
tcp.streams.Delete(stream.tuple.Hashable())

// drop application layer connection state and
// update stream_id for app layer analysers using stream_id for lookups
conn.id = tcp.getId()
conn.data = nil
}
}
}
}
stream.lastSeq[original_dir] = tcp_seq

stream.addPacket(pkt, tcphdr, original_dir)
conn.lastSeq[stream.dir] = tcp_seq
stream.addPacket(pkt, tcphdr)
}

func (tcp *Tcp) getStream(pkt *protos.Packet) (stream TcpStream, created bool) {
if conn := tcp.findStream(pkt.Tuple.Hashable()); conn != nil {
return TcpStream{conn: conn, dir: TcpDirectionOriginal}, false
}

if conn := tcp.findStream(pkt.Tuple.RevHashable()); conn != nil {
return TcpStream{conn: conn, dir: TcpDirectionReverse}, false
}

protocol := tcp.decideProtocol(&pkt.Tuple)
if protocol == protos.UnknownProtocol {
// don't follow
return TcpStream{}, false
}

timeout := time.Duration(0)
mod := tcp.protocols.GetTcp(protocol)
if mod != nil {
timeout = mod.ConnectionTimeout()
}

if isDebug {
debugf("Connection doesn't exist, creating new")
}

conn := &TcpConnection{
id: tcp.getId(),
tuple: &pkt.Tuple,
protocol: protocol,
tcp: tcp}
conn.tcptuple = common.TcpTupleFromIpPort(conn.tuple, conn.id)
tcp.streams.PutWithTimeout(pkt.Tuple.Hashable(), conn, timeout)
return TcpStream{conn: conn, dir: TcpDirectionOriginal}, true
}

func tcpSeqBefore(seq1 uint32, seq2 uint32) bool {
return int32(seq1-seq2) < 0
}

func tcpSeqBeforeEq(seq1 uint32, seq2 uint32) bool {
return int32(seq1-seq2) <= 0
}

func buildPortsMap(plugins map[protos.Protocol]protos.TcpProtocolPlugin) (map[uint16]protos.Protocol, error) {
Expand Down

0 comments on commit 4f444b5

Please sign in to comment.