Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throughput/octetDeltaCount issue in flow visualization #2089

Merged
merged 1 commit into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/yamls/elk-flow-collector/elk-flow-collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ spec:
- name: ELASTICSEARCH_URL
value: "http://elasticsearch:9200"
- name: KIBANA_DEFAULTAPPID
value: "dashboard/653cf1e0-2fd2-11e7-99ed-49759aed30f5"
value: "dashboard/3b331b30-b987-11ea-b16e-fb06687c3589"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as Antonin. Why is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found that dashboard cannot be found error popping up after importing the Kibana ndjson file.

- name: LOGGING_QUIET
value: "true"
ports:
Expand Down
120 changes: 60 additions & 60 deletions build/yamls/elk-flow-collector/kibana.ndjson

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions build/yamls/elk-flow-collector/logstash/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# register accepts the hashmap passed to "script_params"
# it runs once at startup
def register(params)
@interval = params["interval"]
@@time_map = Hash.new
end

Expand Down Expand Up @@ -93,9 +92,12 @@ def filter(event)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / duration.to_i)
@@time_map[key] = t
else
@@time_map[key] = DateTime.strptime(event.get("[ipfix][flowEndSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
event.set("[ipfix][throughput]", event.get("[ipfix][octetDeltaCountFromSourceNode]").to_i / @interval.to_i)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / @interval.to_i)
startTime = DateTime.strptime(event.get("[ipfix][flowStartSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
endTime = DateTime.strptime(event.get("[ipfix][flowEndSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
duration = endTime-startTime
event.set("[ipfix][throughput]", event.get("[ipfix][octetDeltaCountFromSourceNode]").to_i / duration.to_i)
event.set("[ipfix][reverseThroughput]", event.get("[ipfix][reverseOctetDeltaCountFromSourceNode]").to_i / duration.to_i)
@@time_map[key] = endTime
end
return [event]
end
1 change: 0 additions & 1 deletion build/yamls/elk-flow-collector/logstash/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ input {
filter {
ruby {
path => "/usr/share/logstash/config/filter.rb"
script_params => { "interval" => 60 }
}
}

Expand Down
7 changes: 0 additions & 7 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,6 @@ or
svn export https://github.com/vmware-tanzu/antrea/trunk/build/yamls/elk-flow-collector/
```

To configure the export interval as `flowExportInterval` in [Configuration](#configuration),
modify the `interval` value (in seconds) in `elk-flow-collector/logstash/logstash.conf`

```conf
script_params => { "interval" => 60 }
```

To create the required K8s resources in the `elk-flow-collector` folder and get
everything up-and-running, run following commands:

Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
existingConn, exists := cs.connections[connKey]

if exists {
existingConn.IsPresent = true
// avoid updating stats of the existing connection that is about to close
if flowexporter.IsConnectionDying(existingConn) {
existingConn.IsPresent = true
return
}
// Update the necessary fields that are used in generating flow records.
Expand All @@ -133,7 +133,6 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
existingConn.ReverseBytes = conn.ReverseBytes
existingConn.ReversePackets = conn.ReversePackets
existingConn.TCPState = conn.TCPState
existingConn.IsPresent = true
klog.V(4).Infof("Antrea flow updated: %v", existingConn)
} else {
// sourceIP/destinationIP are mapped only to local pods and not remote pods.
Expand Down
20 changes: 4 additions & 16 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,13 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "octetTotalCount":
ie.Value = record.Conn.OriginalBytes
case "packetDeltaCount":
deltaPkts := int64(0)
if record.PrevPackets != 0 {
deltaPkts = int64(record.Conn.OriginalPackets) - int64(record.PrevPackets)
}
deltaPkts := int64(record.Conn.OriginalPackets) - int64(record.PrevPackets)
if deltaPkts < 0 {
klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts)
}
ie.Value = uint64(deltaPkts)
case "octetDeltaCount":
deltaBytes := int64(0)
if record.PrevBytes != 0 {
deltaBytes = int64(record.Conn.OriginalBytes) - int64(record.PrevBytes)
}
deltaBytes := int64(record.Conn.OriginalBytes) - int64(record.PrevBytes)
if deltaBytes < 0 {
klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes)
}
Expand All @@ -446,19 +440,13 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "reverseOctetTotalCount":
ie.Value = record.Conn.ReverseBytes
case "reversePacketDeltaCount":
deltaPkts := int64(0)
if record.PrevReversePackets != 0 {
deltaPkts = int64(record.Conn.ReversePackets) - int64(record.PrevReversePackets)
}
deltaPkts := int64(record.Conn.ReversePackets) - int64(record.PrevReversePackets)
if deltaPkts < 0 {
klog.Warningf("Packet delta count for connection should not be negative: %d", deltaPkts)
}
ie.Value = uint64(deltaPkts)
case "reverseOctetDeltaCount":
deltaBytes := int64(0)
if record.PrevReverseBytes != 0 {
deltaBytes = int64(record.Conn.ReverseBytes) - int64(record.PrevReverseBytes)
}
deltaBytes := int64(record.Conn.ReverseBytes) - int64(record.PrevReverseBytes)
if deltaBytes < 0 {
klog.Warningf("Byte delta count for connection should not be negative: %d", deltaBytes)
}
Expand Down