Skip to content

Commit 5ab5e49

Browse files
committed
RTO Mechanism for Retransmission
- Acknowledgment reception and handling - Error handling - Communication component states - Bus Off recovery mechanism
1 parent 6e7620a commit 5ab5e49

File tree

8 files changed

+1011
-160
lines changed

8 files changed

+1011
-160
lines changed

communication/src/communication.cpp

Lines changed: 409 additions & 100 deletions
Large diffs are not rendered by default.

communication/src/communication.h

Lines changed: 403 additions & 48 deletions
Large diffs are not rendered by default.

communication/src/global_clock.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,13 @@ void GlobalClock::runClock()
139139
pthread_mutex_lock(
140140
&shared_clock
141141
->tick_mutex); // Lock the mutex to safely increment the tick
142-
shared_clock->current_tick++; // Increment the tick
142+
// Increment the tick and check for overflow
143+
if (shared_clock->current_tick.load() >= MAX_TICK) {
144+
shared_clock->current_tick.store(0); // Reset to 0 if max is reached
145+
}
146+
else {
147+
shared_clock->current_tick++; // Increment the tick
148+
}
143149
pthread_cond_broadcast(
144150
&shared_clock
145151
->tick_cond); // Notify all waiting processes that the tick has changed

communication/src/global_clock.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
#include <unistd.h>
1111
#include <csignal>
1212
#include <pthread.h>
13+
#include <limits>
14+
1315
#define SHM_NAME "/clock__shm"
14-
#define TICK_DURATION std::chrono::milliseconds(500) // Define the tick duration
16+
#define TICK_DURATION std::chrono::milliseconds(50) // Define the tick duration
17+
#define MAX_TICK std::numeric_limits<int>::max() // Define the maximum value for the tick
18+
1519
/**
1620
* @struct SharedClock
1721
* @brief A structure representing the shared state of the global clock.
@@ -25,11 +29,9 @@
2529
*/
2630
struct SharedClock {
2731
std::atomic<int> current_tick; ///< The current tick value.
28-
std::atomic<bool> is_running; ///< Whether the clock is currently running.
29-
pthread_mutex_t
30-
tick_mutex; ///< Mutex to synchronize access to the tick count.
31-
pthread_cond_t
32-
tick_cond; ///< Condition variable to notify waiting threads on tick change.
32+
std::atomic<bool> is_running; ///< Whether the clock is currently running.
33+
pthread_mutex_t tick_mutex; ///< Mutex to synchronize access to the tick count.
34+
pthread_cond_t tick_cond; ///< Condition variable to notify waiting threads on tick change.
3335
};
3436
/**
3537
* @class GlobalClock

communication/src/message.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ uint32_t Message::generateMessageID(MessageType messageType, uint16_t srcID) {
2121

2222
auto now = std::chrono::system_clock::now().time_since_epoch();
2323
uint32_t timestamp = static_cast<uint32_t>(
24-
std::chrono::duration_cast<std::chrono::milliseconds>(now).count() & 0xFF
24+
std::chrono::duration_cast<std::chrono::nanoseconds>(now).count() & 0xFF
2525
);
2626

2727
return (messageTypeID << 24) |
@@ -37,9 +37,11 @@ Message::Message(uint32_t tps)
3737

3838
// Add a packet to the received message
3939
bool Message::addPacket(const Packet &p)
40-
{
41-
if (p.getPSN() != packets.size()) {
42-
return false;
40+
{
41+
if (!packets.empty()) {
42+
if (p.getPSN() <= packets.back().getPSN()) {
43+
return false;
44+
}
4345
}
4446
packets.push_back(p);
4547
return true;
@@ -48,7 +50,8 @@ bool Message::addPacket(const Packet &p)
4850
// Check if the message is complete
4951
bool Message::isComplete() const
5052
{
51-
return packets.size() == tps;
53+
if(packets.size() == tps)
54+
return packets.size() == tps;
5255
}
5356

5457
// Get the complete data of the message
@@ -67,3 +70,10 @@ std::vector<Packet> &Message::getPackets()
6770
{
6871
return packets;
6972
}
73+
74+
// Extract the MessageType from the message ID
75+
MessageType Message::getMessageTypeFromID(uint32_t messageID) {
76+
uint32_t messageTypeID = (messageID >> 24) & 0xFF; // Extract the message type
77+
return static_cast<MessageType>(messageTypeID); // Cast to MessageType
78+
}
79+

communication/src/message.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,7 @@ class Message {
4747

4848
// Get the packets of the message
4949
std::vector<Packet>& getPackets();
50+
51+
// Extract the MessageType from the message ID
52+
static MessageType getMessageTypeFromID(uint32_t messageID);
5053
};

communication/src/scheduler.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include "scheduler.h"
2+
3+
Scheduler::Scheduler()
4+
{
5+
}
6+
7+
Scheduler::~Scheduler()
8+
{
9+
stopAllTimers();
10+
}
11+
12+
void Scheduler::stopAllTimers()
13+
{
14+
for (auto &future : futures)
15+
{
16+
if (future.valid())
17+
{
18+
future.wait(); // Wait for all threads to finish
19+
}
20+
}
21+
}
22+
23+
void Scheduler::startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise)
24+
{
25+
// Promise to manage the lifecycle of the thread itself
26+
std::promise<void> threadCompletionPromise;
27+
28+
// Future object to track the thread's completion
29+
std::future<void> future = threadCompletionPromise.get_future();
30+
31+
// Store the future to ensure we can wait for the thread to finish later
32+
futures.push_back(std::move(future));
33+
34+
// Start a new thread for handling retransmission and ACK wait
35+
std::thread([this, packetID, callback, threadCompletionPromise = std::move(threadCompletionPromise), ackPromise]() mutable
36+
{
37+
int retryCount = 0;
38+
39+
{
40+
// Lock the mutex to synchronize access to shared data (ackReceived)
41+
std::unique_lock<std::mutex> lock(mutex);
42+
43+
// Wait for an ACK or timeout
44+
if (cv.wait_for(lock, MAX_ACK_TIMEOUT, [this, packetID]()
45+
{ return ackReceived[packetID]; }))
46+
{
47+
// ACK received within the timeout period
48+
std::cout << "ACK received for packet ID: " << packetID << std::endl;
49+
clearPacketData(packetID); // Clear packet data
50+
51+
// Set both promises to indicate success and thread completion
52+
threadCompletionPromise.set_value();
53+
ackPromise->set_value(true); // ACK was received, set to true
54+
return; // Exit the thread
55+
}
56+
else
57+
{
58+
// Timeout occurred, retransmit the packet
59+
retryCounts[packetID]++;
60+
retryCount = retryCounts[packetID];
61+
std::cout << "Timeout! Retransmitting packet ID: " << packetID << std::endl;
62+
}
63+
}
64+
65+
// Call the callback function with the updated retry count
66+
callback(retryCount);
67+
68+
// Set the promise to indicate the thread has finished
69+
threadCompletionPromise.set_value();
70+
})
71+
.detach(); // Detach the thread to allow it to run independently
72+
}
73+
74+
void Scheduler::receiveACK(int packetID)
75+
{
76+
std::unique_lock<std::mutex> lock(mutex);
77+
ackReceived[packetID] = true;
78+
cv.notify_all(); // Notify all waiting threads
79+
}
80+
81+
void Scheduler::clearPacketData(int packetID)
82+
{
83+
ackReceived.erase(packetID);
84+
retryCounts.erase(packetID);
85+
}

communication/src/scheduler.h

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#ifndef __SCHEDULER_H__
2+
#define __SCHEDULER_H__
3+
4+
#include <chrono>
5+
#include <condition_variable>
6+
#include <functional>
7+
#include <future>
8+
#include <mutex>
9+
#include <unordered_map>
10+
#include <vector>
11+
#include "global_clock.h"
12+
13+
// must be set
14+
#define MAX_ACK_TIMEOUT \
15+
(20 * TICK_DURATION) ///< Maximum time to wait for ACK
16+
17+
/**
18+
* @class Scheduler
19+
* @brief Manages retransmission timers and tracks acknowledgments for packets.
20+
*
21+
* This class provides functionality to manage retransmission timers for
22+
* packets, handle acknowledgments, and clean up packet data once
23+
* retransmissions are complete.
24+
*/
25+
class Scheduler
26+
{
27+
public:
28+
using Callback = std::function<void(int)>;
29+
30+
Scheduler();
31+
~Scheduler();
32+
33+
/**
34+
* @brief Stops all active timers and waits for their completion.
35+
*
36+
* This method ensures that all active timers (threads) complete their
37+
* execution before the program exits.
38+
*/
39+
void stopAllTimers();
40+
41+
/**
42+
* @brief Starts a retransmission timer for a given packet ID.
43+
*
44+
* The function initiates a timer to wait for an ACK (Acknowledgment) for the specified packet.
45+
* If the ACK is received within the MAX_ACK_TIMEOUT, it clears the packet data and sets the result
46+
* of the ackPromise to `true`, indicating success. If the timeout occurs and no ACK is received,
47+
* it triggers the provided callback function to retransmit the packet and increments the retry count.
48+
*
49+
* @param packetID The unique ID of the packet being transmitted.
50+
* @param callback The callback function to call when retransmitting the packet after a timeout.
51+
* @param ackPromise A shared promise that communicates whether the packet transmission was successful
52+
* (i.e., ACK was received).
53+
*/
54+
void startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise);
55+
56+
/**
57+
* @brief Receives an acknowledgment for a packet.
58+
*
59+
* @param packetID The ID of the packet that has been acknowledged.
60+
*/
61+
void receiveACK(int packetID);
62+
63+
/**
64+
* @brief Clears the data associated with a packet.
65+
*
66+
* @param packetID The ID of the packet whose data is to be cleared.
67+
*/
68+
void clearPacketData(int packetID);
69+
70+
private:
71+
std::unordered_map<int, bool>
72+
ackReceived; ///< Map to track received acknowledgments
73+
std::unordered_map<int, int>
74+
retryCounts; ///< Map to track retry counts for packets
75+
std::mutex mutex; ///< Mutex for thread safety
76+
std::condition_variable cv; ///< Condition variable for synchronization
77+
std::vector<std::future<void>>
78+
futures; ///< Vector to store futures of active threads
79+
};
80+
81+
#endif // __SCHEDULER_H__

0 commit comments

Comments
 (0)