Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ def wait_for_job(
try:
job = self.get_job(job_id=(int(job_id)))
state = job.json()["job"]["status"]
except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
except AirflowException:
self.log.info(
"Retrying. Airbyte API returned server error when waiting for job.", exc_info=True
)
continue

if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
Expand Down
36 changes: 18 additions & 18 deletions airflow/providers/alibaba/cloud/hooks/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ def download_file(
"""
try:
self.get_bucket(bucket_name).get_object_to_file(key, local_file)
except Exception as e:
self.log.error(e)
except Exception:
self.log.exception("Unable to download file.")
return None
return local_file

Expand All @@ -221,8 +221,8 @@ def delete_object(
try:
self.get_bucket(bucket_name).delete_object(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when deleting: {key}")
self.log.error("Errors when deleting %s", key)
raise AirflowException(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a provider, I wonder if we should just switch to re-raise the original exception. Coercing the error to AirflowException offers no benefits at all

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree :). We used to use that pattern in the past but unless you use dedicated AirflowSkip or AirflowFailException it's better to raise the original exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, this totally makes sense. Agreed. I'll do a clean sweep of all the provider files I'm touching for this as well.


@provide_bucket_name
@unify_bucket_name_and_key
Expand All @@ -240,8 +240,8 @@ def delete_objects(
try:
self.get_bucket(bucket_name).batch_delete_objects(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when deleting: {key}")
self.log.error("Errors when deleting %s", key)
raise AirflowException(e)

@provide_bucket_name
def delete_bucket(
Expand All @@ -256,8 +256,8 @@ def delete_bucket(
try:
self.get_bucket(bucket_name).delete_bucket()
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when deleting: {bucket_name}")
self.log.error("Errors when deleting %s", bucket_name)
raise AirflowException(e)

@provide_bucket_name
def create_bucket(
Expand All @@ -272,8 +272,8 @@ def create_bucket(
try:
self.get_bucket(bucket_name).create_bucket()
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when create bucket: {bucket_name}")
self.log.error("Errors when deleting %s", bucket_name)
raise AirflowException(e)

@provide_bucket_name
@unify_bucket_name_and_key
Expand All @@ -290,8 +290,8 @@ def append_string(self, bucket_name: Optional[str], content: str, key: str, pos:
try:
self.get_bucket(bucket_name).append_object(key, pos, content)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when append string for object: {key}")
self.log.error("Errors when appending string for object %s", key)
raise AirflowException(e)

@provide_bucket_name
@unify_bucket_name_and_key
Expand All @@ -306,8 +306,8 @@ def read_key(self, bucket_name: Optional[str], key: str) -> str:
try:
return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8")
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when read bucket object: {key}")
self.log.error("Errors when reading bucket object %s", key)
raise AirflowException(e)

@provide_bucket_name
@unify_bucket_name_and_key
Expand All @@ -322,8 +322,8 @@ def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObje
try:
return self.get_bucket(bucket_name).head_object(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when head bucket object: {key}")
self.log.error("Errors when head bucket object %s", key)
raise AirflowException(e)

@provide_bucket_name
@unify_bucket_name_and_key
Expand All @@ -339,8 +339,8 @@ def key_exist(self, bucket_name: Optional[str], key: str) -> bool:
try:
return self.get_bucket(bucket_name).object_exists(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when check bucket object existence: {key}")
self.log.error("Errors when checking bucket object existence, %s", key)
raise AirflowException(e)

def get_credential(self) -> oss2.auth.Auth:
extra_config = self.oss_conn.extra_dejson
Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ def hook(self):
self.log.info("remote_conn_id: %s", remote_conn_id)
try:
return OSSHook(oss_conn_id=remote_conn_id)
except Exception as e:
self.log.error(e, exc_info=True)
self.log.error(
except Exception:
self.log.exception(
'Could not create an OSSHook with connection id "%s". '
'Please make sure that airflow[oss] is installed and '
'the OSS connection exists.',
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/amazon/aws/hooks/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def check_query_status(self, query_execution_id: str) -> Optional[str]:
state = None
try:
state = response['QueryExecution']['Status']['State']
except Exception as ex:
self.log.error('Exception while getting query state %s', ex)
except Exception:
self.log.exception('Exception while getting query state.')
finally:
# The error is being absorbed here and is being handled by the caller.
# The error is being absorbed to implement retries.
Expand All @@ -122,8 +122,8 @@ def get_state_change_reason(self, query_execution_id: str) -> Optional[str]:
reason = None
try:
reason = response['QueryExecution']['Status']['StateChangeReason']
except Exception as ex:
self.log.error('Exception while getting query state change reason: %s', ex)
except Exception:
self.log.exception('Exception while getting query state change reason.')
finally:
# The error is being absorbed here and is being handled by the caller.
# The error is being absorbed to implement retries.
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def get_job_failure_reason(self, job_id: str) -> Optional[str]:
reason = f"{failure_reason} - {state_details}"
except KeyError:
self.log.error('Could not get status of the EMR on EKS job')
except ClientError as ex:
self.log.error('AWS request failed, check logs for more info: %s', ex)
except ClientError:
self.log.exception('AWS request failed, check logs for more info.')

return reason

Expand All @@ -216,9 +216,9 @@ def check_query_status(self, job_id: str) -> Optional[str]:
except self.conn.exceptions.ResourceNotFoundException:
# If the job is not found, we raise an exception as something fatal has happened.
raise AirflowException(f'Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}')
except ClientError as ex:
except ClientError:
# If we receive a generic ClientError, we swallow the exception so that the
self.log.error('AWS request failed, check logs for more info: %s', ex)
self.log.exception('AWS request failed, check logs for more info.')
return None

def poll_query_status(
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def get_iam_execution_role(self) -> Dict:
glue_execution_role = iam_client.get_role(RoleName=self.role_name)
self.log.info("Iam Role Name: %s", self.role_name)
return glue_execution_role
except Exception as general_error:
self.log.error("Failed to create aws glue job, error: %s", general_error)
except Exception:
self.log.error("Failed to create aws glue job.")
Copy link
Member

@uranusjr uranusjr Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use exception here? (same for many below)

Copy link
Member

@potiuk potiuk Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Sounds strange. We shoud catch specific exception that we know about and let all the rest buble up directly, there is no point in extra logging here unless we want to provide any "specific" information resulting in helping the user to react to some known exceptions. There is no point in logging meaningless log here - the user knows, Glue job failed to be created already and providing this extra line with no actual "Help" for the user and without instructions on what to do is borderline harrasing the user "Hello you already know we failed, so let us repeat it here").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use exception here? (same for many below)

Generally if there was a raise I stuck to using error so the traceback wasn't logged twice.

Oof yeah @potiuk there are a lot of generic exceptions in these files. I'll clean them up.

raise

def initialize_job(
Expand All @@ -129,8 +129,8 @@ def initialize_job(
job_name = self.get_or_create_glue_job()
return glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)

except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
except Exception:
self.log.error("Failed to run aws glue job.")
raise

def get_job_state(self, job_name: str, run_id: str) -> str:
Expand Down Expand Up @@ -280,8 +280,8 @@ def get_or_create_glue_job(self) -> str:
**self.create_job_kwargs,
)
return create_job_response['Name']
except Exception as general_error:
self.log.error("Failed to create aws glue job, error: %s", general_error)
except Exception:
self.log.error("Failed to create aws glue job.")
raise


Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/amazon/aws/hooks/glue_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ def get_partition(self, database_name: str, table_name: str, partition_values: L
DatabaseName=database_name, TableName=table_name, PartitionValues=partition_values
)
return response["Partition"]
except ClientError as e:
self.log.error("Client error: %s", e)
except ClientError:
self.log.error("Client error.")
raise AirflowException("AWS request failed, check logs for more info")

def create_partition(self, database_name: str, table_name: str, partition_input: Dict) -> Dict:
Expand All @@ -175,8 +175,8 @@ def create_partition(self, database_name: str, table_name: str, partition_input:
return self.get_conn().create_partition(
DatabaseName=database_name, TableName=table_name, PartitionInput=partition_input
)
except ClientError as e:
self.log.error("Client error: %s", e)
except ClientError:
self.log.error("Client error.")
raise AirflowException("AWS request failed, check logs for more info")


Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def create_ingestion(
check_interval=check_interval,
)
return create_ingestion_response
except Exception as general_error:
self.log.error("Failed to run Amazon QuickSight create_ingestion API, error: %s", general_error)
except Exception:
self.log.error("Failed to run Amazon QuickSight create_ingestion API.")
raise

def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id: str):
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,9 +934,9 @@ def get_bucket_tagging(self, bucket_name: Optional[str] = None) -> Optional[List
result = s3_client.get_bucket_tagging(Bucket=bucket_name)['TagSet']
self.log.info("S3 Bucket Tag Info: %s", result)
return result
except ClientError as e:
self.log.error(e)
raise e
except ClientError:
self.log.error("Unable to retrieve tags.")
raise

@provide_bucket_name
def put_bucket_tagging(
Expand Down Expand Up @@ -969,9 +969,9 @@ def put_bucket_tagging(
try:
s3_client = self.get_conn()
s3_client.put_bucket_tagging(Bucket=bucket_name, Tagging={'TagSet': tag_set})
except ClientError as e:
self.log.error(e)
raise e
except ClientError:
self.log.error("Unable to apply tag(s) to bucket.")
raise

@provide_bucket_name
def delete_bucket_tagging(self, bucket_name: Optional[str] = None) -> None:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,6 @@ def delete_model(self, model_name: str):
"""
try:
self.get_conn().delete_model(ModelName=model_name)
except Exception as general_error:
self.log.error("Failed to delete model, error: %s", general_error)
except Exception:
self.log.error("Failed to delete model.")
raise
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/sts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ def get_account_number(self) -> str:
"""Get the account Number"""
try:
return self.get_conn().get_caller_identity()['Account']
except Exception as general_error:
self.log.error("Failed to get the AWS Account Number, error: %s", general_error)
except Exception:
self.log.error("Failed to get the AWS Account Number.")
raise
7 changes: 3 additions & 4 deletions airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ def hook(self):
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook

return AwsLogsHook(aws_conn_id=remote_conn_id, region_name=self.region_name)
except Exception as e:
self.log.error(
except Exception:
self.log.exception(
'Could not create an AwsLogsHook with connection id "%s". '
'Please make sure that apache-airflow[aws] is installed and '
'the Cloudwatch logs connection exists. Exception: "%s"',
'the Cloudwatch logs connection exists.',
remote_conn_id,
e,
)
return None

Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ def hook(self):
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

return S3Hook(remote_conn_id, transfer_config_args={"use_threads": False})
except Exception as e:
except Exception:
self.log.exception(
'Could not create an S3Hook with connection id "%s". '
'Please make sure that apache-airflow[aws] is installed and '
'the S3 connection exists. Exception : "%s"',
'the S3 connection exists.',
remote_conn_id,
e,
)
return None

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ def on_kill(self) -> None:
http_status_code = None
try:
http_status_code = response['ResponseMetadata']['HTTPStatusCode']
except Exception as ex:
self.log.error('Exception while cancelling query: %s', ex)
except Exception:
self.log.exception('Exception while cancelling query.')
finally:
if http_status_code is None or http_status_code != 200:
self.log.error('Unable to request query cancel on athena. Exiting')
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/datasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ def _execute_datasync_task(self) -> None:
self.log.info("Waiting for TaskExecutionArn %s", self.task_execution_arn)
try:
result = hook.wait_for_task_execution(self.task_execution_arn, max_iterations=self.max_iterations)
except (AirflowTaskTimeout, AirflowException) as e:
self.log.error('Cancelling TaskExecution after Exception: %s', e)
except (AirflowTaskTimeout, AirflowException):
self.log.error('Cancelling TaskExecution after Exception')
self._cancel_datasync_task_execution()
raise
self.log.info("Completed TaskExecutionArn %s", self.task_execution_arn)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def _get_log_events(self, skip: int = 0) -> Generator:
yield from self.hook.get_log_events(self.log_group, self.log_stream_name, skip=skip)
except ClientError as error:
if error.response['Error']['Code'] != 'ResourceNotFoundException':
self.logger.warning('Error on retrieving Cloudwatch log events', error)
self.logger.warning('Error on retrieving Cloudwatch log events', exc_info=True)

yield from ()
except ConnectionClosedError as error:
self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', error)
except ConnectionClosedError:
self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', exc_info=True)
yield from ()

def _event_to_str(self, event: dict) -> str:
Expand Down
13 changes: 7 additions & 6 deletions airflow/providers/amazon/aws/operators/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
DEFAULT_POD_NAME = 'pod'

ABORT_MSG = "{compute} are still active after the allocated time limit. Aborting."
CAN_NOT_DELETE_MSG = "A cluster can not be deleted with attached {compute}. Deleting {count} {compute}."
CAN_NOT_DELETE_MSG = "A cluster can not be deleted with attached %s. Deleting %d %s."
MISSING_ARN_MSG = "Creating an {compute} requires {requirement} to be passed in."
SUCCESS_MSG = "No {compute} remain, deleting cluster."
# SUCCESS_MSG = "No {compute} remain, deleting cluster."
SUCCESS_MSG = "No %s remain, deleting cluster."

SUPPORTED_COMPUTE_VALUES = frozenset({'nodegroup', 'fargate'})
NODEGROUP_FULL_NAME = 'Amazon EKS managed node groups'
Expand Down Expand Up @@ -438,7 +439,7 @@ def delete_any_nodegroups(self, eks_hook) -> None:
"""
nodegroups = eks_hook.list_nodegroups(clusterName=self.cluster_name)
if nodegroups:
self.log.info(CAN_NOT_DELETE_MSG.format(compute=NODEGROUP_FULL_NAME, count=len(nodegroups)))
self.log.info(CAN_NOT_DELETE_MSG, NODEGROUP_FULL_NAME, len(nodegroups), NODEGROUP_FULL_NAME)
for group in nodegroups:
eks_hook.delete_nodegroup(clusterName=self.cluster_name, nodegroupName=group)

Expand All @@ -457,7 +458,7 @@ def delete_any_nodegroups(self, eks_hook) -> None:
)
else:
raise RuntimeError(ABORT_MSG.format(compute=NODEGROUP_FULL_NAME))
self.log.info(SUCCESS_MSG.format(compute=NODEGROUP_FULL_NAME))
self.log.info(SUCCESS_MSG, NODEGROUP_FULL_NAME)

def delete_any_fargate_profiles(self, eks_hook) -> None:
"""
Expand All @@ -468,7 +469,7 @@ def delete_any_fargate_profiles(self, eks_hook) -> None:
"""
fargate_profiles = eks_hook.list_fargate_profiles(clusterName=self.cluster_name)
if fargate_profiles:
self.log.info(CAN_NOT_DELETE_MSG.format(compute=FARGATE_FULL_NAME, count=len(fargate_profiles)))
self.log.info(CAN_NOT_DELETE_MSG, FARGATE_FULL_NAME, len(fargate_profiles), FARGATE_FULL_NAME)
for profile in fargate_profiles:
# The API will return a (cluster) ResourceInUseException if you try
# to delete Fargate profiles in parallel the way we can with nodegroups,
Expand All @@ -493,7 +494,7 @@ def delete_any_fargate_profiles(self, eks_hook) -> None:
)
else:
raise RuntimeError(ABORT_MSG.format(compute=FARGATE_FULL_NAME))
self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME))
self.log.info(SUCCESS_MSG, FARGATE_FULL_NAME)


class EksDeleteNodegroupOperator(BaseOperator):
Expand Down
Loading