diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/error_generation/__init__.py b/error_generation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/error_generation/add_code.py b/error_generation/add_code.py new file mode 100644 index 00000000..81fe7c1e --- /dev/null +++ b/error_generation/add_code.py @@ -0,0 +1,93 @@ +# Copyright (C) 2021 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import redbaron as rb +from misc_utils import ( + get_random_list_sample, + load_json, + load_data, + write_csv, + get_random_int, + get_random_float, + get_valid_code_trace, +) + +VARIABLE_NAMES = ["tmp{}".format(i) for i in range(10)] + [ + chr(ord("a") + i) for i in range(26) +] +NUM_RANGE = [1, 100] + + +def get_perturb_line_step(code_trace_org, err_suffx): + # Not all the variable types are valid for all the errors. + # For instance for index out of range an int var is not valid. + code_trace = get_valid_code_trace(code_trace_org, err_suffx) + if not code_trace: + return None, None, None + perturb_line = get_random_list_sample(code_trace.keys(), 1)[0] + perturb_step = get_random_list_sample(code_trace[perturb_line], 1)[0] + perturb_var = get_random_list_sample(perturb_step.keys(), 1)[0] + perturb_val = perturb_step[perturb_var] + return int(perturb_line), perturb_var, perturb_val + + +def perturb_program(red, code_trace, err_suffx, error_expr_factory_obj): + perturb_line, perturb_var, perturb_val = get_perturb_line_step( + code_trace, err_suffx + ) + if perturb_line is None: + return 0 + perturb_expression, is_err_present = error_expr_factory_obj.add_err( + err_suffx, perturb_var, perturb_val + ) + # TODO(rishab): Need to be careful to ensure that that the insertion + # line is not an AssignmentNode in RedBaron. + if err_suffx == "math_domain_err": + # The sqrt function needs to be imported so that sqrt function + # can be called. I am not sure if we can just add the expression + # without proper imports. + import_statement, perturb_expression = perturb_expression.split(";") + red.at(perturb_line).insert_before(import_statement, offset=perturb_line - 1) + red.at(perturb_line + 1).insert_after(perturb_expression) + else: + red.at(perturb_line).insert_after(perturb_expression) + return is_err_present + + +def add_error( + org_code_fp, code_trace_fp, err_code_fp, err_suffx, error_expr_factory_obj +): + # We can optimize the code by passing the read file. + # But for now to ensure isolation, I am doing it + # explicitly. + code_trace = load_json(code_trace_fp) + # To keep this function generic the name of the output + # code file has the error type and indicator whether the + # the error is present or not as suffix. + err_code_fp = err_code_fp.replace(".txt", "-" + err_suffx + ".txt") + program = load_data(org_code_fp).strip() + red = rb.RedBaron(program) + try: + is_err_present = perturb_program( + red, code_trace, err_suffx, error_expr_factory_obj + ) + err_code_fp = err_code_fp.replace(".txt", "-" + str(is_err_present) + ".txt") + except Exception as e: + # We can handle the exception as we want. + # But for the time being we can return False. + # import pdb;pdb.set_trace() + return False + + write_csv(red.dumps(), err_code_fp) + return True diff --git a/error_generation/config.yaml b/error_generation/config.yaml new file mode 100644 index 00000000..4591a2a0 --- /dev/null +++ b/error_generation/config.yaml @@ -0,0 +1,10 @@ +base_path: /Users/rishabgoel/Documents/compressive-ipagnn/data/codeforces +trace_code_path: error_generation/trace_code.py +process_suffix: processed +errors: + - zero_err + - assert_err + - not_subscriptable_err + - idx_out_range_err + - math_domain_err + - not_iterable_err \ No newline at end of file diff --git a/error_generation/error_expression_factory.py b/error_generation/error_expression_factory.py new file mode 100644 index 00000000..f4aed201 --- /dev/null +++ b/error_generation/error_expression_factory.py @@ -0,0 +1,231 @@ +# Copyright (C) 2021 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from misc_utils import ( + get_random_list_sample, + load_json, + load_data, + write_csv, + get_random_int, + get_random_float, +) + + +class ErrorFactory: + """TODO (rishab): + 1. implement methods for var name not defined and + operand mismatch. + 2. make the expressions more complex. + """ + + VARIABLE_NAMES = ["tmp{}".format(i) for i in range(10)] + [ + chr(ord("a") + i) for i in range(26) + ] + NUM_RANGE = [1, 100] + + def __init__(self): + self._builders = { + "zero_err": self.get_zero_perturb_expression, + "assert_err": self.get_assert_perturb_expression, + "not_subscriptable_err": self.get_not_subscriptable_perturb_expression, + "idx_out_range_err": self.get_index_range_perturb_expression, + "undef_var_err": self.get_undef_name_perturb_expression, # Caution: not implemented properly. + "math_domain_err": self.get_math_domain_perturb_expression, + "not_iterable_err": self.get_int_not_iterable_perturb_expression, + } + + def get_zero_perturb_expression(self, perturb_var, perturb_val): + assign_var = get_random_list_sample(self.VARIABLE_NAMES, 1)[0] + is_zerro_err = get_random_int(0, 1) + # is_zerro_err = 1 + if is_zerro_err: + numerator = get_random_float(*self.NUM_RANGE, size=1)[0] + return ( + assign_var + + "=" + + str(int(numerator)) + + "/(" + + str(perturb_val) + + "-" + + perturb_var + + ")", + is_zerro_err, + ) + else: + perturb_val_offset, numerator = get_random_float(*self.NUM_RANGE, size=2) + perturb_val = perturb_val + int(perturb_val_offset) + return ( + assign_var + + "=" + + str(int(numerator)) + + "/(" + + str(perturb_val) + + "-" + + perturb_var + + ")", + is_zerro_err, + ) + + def get_assert_perturb_expression(self, perturb_var, perturb_val): + is_assert_err = get_random_int(0, 1) + # is_assert_err = 1 + if is_assert_err: + perturb_val_offset = get_random_float(*self.NUM_RANGE, size=1)[0] + perturb_val = perturb_val + int(perturb_val_offset) + return ( + "assert " + perturb_var + "==" + str(perturb_val), + is_assert_err, + ) + else: + return ( + "assert " + perturb_var + "==" + str(perturb_val), + is_assert_err, + ) + + def get_not_subscriptable_perturb_expression(self, perturb_var, perturb_val): + is_not_subscriptable_err = get_random_int(0, 1) + # is_not_subscriptable_err = 1 + if is_not_subscriptable_err: + random_val, numerator = get_random_float(*self.NUM_RANGE, size=2) + return ( + perturb_var + "[" + str(int(numerator)) + "] = " + str(int(random_val)), + is_not_subscriptable_err, + ) + else: + return ( + "", + is_not_subscriptable_err, + ) + + def get_index_range_perturb_expression(self, perturb_var, perturb_val): + """This will occur very less frequently and hence we perhaps + need to rethink how to handle generate the error. + """ + is_index_range_err = get_random_int(0, 1) + # is_index_range_err = 1 + if is_index_range_err: + random_ass = get_random_float(*self.NUM_RANGE, size=1)[0] + return ( + perturb_var + + "[" + + str(len(perturb_val)) + + "] = " + + str(int(random_ass)), + is_index_range_err, + ) + else: + valid_idx = int(get_random_float(*[0, len(perturb_val) - 1], size=1)[0]) + random_ass = get_random_float(*self.NUM_RANGE, size=1)[0] + return ( + perturb_var + "[" + str(valid_idx) + "] = " + str(random_ass), + is_index_range_err, + ) + + def get_undef_name_perturb_expression(self, perturb_var, perturb_val): + """Not implemented as per our requirements.""" + is_undef_name_err = get_random_int(0, 1) + # is_undef_name_err = 1 + if is_undef_name_err: + undef_var = get_random_list_sample(self.VARIABLE_NAMES, 1)[0] + return ( + perturb_var + "=" + undef_var + "+" + str(perturb_val), + is_undef_name_err, + ) + else: + return ( + "", + is_undef_name_err, + ) + + def get_math_domain_perturb_expression(self, perturb_var, perturb_val): + """The current implementation may cause unforeseen issues when the + is_math_domain_err is 0 as the assign_var can be a part of the program. Also, we may + perhaps need to refine how we import math module.""" + is_math_domain_err = get_random_int(0, 1) + # is_math_domain_err = 1 + if is_math_domain_err: + assign_var = get_random_list_sample(self.VARIABLE_NAMES, 1)[0] + if perturb_val >= 0: + random_ass = ( + str(-1 * int(get_random_float(*self.NUM_RANGE, size=1)[0])) + + "*" + + perturb_var + ) + else: + random_ass = ( + str(int(get_random_float(*self.NUM_RANGE, size=1)[0])) + + "*" + + perturb_var + ) + return ( + "import math;" + + assign_var + + "=" + + "math.sqrt(" + + str(random_ass) + + ")", + is_math_domain_err, + ) + else: + assign_var = get_random_list_sample(self.VARIABLE_NAMES, 1)[0] + if perturb_val >= 0: + random_ass = ( + str(int(get_random_float(*self.NUM_RANGE, size=1)[0])) + + "*" + + perturb_var + ) + else: + random_ass = ( + str(-1 * int(get_random_float(*self.NUM_RANGE, size=1)[0])) + + "*" + + perturb_var + ) + return ( + "import math;" + + assign_var + + "=" + + "math.sqrt(" + + str(random_ass) + + ")", + is_math_domain_err, + ) + + def _relevant_operand_val_type(self, val, is_same): + pass + + def get_operand_type_mismatch_perturb_expression(self, perturb_var, perturb_val): + pass + + def get_int_not_iterable_perturb_expression(self, perturb_var, perturb_val): + """TODO: 1. Add more variants of the for loop. + 2. Add logic to include the while loop. + """ + is_int_not_iterable_err = get_random_int(0, 1) + # is_int_not_iterable_err = 1 + if is_int_not_iterable_err: + assign_var = get_random_list_sample(self.VARIABLE_NAMES, 1)[0] + random_ass = int(get_random_float(*self.NUM_RANGE, size=1)[0]) + return ( + "{}=[{}+val for val in {}]".format(assign_var, random_ass, perturb_var), + is_int_not_iterable_err, + ) + else: + return "", is_int_not_iterable_err + + def add_err(self, err_type, perturb_var, perturb_val): + expr_builder = self._builders.get(err_type.lower(), None) + if not expr_builder: + raise ValueError(err_type + " is not a valid error generation function.") + return expr_builder(perturb_var, perturb_val) diff --git a/error_generation/get_trace.py b/error_generation/get_trace.py new file mode 100644 index 00000000..36c71d69 --- /dev/null +++ b/error_generation/get_trace.py @@ -0,0 +1,77 @@ +# Copyright (C) 2021 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import sys +from collections import defaultdict +import subprocess + + +def postprocess_and_save(json_fp, offset, processed_suffix): + """Here we offset the lines of the trace to take into account + additional lines added to get the trace of the function. + """ + data = json.load(open(json_fp, "rb")) + processed_data = {} + for key, val in data.items(): + val = [v for v in val if v] + if val: + processed_data[int(key) - offset] = val + out_path = json_fp.replace(".json", "_" + str(processed_suffix) + ".json") + open(out_path, "w").write(json.dumps(processed_data)) + + +def run_for_errors( + python_filepath, + data_path, + trace_path, + stdin_file, + stdout_file, + stderr_file, + processed_suffix="processed", + offset=3, +): + # Assumes the input is stdin when called. + # import pdb;pdb.set_trace() + trace_source = open(trace_path, "r").read() + python_source = open(python_filepath, "r").read() + python_source = python_source.replace('__name__ == "__main__"', "True") + python_source = python_source.replace("__name__ == '__main__'", "True") + # TODO(rishab): Clean the python_source variable. + python_source = ( + "import json\n" + + "import sys\n" + + "def main__errorchecker__():\n" + + "\n".join(" " + line for line in python_source.split("\n")) + + "\n" + + trace_source + + "\nsys.settrace(trace_calls)\n" + + "main__errorchecker__()\n" + + 'open("' + + data_path + + '","w").write(json.dumps(data, indent=4, sort_keys=True))\n' + ) + try: + subprocess_call = subprocess.check_call( + ["python", "-c", python_source], + stdin=open(stdin_file, "rb"), + stdout=open(stdout_file, "wb"), + stderr=open(stderr_file, "wb"), + ) + except Exception as e: + # raise e + return False + postprocess_and_save(data_path, offset, processed_suffix) + return True diff --git a/error_generation/main.py b/error_generation/main.py new file mode 100644 index 00000000..90adecbf --- /dev/null +++ b/error_generation/main.py @@ -0,0 +1,82 @@ +# Copyright (C) 2021 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from error_generation.misc_utils import get_codeforeces_paths, load_yaml, set_seeds +from error_generation.get_trace import run_for_errors +from error_generation.add_code import add_error +from error_generation.error_expression_factory import ErrorFactory + +""" +TODO(rishab): Setup code to include codechef as well. + +Run instructions: +In the compressive-ipagnn folder run the following command: +python -m error_generation.main +""" + + +def main(config_fp): + config = load_yaml(config_fp) + set_seeds() + code_inp_data_paths = get_codeforeces_paths(config["base_path"]) + error_expr_factory_obj = ErrorFactory() + for ( + code_path, + inp_paths, + perturbed_code_path, + trace_data_path, + sol_err_out_path, + ) in code_inp_data_paths: + for idx, inp_path in enumerate(inp_paths): + # print inp_path + err_path = sol_err_out_path.replace( + ".txt", "_error" + "_" + str(idx) + ".txt" + ) + out_path = sol_err_out_path.replace( + ".txt", "_out" + "_" + str(idx) + ".txt" + ) + out_code_path = perturbed_code_path.replace(".txt", "_" + str(idx) + ".txt") + data_trace_path = trace_data_path.replace( + ".json", "_trace_" + str(idx) + ".json" + ) + is_trace_successful = run_for_errors( + code_path, + data_trace_path, + config["trace_code_path"], + inp_path, + out_path, + err_path, + config["process_suffix"], + ) + if is_trace_successful: + data_trace_path = data_trace_path.replace( + ".json", "_" + config["process_suffix"] + ".json" + ) + for err_suffix in config["errors"]: + # import pdb;pdb.set_trace() + _ = add_error( + code_path, + data_trace_path, + out_code_path, + err_suffix, + error_expr_factory_obj, + ) + # break + # break + + +if __name__ == "__main__": + main("error_generation/config.yaml") diff --git a/error_generation/misc_utils.py b/error_generation/misc_utils.py new file mode 100644 index 00000000..7b899d32 --- /dev/null +++ b/error_generation/misc_utils.py @@ -0,0 +1,150 @@ +# Copyright (C) 2021 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import numpy as np +import os +import json +import yaml +import copy +from collections import defaultdict + +ALLOWED_TYPE = { + "zero_err": [int, float], + "assert_err": [int, float], + "not_subscriptable_err": [int, float], + "idx_out_range_err": [list], + "undef_var_err": [int, float, str, list], + "math_domain_err": [int, float], + "not_iterable_err": [int, float], +} + + +def get_codeforces_inp_data_paths(base_path): + inp_data_directory = os.path.join(base_path, "samples") + inp_data_paths = [ + inp_data_directory + "/" + path + for path in os.listdir(inp_data_directory) + if "input" in path + ] + return inp_data_paths + + +def create_dirs(dir): + if not os.path.exists(dir): + os.makedirs(dir) + + +def get_codeforeces_paths(base_path): + problem_paths = [ + os.path.join(base_path, problem_name_dir) + for problem_name_dir in os.listdir(base_path) + if os.path.isdir(os.path.join(base_path, problem_name_dir)) + ] + # print(len(problem_paths)) + data_paths = [] + for problem_path in problem_paths: + prob_solutions_path = os.path.join(problem_path, "solutions_python") + # print prob_solutions_path + perturbed_prob_solutions_path = os.path.join( + problem_path, "perturbed_solutions_python" + ) + create_dirs(perturbed_prob_solutions_path) + trace_path = os.path.join(problem_path, "trace") + create_dirs(trace_path) + err_out_path = os.path.join(problem_path, "err_out") + create_dirs(err_out_path) + inp_data_paths = get_codeforces_inp_data_paths(problem_path) + # import pdb;pdb.set_trace() + if os.path.exists(prob_solutions_path): + solution_paths = [] + for sol_name in os.listdir(prob_solutions_path): + code_path = os.path.join(prob_solutions_path, sol_name) + sol_name_json = sol_name.replace(".txt", ".json") + perturbed_code_path = os.path.join( + perturbed_prob_solutions_path, sol_name + ) + trace_code_path = os.path.join(trace_path, sol_name_json) + sol_err_out_path = os.path.join(err_out_path, sol_name) + solution_paths.append( + ( + code_path, + inp_data_paths, + perturbed_code_path, + trace_code_path, + sol_err_out_path, + ) + ) + data_paths.append(solution_paths) + + data_paths = [sol_path for path in data_paths for sol_path in path] + return data_paths + + +def is_valid_type(val, type): + return isinstance(val, type) + + +def get_valid_code_trace(code_trace, err_suffx): + code_trace_filtered = defaultdict(list) + for line in code_trace: + for step_idx in range(len(code_trace[line])): + new_step_dict = {} + for var in code_trace[line][step_idx]: + if any( + is_valid_type(code_trace[line][step_idx][var], typ) + for typ in ALLOWED_TYPE[err_suffx] + ): + new_step_dict[var] = code_trace[line][step_idx][var] + if new_step_dict: + code_trace_filtered[line].append(new_step_dict) + return code_trace_filtered + + +def set_seeds(seed=10): + random.seed(seed) + np.random.seed(seed) + + +def load_data(fp): + with open(fp, "r") as file: + data = file.read().strip() + return data + + +def load_json(fp): + with open(fp, "r") as file: + return json.load(file) + + +def load_yaml(fp): + with open(fp, "r") as file: + return yaml.load(file, Loader=yaml.FullLoader) + + +def write_csv(data, fp): + with open(fp, "w") as file: + file.write(data) + + +def get_random_int(lower_limit, upper_limit): + return random.randint(lower_limit, upper_limit) + + +def get_random_float(lower_limit, upper_limit, size=None): + return np.random.uniform(lower_limit, upper_limit, size=size) + + +def get_random_list_sample(lst, num_samples): + return random.sample(lst, num_samples) diff --git a/error_generation/trace_code.py b/error_generation/trace_code.py new file mode 100644 index 00000000..11c31e79 --- /dev/null +++ b/error_generation/trace_code.py @@ -0,0 +1,33 @@ +# Get line by line trace for variables in a program. +# It is adapted from the examples in the following link: +# https://pymotw.com/2/sys/tracing.html + +from collections import defaultdict + +data = defaultdict(list) + + +def trace_lines(frame, event, arg): + if event != "line": + return + co = frame.f_code + # func_name = co.co_name + line_no = frame.f_lineno + filename = co.co_filename + if filename == "": + local_data_dict = {} + for key, value in frame.f_locals.items(): + try: + json.dumps(value) + local_data_dict[key] = value + except Exception as e: + continue + data[line_no].append(local_data_dict) + + +def trace_calls(frame, event, arg): + if event != "call": + return + # co = frame.f_code + # func_name = co.co_name + return trace_lines diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_err_expr_factory.py b/tests/test_err_expr_factory.py new file mode 100644 index 00000000..4205e616 --- /dev/null +++ b/tests/test_err_expr_factory.py @@ -0,0 +1,189 @@ +import pytest +import subprocess +from error_generation.error_expression_factory import ErrorFactory + + +class TestErrorFactory: + """The test suite is not complete but will test some obvious + issues in code.""" + + def setup(self): + self.error_factory = ErrorFactory() + self.test_var_name = "test_var" + self.test_var_val = 23 + self.test_lst_var_name = "test_lst_var" + self.test_lst_var_val = [1, 2, 3, 4] + self.test_var_assign = self.test_var_name + "=" + str(self.test_var_val) + "\n" + self.test_lst_var_assign = ( + self.test_lst_var_name + "=" + str(self.test_lst_var_val) + "\n" + ) + + def test_get_zero_perturb_expression(self): + + expr, is_err_present = self.error_factory.add_err( + "zero_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + assert subprocess_call == 0 + expr, is_err_present = self.error_factory.add_err( + "zero_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if "ZeroDivisionError" in exception.output: + raise exception + + def test_get_assert_perturb_expression(self): + expr, is_err_present = self.error_factory.add_err( + "assert_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + assert subprocess_call == 0 + expr, is_err_present = self.error_factory.add_err( + "assert_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if "AssertionError" in exception.output: + raise exception + + def test_get_not_subscriptable_perturb_expression(self): + expr, is_err_present = self.error_factory.add_err( + "not_subscriptable_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + assert subprocess_call == 0 + expr, is_err_present = self.error_factory.add_err( + "not_subscriptable_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if ( + "TypeError" in exception.output + and "not support item assignment" in exception.output + ): + raise exception + + def test_get_index_range_perturb_expression(self): + expr, is_err_present = self.error_factory.add_err( + "idx_out_range_err", self.test_lst_var_name, self.test_lst_var_val + ) + expr = self.test_lst_var_assign + expr + + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + if subprocess_call != 0: + import pdb + + pdb.set_trace() + assert subprocess_call == 0 + + expr, is_err_present = self.error_factory.add_err( + "idx_out_range_err", self.test_lst_var_name, self.test_lst_var_val + ) + expr = self.test_lst_var_assign + expr + + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if ( + "IndexError" in exception.output + and "index out of range" in exception.output + ): + raise exception + + def test_get_math_domain_perturb_expression(self): + expr, is_err_present = self.error_factory.add_err( + "math_domain_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + assert subprocess_call == 0 + expr, is_err_present = self.error_factory.add_err( + "math_domain_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if ( + "ValueError" in exception.output + and "math domain error" in exception.output + ): + raise exception + + def test_get_int_not_iterable_perturb_expression(self): + expr, is_err_present = self.error_factory.add_err( + "not_iterable_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + while not is_err_present: + subprocess_call = subprocess.call( + ["python", "-c", expr], stderr=subprocess.PIPE + ) + assert subprocess_call == 0 + expr, is_err_present = self.error_factory.add_err( + "not_iterable_err", self.test_var_name, self.test_var_val + ) + expr = self.test_var_assign + expr + + with pytest.raises(subprocess.CalledProcessError) as exc: + try: + subprocess_call = subprocess.check_output( + ["python", "-c", expr], stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as exception: + if ( + "TypeError" in exception.output + and "object is not iterable" in exception.output + ): + raise exception + + def test_get_operand_type_mismatch_perturb_expression(self): + pass + + def test_get_undef_name_perturb_expression(self): + pass