Skip to content

Commit

Permalink
Fix aggregation bug with v4/v6 flows #536
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed May 31, 2024
1 parent 7b701c9 commit 1d25d32
Showing 1 changed file with 105 additions and 81 deletions.
186 changes: 105 additions & 81 deletions src/nfdump/nflowcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ static struct order_mode_s {
{"duration", 0, DESCENDING, order_duration},
{NULL, 0, 0, NULL}}; // terminating entry

// index list of elelemts to aggregate
// index = index into aggregationTable
#define MaxAggrStackSize 64
static int aggregateInfo[MaxAggrStackSize] = {0};

Expand Down Expand Up @@ -267,7 +269,7 @@ typedef struct FlowKeyV4_s {
uint32_t dstAddr;
} FlowKeyV4_t;

static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow);
static inline int New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow);

/*
* hash definition and implementation
Expand Down Expand Up @@ -535,11 +537,7 @@ static struct FlowList_s {
size_t NumRecords;
} FlowList = {0};

static void *keymemV4 = NULL;
static void *keymemV6 = NULL;
static size_t keymenV4Len = 0;
static size_t keymenV6Len = 0;

static size_t maxKeyLen = 0;
static uint32_t bidir_flows = 0;

#include "memhandle.c"
Expand Down Expand Up @@ -777,13 +775,15 @@ static inline void PreProcess(void *inPtr, preprocess_t process, recordHandle_t

/*
* generate dynamic hash value for hast table, depending on -s or -A parameters
* returns actual keylen for this record.
*/
static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow) {
static inline int New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow) {
EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID];
EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID];
EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID];

dbg_printf("NewHash: %d\n", aggregateInfo[0]);
int keyLen = 0;

if (aggregateInfo[0] >= 0) {
// custom user aggregation
for (int i = 0; aggregateInfo[i] >= 0; i++) {
Expand All @@ -803,6 +803,7 @@ static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int s
preprocess_t preprocess = aggregationTable[tableIndex].preprocess;
PreProcess(inPtr, preprocess, recordHandle);

keyLen += param->length;
switch (param->length) {
case 0:
break;
Expand Down Expand Up @@ -845,6 +846,7 @@ static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int s
keyptr->proto = genericFlow->proto;
keyptr->af = AF_INET;
keymem += sizeof(FlowKeyV4_t);
keyLen = sizeof(FlowKeyV4_t);
} else if (ipv6Flow) {
FlowKeyV6_t *keyptr = (FlowKeyV6_t *)keymem;
keyptr->srcAddr[0] = ipv6Flow->dstAddr[0];
Expand All @@ -856,6 +858,7 @@ static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int s
keyptr->proto = genericFlow->proto;
keyptr->af = AF_INET6;
keymem += sizeof(FlowKeyV6_t);
keyLen = sizeof(FlowKeyV6_t);
}
} else {
// default 5-tuple aggregation
Expand All @@ -868,6 +871,7 @@ static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int s
keyptr->proto = genericFlow->proto;
keyptr->af = AF_INET;
keymem += sizeof(FlowKeyV4_t);
keyLen = sizeof(FlowKeyV4_t);
} else if (ipv6Flow) {
FlowKeyV6_t *keyptr = (FlowKeyV6_t *)keymem;
keyptr->srcAddr[0] = ipv6Flow->srcAddr[0];
Expand All @@ -879,9 +883,12 @@ static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int s
keyptr->proto = genericFlow->proto;
keyptr->af = AF_INET6;
keymem += sizeof(FlowKeyV6_t);
keyLen = sizeof(FlowKeyV6_t);
}
}
dbg_printf("New_HashKey() size: %u\n", keyLen);

return keyLen;
} // End of New_HashKey

