Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ def load_auth():
log.info("Loaded API auth backend: %s", backend)
backends.append(auth)
except ImportError as err:
log.critical("Cannot import %s for API authentication due to: %s", backend, err)
log.critical("Cannot import %s for API authentication.", backend)
raise AirflowException(err)
return backends
4 changes: 2 additions & 2 deletions airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def init_app(app):
try:
log.info("Kerberos init: %s %s", service, hostname)
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
log.warning("Kerberos: %s", err)
except kerberos.KrbError:
log.warning("A Kerberos error has occurred.", exc_info=True)
else:
log.info("Kerberos API: server is %s", principal)

Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/info_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ def _upload_text_to_fileio(content):
try:
return resp.json()["link"]
except ValueError as e:
log.debug(e)
raise FileIoException("Failed to send report to file.io service.")
log.debug("Failed to send report to file.io service.")
raise FileIoException(e)


def _send_report_to_fileio(info):
Expand Down
5 changes: 2 additions & 3 deletions airflow/cli/commands/webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,8 @@ def start(self) -> NoReturn:
# Throttle loop
sleep(1)

except (AirflowWebServerTimeout, OSError) as err:
self.log.error(err)
self.log.error("Shutting down webserver")
except (AirflowWebServerTimeout, OSError):
self.log.exception("Shutting down webserver")
try:
self.gunicorn_master_proc.terminate()
self.gunicorn_master_proc.wait()
Expand Down
11 changes: 7 additions & 4 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,14 @@ def getimport(self, section: str, key: str, **kwargs) -> Any:
try:
return import_string(full_qualified_path)
except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
f'Current value: "{full_qualified_path}".'
log.error(
'The object could not be loaded. Please check %r key in %r section. '
'Current value: %r.',
key,
section,
full_qualified_path,
)
raise AirflowConfigException(e)

