-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from patham9/MeTTaIntegration
Modularization and direct MeTTa code support
- Loading branch information
Showing
10 changed files
with
676 additions
and
557 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from collections import deque | ||
|
||
def BFS_for_nearest_unknown_cell(low_res_grid, new_width, new_height, start_x, start_y): | ||
directions = [(0, 1, 'right'), (0, -1, 'left'), (1, 0, 'down'), (-1, 0, 'up')] | ||
start_idx = start_y * new_width + start_x | ||
queue = deque([(start_x, start_y)]) | ||
visited = set() | ||
visited.add(start_idx) | ||
while queue: | ||
x, y = queue.popleft() | ||
for dx, dy, action in directions: | ||
nx, ny = x + dx, y + dy | ||
nidx = ny * new_width + nx | ||
if 0 <= nx < new_width and 0 <= ny < new_height: | ||
if low_res_grid[nidx] == -1: #found unknown | ||
return (nx, ny) | ||
if low_res_grid[nidx] == 0 and nidx not in visited: | ||
queue.append((nx, ny)) | ||
visited.add(nidx) | ||
return None |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
#!/usr/bin/env python3 | ||
# main.py | ||
import rclpy | ||
from rclpy.node import Node | ||
import tf2_ros | ||
|
||
from yolo import YOLODetector | ||
from robot import RobotLocalization | ||
from semanticslam import SemanticSLAM | ||
from nav import Navigation | ||
|
||
class MainNode(Node): | ||
def __init__(self): | ||
super().__init__('NARTECH_node') | ||
# Create a single TF2 buffer and listener to be shared across modules. | ||
self.tf_buffer = tf2_ros.Buffer() | ||
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self) | ||
|
||
# Instantiate the helper modules. | ||
self.yolo_detector = YOLODetector(self, self.tf_buffer) | ||
self.robot_localization = RobotLocalization(self, self.tf_buffer) | ||
self.semantic_slam = SemanticSLAM(self, self.tf_buffer, self.robot_localization, self.yolo_detector) | ||
self.navigation = Navigation(self, self.semantic_slam, self.robot_localization) | ||
self.start_navigation_to_coordinate = self.navigation.start_navigation_to_coordinate | ||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
node = MainNode() | ||
try: | ||
rclpy.spin(node) | ||
except KeyboardInterrupt: | ||
pass | ||
finally: | ||
node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from exploration import * | ||
from copy import deepcopy | ||
from hyperon.ext import register_atoms | ||
from hyperon import * | ||
import time | ||
|
||
NAV_STATE_FAIL = "FAIL" | ||
NAV_STATE_BUSY = "BUSY" | ||
NAV_STATE_SUCCESS = "SUCCESS" | ||
NAV_STATE = NAV_STATE_SUCCESS #"busy", "success" and "fail" (initially set to success) | ||
def NAV_STATE_SET(NAV_STATE_ARG): | ||
global NAV_STATE | ||
NAV_STATE = NAV_STATE_ARG | ||
return None | ||
|
||
class PatternOperation(OperationObject): | ||
def __init__(self, name, op, unwrap=False, rec=False): | ||
super().__init__(name, op, unwrap) | ||
self.rec = rec | ||
def execute(self, *args, res_typ=AtomType.UNDEFINED): | ||
return super().execute(*args, res_typ=res_typ) | ||
|
||
def wrapnpop(func): | ||
def wrapper(*args): | ||
a = [str("'"+arg) if arg is SymbolAtom else str(arg) for arg in args] | ||
res = func(*a) | ||
return [res] | ||
return wrapper | ||
|
||
MeTTaROS2Command = "" | ||
def call_bridgeinput(*a): | ||
global runner, MeTTaROS2Command | ||
tokenizer = runner.tokenizer() | ||
cmd = str(a[0]) | ||
parser = SExprParser(f"(MeTTaROS2Command {cmd})") | ||
MeTTaROS2Command = cmd | ||
return parser.parse(tokenizer) | ||
|
||
def space_init(): | ||
global runner | ||
with open("space.metta", "r") as f: | ||
metta_code = f.read() | ||
runner = MeTTa() | ||
call_bridgeinput_atom = G(PatternOperation('bridgeinput', wrapnpop(call_bridgeinput), unwrap=False)) | ||
runner.register_atom("bridgeinput", call_bridgeinput_atom) | ||
runner.run(metta_code) | ||
|
||
currentTime = 0 | ||
start_time = time.time() | ||
|
||
def space_tick(node): | ||
global currentTime, MeTTaROS2Command | ||
cmd = MeTTaROS2Command | ||
MeTTaROS2Command = "" | ||
if cmd == "right": | ||
node.start_navigation_by_moves("right") | ||
if cmd == "left": | ||
node.start_navigation_by_moves("left") | ||
if cmd == "up": | ||
node.start_navigation_by_moves("up") | ||
if cmd == "down": | ||
node.start_navigation_by_moves("down") | ||
if cmd.startswith("(goto (coordinates "): | ||
x_y = cmd.split("(goto (coordinates ")[1].split(")")[0] | ||
x = int(x_y.split(" ")[0]) | ||
y = int(x_y.split(" ")[1]) | ||
node.start_navigation_to_coordinate((x, y)) | ||
alldetections = deepcopy(node.semantic_slam.previous_detections) | ||
objects = "(" | ||
if "self" in node.semantic_slam.previous_detections: | ||
(t, object_grid_x, object_grid_y, origin_x, origin_y) = node.semantic_slam.previous_detections["self"] | ||
x_y_unknown = BFS_for_nearest_unknown_cell(node.semantic_slam.low_res_grid, node.semantic_slam.new_width, node.semantic_slam.new_height, object_grid_x, object_grid_y) | ||
if x_y_unknown: | ||
(x_unknown,y_unknown) = x_y_unknown | ||
alldetections["unknown"] = (time.time(), x_unknown, y_unknown, origin_x, origin_y) | ||
print(alldetections) | ||
for category in alldetections: | ||
(t, object_grid_x, object_grid_y, _, __) = alldetections[category] | ||
SEXP = f"(detection {category} (coordinates {object_grid_x} {object_grid_y}))" | ||
objects += SEXP | ||
if category == "self": | ||
SELF_position = (object_grid_x, object_grid_y) | ||
objects += ")" | ||
currentTime += 1 | ||
elapsedTime = round(time.time() - start_time, 2) | ||
print("NAV_STATE", NAV_STATE, runner.run(f"!(Step {currentTime} {elapsedTime} {NAV_STATE} {objects})")) | ||
|
||
space_init() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# nav.py | ||
import time | ||
import math | ||
import rclpy | ||
from rclpy.node import Node | ||
from std_msgs.msg import String | ||
from nav2_msgs.action import NavigateToPose | ||
from rclpy.action import ActionClient | ||
from geometry_msgs.msg import PoseStamped | ||
from action_msgs.msg import GoalStatus | ||
# Assume mettabridge defines these constants and functions: | ||
from mettabridge import NAV_STATE_SET, NAV_STATE_BUSY, NAV_STATE_SUCCESS, NAV_STATE_FAIL | ||
|
||
class Navigation: | ||
def __init__(self, node: Node, semantic_slam, robot_localization): | ||
self.node = node | ||
self.semantic_slam = semantic_slam | ||
self.robot_localization = robot_localization | ||
self.navigation_goal = None | ||
self.navigation_retries = 0 | ||
self.goal_handle = None | ||
|
||
qos_profile_str = rclpy.qos.QoSProfile(depth=1) | ||
qos_profile_str.history = rclpy.qos.QoSHistoryPolicy.KEEP_LAST | ||
qos_profile_str.durability = rclpy.qos.QoSDurabilityPolicy.TRANSIENT_LOCAL | ||
qos_profile_str.reliability = rclpy.qos.QoSReliabilityPolicy.RELIABLE | ||
|
||
self.naceop_sub = self.node.create_subscription( | ||
String, '/naceop', self.naceop_callback, qos_profile_str) | ||
self.nacedone_pub = self.node.create_publisher(String, '/nacedone', qos_profile_str) | ||
self.action_client = ActionClient(self.node, NavigateToPose, 'navigate_to_pose') | ||
|
||
def naceop_callback(self, msg): | ||
self.semantic_slam.goalstart = self.semantic_slam.mapupdate # capture current map update state | ||
command = msg.data.lower() | ||
self.node.get_logger().info(f"Received command: {command}") | ||
self.start_navigation_by_moves(command) | ||
|
||
def start_navigation_by_moves(self, command): | ||
if self.semantic_slam.robot_lowres_x is None: | ||
return | ||
target_cell = self.get_current_target_cell(command) | ||
if target_cell: | ||
self.start_navigation_to_coordinate(target_cell, command) | ||
|
||
def start_navigation_to_coordinate(self, target_cell, command=""): | ||
NAV_STATE_SET(NAV_STATE_BUSY) | ||
if self.semantic_slam.robot_lowres_x is None: | ||
return | ||
self.navigation_goal = (target_cell, command) | ||
self.navigation_retries = 0 | ||
self.send_navigation_goal(target_cell, command) | ||
|
||
def send_navigation_goal(self, target_cell, command): | ||
origin_x, origin_y = self.semantic_slam.origin.position.x, self.semantic_slam.origin.position.y | ||
import sys | ||
if "metta" not in sys.argv and self.check_collision(target_cell): | ||
if "," in self.navigation_goal[1]: | ||
self.node.get_logger().info("COLLISION, shortening command") | ||
newcommand = ",".join(self.navigation_goal[1].split(",")[1:]) | ||
self.start_navigation_by_moves(newcommand) | ||
return | ||
else: | ||
self.node.get_logger().info("COLLISION, aborting") | ||
self.publish_done(force_mapupdate=False) | ||
NAV_STATE_SET(NAV_STATE_FAIL) | ||
return | ||
cell_x, cell_y = target_cell | ||
goal_pose = PoseStamped() | ||
goal_pose.header.frame_id = 'map' | ||
goal_pose.header.stamp = self.node.get_clock().now().to_msg() | ||
goal_pose.pose.position.x = origin_x + (cell_x * self.semantic_slam.new_resolution) + self.semantic_slam.new_resolution / 2 | ||
goal_pose.pose.position.y = origin_y + (cell_y * self.semantic_slam.new_resolution) + self.semantic_slam.new_resolution / 2 | ||
goal_pose.pose.orientation = self.robot_localization.set_orientation(command) | ||
self.node.get_logger().info(f"Sending goal to ({goal_pose.pose.position.x}, {goal_pose.pose.position.y})") | ||
if not self.action_client.wait_for_server(timeout_sec=1.0): | ||
self.node.get_logger().error("Action server not available!") | ||
self.publish_done(force_mapupdate=False) | ||
return | ||
goal_msg = NavigateToPose.Goal() | ||
goal_msg.pose = goal_pose | ||
sent_goal = self.action_client.send_goal_async(goal_msg) | ||
sent_goal.add_done_callback(self._goal_response_callback) | ||
|
||
def _goal_response_callback(self, future): | ||
self.goal_handle = future.result() | ||
if not self.goal_handle.accepted: | ||
self.goal_handle = None | ||
self.node.get_logger().info("Goal rejected") | ||
self.publish_done(force_mapupdate=False) | ||
return | ||
self.node.get_logger().info("Goal accepted, waiting for result") | ||
result_future = self.goal_handle.get_result_async() | ||
result_future.add_done_callback(self._result_callback) | ||
|
||
def _result_callback(self, future): | ||
result = future.result() | ||
nav_state = NAV_STATE_SUCCESS | ||
if result.status == GoalStatus.STATUS_SUCCEEDED: | ||
self.node.get_logger().info("Goal succeeded!") | ||
else: | ||
if self.navigation_retries < 10 and "metta" not in __import__("sys").argv: | ||
self.node.get_logger().info("Goal failed with status: {0}, retrying".format(result.status)) | ||
self.navigation_retries += 1 | ||
self.send_navigation_goal(self.navigation_goal[0], self.navigation_goal[1]) | ||
return | ||
else: | ||
if "," in self.navigation_goal[1]: | ||
self.node.get_logger().info("Goal failed with status: {0}, exhausted retries, shortening command".format(result.status)) | ||
newcommand = ",".join(self.navigation_goal[1].split(",")[1:]) | ||
self.start_navigation_by_moves(newcommand) | ||
return | ||
else: | ||
self.node.get_logger().info("Goal failed with status: {0}, exhausted retries and shortenings".format(result.status)) | ||
self.publish_done(force_mapupdate=True) | ||
NAV_STATE_SET(nav_state) | ||
|
||
def publish_done(self, force_mapupdate): | ||
force_mapupdate = False # as in the original code, map updating is fast so we disable forced waiting | ||
if force_mapupdate: | ||
waittime = 0.1 | ||
while self.semantic_slam.mapupdate == self.semantic_slam.goalstart: | ||
if waittime == 0.1: | ||
self.node.get_logger().info("Waiting for map update!") | ||
miniwait = 0.1 | ||
time.sleep(miniwait) | ||
waittime += miniwait | ||
if waittime > 10.0: | ||
self.node.get_logger().warn("Wait time exceeded, finishing nevertheless!") | ||
break | ||
msg = String() | ||
msg.data = str(time.time()) | ||
self.nacedone_pub.publish(msg) | ||
self.node.get_logger().info("Published 'done' to /nacedone") | ||
|
||
def get_current_target_cell(self, dirs): | ||
current_x, current_y = self.semantic_slam.robot_lowres_x, self.semantic_slam.robot_lowres_y | ||
for direction in dirs.split(","): | ||
if direction == "left": | ||
current_x, current_y = (current_x - 1, current_y) | ||
elif direction == "right": | ||
current_x, current_y = (current_x + 1, current_y) | ||
elif direction == "up": | ||
current_x, current_y = (current_x, current_y + 1) | ||
elif direction == "down": | ||
current_x, current_y = (current_x, current_y - 1) | ||
return (current_x, current_y) | ||
|
||
def check_collision(self, target_cell): | ||
cell_x, cell_y = target_cell | ||
idx = cell_y * self.semantic_slam.new_width + cell_x | ||
if idx < len(self.semantic_slam.low_res_grid) and self.semantic_slam.low_res_grid[idx] not in [0, -1, 127]: | ||
self.node.get_logger().info("COLLISION!!!") | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# robot.py | ||
import math | ||
import rclpy | ||
from rclpy.duration import Duration | ||
from rclpy.time import Time | ||
import tf2_ros | ||
from geometry_msgs.msg import Quaternion | ||
|
||
class RobotLocalization: | ||
def __init__(self, node, tf_buffer): | ||
self.node = node | ||
self.tf_buffer = tf_buffer | ||
|
||
def get_robot_lowres_position(self, original_origin, original_resolution, downsample_factor): | ||
try: | ||
trans = self.tf_buffer.lookup_transform( | ||
'map', | ||
'base_link', | ||
rclpy.time.Time(), | ||
timeout=Duration(seconds=1.0) | ||
) | ||
robot_x = trans.transform.translation.x | ||
robot_y = trans.transform.translation.y | ||
map_origin_x = original_origin.position.x | ||
map_origin_y = original_origin.position.y | ||
robot_map_x = int((robot_x - map_origin_x) / original_resolution) | ||
robot_map_y = int((robot_y - map_origin_y) / original_resolution) | ||
robot_lowres_x = robot_map_x // downsample_factor | ||
robot_lowres_y = robot_map_y // downsample_factor | ||
return robot_lowres_x, robot_lowres_y | ||
except Exception as e: | ||
self.node.get_logger().error(f"Transform exception: {str(e)}") | ||
return 0, 0 # Default position if transform fails | ||
|
||
def set_orientation_with_angle(self, angle_radians): | ||
if angle_radians is None: | ||
return Quaternion(x=0.0, y=0.0, z=0.0, w=1.0) | ||
half_angle = angle_radians / 2.0 | ||
return Quaternion(x=0.0, y=0.0, z=math.sin(half_angle), w=math.cos(half_angle)) | ||
|
||
def set_orientation(self, command): | ||
# Use the last comma‐separated part of the command for orientation. | ||
command = command.split(",")[-1] | ||
if command == "right": | ||
angle = 0 # Facing up (0 radians) | ||
elif command == "left": | ||
angle = math.pi # Facing down (π radians) | ||
elif command == "up": | ||
angle = math.pi / 2 # Facing left (π/2 radians) | ||
elif command == "down": | ||
angle = -math.pi / 2 # Facing right (-π/2 radians) | ||
else: | ||
return self.set_orientation_with_angle(None) | ||
return self.set_orientation_with_angle(angle) |
Oops, something went wrong.