Skip to content

Commit

Permalink
Fix zero delta count for first record
Browse files Browse the repository at this point in the history
IsPresent field of connection should be updated before having
IsConnectionDying check otherwise IsConnectionDying will always
return true, which makes existing connections cannot be updated and
octetDeltaCount always return 0.

This commit also changes the delta count of first record from zero
to its total delta count, modifies throughput calculation of first
record in logstash config and changes names of thoughput diagram
from 'throughput' to 'cumulative bytes'.
  • Loading branch information
zyiou committed Apr 15, 2021
1 parent b118f2d commit b2963d8
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 91 deletions.
2 changes: 1 addition & 1 deletion build/yamls/elk-flow-collector/elk-flow-collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ spec:
- name: ELASTICSEARCH_URL
value: "http://elasticsearch:9200"
- name: KIBANA_DEFAULTAPPID
value: "dashboard/653cf1e0-2fd2-11e7-99ed-49759aed30f5"
value: "dashboard/3b331b30-b987-11ea-b16e-fb06687c3589"
- name: LOGGING_QUIET
value: "true"
ports:
Expand Down
120 changes: 60 additions & 60 deletions build/yamls/elk-flow-collector/kibana.ndjson

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions build/yamls/elk-flow-collector/logstash/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# register accepts the hashmap passed to "script_params"
# it runs once at startup
def register(params)
@interval = params["interval"]
@@time_map = Hash.new
end

Expand Down Expand Up @@ -93,9 +92,12 @@ def filter(event)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / duration.to_i)
@@time_map[key] = t
else
@@time_map[key] = DateTime.strptime(event.get("[ipfix][flowEndSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
event.set("[ipfix][throughput]", event.get("[ipfix][octetDeltaCountFromSourceNode]").to_i / @interval.to_i)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / @interval.to_i)
startTime = DateTime.strptime(event.get("[ipfix][flowStartSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
endTime = DateTime.strptime(event.get("[ipfix][flowEndSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
duration = endTime-startTime
event.set("[ipfix][throughput]", event.get("[ipfix][octetDeltaCountFromSourceNode]").to_i / duration.to_i)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / duration.to_i)
@@time_map[key] = endTime
end
return [event]
end
1 change: 0 additions & 1 deletion build/yamls/elk-flow-collector/logstash/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ input {
filter {
ruby {
path => "/usr/share/logstash/config/filter.rb"
script_params => { "interval" => 60 }
}
}

Expand Down
7 changes: 0 additions & 7 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,6 @@ or
svn export https://github.com/vmware-tanzu/antrea/trunk/build/yamls/elk-flow-collector/
```

To configure the export interval as `flowExportInterval` in [Configuration](#configuration),
modify the `interval` value (in seconds) in `elk-flow-collector/logstash/logstash.conf`

```conf
script_params => { "interval" => 60 }
```

To create the required K8s resources in the `elk-flow-collector` folder and get
everything up-and-running, run following commands:

Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
existingConn, exists := cs.connections[connKey]

if exists {
existingConn.IsPresent = true
// avoid updating stats of the existing connection that is about to close
if flowexporter.IsConnectionDying(existingConn) {
existingConn.IsPresent = true
return
}
// Update the necessary fields that are used in generating flow records.
Expand All @@ -133,7 +133,6 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
existingConn.ReverseBytes = conn.ReverseBytes
existingConn.ReversePackets = conn.ReversePackets
existingConn.TCPState = conn.TCPState
existingConn.IsPresent = true
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
Expand Down
20 changes: 4 additions & 16 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,13 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "octetTotalCount":
ie.Value = record.Conn.OriginalBytes
case "packetDeltaCount":
deltaPkts := int64(0)
if record.PrevPackets != 0 {
deltaPkts = int64(record.Conn.OriginalPackets) - int64(record.PrevPackets)
}
deltaPkts := int64(record.Conn.OriginalPackets) - int64(record.PrevPackets)
if deltaPkts < 0 {
klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts)
}
ie.Value = uint64(deltaPkts)
case "octetDeltaCount":
deltaBytes := int64(0)
if record.PrevBytes != 0 {
deltaBytes = int64(record.Conn.OriginalBytes) - int64(record.PrevBytes)
}
deltaBytes := int64(record.Conn.OriginalBytes) - int64(record.PrevBytes)
if deltaBytes < 0 {
klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes)
}
Expand All @@ -446,19 +440,13 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "reverseOctetTotalCount":
ie.Value = record.Conn.ReverseBytes
case "reversePacketDeltaCount":
deltaPkts := int64(0)
if record.PrevReversePackets != 0 {
deltaPkts = int64(record.Conn.ReversePackets) - int64(record.PrevReversePackets)
}
deltaPkts := int64(record.Conn.ReversePackets) - int64(record.PrevReversePackets)
if deltaPkts < 0 {
klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts)
}
ie.Value = uint64(deltaPkts)
case "reverseOctetDeltaCount":
deltaBytes := int64(0)
if record.PrevReverseBytes != 0 {
deltaBytes = int64(record.Conn.ReverseBytes) - int64(record.PrevReverseBytes)
}
deltaBytes := int64(record.Conn.ReverseBytes) - int64(record.PrevReverseBytes)
if deltaBytes < 0 {
klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes)
}
Expand Down

0 comments on commit b2963d8

Please sign in to comment.