Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/run-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ runs:
cd /workspace/isaaclab
mkdir -p tests
echo 'Starting pytest with path: $test_path'
/isaac-sim/python.sh -m pytest --ignore=tools/conftest.py $test_path $pytest_options -v --junitxml=tests/$result_file || echo 'Pytest completed with exit code: $?'
/isaac-sim/python.sh -m pytest --ignore=tools/conftest.py source/isaaclab_tasks/test/test_environments.py $pytest_options -v -s --junitxml=tests/$result_file || echo 'Pytest completed with exit code: $?'
"; then
echo "✅ Docker container completed successfully"
else
Expand Down
3 changes: 1 addition & 2 deletions docs/source/overview/environments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ We provide environments for both disassembly and assembly.

.. attention::

CUDA is required for running the AutoMate environments.
Follow the below steps to install CUDA 12.8:
CUDA is required for running the AutoMate environments with 570 drivers. If running with Nvidia driver 570, we follow the below steps to install CUDA 12.8 and compute rewards in AutoMate environments with CUDA. With 580 drivers and CUDA 13, we are currently unable to enable CUDA for computing the rewards and will fallback to CPU, resulting in slower performance.

.. code-block:: bash

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ def __init__(self, cfg: AssemblyEnvCfg, render_mode: str | None = None, **kwargs
)

# Create criterion for dynamic time warping (later used for imitation reward)
self.soft_dtw_criterion = SoftDTW(use_cuda=True, device=self.device, gamma=self.cfg_task.soft_dtw_gamma)
cuda_version = automate_algo.get_cuda_version()
print("!!!!!!", cuda_version)
if cuda_version < "13.0":
self.soft_dtw_criterion = SoftDTW(use_cuda=True, device=self.device, gamma=self.cfg_task.soft_dtw_gamma)
else:
self.soft_dtw_criterion = SoftDTW(use_cuda=False, device=self.device, gamma=self.cfg_task.soft_dtw_gamma)

# Evaluate
if self.cfg_task.if_logging_eval:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#
# SPDX-License-Identifier: BSD-3-Clause

import numpy as np
import os
import re
import subprocess
import sys
import torch
import trimesh
Expand All @@ -14,248 +15,34 @@
print("Python Executable:", sys.executable)
print("Python Path:", sys.path)

from scipy.stats import norm

from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.mixture import GaussianMixture

base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "."))
sys.path.append(base_dir)

from isaaclab.utils.assets import retrieve_file_path

"""
Initialization / Sampling
Util Functions
"""


def get_prev_success_init(held_asset_pose, fixed_asset_pose, success, N, device):
"""
Randomly selects N held_asset_pose and corresponding fixed_asset_pose
at indices where success is 1 and returns them as torch tensors.

Args:
held_asset_pose (np.ndarray): Numpy array of held asset poses.
fixed_asset_pose (np.ndarray): Numpy array of fixed asset poses.
success (np.ndarray): Numpy array of success values (1 for success, 0 for failure).
N (int): Number of successful indices to select.
device: torch device.

Returns:
tuple: (held_asset_poses, fixed_asset_poses) as torch tensors, or None if no success found.
"""
# Get indices where success is 1
success_indices = np.where(success == 1)[0]

if success_indices.size == 0:
return None # No successful entries found

# Select up to N random indices from successful indices
selected_indices = np.random.choice(success_indices, min(N, len(success_indices)), replace=False)

return torch.tensor(held_asset_pose[selected_indices], device=device), torch.tensor(
fixed_asset_pose[selected_indices], device=device
)


def model_succ_w_gmm(held_asset_pose, fixed_asset_pose, success):
"""
Models the success rate distribution as a function of the relative position between the held and fixed assets
using a Gaussian Mixture Model (GMM).

Parameters:
held_asset_pose (np.ndarray): Array of shape (N, 7) representing the positions of the held asset.
fixed_asset_pose (np.ndarray): Array of shape (N, 7) representing the positions of the fixed asset.
success (np.ndarray): Array of shape (N, 1) representing the success.

Returns:
GaussianMixture: The fitted GMM.

Example:
gmm = model_succ_dist_w_gmm(held_asset_pose, fixed_asset_pose, success)
relative_pose = held_asset_pose - fixed_asset_pose
# To compute the probability of each component for the given relative positions:
probabilities = gmm.predict_proba(relative_pose)
"""
# Compute the relative positions (held asset relative to fixed asset)
relative_pos = held_asset_pose[:, :3] - fixed_asset_pose[:, :3]