def getjson(
self, section: str, key: str, fallback=_UNSET, **kwargs
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,8 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
try:
self._add_callback_to_queue(callback.get_callback_request())
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
except Exception:
self.log.warning("Error adding %r callback for execution.", callback, exc_info=True)
guard.commit()

def _add_callback_to_queue(self, request: CallbackRequest):
Expand Down
13 changes: 7 additions & 6 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: Optional[str]

args.func(args)
ret = 0
except Exception as e:
log.exception("[%s] Failed to execute task %s.", celery_task_id, str(e))
except Exception:
log.exception("[%s] Failed to execute task.", celery_task_id)
ret = 1
finally:
Sentry.flush()
Expand Down Expand Up @@ -291,7 +291,7 @@ def _process_tasks(self, task_tuples: List[TaskTuple]) -> None:
self.queued_tasks.pop(key)
self.task_publish_retries.pop(key, None)
if isinstance(result, ExceptionWithTraceback):
self.log.error(CELERY_SEND_ERR_MSG_HEADER + ": %s\n%s\n", result.exception, result.traceback)
self.log.error("%s: %s\n%s\n", CELERY_SEND_ERR_MSG_HEADER, result.exception, result.traceback)
self.event_buffer[key] = (State.FAILED, None)
elif result is not None:
result.backend = cached_celery_backend
Expand Down Expand Up @@ -416,8 +416,8 @@ def _send_stalled_tis_back_to_scheduler(
if celery_async_result:
try:
app.control.revoke(celery_async_result.task_id)
except Exception as ex:
self.log.error("Error revoking task instance %s from celery: %s", key, ex)
except Exception:
self.log.exception("Error revoking task instance %s from celery.", key)

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler"""
Expand Down Expand Up @@ -650,7 +650,8 @@ def _get_many_using_multiprocessing(self, async_results) -> Mapping[str, EventBu
for task_id, state_or_exception, info in task_id_to_states_and_info:
if isinstance(state_or_exception, ExceptionWithTraceback):
self.log.error(
CELERY_FETCH_ERR_MSG_HEADER + ":%s\n%s\n",
"%s:%s\n%s\n",
CELERY_FETCH_ERR_MSG_HEADER,
state_or_exception.exception,
state_or_exception.traceback,
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ def _run_task(self, ti: TaskInstance) -> bool:
ti.run(job_id=ti.job_id, **params)
self.change_state(key, State.SUCCESS)
return True
except Exception as e:
except Exception:
ti.set_state(State.FAILED)
self.change_state(key, State.FAILED)
self.log.exception("Failed to execute task: %s.", str(e))
self.log.exception("Failed to execute task.")
return False

def queue_task_instance(
Expand Down
9 changes: 5 additions & 4 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ def load_executor(cls, executor_name: str) -> "BaseExecutor":
executor_cls, import_source = cls.import_executor_cls(executor_name)
log.debug("Loading executor %s from %s", executor_name, import_source.value)
except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
f'Current value: "{executor_name}".'
log.error(
'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
'Current value: %r.',
executor_name
)
raise AirflowConfigException(e)
log.info("Loaded executor: %s", executor_name)

return executor_cls()
Expand Down
21 changes: 9 additions & 12 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,9 @@ def sync(self) -> None:
self.log.info('Changing state of %s to %s', results, state)
try:
self._change_state(key, state, pod_id, namespace)
except Exception as e:
except Exception:
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
e,
"Exception when attempting to change state of %s to %s, re-queueing.",
results,
state,
)
Expand All @@ -619,10 +618,9 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except PodReconciliationError as e:
self.log.error(
self.log.exception(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
"Try clearing the task to re-run.",
exc_info=True,
)
self.fail(task[0], e)
except ApiException as e:
Expand Down Expand Up @@ -741,8 +739,8 @@ def adopt_launched_task(
)
pod_ids.pop(pod_id)
self.running.add(pod_id)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
except ApiException:
self.log.info("Failed to adopt pod %s.", pod.metadata.name, exc_info=True)

def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
Expand All @@ -767,8 +765,8 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
namespace=pod.metadata.namespace,
body=PodGenerator.serialize_pod(pod),
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
except ApiException:
self.log.info("Failed to adopt pod %s.", pod.metadata.name, exc_info=True)

def _flush_task_queue(self) -> None:
if not self.task_queue:
Expand Down Expand Up @@ -798,10 +796,9 @@ def _flush_result_queue(self) -> None:
)
try:
self._change_state(key, state, pod_id, namespace)
except Exception as e:
except Exception:
self.log.exception(
'Ignoring exception: %s when attempting to change state of %s to %s.',
e,
'Ignoring the following exception when attempting to change state of %s to %s.',
results,
state,
)
Expand Down
8 changes: 4 additions & 4 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def _execute_work_in_subprocess(self, command: CommandType) -> str:
try:
subprocess.check_call(command, close_fds=True)
return State.SUCCESS
except subprocess.CalledProcessError as e:
self.log.error("Failed to execute task %s.", str(e))
except subprocess.CalledProcessError:
self.log.exception("Failed to execute task.")
return State.FAILED

def _execute_work_in_fork(self, command: CommandType) -> str:
Expand Down Expand Up @@ -124,8 +124,8 @@ def _execute_work_in_fork(self, command: CommandType) -> str:
args.func(args)
ret = 0
return State.SUCCESS
except Exception as e:
self.log.exception("Failed to execute task %s.", e)
except Exception:
self.log.exception("Failed to execute task.")
return State.FAILED
finally:
Sentry.flush()
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def sync(self) -> None:
try:
subprocess.check_call(command, close_fds=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
except subprocess.CalledProcessError:
self.change_state(key, State.FAILED)
self.log.error("Failed to execute task %s.", str(e))
self.log.exception("Failed to execute task.")

self.commands_to_run = []

Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ def _per_task_process(key, ti: TaskInstance, session=None):

_per_task_process(key, ti, session)
session.commit()
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
self.log.debug(e)
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached):
self.log.debug("Unable to schedule Task Instance.", exc_info=True)

self.heartbeat(only_if_necessary=is_unit_test)
# execute the tasks in the queue
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def kill(self, session=None):
job.end_date = timezone.utcnow()
try:
self.on_kill()
except Exception as e:
self.log.error('on_kill() method failed: %s', str(e))
except Exception:
self.log.exception('on_kill() method failed.')
session.merge(job)
session.commit()
raise AirflowException("Job shut down externally.")
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,8 @@ def _update_dag_run_state_for_paused_dags(self):
if callback_to_run:
self._send_dag_callbacks_to_processor(dag, callback_to_run)
self._paused_dag_without_running_dagruns.add(dag_id)
except Exception as e: # should not fail the scheduler
self.log.exception('Failed to update dag run state for paused dags due to %s', str(e))
except Exception: # should not fail the scheduler
self.log.exception('Failed to update dag run state for paused dags.')

def _run_scheduler_loop(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async def cleanup_finished_triggers(self):
continue
except BaseException as e:
# This is potentially bad, so log it.
self.log.exception("Trigger %s exited with error %s", details["name"], e)
self.log.exception("Trigger %s exited with error.", details["name"])
saved_exc = e
else:
# See if they foolishly returned a TriggerEvent
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def __dir__(self):
return dir(self.warnings)

def warn(self, message, category=None, stacklevel=1, source=None):
self.warnings.warn(message, category, stacklevel + 2, source)
self.warnings.warn(message, category, stacklevel + 2, source) # noqa: G010

if func.__globals__.get('warnings') is sys.modules['warnings']:
# Yes, this is slightly hacky, but it _automatically_ sets the right
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2137,8 +2137,8 @@ def pickle_info(self):
pickled = pickle.dumps(self)
d['pickle_len'] = len(pickled)
d['pickling_duration'] = str(timezone.utcnow() - dttm)
except Exception as e:
self.log.debug(e)
except Exception:
self.log.debug("Error while getting pickle info.", exc_info=True)
d['is_picklable'] = False
d['stacktrace'] = traceback.format_exc()
return d
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
and file_last_changed_on_disk == self.file_last_changed[filepath]
):
return []
except Exception as e:
self.log.exception(e)
except Exception:
self.log.exception("Exception occurred when processing filepath %r.", filepath)
return []

if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
Expand Down Expand Up @@ -532,8 +532,8 @@ def collect_dags(
dags=str([dag.dag_id for dag in found_dags]),
)
)
except Exception as e:
self.log.exception(e)
except Exception:
self.log.exception("Exception occurred when collecting DAGs from %s", dag_folder)

self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)

Expand Down
13 changes: 7 additions & 6 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,8 @@ def _log_state(self, lead_msg: str = ''):
params.append(self.map_index)
message += 'map_index=%d, '
self.log.info(
message + 'execution_date=%s, start_date=%s, end_date=%s',
'%sexecution_date=%s, start_date=%s, end_date=%s',
message,
*params,
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
Expand Down Expand Up @@ -1457,14 +1458,14 @@ def _run_raw_task(
session.merge(self)
session.commit()
return
except AirflowSmartSensorException as e:
self.log.info(e)
except AirflowSmartSensorException:
self.log.info("Task successfully registered in smart sensor.", exc_info=True)
return
except AirflowSkipException as e:
# Recording SKIP
# log only if exception has any arguments to prevent log flooding
if e.args:
self.log.info(e)
self.log.info("Skipping task.", exc_info=True)
if not test_mode:
self.refresh_from_db(lock_for_update=True, session=session)
self.state = State.SKIPPED
Expand Down Expand Up @@ -1638,7 +1639,7 @@ def _run_finished_callback(self, callback, context, callback_type):
if callback:
callback(context)
except Exception: # pylint: disable=broad-except
self.log.exception(f"Error when executing {callback_type} callback")
self.log.exception("Error when executing %s callback", callback_type)

def _execute_task(self, context, task_orig):
"""Executes Task (optionally with a Timeout) and pushes Xcom results"""
Expand Down Expand Up @@ -1865,7 +1866,7 @@ def handle_failure(self, error, test_mode=None, context=None, force_fail=False,
if error:
if isinstance(error, BaseException):
tb = self.get_truncated_error_traceback(error, truncate_to=self._execute_task)
self.log.error("Task failed with exception", exc_info=(type(error), error, tb))
self.log.error("Task failed with exception", exc_info=(type(error), error, tb)) # noqa: G201
else:
self.log.error("%s", error)
if not test_mode:
Expand Down
7 changes: 5 additions & 2 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,13 @@ def check_for_write_conflict(key: str) -> None:
var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
log.warning(
"The variable {key} is defined in the {cls} secrets backend, which takes "
"The variable %s is defined in the %s secrets backend, which takes "
"precedence over reading from the database. The value in the database will be "
"updated, but to read it you have to delete the conflicting variable "
"from {cls}".format(key=key, cls=secrets_backend.__class__.__name__)
"from %s",
key,
secrets_backend.__class__.__name__,
secrets_backend.__class__.__name__,
)
return
except Exception:
Expand Down
Loading