static void ApplyAggregateMask(recordHandle_t *recordHandle, struct aggregationElement_s *aggregationElement) {
Expand Down Expand Up @@ -975,8 +982,8 @@ int Init_FlowCache(int hasGeoDB) {

flowHash = flowHash_init(InitFlowHashBits);
FlowList = (struct FlowList_s){.head = NULL, .tail = &FlowList.head, .NumRecords = 0};
keymenV4Len = sizeof(FlowKeyV4_t);
keymenV6Len = sizeof(FlowKeyV6_t);
// ipv4 fits into sizeof(FlowKeyV6_t)
maxKeyLen = sizeof(FlowKeyV6_t);

HasGeoDB = hasGeoDB;
aggregateInfo[0] = -1;
Expand Down Expand Up @@ -1090,8 +1097,7 @@ char *ParseAggregateMask(char *arg) {
uint32_t elementCount = 0;
aggregateInfo[0] = -1;

keymenV4Len = 0;
keymenV6Len = 0;
maxKeyLen = 0;
memset((void *)&aggregateInfo, 0, sizeof(aggregateInfo));

size_t fmt_len = 0;
Expand Down Expand Up @@ -1205,12 +1211,11 @@ char *ParseAggregateMask(char *arg) {
aggregateInfo[elementCount++] = index;
size_t len = aggregationTable[index].param.length;
if (aggregationTable[index].param.af == AF_INET) {
keymenV4Len += len;
// nothing
} else if (aggregationTable[index].param.af == AF_INET6) {
keymenV6Len += len;
maxKeyLen += len;
} else {
keymenV4Len += len;
keymenV6Len += len;
maxKeyLen += len;
}
index++;
} while (aggregationTable[index].aggrElement && (strcasecmp(p, aggregationTable[index].aggrElement) == 0));
Expand All @@ -1225,7 +1230,7 @@ char *ParseAggregateMask(char *arg) {
aggregateInfo[elementCount] = -1;

#ifdef DEVEL
printf("Aggregate key: v4len: %zu, v6len: %zu bytes\n", keymenV4Len, keymenV6Len);
printf("Aggregate key: maxKeyLen: %zu bytes\n", maxKeyLen);
printf("Aggregate format string: '%s'\n", aggr_fmt);

printf("Aggregate stack:\n");
Expand All @@ -1244,11 +1249,11 @@ char *ParseAggregateMask(char *arg) {
}

#endif

strncat(aggr_fmt, " ", fmt_len);
fmt_len--;
strncat(aggr_fmt, AggrAppendFmt, fmt_len);
return aggr_fmt;

} // End of ParseAggregateMask

void InsertFlow(recordHandle_t *recordHandle) {
Expand Down Expand Up @@ -1290,10 +1295,8 @@ void InsertFlow(recordHandle_t *recordHandle) {

static void AddBidirFlow(recordHandle_t *recordHandle) {
dbg_printf("Enter %s\n", __func__);
recordHeaderV3_t *record = recordHandle->recordHeaderV3;
EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID];
EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID];
EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID];

EXcntFlow_t *cntFlow = (EXcntFlow_t *)recordHandle->extensionList[EXcntFlowID];
uint64_t inPackets = genericFlow->inPackets;
uint64_t inBytes = genericFlow->inBytes;
Expand All @@ -1306,30 +1309,46 @@ static void AddBidirFlow(recordHandle_t *recordHandle) {
aggrFlows = cntFlow->flows;
}

size_t keyLen = 0;
void **keymem = NULL;
if (ipv4Flow) {
keymem = &keymemV4;
keyLen = keymenV4Len;
} else if (ipv6Flow) {
keymem = &keymemV6;
keyLen = keymenV6Len;
} else
return;
void *keymem = NULL;
static void *mem = NULL;
recordHeaderV3_t *record = recordHandle->recordHeaderV3;

hashValue_t hashValue = {0};
if (keyLen > 16) {
if (*keymem == NULL) *keymem = nfmalloc(keyLen);
hashValue.valPtr = *keymem;
hashValue.ptrSize = keyLen;
int keyLen = 0;
/*
* New_Hashkey fills mem with flow elements to aggregate
* returns actual length needed (different for ipv4/ipv6 elements)
* up to 16bytes go directly into the hashKey. Faster lookup for CPU cache
* otherwise use allocated nf-memory
*/
if (maxKeyLen > 16) {
if (mem == NULL) {
dbg_printf("Allocate: %zu\n", maxKeyLen);
mem = nfmalloc(maxKeyLen);
} else {
dbg_printf("Recycle: %zu\n", maxKeyLen);
}
keyLen = New_HashKey(mem, recordHandle, 0);
if (keyLen <= 16) {
dbg_printf("Copy to local: %u\n", keyLen);
memcpy(hashValue.val, mem, keyLen);
hashValue.ptrSize = 0;
keyLen = 16;
keymem = (void *)hashValue.val;
} else {
dbg_printf("Use keymen: %u\n", keyLen);
hashValue.valPtr = mem;
hashValue.ptrSize = keyLen;
keymem = mem;
}
} else {
*keymem = &hashValue.val;
dbg_printf("Use local val\n");
New_HashKey((void *)hashValue.val, recordHandle, 0);
keyLen = 16;
keymem = (void *)hashValue.val;
}

// generate hash value from selected -s -A parameters
New_HashKey(*keymem, recordHandle, 0);
// generate 32bit hash from hash value
hashValue.hash = SuperFastHash(*keymem, keyLen);
hashValue.hash = SuperFastHash(keymem, keyLen);

int index = flowHash_get(flowHash, hashValue);
if (index >= 0) {
Expand Down Expand Up @@ -1370,16 +1389,16 @@ static void AddBidirFlow(recordHandle_t *recordHandle) {
}
memcpy((void *)p, record, record->size);
flowHash->records[index].flowrecord = p;
flowHash->records[index].swap = NeedSwap(*keymem);
flowHash->records[index].swap = NeedSwap(keymem);

// keymen got part of the cache
*keymem = NULL;
mem = NULL;
} else {
// for bidir flows do

// generate reverse hash key to search for bidir flow
New_HashKey(*keymem, recordHandle, 1);
hashValue.hash = SuperFastHash(*keymem, keyLen);
New_HashKey(keymem, recordHandle, 1);
hashValue.hash = SuperFastHash(keymem, keyLen);

index = flowHash_get(flowHash, hashValue);
if (index >= 0) {
Expand All @@ -1401,8 +1420,8 @@ static void AddBidirFlow(recordHandle_t *recordHandle) {
} else {
// no bidir flow found
// insert original flow into the cache
New_HashKey(*keymem, recordHandle, 0);
hashValue.hash = SuperFastHash(*keymem, keyLen);
New_HashKey(keymem, recordHandle, 0);
hashValue.hash = SuperFastHash(keymem, keyLen);

int insert;
index = flowHash_add(flowHash, hashValue, &insert);
Expand All @@ -1424,22 +1443,20 @@ static void AddBidirFlow(recordHandle_t *recordHandle) {
}
memcpy((void *)p, record, record->size);
flowHash->records[index].flowrecord = p;
flowHash->records[index].swap = NeedSwap(*keymem);
flowHash->records[index].swap = NeedSwap(keymem);

// keymen got part of the cache
*keymem = NULL;
mem = NULL;
}
}

} // End of AddBidirFlow

void AddFlowCache(recordHandle_t *recordHandle) {
dbg_printf("Enter %s\n", __func__);
dbg_printf("\nEnter %s\n", __func__);
EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID];
if (!genericFlow) return;

EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID];
EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID];
EXcntFlow_t *cntFlow = (EXcntFlow_t *)recordHandle->extensionList[EXcntFlowID];
uint64_t inPackets = genericFlow->inPackets;
uint64_t inBytes = genericFlow->inBytes;
Expand All @@ -1452,41 +1469,48 @@ void AddFlowCache(recordHandle_t *recordHandle) {
aggrFlows = cntFlow->flows ? cntFlow->flows : 1;
}

recordHeaderV3_t *record = recordHandle->recordHeaderV3;

if (bidir_flows) return AddBidirFlow(recordHandle);

size_t keyLen = 0;
void **keymem = NULL;
if (ipv4Flow) {
keymem = &keymemV4;
keyLen = keymenV4Len;
} else if (ipv6Flow) {
keymem = &keymemV6;
keyLen = keymenV6Len;
} else {
// if neither ipv4 nor ipv6 but keymemV4 defined
// we aggregate elements outside a record with ip addresses
// so use either
if (keymenV4Len == 0) {
LogError("Could not determine a valid aggregation");
return;
}
keymem = &keymemV4;
keyLen = keymenV4Len;
}
void *keymem = NULL;
static void *mem = NULL;
recordHeaderV3_t *record = recordHandle->recordHeaderV3;

hashValue_t hashValue = {0};
if (keyLen > 16) {
if (*keymem == NULL) *keymem = nfmalloc(keyLen);
hashValue.valPtr = *keymem;
hashValue.ptrSize = keyLen;
int keyLen = 0;
/*
* New_Hashkey fills mem with flow elements to aggregate
* returns actual length needed (different for ipv4/ipv6 elements)
* up to 16bytes go directly into the hashKey. Faster lookup for CPU cache
* otherwise use allocated nf-memory
*/
if (maxKeyLen > 16) {
if (mem == NULL) {
dbg_printf("Allocate: %zu\n", maxKeyLen);
mem = nfmalloc(maxKeyLen);
} else {
dbg_printf("Recycle: %zu\n", maxKeyLen);
}
keyLen = New_HashKey(mem, recordHandle, 0);
if (keyLen <= 16) {
dbg_printf("Copy to local: %u\n", keyLen);
memcpy(hashValue.val, mem, keyLen);
hashValue.ptrSize = 0;
keyLen = 16;
keymem = (void *)hashValue.val;
} else {
dbg_printf("Use keymen: %u\n", keyLen);
hashValue.valPtr = mem;
hashValue.ptrSize = keyLen;
keymem = mem;
}
} else {
*keymem = &hashValue.val;
dbg_printf("Use local val\n");
New_HashKey((void *)hashValue.val, recordHandle, 0);
keyLen = 16;
keymem = (void *)hashValue.val;
}

New_HashKey(*keymem, recordHandle, 0);
hashValue.hash = SuperFastHash(*keymem, keyLen);
hashValue.hash = SuperFastHash(keymem, keyLen);

int insert;
int index = flowHash_add(flowHash, hashValue, &insert);
Expand Down Expand Up @@ -1518,11 +1542,11 @@ void AddFlowCache(recordHandle_t *recordHandle) {

flowHash->records[index].msecFirst = genericFlow->msecFirst;
flowHash->records[index].msecLast = genericFlow->msecLast;
flowHash->records[index].swap = NeedSwap(*keymem);
flowHash->records[index].swap = NeedSwap(keymem);
void *p = nfmalloc(record->size);
memcpy((void *)p, record, record->size);
flowHash->records[index].flowrecord = p;
*keymem = NULL;
mem = NULL;
}

} // End of AddFlowCache
Expand Down

0 comments on commit 1d25d32

Please sign in to comment.