# Flatten the success array to serve as sample weights.
# This way, samples with higher success contribute more to the model.
sample_weights = success.flatten()

# Initialize the Gaussian Mixture Model with the specified number of components.
gmm = GaussianMixture(n_components=2, random_state=0)

# Fit the GMM on the relative positions, using sample weights from the success metric.
gmm.fit(relative_pos, sample_weight=sample_weights)

return gmm


def sample_rel_pos_from_gmm(gmm, batch_size, device):
"""
Samples a batch of relative poses (held_asset relative to fixed_asset)
from a fitted GaussianMixture model.

Parameters:
gmm (GaussianMixture): A GaussianMixture model fitted on relative pose data.
batch_size (int): The number of samples to generate.

Returns:
torch.Tensor: A tensor of shape (batch_size, 3) containing the sampled relative poses.
"""
# Sample batch_size samples from the Gaussian Mixture Model.
samples, _ = gmm.sample(batch_size)

# Convert the numpy array to a torch tensor.
samples_tensor = torch.from_numpy(samples).to(device)

return samples_tensor

def get_cuda_version():
try:
# Execute nvcc --version command
result = subprocess.run(["nvcc", "--version"], capture_output=True, text=True, check=True)
output = result.stdout

def model_succ_w_gp(held_asset_pose, fixed_asset_pose, success):
"""
Models the success rate distribution given the relative position of the held asset
from the fixed asset using a Gaussian Process classifier.

Parameters:
held_asset_pose (np.ndarray): Array of shape (N, 7) representing the held asset pose.
Assumes the first 3 columns are the (x, y, z) positions.
fixed_asset_pose (np.ndarray): Array of shape (N, 7) representing the fixed asset pose.
Assumes the first 3 columns are the (x, y, z) positions.
success (np.ndarray): Array of shape (N, 1) representing the success outcome (e.g., 0 for failure,
1 for success).

Returns:
GaussianProcessClassifier: A trained GP classifier that models the success rate.
"""
# Compute the relative position (using only the translation components)
relative_position = held_asset_pose[:, :3] - fixed_asset_pose[:, :3]

# Flatten success array from (N, 1) to (N,)
y = success.ravel()

# Create and fit the Gaussian Process Classifier
# gp = GaussianProcessClassifier(kernel=kernel, random_state=42)
gp = GaussianProcessRegressor(random_state=42)
gp.fit(relative_position, y)

return gp


def propose_failure_samples_batch_from_gp(
gp_model, candidate_points, batch_size, device, method="ucb", kappa=2.0, xi=0.01
):
"""
Proposes a batch of candidate samples from failure-prone regions using one of three acquisition functions:
'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement), or 'ei' (Expected Improvement).

In this formulation, lower predicted success probability (closer to 0) is desired,
so we invert the typical acquisition formulations.

Parameters:
gp_model: A trained Gaussian Process model (e.g., GaussianProcessRegressor) that supports
predictions with uncertainties via the 'predict' method (with return_std=True).
candidate_points (np.ndarray): Array of shape (n_candidates, d) representing candidate relative positions.
batch_size (int): Number of candidate samples to propose.
method (str): Acquisition function to use: 'ucb', 'pi', or 'ei'. Default is 'ucb'.
kappa (float): Exploration parameter for UCB. Default is 2.0.
xi (float): Exploration parameter for PI and EI. Default is 0.01.

Returns:
best_candidates (np.ndarray): Array of shape (batch_size, d) containing the selected candidate points.
acquisition (np.ndarray): Acquisition values computed for each candidate point.
"""
# Obtain the predictive mean and standard deviation for each candidate point.
mu, sigma = gp_model.predict(candidate_points, return_std=True)
# mu, sigma = gp_model.predict(candidate_points)

# Compute the acquisition values based on the chosen method.
if method.lower() == "ucb":
# Inversion: we want low success (i.e. low mu) and high uncertainty (sigma) to be attractive.
acquisition = kappa * sigma - mu
elif method.lower() == "pi":
# Probability of Improvement: likelihood of the prediction falling below the target=0.0.
Z = (-mu - xi) / (sigma + 1e-9)
acquisition = norm.cdf(Z)
elif method.lower() == "ei":
# Expected Improvement
Z = (-mu - xi) / (sigma + 1e-9)
acquisition = (-mu - xi) * norm.cdf(Z) + sigma * norm.pdf(Z)
# Set acquisition to 0 where sigma is nearly zero.
acquisition[sigma < 1e-9] = 0.0
else:
raise ValueError("Unknown acquisition method. Please choose 'ucb', 'pi', or 'ei'.")

