Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate payload order algorithm and wrote up unit test (Not Ready to Merge) #170

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 90 additions & 7 deletions tests/imaging/integ_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from uavf_2024.imaging.image_processor import ImageProcessor
from uavf_2024.imaging.tracker import TargetTracker
from uavf_2024.imaging.imaging_types import FullBBoxPrediction, Image, ProbabilisticTargetDescriptor, Target3D, COLORS, SHAPES, LETTERS, CertainTargetDescriptor
from uavf_2024.imaging.utils import calc_match_score
from uavf_2024.imaging.utils import calc_match_score, sort_payload

import os
import numpy as np
import shutil
Expand Down Expand Up @@ -147,14 +148,56 @@ def sort_key(file_name: str):

ground_truth: list[Target3D] = all_ground_truth

closest_tracks = tracker.estimate_positions([t.descriptor.collapse_to_certain() for t in ground_truth])
#insert the sorting algorithm:
# names ["shape", "letter", "shape_col", "letter_col"],

sim_root_folder = f"{CURRENT_FILE_PATH}/visualizations/test_metrics"
irl_root_folder = f"{CURRENT_FILE_PATH}/visualizations/test_irl"


shape_matrix = np.loadtxt( os.path.join( irl_root_folder, f"shape_confusion_matrix.csv"), delimiter=",",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you encapsulate the payload sorting algorithm more? I would rather that when we use it, we don't have to deal with loading all the confusion matrices ourselves.

skiprows = 1, usecols= range(1,10))
letter_matrix = np.genfromtxt(os.path.join( irl_root_folder, f"letter_confusion_matrix.csv"), delimiter=',', dtype=str)
letter_matrix = letter_matrix[1:, 1:].astype(np.float32)
#print(letter_matrix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you get rid of all the lines that are commented out like this?

color_matrix = np.loadtxt(os.path.join( irl_root_folder, f"shape_col_confusion_matrix.csv"), delimiter=",",
skiprows= 1, usecols= range(1,9))

#print("content", dir(all_ground_truth[0]))
payload_truth = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick on variable names: can you call this ground_truth_descriptors?

Also, it would be easier to read if you initialize it in one line as a list comprehension.

for each_truth in ground_truth:
payload_truth.append(each_truth.descriptor)


resorted_payload = sort_payload( list_payload_targets= payload_truth, shape_confusion= shape_matrix,
letter_confusion= letter_matrix, color_confusion= color_matrix, penalty= True)

closest_resorted_tracks = tracker.estimate_positions([each_description.collapse_to_certain() for each_description in resorted_payload])

scores = []
distances = []
for gt_target, pred_track in zip(ground_truth, closest_tracks):
is_close_enough = np.linalg.norm(pred_track.position-gt_target.position) < POSITION_ERROR_ACCEPTABLE_BOUND
payload_truth = []

for each_track in resorted_payload:
for each_truth in ground_truth:
if str(each_truth.descriptor.collapse_to_certain()) == str(each_track.collapse_to_certain()):
payload_truth.append(each_truth)
break

assert len(payload_truth) != 0, "Payload truth should not be empty"
close_detection = []

for targ_num, (gt_target, pred_track) in enumerate(zip(payload_truth, closest_resorted_tracks)):
#note to self: this determines whether the is close enough is Yes or No however if multiple targets have integration test oops
#also need to insert the payload algorithm and verify whether the order lines up with the number of times it's getting it right
is_close_enough = np.linalg.norm(pred_track.position- gt_target.position) < POSITION_ERROR_ACCEPTABLE_BOUND
scores.append(int(is_close_enough))

close_detection.append( np.linalg.norm(pred_track.position-gt_target.position))

if is_close_enough:
distances.append(np.linalg.norm(pred_track.position-gt_target.position))

if verbose: # we only want to print this extra info for the first one to not clog up the output
print(f"Closest Match for {str(gt_target.descriptor.collapse_to_certain())}:")
physically_closest_match = min(predictions_3d, key=lambda pred: np.linalg.norm(pred.position-gt_target.position))
Expand All @@ -178,8 +221,23 @@ def sort_key(file_name: str):
print(f"\tHighest descriptor match id: {closest_match.id}")
print(f"\tHigh descriptor match distance: {np.linalg.norm(closest_match.position-gt_target.position):.3f}")
print(f"\tClose enough? {is_close_enough}")

return np.sum(scores), np.mean(distances), np.std(distances)

avg_detection = np.zeros(len(close_detection))
#Reporting Results of the Payload Algorithm Plugged in
payload_order = [str(each_description.collapse_to_certain()) for each_description in resorted_payload]
print("Payload algorithm with penalty sorting results:", "\n".join(payload_order))

truth_order = sorted(close_detection, reverse= False)

#payload_results_dict = {"Average Detection" : np.zeros(len(truth_order)), "Average Distance" : np.zeros(len(truth_order)) }

for rank, detect_dist in enumerate(close_detection ):
if close_detection[rank] == truth_order[rank]:
avg_detection[rank] += 1

payload_results_dict = np.array([avg_detection, close_detection])

return np.sum(scores), np.mean(distances), np.std(distances), payload_results_dict

