Skip to content

Commit

Permalink
Implement #501. Add -X <extlist> option for sfcapd
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Jan 22, 2024
1 parent 4b961bd commit e325de6
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 80 deletions.
12 changes: 11 additions & 1 deletion man/sfcapd.1
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
\" Copyright (c) 2023, Peter Haag
\" Copyright (c) 2024, Peter Haag
.\" All rights reserved.
.\"
.\" Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -59,6 +59,7 @@
.Op Fl m Ar metricpath
.Op Fl e
.Op Fl x Ar command
.Op Fl X Ar extensionList
.Op Fl W Ar workers
.Op Fl E
.Op Fl v
Expand Down Expand Up @@ -265,6 +266,15 @@ The full path of the new file is: %d/%f
string supplied by
.Fl I
.El
.It Fl X Ar extensionList
.Ar extensionList
is a ',' separated list of extensions to be stored by
.Nm .
The numbers correspond to the extension list in nfxV3.h. By default extensions are added
dynamically to store all data sent by the exporter. If
.Ar extensionList
is given, only those elements matching the extension are processed and stored. Usually this
option is not needed, unless for specific requirements.
.It Fl m Ar metricpath
Enables the flow metric exporter. Flow metric information is sent to the UNIX socket
.Ar metricpath
Expand Down
5 changes: 4 additions & 1 deletion src/sflow/sfcapd.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,10 @@ int main(int argc, char **argv) {

SetPriv(userid, groupid);

Init_sflow(verbose, extensionList);
if (!Init_sflow(verbose, extensionList)) {
LogError("Init_sflow() failed");
exit(EXIT_FAILURE);
}

if (subdir_index && !InitHierPath(subdir_index)) {
close(sock);
Expand Down
211 changes: 138 additions & 73 deletions src/sflow/sflow_nfdump.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2009-2023, Peter Haag
* Copyright (c) 2009-2024, Peter Haag
* Copyright (c) 2004-2008, SWITCH - Teleinformatikdienste fuer Lehre und Forschung
* All rights reserved.
*
Expand Down Expand Up @@ -84,20 +84,67 @@ typedef struct exporter_sflow_s {

} exporter_sflow_t;

static int printRecord = 0;
static uint32_t recordBaseSize;
static uint32_t numBaseElements;
static int PrintRecord = 0;

static int ExtensionsEnabled[MAXEXTENSIONS];
uint32_t BaseRecordSize = EXgenericFlowSize;

static exporter_sflow_t *GetExporter(FlowSource_t *fs, uint32_t agentSubId, uint32_t meanSkipCount);

#include "inline.c"
#include "nffile_inline.c"

void Init_sflow(int verbose, char *extensionList) {
printRecord = verbose;
recordBaseSize = EXgenericFlowSize + EXflowMiscSize + EXasRoutingSize + EXvLanSize + EXmacAddrSize;
numBaseElements = 5;
int Init_sflow(int verbose, char *extensionList) {
PrintRecord = verbose;

if (extensionList) {
// Disable all extensions
for (int i = 0; i < MAXEXTENSIONS; i++) {
ExtensionsEnabled[i] = 0;
}

// get enabled extensions from string
int extID = ScanExtension(extensionList);
while (extID > 0) {
dbg_printf("Enable extension %d\n", extID);
ExtensionsEnabled[extID] = 1;
extID = ScanExtension(NULL);
}

if (extID == -1) {
LogError("Failed to scan extension list.");
return 0;
}

// make sure extension 1 is enabled
ExtensionsEnabled[1] = 1;

} else {
// Enable all extensions
dbg_printf("Enable all extensions\n");
for (int i = 0; i < MAXEXTENSIONS; i++) {
ExtensionsEnabled[i] = 1;
}
}

// extension available in all flows
if (ExtensionsEnabled[EXflowMiscID]) {
BaseRecordSize += EXflowMiscSize;
}
if (ExtensionsEnabled[EXvLanID]) {
BaseRecordSize += EXvLanSize;
}
if (ExtensionsEnabled[EXasRoutingID]) {
BaseRecordSize += EXasRoutingSize;
}
if (ExtensionsEnabled[EXmacAddrID]) {
BaseRecordSize += EXmacAddrSize;
}
if (ExtensionsEnabled[EXmplsLabelID]) {
BaseRecordSize += EXmplsLabelSize;
}

return 1;
} // End of Init_sflow

// called by sfcapd for each packet
Expand All @@ -117,7 +164,7 @@ void Process_sflow(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs) {
// TRY
sample.datap = (uint32_t *)sample.rawSample;
sample.endp = (u_char *)sample.rawSample + sample.rawSampleLen;
readSFlowDatagram(&sample, fs, printRecord);
readSFlowDatagram(&sample, fs, PrintRecord);
} else {
// CATCH
dbg_printf("SFLOW: caught exception: %d\n", exceptionVal);
Expand Down Expand Up @@ -220,43 +267,49 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
sample->dcd_dport = 0;
}

uint32_t recordSize = recordBaseSize;
if (sample->gotIPV6) {
uint32_t recordSize = BaseRecordSize;
if (sample->gotIPV6 && ExtensionsEnabled[EXipv6FlowID]) {
recordSize += EXipv6FlowSize;
} else {
}

if (sample->gotIPV4 && ExtensionsEnabled[EXipv4FlowID]) {
recordSize += EXipv4FlowSize;
}

if (sample->mpls_num_labels > 0) {
if (sample->mpls_num_labels > 0 && ExtensionsEnabled[EXmplsLabelID]) {
recordSize += EXmplsLabelSize;
}

if (sample->nextHop.type == SFLADDRESSTYPE_IP_V4) {
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXipNextHopV4ID]) {
recordSize += EXipNextHopV4Size;
}
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V6) {
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXipNextHopV6ID]) {
recordSize += EXipNextHopV6Size;
}

if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V4) {
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXbgpNextHopV4ID]) {
recordSize += EXbgpNextHopV4Size;
}
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V6) {
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXbgpNextHopV6ID]) {
recordSize += EXbgpNextHopV6Size;
}

