diff --git a/dagger/dag_creator/airflow/operator_creators/soda_creator.py b/dagger/dag_creator/airflow/operator_creators/soda_creator.py index 5f145c8..8b1f68a 100644 --- a/dagger/dag_creator/airflow/operator_creators/soda_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/soda_creator.py @@ -15,6 +15,7 @@ def __init__(self, task, dag): self._table_name = task.table_name self._output_s3_path = task.output_s3_path self._output_table = task.output_table + self._is_critical_test = task.is_critical_test self._vars = task.vars def _generate_command(self): @@ -23,7 +24,8 @@ def _generate_command(self): command.append(f"--output_s3_path={self._output_s3_path}") command.append(f"--output_table={self._output_table}") - + if self._is_critical_test: + command.append(f"--is_critical_test={self._is_critical_test}") if self._table_name: command.append(f"--table_name={self._table_name}") if self._vars: diff --git a/dagger/pipeline/tasks/soda_task.py b/dagger/pipeline/tasks/soda_task.py index 443f98a..e4c0343 100644 --- a/dagger/pipeline/tasks/soda_task.py +++ b/dagger/pipeline/tasks/soda_task.py @@ -44,6 +44,14 @@ def init_attributes(cls, orig_cls): required=False, comment="Athena table that will contain the scan results.", ), + Attribute( + attribute_name="is_critical_test", + parent_fields=["task_parameters"], + validator=str, + required=False, + comment="True if test run is critical test. Defaults to False", + + ), Attribute( attribute_name="vars", parent_fields=["task_parameters"], @@ -64,6 +72,7 @@ def __init__(self, name, pipeline_name, pipeline, job_config): self._output_table = self.parse_attribute("output_table") or conf.SODA_DEFAULT_OUTPUT_TABLE self._output_s3_path = self.parse_attribute("output_s3_path") or conf.SODA_DEFAULT_OUTPUT_S3_PATH self._table_name = self.parse_attribute("table_name") + self._is_critical_test = self.parse_attribute("is_critical_test") self._vars = self.parse_attribute("vars") @@ -80,6 +89,9 @@ def output_s3_path(self): @property def table_name(self): return self._table_name + @property + def is_critical_test(self): + return self._is_critical_test @property def vars(self):