From aa3e8f2b1b3cbb715659f0e7f27523eec309ce6e Mon Sep 17 00:00:00 2001 From: Joseph Hickey Date: Fri, 17 Nov 2023 11:59:29 -0500 Subject: [PATCH] Fix #2465, add multi threaded SB perf tests Add multi-thread variants of the SB bulk transfer tests. The intent is to create some contention on the SB shared data mutex between the threads. This more closely resembles real use cases where different threads are doing sending and receving simultaneously. --- .../cfe_testcase/src/sb_performance_test.c | 396 +++++++++++++++++- 1 file changed, 391 insertions(+), 5 deletions(-) diff --git a/modules/cfe_testcase/src/sb_performance_test.c b/modules/cfe_testcase/src/sb_performance_test.c index 25f86ef44..e0ccab708 100644 --- a/modules/cfe_testcase/src/sb_performance_test.c +++ b/modules/cfe_testcase/src/sb_performance_test.c @@ -33,6 +33,10 @@ #include "cfe_msgids.h" #include "cfe_test_msgids.h" +/* Number of messages to send during test */ +#define UT_BULK_SEND_COUNT 1000000 + + /* A simple command message */ typedef struct { @@ -47,6 +51,28 @@ typedef struct uint32 TlmPayload; } CFE_FT_TestTlmMessage_t; +/* State structure for multicore test - shared between threads */ +typedef struct UT_BulkMultiCoreSharedState +{ + volatile bool XmitFinished; + volatile bool RecvFinished; + + CFE_ES_TaskId_t TaskIdXmit; + CFE_ES_TaskId_t TaskIdRecv; + CFE_SB_PipeId_t PipeId; + osal_id_t SyncSem; + uint32 SendCount; + uint32 RecvCount; + OS_time_t StartTime; + OS_time_t EndTime; + +} UT_BulkMultiCoreSharedState_t; + +UT_BulkMultiCoreSharedState_t BulkCmd; +UT_BulkMultiCoreSharedState_t BulkTlm; + + + /* * This test procedure should be agnostic to specific MID values, but it should * not overlap/interfere with real MIDs used by other apps. @@ -54,7 +80,7 @@ typedef struct static const CFE_SB_MsgId_t CFE_FT_CMD_MSGID = CFE_SB_MSGID_WRAP_VALUE(CFE_TEST_CMD_MID); static const CFE_SB_MsgId_t CFE_FT_TLM_MSGID = CFE_SB_MSGID_WRAP_VALUE(CFE_TEST_HK_TLM_MID); -void TestBulkTransmitRecv(void) +void TestBulkTransferSingle(void) { CFE_SB_PipeId_t PipeId1 = CFE_SB_INVALID_PIPE; CFE_SB_PipeId_t PipeId2 = CFE_SB_INVALID_PIPE; @@ -66,12 +92,12 @@ void TestBulkTransmitRecv(void) uint32 SendCount; OS_time_t StartTime; OS_time_t ElapsedTime; + int64 AvgRate; memset(&CmdMsg, 0, sizeof(CmdMsg)); memset(&TlmMsg, 0, sizeof(TlmMsg)); - UtPrintf("Testing: Bulk SB Transmit/Receive"); - CFE_PSP_GetTime(&StartTime); + UtPrintf("Testing: Single Threaded Bulk SB Transmit/Receive"); /* Setup, create a pipe and subscribe (one cmd, one tlm) */ UtAssert_INT32_EQ(CFE_SB_CreatePipe(&PipeId1, 5, "TestPipe1"), CFE_SUCCESS); @@ -83,7 +109,9 @@ void TestBulkTransmitRecv(void) UtAssert_INT32_EQ(CFE_MSG_Init(CFE_MSG_PTR(CmdMsg.CommandHeader), CFE_FT_CMD_MSGID, sizeof(CmdMsg)), CFE_SUCCESS); UtAssert_INT32_EQ(CFE_MSG_Init(CFE_MSG_PTR(TlmMsg.TelemetryHeader), CFE_FT_TLM_MSGID, sizeof(TlmMsg)), CFE_SUCCESS); - for (SendCount = 0; SendCount < 1000000; ++SendCount) + CFE_PSP_GetTime(&StartTime); + + for (SendCount = 0; SendCount < UT_BULK_SEND_COUNT; ++SendCount) { CmdMsg.CmdPayload = SendCount; TlmMsg.TlmPayload = ~SendCount; @@ -140,9 +168,367 @@ void TestBulkTransmitRecv(void) UtAssert_MIR("Elapsed time for SB bulk message test: %lu usec", (unsigned long)OS_TimeGetTotalMicroseconds(ElapsedTime)); + + AvgRate = OS_TimeGetTotalMilliseconds(ElapsedTime); + AvgRate = ((int64)SendCount * 10000) / AvgRate; + + UtAssert_MIR("Message Rate for single-thread: %ld.%01ld messages/sec", + (long)(AvgRate / 10), (long)(AvgRate % 10)); + + UtAssert_INT32_EQ(CFE_SB_DeletePipe(PipeId1), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_DeletePipe(PipeId2), CFE_SUCCESS); +} + +void RunSingleCmdSendRecv(void) +{ + CFE_FT_TestCmdMessage_t CmdMsg; + CFE_SB_Buffer_t * MsgBuf; + const CFE_FT_TestCmdMessage_t *CmdPtr; + + UtAssert_INT32_EQ(CFE_MSG_Init(CFE_MSG_PTR(CmdMsg.CommandHeader), CFE_FT_CMD_MSGID, sizeof(CmdMsg)), CFE_SUCCESS); + + CFE_PSP_GetTime(&BulkCmd.StartTime); + + while (BulkCmd.SendCount < UT_BULK_SEND_COUNT) + { + CmdMsg.CmdPayload = BulkCmd.SendCount; + + /* In order to not "flood" with test results, this should be silent unless a failure occurs */ + CFE_Assert_STATUS_STORE(CFE_SB_TransmitMsg(CFE_MSG_PTR(CmdMsg.CommandHeader), true)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + break; + } + + ++BulkCmd.SendCount; + + CFE_Assert_STATUS_STORE(CFE_SB_ReceiveBuffer(&MsgBuf, BulkCmd.PipeId, CFE_SB_POLL)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + break; + } + + ++BulkCmd.RecvCount; + + /* As above, to avoid flooding of test cases, only report mismatch here */ + CmdPtr = (const void *)MsgBuf; + if (CmdPtr->CmdPayload != CmdMsg.CmdPayload) + { + UtAssert_UINT32_EQ(CmdPtr->CmdPayload, CmdMsg.CmdPayload); + break; + } + } + + CFE_PSP_GetTime(&BulkCmd.EndTime); + + BulkCmd.XmitFinished = true; + BulkCmd.RecvFinished = true; +} + +void RunSingleTlmSendRecv(void) +{ + CFE_FT_TestTlmMessage_t TlmMsg; + CFE_SB_Buffer_t * MsgBuf; + const CFE_FT_TestTlmMessage_t *TlmPtr; + + UtAssert_INT32_EQ(CFE_MSG_Init(CFE_MSG_PTR(TlmMsg.TelemetryHeader), CFE_FT_TLM_MSGID, sizeof(TlmMsg)), CFE_SUCCESS); + + CFE_PSP_GetTime(&BulkTlm.StartTime); + + while (BulkTlm.SendCount < UT_BULK_SEND_COUNT) + { + TlmMsg.TlmPayload = BulkTlm.SendCount; + + /* In order to not "flood" with test results, this should be silent unless a failure occurs */ + CFE_Assert_STATUS_STORE(CFE_SB_TransmitMsg(CFE_MSG_PTR(TlmMsg.TelemetryHeader), true)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + break; + } + + ++BulkTlm.SendCount; + + CFE_Assert_STATUS_STORE(CFE_SB_ReceiveBuffer(&MsgBuf, BulkTlm.PipeId, CFE_SB_POLL)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + break; + } + + ++BulkTlm.RecvCount; + + /* As above, to avoid flooding of test cases, only report mismatch here */ + TlmPtr = (const void *)MsgBuf; + if (TlmPtr->TlmPayload != TlmMsg.TlmPayload) + { + UtAssert_UINT32_EQ(TlmPtr->TlmPayload, TlmMsg.TlmPayload); + break; + } + } + + CFE_PSP_GetTime(&BulkTlm.EndTime); + + BulkTlm.XmitFinished = true; + BulkTlm.RecvFinished = true; +} + +void TestBulkTransferMulti2(void) +{ + OS_time_t ElapsedCmdTime; + OS_time_t ElapsedTlmTime; + int64 AvgRate; + + UtPrintf("Testing: 2 Thread Bulk SB Transmit/Receive without Sync Sem"); + memset(&BulkCmd, 0, sizeof(BulkCmd)); + memset(&BulkTlm, 0, sizeof(BulkCmd)); + + /* Setup, create a pipe and subscribe (one cmd, one tlm) */ + UtAssert_INT32_EQ(CFE_SB_CreatePipe(&BulkCmd.PipeId, 5, "CmdPipe"), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_CreatePipe(&BulkTlm.PipeId, 5, "TlmPipe"), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_SubscribeEx(CFE_FT_CMD_MSGID, BulkCmd.PipeId, CFE_SB_DEFAULT_QOS, 3), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_SubscribeEx(CFE_FT_TLM_MSGID, BulkTlm.PipeId, CFE_SB_DEFAULT_QOS, 3), CFE_SUCCESS); + + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkCmd.TaskIdXmit, "CmdXmit", RunSingleCmdSendRecv, NULL, 32768, 150, 0), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkTlm.TaskIdXmit, "TlmXmit", RunSingleTlmSendRecv, NULL, 32768, 150, 0), CFE_SUCCESS); + + do + { + OS_TaskDelay(1000); + UtPrintf("Counts => %lu/%lu CMD, %lu/%lu TLM", (unsigned long)BulkCmd.SendCount, (unsigned long)BulkCmd.RecvCount, + (unsigned long)BulkTlm.SendCount, (unsigned long)BulkTlm.RecvCount); + } + while (!BulkCmd.XmitFinished || !BulkCmd.RecvFinished || !BulkTlm.XmitFinished || !BulkTlm.RecvFinished); + + ElapsedCmdTime = OS_TimeSubtract(BulkCmd.EndTime, BulkCmd.StartTime); + UtAssert_MIR("Elapsed time for SB bulk CMD thread: %lu usec", + (unsigned long)OS_TimeGetTotalMicroseconds(ElapsedCmdTime)); + + ElapsedTlmTime = OS_TimeSubtract(BulkTlm.EndTime, BulkCmd.StartTime); + UtAssert_MIR("Elapsed time for SB bulk TLM thread: %lu usec", + (unsigned long)OS_TimeGetTotalMicroseconds(ElapsedTlmTime)); + + /* Take the average time of the two parallel activities */ + AvgRate = OS_TimeGetTotalMilliseconds(OS_TimeAdd(ElapsedCmdTime, ElapsedTlmTime)) / 2; + AvgRate = ((int64)(BulkCmd.RecvCount + BulkTlm.RecvCount) * 10000) / AvgRate; + + UtAssert_MIR("Message Rate for multi-thread 1: %ld.%01ld messages/sec", + (long)(AvgRate / 10), (long)(AvgRate % 10)); + + UtAssert_INT32_EQ(CFE_SB_DeletePipe(BulkCmd.PipeId), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_DeletePipe(BulkTlm.PipeId), CFE_SUCCESS); +} + +void UT_CommandTransmitterTask(void) +{ + CFE_SB_Buffer_t *BufPtr; + CFE_FT_TestCmdMessage_t *CmdMsgPtr; + + for (BulkCmd.SendCount = 0; BulkCmd.SendCount < UT_BULK_SEND_COUNT; ++BulkCmd.SendCount) + { + CFE_Assert_STATUS_STORE(OS_CountSemTake(BulkCmd.SyncSem)); + if (!CFE_Assert_STATUS_SILENTCHECK(OS_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(OS_SUCCESS); + break; + } + + BufPtr = CFE_SB_AllocateMessageBuffer(sizeof(CFE_FT_TestCmdMessage_t)); + + CmdMsgPtr = (void *)&BufPtr->Msg; + + /* Initialize the message content */ + CFE_MSG_Init(CFE_MSG_PTR(CmdMsgPtr->CommandHeader), CFE_FT_CMD_MSGID, sizeof(*CmdMsgPtr)); + + CmdMsgPtr->CmdPayload = BulkCmd.SendCount; + + CFE_Assert_STATUS_STORE(CFE_SB_TransmitBuffer(BufPtr, true)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(CFE_SUCCESS); + break; + } + } + + BulkCmd.XmitFinished = true; +} + +void UT_TelemtryTransmitterTask(void) +{ + CFE_SB_Buffer_t *BufPtr; + CFE_FT_TestTlmMessage_t *TlmMsgPtr; + + for (BulkTlm.SendCount = 0; BulkTlm.SendCount < UT_BULK_SEND_COUNT; ++BulkTlm.SendCount) + { + CFE_Assert_STATUS_STORE(OS_CountSemTake(BulkTlm.SyncSem)); + if (!CFE_Assert_STATUS_SILENTCHECK(OS_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(OS_SUCCESS); + break; + } + + BufPtr = CFE_SB_AllocateMessageBuffer(sizeof(CFE_FT_TestTlmMessage_t)); + + TlmMsgPtr = (void *)&BufPtr->Msg; + + /* Initialize the message content */ + CFE_MSG_Init(CFE_MSG_PTR(TlmMsgPtr->TelemetryHeader), CFE_FT_TLM_MSGID, sizeof(*TlmMsgPtr)); + + TlmMsgPtr->TlmPayload = BulkTlm.SendCount; + + CFE_Assert_STATUS_STORE(CFE_SB_TransmitBuffer(BufPtr, true)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(CFE_SUCCESS); + break; + } + + } + + BulkTlm.XmitFinished = true; +} + +void UT_CommandReceiverTask(void) +{ + CFE_SB_Buffer_t * MsgBuf; + const CFE_FT_TestCmdMessage_t *CmdPtr; + + for (BulkCmd.RecvCount = 0; BulkCmd.RecvCount < UT_BULK_SEND_COUNT; ++BulkCmd.RecvCount) + { + CFE_Assert_STATUS_STORE(CFE_SB_ReceiveBuffer(&MsgBuf, BulkCmd.PipeId, 5000)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(CFE_SUCCESS); + break; + } + + /* As above, to avoid flooding of test cases, only report mismatch here */ + CmdPtr = (const void *)MsgBuf; + if (CmdPtr->CmdPayload != BulkCmd.RecvCount) + { + UtAssert_UINT32_EQ(CmdPtr->CmdPayload, BulkCmd.RecvCount); + break; + } + + CFE_Assert_STATUS_STORE(OS_CountSemGive(BulkCmd.SyncSem)); + if (!CFE_Assert_STATUS_SILENTCHECK(OS_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(OS_SUCCESS); + break; + } + } + + CFE_PSP_GetTime(&BulkCmd.EndTime); + BulkCmd.RecvFinished = true; +} + +void UT_TelemetryReceiverTask(void) +{ + CFE_SB_Buffer_t * MsgBuf; + const CFE_FT_TestTlmMessage_t *TlmPtr; + + for (BulkTlm.RecvCount = 0; BulkTlm.RecvCount < UT_BULK_SEND_COUNT; ++BulkTlm.RecvCount) + { + CFE_Assert_STATUS_STORE(CFE_SB_ReceiveBuffer(&MsgBuf, BulkTlm.PipeId, 5000)); + if (!CFE_Assert_STATUS_SILENTCHECK(CFE_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(CFE_SUCCESS); + break; + } + + /* As above, to avoid flooding of test cases, only report mismatch here */ + TlmPtr = (const void *)MsgBuf; + if (TlmPtr->TlmPayload != BulkTlm.RecvCount) + { + UtAssert_UINT32_EQ(TlmPtr->TlmPayload, BulkTlm.RecvCount); + break; + } + + CFE_Assert_STATUS_STORE(OS_CountSemGive(BulkTlm.SyncSem)); + if (!CFE_Assert_STATUS_SILENTCHECK(OS_SUCCESS)) + { + CFE_Assert_STATUS_MUST_BE(OS_SUCCESS); + break; + } + } + + CFE_PSP_GetTime(&BulkTlm.EndTime); + BulkTlm.RecvFinished = true; } + +void TestBulkTransferMulti4(void) +{ + uint32 i; + OS_time_t StartTime; + OS_time_t ElapsedTime1; + OS_time_t ElapsedTime2; + int64 AvgRate; + + UtPrintf("Testing: 4 Thread Bulk SB Transmit/Receive with Sync Sem"); + memset(&BulkCmd, 0, sizeof(BulkCmd)); + memset(&BulkTlm, 0, sizeof(BulkCmd)); + + /* Setup, create a pipe and subscribe (one cmd, one tlm) */ + UtAssert_INT32_EQ(CFE_SB_CreatePipe(&BulkCmd.PipeId, 10, "TestPipe1"), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_CreatePipe(&BulkTlm.PipeId, 10, "TestPipe2"), CFE_SUCCESS); + + UtAssert_INT32_EQ(CFE_SB_SubscribeEx(CFE_FT_CMD_MSGID, BulkCmd.PipeId, CFE_SB_DEFAULT_QOS, 8), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_SubscribeEx(CFE_FT_TLM_MSGID, BulkTlm.PipeId, CFE_SB_DEFAULT_QOS, 8), CFE_SUCCESS); + + UtAssert_INT32_EQ(OS_CountSemCreate(&BulkCmd.SyncSem, "CmdSem", 0, 0), OS_SUCCESS); + UtAssert_INT32_EQ(OS_CountSemCreate(&BulkTlm.SyncSem, "TlmSem", 0, 0), OS_SUCCESS); + + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkCmd.TaskIdXmit, "CmdXmit", UT_CommandTransmitterTask, NULL, 32768, 150, 0), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkTlm.TaskIdXmit, "TlmXmit", UT_TelemtryTransmitterTask, NULL, 32768, 150, 0), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkCmd.TaskIdRecv, "CmdRecv", UT_CommandReceiverTask, NULL, 32768, 100, 0), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_ES_CreateChildTask(&BulkTlm.TaskIdRecv, "TlmRecv", UT_TelemetryReceiverTask, NULL, 32768, 100, 0), CFE_SUCCESS); + + /* Let all tasks start and pend on sem */ + OS_TaskDelay(500); + + CFE_PSP_GetTime(&StartTime); + + /* Give sem several times each to get a pipeline going, but do not exceed msglim of 8 */ + for (i=0; i < 8; ++i) + { + UtAssert_INT32_EQ(OS_CountSemGive(BulkCmd.SyncSem), OS_SUCCESS); + UtAssert_INT32_EQ(OS_CountSemGive(BulkTlm.SyncSem), OS_SUCCESS); + } + + do + { + OS_TaskDelay(1000); + UtPrintf("Counts => %lu/%lu CMD, %lu/%lu TLM", (unsigned long)BulkCmd.SendCount, (unsigned long)BulkCmd.RecvCount, + (unsigned long)BulkTlm.SendCount, (unsigned long)BulkTlm.RecvCount); + } + while (!BulkCmd.XmitFinished || !BulkCmd.RecvFinished || !BulkTlm.XmitFinished || !BulkTlm.RecvFinished); + + ElapsedTime1 = OS_TimeSubtract(BulkCmd.EndTime, StartTime); + UtAssert_MIR("Elapsed time for SB bulk CMD thread: %lu usec", + (unsigned long)OS_TimeGetTotalMicroseconds(ElapsedTime1)); + + ElapsedTime2 = OS_TimeSubtract(BulkTlm.EndTime, StartTime); + UtAssert_MIR("Elapsed time for SB bulk TLM thread: %lu usec", + (unsigned long)OS_TimeGetTotalMicroseconds(ElapsedTime2)); + + /* Take the average time of the two parallel activities */ + AvgRate = OS_TimeGetTotalMilliseconds(OS_TimeAdd(ElapsedTime1, ElapsedTime2)) / 2; + AvgRate = ((int64)(BulkCmd.RecvCount + BulkTlm.RecvCount) * 10000) / AvgRate; + + UtAssert_MIR("Message Rate for multi-thread: %ld.%01ld messages/sec", + (long)(AvgRate / 10), (long)(AvgRate % 10)); + + + /* Child tasks should have self-exited... */ + UtAssert_INT32_EQ(CFE_SB_DeletePipe(BulkCmd.PipeId), CFE_SUCCESS); + UtAssert_INT32_EQ(CFE_SB_DeletePipe(BulkTlm.PipeId), CFE_SUCCESS); + UtAssert_INT32_EQ(OS_CountSemDelete(BulkCmd.SyncSem), OS_SUCCESS); + UtAssert_INT32_EQ(OS_CountSemDelete(BulkTlm.SyncSem), OS_SUCCESS); +} + + void SBPerformanceTestSetup(void) { - UtTest_Add(TestBulkTransmitRecv, NULL, NULL, "Test Bulk SB Transmit/Receive"); + UtTest_Add(TestBulkTransferSingle, NULL, NULL, "Single Thread Bulk Transfer"); + UtTest_Add(TestBulkTransferMulti2, NULL, NULL, "2 Thread Bulk Transfer"); + UtTest_Add(TestBulkTransferMulti4, NULL, NULL, "4 Thread Bulk Transfer"); }