if ((sample->extended_data_tag & SASAMPLE_EXTENDED_DATA_NAT) != 0) {
if (sample->nat_src.type == SFLADDRESSTYPE_IP_V4) {
recordSize += (EXnselXlateIPv4Size + EXnselXlatePortSize);
if (sample->nat_src.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXnselXlateIPv4ID]) {
recordSize += EXnselXlateIPv4Size;
}
if (sample->nat_src.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXnselXlateIPv6ID]) {
recordSize += EXnselXlateIPv6Size;
}
if (sample->nat_src.type == SFLADDRESSTYPE_IP_V6) {
recordSize += (EXnselXlateIPv6Size + EXnselXlatePortSize);
if (ExtensionsEnabled[EXnselXlatePortID]) {
recordSize += EXnselXlatePortSize;
}
}

if (fs->sa_family == AF_INET6) {
if (fs->sa_family == AF_INET6 && ExtensionsEnabled[EXipReceivedV6ID]) {
recordSize += EXipReceivedV6Size;
} else {
}
if (fs->sa_family == AF_INET && ExtensionsEnabled[EXipReceivedV4ID]) {
recordSize += EXipReceivedV4Size;
}

Expand Down Expand Up @@ -287,7 +340,7 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
genericFlow->inBytes = sample->meanSkipCount * sample->sampledPacketSize;
genericFlow->srcTos = sample->dcd_ipTos;

if (sample->gotIPV6) {
if (sample->gotIPV6 && ExtensionsEnabled[EXipv6FlowID]) {
PushExtension(recordHeader, EXipv6Flow, ipv6Flow);
SetFlag(recordHeader->flags, V3_FLAG_IPV6_ADDR);

Expand All @@ -302,101 +355,113 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
ipv6Flow->dstAddr[0] = ntohll(*u);
u = (uint64_t *)&(b[8]);
ipv6Flow->dstAddr[1] = ntohll(*u);

} else {
}
if (sample->gotIPV4 && ExtensionsEnabled[EXipv4FlowID]) {
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
ipv4Flow->srcAddr = ntohl(sample->dcd_srcIP.s_addr);
ipv4Flow->dstAddr = ntohl(sample->dcd_dstIP.s_addr);
}

PushExtension(recordHeader, EXflowMisc, flowMisc);
flowMisc->input = sample->inputPort;
flowMisc->output = sample->outputPort;
flowMisc->srcMask = sample->srcMask;
flowMisc->dstMask = sample->dstMask;
if (ExtensionsEnabled[EXflowMiscID]) {
PushExtension(recordHeader, EXflowMisc, flowMisc);
flowMisc->input = sample->inputPort;
flowMisc->output = sample->outputPort;
flowMisc->srcMask = sample->srcMask;
flowMisc->dstMask = sample->dstMask;
}

PushExtension(recordHeader, EXvLan, vLan);
vLan->srcVlan = sample->in_vlan;
vLan->dstVlan = sample->out_vlan;
if (ExtensionsEnabled[EXvLanID]) {
PushExtension(recordHeader, EXvLan, vLan);
vLan->srcVlan = sample->in_vlan;
vLan->dstVlan = sample->out_vlan;
}

PushExtension(recordHeader, EXasRouting, asRouting);
asRouting->srcAS = sample->src_as;
asRouting->dstAS = sample->dst_as;
if (ExtensionsEnabled[EXasRoutingID]) {
PushExtension(recordHeader, EXasRouting, asRouting);
asRouting->srcAS = sample->src_as;
asRouting->dstAS = sample->dst_as;
}

if (sample->nextHop.type == SFLADDRESSTYPE_IP_V4) {
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXipNextHopV4ID]) {
PushExtension(recordHeader, EXipNextHopV4, ipNextHopV4);
ipNextHopV4->ip = ntohl(sample->nextHop.address.ip_v4.addr);
}
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V6) {
if (sample->nextHop.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXipNextHopV6ID]) {
uint64_t *addr = (uint64_t *)sample->nextHop.address.ip_v6.addr;
PushExtension(recordHeader, EXipNextHopV6, ipNextHopV6);
ipNextHopV6->ip[0] = ntohll(addr[0]);
ipNextHopV6->ip[1] = ntohll(addr[1]);
}