class TestPipeline(unittest.TestCase):
def test_with_sim_dataset(self, verbose: bool = False):
Expand All @@ -192,10 +250,35 @@ def test_with_sim_dataset(self, verbose: bool = False):
scores = []
dists = []
datasets_folder = f'{CURRENT_FILE_PATH}/2024_test_data/3d_datasets/'

#a total of 3 runs with different sets of payload targets is supposed to happen here

payload_dataset_results = []

for dataset_name in os.listdir(datasets_folder):
score, dist_avg, dist_std = test_with_dataset(f'{datasets_folder}/{dataset_name}', f'integ_test_{dataset_name}')
score, dist_avg, dist_std, payload_results = test_with_dataset(f'{datasets_folder}/{dataset_name}', f'integ_test_{dataset_name}')
scores.append(score)
dists.append(dist_avg)
#append a payload numpy array
payload_dataset_results.append(payload_results)
#break #temporary patch for the test breaking when it runs into converting a person test case into non shape probabilities

payload_report = np.sum(payload_dataset_results, axis = 0) / len ( payload_dataset_results)

payload_report_keys = ["Rank", "Average Detection", "Average Distance"]
max_length = max(len(word) for word in payload_report_keys)

print(f"{payload_report_keys[0]:>{10}} {payload_report_keys[1]:>{20}} {payload_report_keys[2]:>{20}}")

# Print array with row labels
for label, row, data in zip(range(1, len( payload_report[0])+1), payload_report[0], payload_report[1]):
print(f"{str(label):>{10}} {row:>{20}.{5}f} {data:>{20}.{5}f}")


#max_key_length = max(len(key) for key in my_dict.keys())



scores_hist = np.histogram(scores, bins=[0,1,2,3,4,5])
print(f"Imaging Avg Sim Score: {np.mean(scores)}/5")
print("Distribution of scores:")
Expand Down
2 changes: 1 addition & 1 deletion uavf_2024/imaging/imaging_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def to_indices(self):
def as_probabilistic(self) -> ProbabilisticTargetDescriptor:
err_message = '''Cannot convert to probabilistic if any of the values are None (probably trying
to convert a ground truth label with missing data, which shouldn't be done'''
assert None not in [self.shape, self.letter, self.shape_col, self.letter_col], err_message
#assert None not in [self.shape, self.letter, self.shape_col, self.letter_col], err_message
shape_probs = np.zeros(len(SHAPES))
shape_probs[SHAPES.index(self.shape)] = 1.0

Expand Down
39 changes: 25 additions & 14 deletions uavf_2024/imaging/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,37 @@ def sort_payload(list_payload_targets: List[ProbabilisticTargetDescriptor], shap
for target_position, payload_target in enumerate(list_payload_targets):
payload_targets_order[target_position] = 1
shape_class_index = np.where(payload_target.shape_probs == 1)[0][0]
letter_class_index = np.where(payload_target.letter_probs == 1)[0][0]
shape_col_class_index = np.where(payload_target.shape_col_probs == 1)[0][0]
letter_col_class_index = np.where(payload_target.letter_col_probs == 1)[0][0]
arr = payload_target.letter_probs
if np.all(arr.flatten() == arr.flatten()[0]):
print("letter" )
letter_class_index = -1
shape_col_class_index = -1
letter_col_class_index = -1

else:
letter_class_index = np.where(payload_target.letter_probs == 1)[0][0]
shape_col_class_index = np.where(payload_target.shape_col_probs == 1)[0][0]
letter_col_class_index = np.where(payload_target.letter_col_probs == 1)[0][0]

# Iterate through confusion matrices and corresponding description indices
for confusion_matrix, class_index in zip([shape_confusion, letter_confusion, color_confusion, color_confusion],
[shape_class_index, letter_class_index, shape_col_class_index, letter_col_class_index]):
descrp_truth_row = confusion_matrix[class_index]
descrp_pos_truth = descrp_truth_row[class_index]

# Computes the penalty by summing the squares of each negative truth probability
descrp_neg_truth_penalty = (descrp_truth_row[descrp_truth_row != descrp_pos_truth])**2
descrp_score = descrp_pos_truth - np.sum(descrp_neg_truth_penalty)
descrp_score = max(descrp_score, 0.0001) # Ensure non-negative score after the penalty

if class_index == -1:
print( " ") #place holder, i don't know how to handle person for confusion matrix brb
else:
descrp_truth_row = confusion_matrix[class_index]
descrp_pos_truth = descrp_truth_row[class_index]
# Computes the penalty by summing the squares of each negative truth probability
descrp_neg_truth_penalty = (descrp_truth_row[descrp_truth_row != descrp_pos_truth])**2
descrp_score = descrp_pos_truth - np.sum(descrp_neg_truth_penalty)
descrp_score = max(descrp_score, 0.0001) # Ensure non-negative score after the penalty

# Computes the target's confidence score by multiplying with the confidence values for the four descriptions
if penalty:
payload_targets_order[target_position] *= descrp_score
else:
payload_targets_order[target_position] *= descrp_pos_truth
if penalty:
payload_targets_order[target_position] *= descrp_score
else:
payload_targets_order[target_position] *= descrp_pos_truth

#Reorder the payload target list based on the confidence values
payload_targets_order = sorted(payload_targets_order.items(), key=lambda item: item[1], reverse=True)
Expand Down