Skip to content

Commit 1c3ff3e

Browse files
Add Scheduler class to manage retransmission timers and track ACKs
1 parent 1821abd commit 1c3ff3e

File tree

2 files changed

+164
-0
lines changed

2 files changed

+164
-0
lines changed

communication/include/scheduler.h

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#ifndef __SCHEDULER_H__
2+
#define __SCHEDULER_H__
3+
4+
#include <chrono>
5+
#include <condition_variable>
6+
#include <functional>
7+
#include <future>
8+
#include <mutex>
9+
#include <unordered_map>
10+
#include <vector>
11+
#include "global_clock.h"
12+
13+
// must be set
14+
#define MAX_ACK_TIMEOUT \
15+
(20 * TICK_DURATION) ///< Maximum time to wait for ACK
16+
17+
/**
18+
* @class Scheduler
19+
* @brief Manages retransmission timers and tracks acknowledgments for packets.
20+
*
21+
* This class provides functionality to manage retransmission timers for
22+
* packets, handle acknowledgments, and clean up packet data once
23+
* retransmissions are complete.
24+
*/
25+
class Scheduler
26+
{
27+
public:
28+
using Callback = std::function<void(int)>;
29+
30+
Scheduler();
31+
~Scheduler();
32+
33+
/**
34+
* @brief Stops all active timers and waits for their completion.
35+
*
36+
* This method ensures that all active timers (threads) complete their
37+
* execution before the program exits.
38+
*/
39+
void stopAllTimers();
40+
41+
/**
42+
* @brief Starts a retransmission timer for a given packet ID.
43+
*
44+
* The function initiates a timer to wait for an ACK (Acknowledgment) for the specified packet.
45+
* If the ACK is received within the MAX_ACK_TIMEOUT, it clears the packet data and sets the result
46+
* of the ackPromise to `true`, indicating success. If the timeout occurs and no ACK is received,
47+
* it triggers the provided callback function to retransmit the packet and increments the retry count.
48+
*
49+
* @param packetID The unique ID of the packet being transmitted.
50+
* @param callback The callback function to call when retransmitting the packet after a timeout.
51+
* @param ackPromise A shared promise that communicates whether the packet transmission was successful
52+
* (i.e., ACK was received).
53+
*/
54+
void startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise);
55+
56+
/**
57+
* @brief Receives an acknowledgment for a packet.
58+
*
59+
* @param packetID The ID of the packet that has been acknowledged.
60+
*/
61+
void receiveACK(int packetID);
62+
63+
/**
64+
* @brief Clears the data associated with a packet.
65+
*
66+
* @param packetID The ID of the packet whose data is to be cleared.
67+
*/
68+
void clearPacketData(int packetID);
69+
70+
private:
71+
std::unordered_map<int, bool>
72+
ackReceived; ///< Map to track received acknowledgments
73+
std::unordered_map<int, int>
74+
retryCounts; ///< Map to track retry counts for packets
75+
std::mutex mutex; ///< Mutex for thread safety
76+
std::condition_variable cv; ///< Condition variable for synchronization
77+
std::vector<std::future<void>>
78+
futures; ///< Vector to store futures of active threads
79+
};
80+
81+
#endif // __SCHEDULER_H__

communication/src/scheduler.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#include "../include/scheduler.h"
2+
3+
Scheduler::Scheduler()
4+
{
5+
}
6+
7+
Scheduler::~Scheduler()
8+
{
9+
stopAllTimers();
10+
}
11+
12+
void Scheduler::stopAllTimers()
13+
{
14+
for (auto &future : futures)
15+
{
16+
if (future.valid())
17+
{
18+
future.wait(); // Wait for all threads to finish
19+
}
20+
}
21+
}
22+
23+
void Scheduler::startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr<std::promise<bool>> ackPromise)
24+
{
25+
// Promise to manage the lifecycle of the thread itself
26+
std::promise<void> threadCompletionPromise;
27+
28+
// Future object to track the thread's completion
29+
std::future<void> future = threadCompletionPromise.get_future();
30+
31+
// Store the future to ensure we can wait for the thread to finish later
32+
futures.push_back(std::move(future));
33+
34+
// Start a new thread for handling retransmission and ACK wait
35+
std::thread([this, packetID, callback, threadCompletionPromise = std::move(threadCompletionPromise), ackPromise]() mutable
36+
{
37+
int retryCount = 0;
38+
39+
{
40+
// Lock the mutex to synchronize access to shared data (ackReceived)
41+
std::unique_lock<std::mutex> lock(mutex);
42+
43+
// Wait for an ACK or timeout
44+
if (cv.wait_for(lock, MAX_ACK_TIMEOUT, [this, packetID]()
45+
{ return ackReceived[packetID]; }))
46+
{
47+
// ACK received within the timeout period
48+
clearPacketData(packetID); // Clear packet data
49+
50+
// Set both promises to indicate success and thread completion
51+
threadCompletionPromise.set_value();
52+
ackPromise->set_value(true); // ACK was received, set to true
53+
return; // Exit the thread
54+
}
55+
else
56+
{
57+
// Timeout occurred, retransmit the packet
58+
retryCounts[packetID]++;
59+
retryCount = retryCounts[packetID];
60+
}
61+
}
62+
63+
// Call the callback function with the updated retry count
64+
callback(retryCount);
65+
66+
// Set the promise to indicate the thread has finished
67+
threadCompletionPromise.set_value();
68+
})
69+
.detach(); // Detach the thread to allow it to run independently
70+
}
71+
72+
void Scheduler::receiveACK(int packetID)
73+
{
74+
std::unique_lock<std::mutex> lock(mutex);
75+
ackReceived[packetID] = true;
76+
cv.notify_all(); // Notify all waiting threads
77+
}
78+
79+
void Scheduler::clearPacketData(int packetID)
80+
{
81+
ackReceived.erase(packetID);
82+
retryCounts.erase(packetID);
83+
}

0 commit comments

Comments
 (0)