if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V4) {
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXbgpNextHopV4ID]) {
PushExtension(recordHeader, EXbgpNextHopV4, bgpNextHopV4);
bgpNextHopV4->ip = ntohl(sample->bgp_nextHop.address.ip_v4.addr);
}
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V6) {
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXbgpNextHopV6ID]) {
uint64_t *addr = (void *)sample->bgp_nextHop.address.ip_v6.addr;
PushExtension(recordHeader, EXipReceivedV6, ipNextHopV6);
ipNextHopV6->ip[0] = ntohll(addr[0]);
ipNextHopV6->ip[1] = ntohll(addr[1]);
}

PushExtension(recordHeader, EXmacAddr, macAddr);
macAddr->inSrcMac = Get_val48((void *)&sample->eth_src);
macAddr->outDstMac = Get_val48((void *)&sample->eth_dst);
macAddr->inDstMac = 0;
macAddr->outSrcMac = 0;
if (ExtensionsEnabled[EXmacAddrID]) {
PushExtension(recordHeader, EXmacAddr, macAddr);
macAddr->inSrcMac = Get_val48((void *)&sample->eth_src);
macAddr->outDstMac = Get_val48((void *)&sample->eth_dst);
macAddr->inDstMac = 0;
macAddr->outSrcMac = 0;
}

