Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def patch_partial_parse_content(partial_parse_filepath: Path, project_path: Path
# it may be due a race condition of multiple processes trying to read/write this file
data = msgpack.unpack(f)
except ValueError as e:
logger.info("Unable to patch the partial_parse.msgpack file due to %s" % repr(e))
logger.info("Unable to patch the partial_parse.msgpack file due to %r", e)
else:
for node in data["nodes"].values():
expected_filepath = node.get("root_path")
Expand Down
3 changes: 2 additions & 1 deletion cosmos/dbt/parser/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> i
num = int(output.split(f"{DBT_WARN_MSG}=")[1].split()[0])
except ValueError:
logger.error(
f"Could not parse number of {DBT_WARN_MSG}s. Check your dbt/airflow version or if --quiet is not being used"
"Could not parse number of %ss. Check your dbt/airflow version or if --quiet is not being used",
DBT_WARN_MSG,
)
return num

Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/aws_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def build_and_run_cmd(
) -> Any:
self.invoke_interceptors(context)
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
self.log.info("Running command: %s", self.command)

result = EcsRunTaskOperator.execute(self, context)

Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def build_and_run_cmd(
) -> Any:
self.invoke_interceptors(context)
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
self.log.info("Running command: %s", self.command)
result = AzureContainerInstancesOperator.execute(self, context)
self.log.info(result)

Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def build_and_run_cmd(
) -> Any:
self.invoke_interceptors(context)
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
self.log.info("Running command: %s", self.command)
result = DockerOperator.execute(self, context)
self.log.info(result)

Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def build_and_run_cmd(
) -> Any:
self.invoke_interceptors(context)
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
self.log.info("Running command: %s", self.command)
result = CloudRunExecuteJobOperator.execute(self, context)
logger.info(result)

Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def build_and_run_cmd(
) -> Any:
self.invoke_interceptors(context)
self.build_kube_args(context, cmd_flags)
self.log.info(f"Running command: {self.arguments}")
self.log.info("Running command: %s", self.arguments)
result = KubernetesPodOperator.execute(self, context)
self.log.info(result)

Expand Down
14 changes: 7 additions & 7 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def run_subprocess(
self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any
) -> FullOutputSubprocessResult:
if self._py_bin is not None:
self.log.info(f"Using Python binary from virtualenv: {self._py_bin}")
self.log.info("Using Python binary from virtualenv: %s", self._py_bin)
command[0] = str(Path(self._py_bin).parent / "dbt")
return super().run_subprocess(command, env, cwd, **kwargs)

Expand Down Expand Up @@ -128,7 +128,7 @@ def run_command(
)

try:
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
self.log.info("Checking if the virtualenv lock %s exists", self._lock_file)
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
Expand All @@ -154,7 +154,7 @@ def clean_dir_if_temporary(self) -> None:
Delete the virtualenv directory if it is temporary.
"""
if self.is_virtualenv_dir_temporary and self.virtualenv_dir and self.virtualenv_dir.exists():
self.log.info(f"Deleting the Python virtualenv {self.virtualenv_dir}")
self.log.info("Deleting the Python virtualenv %s", self.virtualenv_dir)
shutil.rmtree(str(self.virtualenv_dir), ignore_errors=True)

def execute(self, context: Context, **kwargs: Any) -> None:
Expand All @@ -168,7 +168,7 @@ def on_kill(self) -> None:
self.clean_dir_if_temporary()

def _prepare_virtualenv(self) -> Any:
self.log.info(f"Creating or updating the virtualenv at `{self.virtualenv_dir}")
self.log.info("Creating or updating the virtualenv at `%s`", self.virtualenv_dir)
py_bin = prepare_virtualenv(
venv_directory=str(self.virtualenv_dir),
python_bin=PY_INTERPRETER,
Expand All @@ -193,12 +193,12 @@ def _is_lock_available(self) -> bool:
if self._lock_file.is_file():
with open(self._lock_file) as lf:
pid = int(lf.read())
self.log.info(f"Checking for running process with PID {pid}")
self.log.info("Checking for running process with PID %s", pid)
try:
_process_running = psutil.Process(pid).is_running()
self.log.info(f"Process {pid} running: {_process_running} and has the lock {self._lock_file}.")
self.log.info("Process %s running: %s and has the lock %s.", pid, _process_running, self._lock_file)
except psutil.NoSuchProcess:
self.log.info(f"Process {pid} is not running. Lock {self._lock_file} was outdated.")
self.log.info("Process %s is not running. Lock %s was outdated.", pid, self._lock_file)
is_available = True
else:
is_available = not _process_running
Expand Down
17 changes: 14 additions & 3 deletions cosmos/plugin/airflow3.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ def dbt_docs_index(slug_alias: str = slug) -> Response: # type: ignore[no-redef
)
except (OSError, ValueError, RuntimeError, TimeoutError, PermissionError):
logger.exception(
f"Cosmos dbt docs error: index read failed for slug={slug_alias}, path={op.join(docs_dir_local, index_local)}, conn_id={conn_id_local}"
"Cosmos dbt docs error: index read failed for slug=%s, path=%s, conn_id=%s",
slug_alias,
op.join(docs_dir_local, index_local),
conn_id_local,
)
return HTMLResponse(
content=(
Expand Down Expand Up @@ -229,7 +232,11 @@ def manifest(slug_alias: str = slug) -> Response: # type: ignore[no-redef]
)
except (OSError, ValueError, RuntimeError, TimeoutError, PermissionError) as e:
logger.exception(
f"Error reading manifest for slug '{slug_alias}', path '{op.join(docs_dir_local, 'manifest.json')}', conn_id '{conn_id_local}': {e}"
"Error reading manifest for slug '%s', path '%s', conn_id '%s': %s",
slug_alias,
op.join(docs_dir_local, "manifest.json"),
conn_id_local,
e,
)
return JSONResponse(
content={
Expand Down Expand Up @@ -263,7 +270,11 @@ def catalog(slug_alias: str = slug) -> Response: # type: ignore[no-redef]
)
except (OSError, ValueError, RuntimeError, TimeoutError, PermissionError) as e:
logger.exception(
f"Error reading catalog for slug '{slug_alias}', path '{op.join(docs_dir_local, 'catalog.json')}', conn_id '{conn_id_local}': {e}"
"Error reading catalog for slug '%s', path '%s', conn_id '%s': %s",
slug_alias,
op.join(docs_dir_local, "catalog.json"),
conn_id_local,
e,
)
return JSONResponse(
content={
Expand Down
Loading