-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor<command_processor.py> wrap the functions in a class and remo…
…ve global variable I didn't test the code yet. I removed the logger message and implemented a better error handling for the zero division
- Loading branch information
1 parent
2529c7a
commit 6a351c2
Showing
1 changed file
with
93 additions
and
135 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,142 +1,100 @@ | ||
import logging | ||
from collections import deque | ||
import time | ||
|
||
|
||
# Declaring the logger | ||
logger = logging.getLogger(__name__) | ||
log_format = "%(levelname)s: %(filename)s %(funcName)s %(message)s" | ||
|
||
|
||
############################################################################################################################################ | ||
#################### Can be of inf size. Potentially dangerous if the user keeps steering => queue keeps appending data #################### | ||
steering_queue = deque() # A queue in which steering values other than 0 will be stored in | ||
############################################################################################################################################ | ||
|
||
time_now = 0.0 # Default values for the current time | ||
delay = 0.0 # Default values for the delay amount | ||
distance = 1.35 # Distance between the motors of the current segment and the LD segment in m | ||
use_steering_queue = False # Variable that tells the function when it is time to use the queue for steering values | ||
started_counting_down = False # Variable that tells the function to start counting down. We dont want the function to start counting down unless it receives steering first | ||
|
||
|
||
counter = 0 | ||
|
||
|
||
def process(movement_command): | ||
"""Use: | ||
This function is processing commands received from the LD segment. In general, the LD sends us commands that its executing now, | ||
and it is the FL's job to decide what to do based on those received commands. | ||
- Will reverse throttle, if the segment is mounted backwards, compared to its LD. All throttle values are applied right away\n | ||
- Will update the timestamp in the command received, so as not to keep the timestamp of the Head segment for all other segments. | ||
Each segment checks the timestamp in the command. If the command timestamp is too old, it wont get forwarded, so we need to update | ||
the timstamp value with the value of the time of retrieval from the LD.\n | ||
- Will apply the steering received from the LD. If steering is 0, then nothing to do. If steering is not 0, then it will start a special process. | ||
Firstly, it will calculate the delay that it needs to wait before applying the steering values. | ||
This delay is calculated as: 'Delay = Distance_between_segment_motors / Velocity of LD segment'. | ||
Secondly, the steering values that are not 0 are stored in a deque, to keep track of what the segment needs to apply after the delay. | ||
Thirdly, a countdown starts. After 'delay' seconds pass, then the segment will start applying the steering values stored in the queue, | ||
effectively turning the segment exactly the same way as the LD did 'delay' seconds ago. | ||
When the queue is empty, meaning that it has finished turning like its LD did, it will continue moving straight. | ||
Args: | ||
movement_command (Dictionary): {'steering': Float, 'throttle': Float, 'time': Int, 'navigator': {'route': None}, 'velocity': Float} | ||
Returns: | ||
Dictionary: {'steering': Float, 'throttle': Float, 'time': Int, 'navigator': {'route': None}, 'velocity': Float} | ||
class MovementProcessor: | ||
""" | ||
A class to process movement commands received from the LD segment. | ||
This class handles the processing of movement commands, including reversing | ||
throttle if necessary, updating timestamps, and applying steering values after | ||
a calculated delay to mimic the movement of the leading segment. | ||
""" | ||
|
||
global steering_queue | ||
global time_now | ||
global delay | ||
global use_steering_queue | ||
global started_counting_down | ||
global counter | ||
|
||
|
||
velocity = movement_command["velocity"] # Velocity of the LD segment in m/s | ||
# velocity = 1 # Testing the delay in m/s | ||
steering = movement_command["steering"] # Getting the steering value from the command we just got | ||
applied_steering = 0.0 # The steering value that the final command will include | ||
time_counter = time.perf_counter() # Getting the current time value | ||
|
||
counter = counter + 1 | ||
|
||
# Clearing the queue when the robot is standing still | ||
if movement_command["throttle"] == 0: | ||
steering_queue.clear() | ||
|
||
# If we detect that the LD is turning | ||
if steering != 0: | ||
|
||
# Calculating the delay for steering | ||
try: | ||
# Execution delay of steering in s | ||
# We want the absolute value, the delay is never negative | ||
delay = abs(distance / velocity) | ||
|
||
# If velocity == 0, meaning the robot is standing still, we do not move on with calculations | ||
except ZeroDivisionError: | ||
|
||
# We reverse throttle | ||
movement_command["throttle"] = -movement_command["throttle"] | ||
|
||
# We apply the steering. | ||
movement_command["steering"] = -movement_command["steering"] | ||
|
||
counter = 0 | ||
|
||
return movement_command | ||
|
||
# Place the non-zero steering in the deque | ||
steering_queue.append(steering) | ||
|
||
# Getting the time in which we start receiving steering other than 0 | ||
# Making sure we mark the time only once, and not keep updating it every time we receive non-zero steering | ||
# We will be able to run this again after the current queue with values is emptied. | ||
if not started_counting_down: | ||
time_now = time.perf_counter() | ||
|
||
# Telling the function that we received steering, and to start counting down so that the values we store are applied after "delay" seconds | ||
started_counting_down = True | ||
|
||
|
||
|
||
# If we need to start counting down | ||
# AND | ||
# If the time difference between the function time and the time in which we first received steering is "delay" seconds | ||
if counter == 50: | ||
print(f"Time counter: {time_counter - time_now}\nDelay: {delay}") | ||
counter = 0 | ||
if started_counting_down and time_counter - time_now >= delay: | ||
# We mark the time we start applying the values for the next iteration of the function | ||
time_now = time_counter | ||
|
||
# We turn the trigger into True, making the function use steering from the queue | ||
use_steering_queue = True | ||
|
||
# Trigger that tells the function to use steering from the queue | ||
if use_steering_queue: | ||
# We take the first item in the deque and we apply it in the processed movement command | ||
# Since we will first apply the steering that was first added to the deque (oldest steering first), | ||
# our data structure needs to be FIFO (First in First out) | ||
started_counting_down = False | ||
try: | ||
applied_steering = steering_queue.popleft() | ||
# print("popped") | ||
except IndexError: | ||
# If the queue is empty, we put steering = 0 and turn the triggers to False | ||
applied_steering = 0.0 | ||
use_steering_queue = False | ||
# started_counting_down = False | ||
|
||
|
||
# We reverse throttle | ||
movement_command["throttle"] = -movement_command["throttle"] | ||
|
||
# We apply the steering. | ||
movement_command["steering"] = -applied_steering | ||
|
||
return movement_command | ||
def __init__(self, distance=1.35): | ||
""" | ||
Initializes the MovementProcessor with default settings. | ||
Args: | ||
distance (float): The distance between the motors of the current segment | ||
and the LD segment in meters. Default is 1.35 meters. | ||
""" | ||
self.steering_queue = deque() # Queue to store steering values | ||
self.time_now = 0.0 # Timestamp when steering was first received | ||
self.delay = 0.0 # Delay before applying steering, calculated based on velocity | ||
self.distance = distance # Distance between segments | ||
self.use_steering_queue = False # Flag to indicate when to use the steering queue | ||
self.started_counting_down = False # Flag to indicate if the delay countdown has started | ||
|
||
def process(self, movement_command): | ||
""" | ||
Processes the movement command from the LD segment. | ||
This method is processing commands received from the LD segment. In general, the LD sends us commands that its executing now, | ||
and it is the FL's job to decide what to do based on those received commands. | ||
- Will reverse throttle, if the segment is mounted backwards, compared to its LD. All throttle values are applied right away\n | ||
- Will update the timestamp in the command received, so as not to keep the timestamp of the Head segment for all other segments. | ||
Each segment checks the timestamp in the command. If the command timestamp is too old, it wont get forwarded, so we need to update | ||
the timstamp value with the value of the time of retrieval from the LD.\n | ||
- Will apply the steering received from the LD. If steering is 0, then nothing to do. If steering is not 0, then it will start a special process. | ||
Firstly, it will calculate the delay that it needs to wait before applying the steering values. | ||
This delay is calculated as: 'Delay = Distance_between_segment_motors / Velocity of LD segment'. | ||
Secondly, the steering values that are not 0 are stored in a deque, to keep track of what the segment needs to apply after the delay. | ||
Thirdly, a countdown starts. After 'delay' seconds pass, then the segment will start applying the steering values stored in the queue, | ||
effectively turning the segment exactly the same way as the LD did 'delay' seconds ago. | ||
When the queue is empty, meaning that it has finished turning like its LD did, it will continue moving straight. | ||
Args: | ||
movement_command (Dictionary): {'steering': Float, 'throttle': Float, 'time': Int, 'navigator': {'route': None}, 'velocity': Float} | ||
Returns: | ||
Dictionary: {'steering': Float, 'throttle': Float, 'time': Int, 'navigator': {'route': None}, 'velocity': Float} | ||
""" | ||
velocity = movement_command["velocity"] | ||
steering = movement_command["steering"] | ||
throttle = movement_command["throttle"] | ||
applied_steering = 0.0 | ||
current_time = time.perf_counter() | ||
|
||
# Clear the steering queue when the robot is standing still | ||
if throttle == 0: | ||
self.steering_queue.clear() | ||
|
||
# If the LD is turning, process the steering command | ||
if steering != 0: | ||
if velocity != 0: | ||
# Calculate the delay based on distance and velocity | ||
self.delay = abs(self.distance / velocity) | ||
# Store the steering value for later application | ||
self.steering_queue.append(steering) | ||
# Start the delay countdown if not already started | ||
if not self.started_counting_down: | ||
self.time_now = current_time | ||
self.started_counting_down = True | ||
else: | ||
# If velocity is zero, reverse throttle and steering immediately | ||
movement_command["throttle"] = -throttle | ||
movement_command["steering"] = -steering | ||
return movement_command | ||
|
||
# Check if the delay period has passed to apply steering from the queue | ||
if self.started_counting_down and (current_time - self.time_now >= self.delay): | ||
self.use_steering_queue = True | ||
self.started_counting_down = False | ||
|
||
# Apply steering from the queue if it's time | ||
if self.use_steering_queue: | ||
try: | ||
applied_steering = self.steering_queue.popleft() | ||
except IndexError: | ||
# If the queue is empty, stop using the steering queue | ||
applied_steering = 0.0 | ||
self.use_steering_queue = False | ||
|
||
# Reverse throttle (if the segment is mounted backwards) | ||
movement_command["throttle"] = -throttle | ||
# Apply the delayed steering value | ||
movement_command["steering"] = -applied_steering | ||
|
||
return movement_command |