if (sample->mpls_num_labels > 0) {
PushExtension(recordHeader, EXmplsLabel, mplsLabel);
for (int i = 0; i < sample->mpls_num_labels; i++) {
mplsLabel->mplsLabel[i] = sample->mpls_label[i];
if (ExtensionsEnabled[EXmplsLabelID]) {
if (sample->mpls_num_labels > 0) {
PushExtension(recordHeader, EXmplsLabel, mplsLabel);
for (int i = 0; i < sample->mpls_num_labels; i++) {
mplsLabel->mplsLabel[i] = sample->mpls_label[i];
}
}
}

if ((sample->extended_data_tag & SASAMPLE_EXTENDED_DATA_NAT) != 0) {
switch (sample->nat_src.type) {
case SFLADDRESSTYPE_IP_V4:
dbg_printf("NAT v4 addr\n");
PushExtension(recordHeader, EXnselXlateIPv4, nselXlateIPv4);
nselXlateIPv4->xlateSrcAddr = ntohl(sample->nat_src.address.ip_v4.addr);
nselXlateIPv4->xlateDstAddr = ntohl(sample->nat_dst.address.ip_v4.addr);

PushExtension(recordHeader, EXnselXlatePort, nselXlatePort);
nselXlatePort->xlateSrcPort = sample->nat_src_port;
nselXlatePort->xlateDstPort = sample->nat_dst_port;
if (ExtensionsEnabled[EXnselXlateIPv4ID]) {
dbg_printf("NAT v4 addr\n");
PushExtension(recordHeader, EXnselXlateIPv4, nselXlateIPv4);
nselXlateIPv4->xlateSrcAddr = ntohl(sample->nat_src.address.ip_v4.addr);
nselXlateIPv4->xlateDstAddr = ntohl(sample->nat_dst.address.ip_v4.addr);
}
break;
case SFLADDRESSTYPE_IP_V6: {
dbg_printf("NAT v6 addr\n");
PushExtension(recordHeader, EXnselXlateIPv6, nselXlateIPv6);
uint64_t *addr = (void *)sample->nat_src.address.ip_v6.addr;
nselXlateIPv6->xlateSrcAddr[0] = ntohll(addr[0]);
nselXlateIPv6->xlateSrcAddr[1] = ntohll(addr[1]);
addr = (void *)sample->nat_dst.address.ip_v6.addr;
nselXlateIPv6->xlateDstAddr[0] = ntohll(addr[0]);
nselXlateIPv6->xlateDstAddr[1] = ntohll(addr[1]);

PushExtension(recordHeader, EXnselXlatePort, nselXlatePort);
nselXlatePort->xlateSrcPort = sample->nat_src_port;
nselXlatePort->xlateDstPort = sample->nat_dst_port;
if (ExtensionsEnabled[EXnselXlateIPv6ID]) {
dbg_printf("NAT v6 addr\n");
PushExtension(recordHeader, EXnselXlateIPv6, nselXlateIPv6);
uint64_t *addr = (void *)sample->nat_src.address.ip_v6.addr;
nselXlateIPv6->xlateSrcAddr[0] = ntohll(addr[0]);
nselXlateIPv6->xlateSrcAddr[1] = ntohll(addr[1]);
addr = (void *)sample->nat_dst.address.ip_v6.addr;
nselXlateIPv6->xlateDstAddr[0] = ntohll(addr[0]);
nselXlateIPv6->xlateDstAddr[1] = ntohll(addr[1]);
}
} break;
default:
/* undefined address type - bail out */
LogError("SFLOW: getAddress() unknown address type = %d\n", sample->nat_src.type);
}
if (ExtensionsEnabled[EXnselXlatePortID]) {
PushExtension(recordHeader, EXnselXlatePort, nselXlatePort);
nselXlatePort->xlateSrcPort = sample->nat_src_port;
nselXlatePort->xlateDstPort = sample->nat_dst_port;
}
}

// add router IP
if (fs->sa_family == PF_INET6) {
if (fs->sa_family == PF_INET6 && ExtensionsEnabled[EXipReceivedV6ID]) {
PushExtension(recordHeader, EXipReceivedV6, ipReceivedV6);
ipReceivedV6->ip[0] = fs->ip.V6[0];
ipReceivedV6->ip[1] = fs->ip.V6[1];
dbg_printf("Add IPv6 route IP extension\n");
} else {
}
if (fs->sa_family == PF_INET && ExtensionsEnabled[EXipReceivedV4ID]) {
PushExtension(recordHeader, EXipReceivedV4, ipReceivedV4);
ipReceivedV4->ip = fs->ip.V4;
dbg_printf("Add IPv4 route IP extension\n");
Expand Down Expand Up @@ -438,7 +503,7 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
uint32_t exporterIdent = MetricExpporterID(recordHeader);
UpdateMetric(fs->nffile->ident, exporterIdent, genericFlow);

if (printRecord) {
if (PrintRecord) {
flow_record_short(stdout, recordHeader);
}
#ifdef DEVEL
Expand Down
4 changes: 2 additions & 2 deletions src/sflow/sflow_nfdump.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023, Peter Haag
* Copyright (c) 2017-2024, Peter Haag
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -37,7 +37,7 @@
#include "collector.h"
#include "sflow_process.h"

void Init_sflow(int verbose, char *extensionList);
int Init_sflow(int verbose, char *extensionList);

void Process_sflow(void *in_buff, ssize_t in_buff_cnt, FlowSource_t *fs);

Expand Down
Loading

0 comments on commit e325de6

Please sign in to comment.