# Select the indices of the top batch_size candidates (highest acquisition values).
sorted_indices = np.argsort(acquisition)[::-1] # sort in descending order
best_indices = sorted_indices[:batch_size]
best_candidates = candidate_points[best_indices]

# Convert the numpy array to a torch tensor.
best_candidates_tensor = torch.from_numpy(best_candidates).to(device)

return best_candidates_tensor, acquisition


def propose_success_samples_batch_from_gp(
gp_model, candidate_points, batch_size, device, method="ucb", kappa=2.0, xi=0.01
):
"""
Proposes a batch of candidate samples from high success rate regions using one of three acquisition functions:
'ucb' (Upper Confidence Bound), 'pi' (Probability of Improvement), or 'ei' (Expected Improvement).

In this formulation, higher predicted success probability is desired.
The GP model is assumed to provide predictions with uncertainties via its 'predict' method (using return_std=True).

Parameters:
gp_model: A trained Gaussian Process model (e.g., GaussianProcessRegressor) that supports
predictions with uncertainties.
candidate_points (np.ndarray): Array of shape (n_candidates, d) representing candidate relative positions.
batch_size (int): Number of candidate samples to propose.
method (str): Acquisition function to use: 'ucb', 'pi', or 'ei'. Default is 'ucb'.
kappa (float): Exploration parameter for UCB. Default is 2.0.
xi (float): Exploration parameter for PI and EI. Default is 0.01.

Returns:
best_candidates (np.ndarray): Array of shape (batch_size, d) containing the selected candidate points.
acquisition (np.ndarray): Acquisition values computed for each candidate point.
"""
# Obtain the predictive mean and standard deviation for each candidate point.
mu, sigma = gp_model.predict(candidate_points, return_std=True)

# Compute the acquisition values based on the chosen method.
if method.lower() == "ucb":
# For maximization, UCB is defined as μ + kappa * σ.
acquisition = mu + kappa * sigma
elif method.lower() == "pi":
# Probability of Improvement (maximization formulation).
Z = (mu - 1.0 - xi) / (sigma + 1e-9)
acquisition = norm.cdf(Z)
elif method.lower() == "ei":
# Expected Improvement (maximization formulation).
Z = (mu - 1.0 - xi) / (sigma + 1e-9)
acquisition = (mu - 1.0 - xi) * norm.cdf(Z) + sigma * norm.pdf(Z)
# Handle nearly zero sigma values.
acquisition[sigma < 1e-9] = 0.0
else:
raise ValueError("Unknown acquisition method. Please choose 'ucb', 'pi', or 'ei'.")

# Sort candidates by acquisition value in descending order and select the top batch_size.
sorted_indices = np.argsort(acquisition)[::-1]
best_indices = sorted_indices[:batch_size]
best_candidates = candidate_points[best_indices]

# Convert the numpy array to a torch tensor.
best_candidates_tensor = torch.from_numpy(best_candidates).to(device)

return best_candidates_tensor, acquisition


"""
Util Functions
"""
# Use regex to find the CUDA version (e.g., V11.2.67)
match = re.search(r"V(\d+\.\d+(\.\d+)?)", output)
if match:
return match.group(1)
else:
return "CUDA version not found in output."
except FileNotFoundError:
return "nvcc command not found. Is CUDA installed and in your PATH?"
except subprocess.CalledProcessError as e:
return f"Error executing nvcc: {e.stderr}"
except Exception as e:
return f"An unexpected error occurred: {e}"


def get_gripper_open_width(obj_filepath):
Expand Down
4 changes: 0 additions & 4 deletions source/isaaclab_tasks/test/env_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ def _run_environments(
if "Visuomotor" in task_name and num_envs == 32:
return

# skip automate environments as they require cuda installation
if task_name in ["Isaac-AutoMate-Assembly-Direct-v0", "Isaac-AutoMate-Disassembly-Direct-v0"]:
return

# Check if this is the teddy bear environment and if it's being called from the right test file
if task_name == "Isaac-Lift-Teddy-Bear-Franka-IK-Abs-v0":
# Get the calling frame to check which test file is calling this function
Expand Down
Loading