diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 39e4c2b7f4..992e05a192 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -499,7 +499,7 @@ def generate_or_convert_task( conversion_function = node_converters.get(resource_type, None) if conversion_function is not None: task_id = task_meta.id - logger.debug(f"Converting node <{node.unique_id}> task <{task_id}> using <{conversion_function.__name__}>") + logger.debug("Converting node <%s> task <%s> using <%s>", node.unique_id, task_id, conversion_function.__name__) # In Cosmos 2.0 we should review this implementation and use render_config or another simpler interface: task = conversion_function( # type: ignore dag=dag, @@ -520,7 +520,7 @@ def generate_or_convert_task( detached_from_parent=detached_from_parent, ) if task is not None: - logger.debug(f"Conversion of node <{node.unique_id}> task <{task_id}> was successful!") + logger.debug("Conversion of node <%s> task <%s> was successful!", node.unique_id, task_id) else: task = create_airflow_task(task_meta, dag, task_group) return task diff --git a/cosmos/cache.py b/cosmos/cache.py index b898ed180c..2d3cb43c96 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -306,7 +306,9 @@ def _calculate_yaml_selectors_cache_current_version( elapsed_time = time.perf_counter() - start_time logger.info( - f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}" + "Cosmos performance: time to calculate cache identifier %s for current version: %s", + cache_identifier, + elapsed_time, ) return f"{dbt_project_hash},{yaml_selector_hash},{cache_key_hash}" @@ -331,7 +333,9 @@ def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: elapsed_time = time.perf_counter() - start_time logger.info( - f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}" + "Cosmos performance: time to calculate cache identifier %s for current version: %s", + cache_identifier, + elapsed_time, ) return f"{dbt_project_hash},{hash_args}" @@ -406,7 +410,7 @@ def delete_unused_dbt_cache( if session is None: return 0 - logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}") + logger.info("Delete the Cosmos cache stored in Airflow Variables that hasn't been used for %s", max_age_last_usage) cosmos_dags_ids = defaultdict(list) all_variables = session.scalars(select(Variable)).all() total_cosmos_variables = 0 @@ -431,12 +435,14 @@ def delete_unused_dbt_cache( ) if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): for var_key in vars_keys: - logger.info(f"Removing the {cache_type} cache {var_key}") + logger.info("Removing the %s cache %s", cache_type, var_key) Variable.delete(var_key) deleted_cosmos_variables += 1 logger.info( - f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. " + "Deleted %s/%s Airflow Variables used to store Cosmos cache.", + deleted_cosmos_variables, + total_cosmos_variables, ) return deleted_cosmos_variables @@ -460,13 +466,14 @@ def delete_unused_dbt_remote_cache_files( # pragma: no cover if session is None: return 0 - logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}") + logger.info("Delete the Cosmos cache stored remotely that hasn't been used for %s", max_age_last_usage) cosmos_dags_ids_remote_cache_files = defaultdict(list) configured_remote_cache_dir = _configure_remote_cache_dir() if not configured_remote_cache_dir: logger.info( - f"No remote cache directory configured. Skipping the deletion of the {cache_type} cache files in remote storage." + "No remote cache directory configured. Skipping the deletion of the %s cache files in remote storage.", + cache_type, ) return 0 @@ -495,7 +502,7 @@ def delete_unused_dbt_remote_cache_files( # pragma: no cover ) if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): for file in files: - logger.info(f"Removing the {cache_type} cache remote file {file}") + logger.info("Removing the %s cache remote file %s", cache_type, file) file.unlink() deleted_cosmos_remote_cache_files += 1 logger.info( diff --git a/cosmos/converter.py b/cosmos/converter.py index 10d788347e..514505957f 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -338,7 +338,12 @@ def __init__( current_time = time.perf_counter() elapsed_time = current_time - previous_time logger.info( - f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to parse the dbt project for DAG using {self.dbt_graph.load_method}" + "Cosmos performance (%s) - [%s|%s]: It took %.3gs to parse the dbt project for DAG using %s", + cache_identifier, + platform.node(), + os.getpid(), + elapsed_time, + self.dbt_graph.load_method, ) previous_time = current_time @@ -385,7 +390,11 @@ def __init__( current_time = time.perf_counter() elapsed_time = current_time - previous_time logger.info( - f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG." + "Cosmos performance (%s) - [%s|%s]: It took %.3gs to build the Airflow DAG.", + cache_identifier, + platform.node(), + os.getpid(), + elapsed_time, ) logger.info("::endgroup::Cosmos DAG parsing logs") @@ -414,9 +423,9 @@ def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None: else: dag.doc_md = f"**dbt project hash:** `{dbt_project_hash}`" - logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation") + logger.debug("Appended dbt project hash %s to DAG %s documentation", dbt_project_hash, dag.dag_id) except Exception as e: - logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}") + logger.warning("Failed to append dbt project hash to DAG documentation: %s", e) def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 self, @@ -480,9 +489,12 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901 ) stored_metadata = True except ParamValidationError as e: - logger.warning(f"Failed to store compressed Cosmos telemetry metadata in DAG {dag.dag_id} params: {e}") + logger.warning("Failed to store compressed Cosmos telemetry metadata in DAG %s params: %s", dag.dag_id, e) if stored_metadata: logger.debug( - f"Stored compressed Cosmos telemetry metadata in DAG {dag.dag_id} params (original size: {len(str(metadata))} bytes, compressed: {len(compressed_metadata)} bytes)" + "Stored compressed Cosmos telemetry metadata in DAG %s params (original size: %s bytes, compressed: %s bytes)", + dag.dag_id, + len(str(metadata)), + len(compressed_metadata), ) diff --git a/cosmos/core/graph/entities.py b/cosmos/core/graph/entities.py index 52aecbd371..d852192a9b 100644 --- a/cosmos/core/graph/entities.py +++ b/cosmos/core/graph/entities.py @@ -46,7 +46,7 @@ def add_entity(self, entity: CosmosEntity) -> None: :param entity: The entity to add """ - logger.info(f"Adding entity {entity.id} to group {self.id}...") + logger.info("Adding entity %s to group %s...", entity.id, self.id) self.entities.append(entity) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index cfea5b489f..37ee6daa25 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -533,7 +533,7 @@ def dbt_ls_cache_key_args(self) -> list[str]: airflow_vars = [var_name, Variable.get(var_name, "")] cache_args.extend(airflow_vars) - logger.debug(f"Value of `dbt_ls_cache_key_args` for <{self.cache_key}>: {cache_args}") + logger.debug("Value of `dbt_ls_cache_key_args` for <%s>: %s", self.cache_key, cache_args) return cache_args @cached_property @@ -555,7 +555,7 @@ def get_dbt_yaml_selectors_cache_key_args(self, impl_version: str) -> list[str]: will be reparsed and the new value will be stored. """ cache_args = [impl_version] + self._yaml_selectors_airflow_vars - logger.debug(f"Value of `dbt_yaml_selectors_cache_key` for <{self.cache_key}>: {cache_args}") + logger.debug("Value of `dbt_yaml_selectors_cache_key` for <%s>: %s", self.cache_key, cache_args) return cache_args def _save_cache_to_variable(self, cache_dict: dict[str, Any], cache_name: str) -> None: @@ -783,13 +783,13 @@ def should_use_yaml_selectors_cache(self) -> bool: def load_via_dbt_ls_cache(self) -> bool: """(Try to) load dbt ls cache from an Airflow Variable""" - logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_key}...") + logger.info("Trying to parse the dbt project using dbt ls cache %s...", self.cache_key) if self.should_use_dbt_ls_cache(): project_path = self.project_path cache_dict = self.get_dbt_ls_cache() if not cache_dict: - logger.info(f"Cosmos performance: Cache miss for {self.cache_key}") + logger.info("Cosmos performance: Cache miss for %s", self.cache_key) return False cache_version = cache_dict.get("version") @@ -801,16 +801,20 @@ def load_via_dbt_ls_cache(self) -> bool: if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version): logger.info( - f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_key} is {len(dbt_ls_cache)}" + "Cosmos performance [%s|%s]: The cache size for %s is %s", + platform.node(), + os.getpid(), + self.cache_key, + len(dbt_ls_cache), ) self.load_method = LoadMode.DBT_LS_CACHE nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache) self.nodes = nodes self.filtered_nodes = nodes - logger.info(f"Cosmos performance: Cache hit for {self.cache_key} - {current_version}") + logger.info("Cosmos performance: Cache hit for %s - %s", self.cache_key, current_version) return True - logger.info(f"Cosmos performance: Cache miss for {self.cache_key} - skipped") + logger.info("Cosmos performance: Cache miss for %s - skipped", self.cache_key) return False def should_use_partial_parse_cache(self) -> bool: @@ -893,13 +897,13 @@ def load_via_dbt_ls_without_cache(self) -> None: dbt_cmd = self.render_config.dbt_executable_path dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd - logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") + logger.info("Trying to parse the dbt project in `%s` using dbt ls...", self.render_config.project_path) project_path = self.project_path if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") with tempfile.TemporaryDirectory() as tmpdir: - logger.debug(f"Content of the dbt project dir {project_path}: `{os.listdir(project_path)}`") + logger.debug("Content of the dbt project dir %s: `%s`", project_path, os.listdir(project_path)) tmpdir_path = Path(tmpdir) self._copy_or_create_symbolic_links(project_path, tmpdir_path) @@ -1174,13 +1178,13 @@ def load_parsed_selectors(self, selector_definitions: dict[str, Any]) -> YamlSel Returns: YamlSelectors: A YamlSelectors instance """ - logger.info(f"Trying to parse the dbt yaml selectors using {self.cache_key}...") + logger.info("Trying to parse the dbt yaml selectors using %s...", self.cache_key) if self.should_use_yaml_selectors_cache(): cache_dict = self.get_yaml_selectors_cache() if not cache_dict: - logger.info(f"Cosmos performance: Cache miss for {self.cache_key}") + logger.info("Cosmos performance: Cache miss for %s", self.cache_key) return self.parse_yaml_selectors(selector_definitions) @@ -1196,13 +1200,17 @@ def load_parsed_selectors(self, selector_definitions: dict[str, Any]) -> YamlSel if cache_dict and not cache.were_yaml_selectors_modified(cache_version, current_version): logger.info( - f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_key} is {len(yaml_selectors.parsed)}" + "Cosmos performance [%s|%s]: The cache size for %s is %s", + platform.node(), + os.getpid(), + self.cache_key, + len(yaml_selectors.parsed), ) - logger.info(f"Cosmos performance: Cache hit for {self.cache_key} - {current_version}") + logger.info("Cosmos performance: Cache hit for %s - %s", self.cache_key, current_version) return yaml_selectors - logger.info(f"Cosmos performance: Cache miss for {self.cache_key} - skipped") + logger.info("Cosmos performance: Cache miss for %s - skipped", self.cache_key) return self.parse_yaml_selectors(selector_definitions) diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index 633c96618c..40a383317c 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -182,7 +182,7 @@ def extract_sql_file_requirements(self, code: str) -> tuple[list[str], set[str]] if base_node.node.name == "config": config_selectors |= self._parse_jinja_config_node(base_node) except KeyError as e: - logger.warning(f"Could not add upstream model for config in {self.path}: {e}") + logger.warning("Could not add upstream model for config in %s: %s", self.path, e) return upstream_models, config_selectors @@ -237,7 +237,7 @@ def _extract_config(self, kwarg: Any, config_name: str) -> Any: except Exception as e: # if we can't convert it to a constant, we can't do anything with it - logger.warning(f"Could not parse {config_name} from config in {self.path}: {e}") + logger.warning("Could not parse %s from config in %s: %s", config_name, self.path, e) pass def __repr__(self) -> str: diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index 3a9d12d2ae..1fdfaa1401 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -37,7 +37,7 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool: if filepath.exists() and filepath.stat().st_size > 0: return True - logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") + logger.info("Project %s does not have %s", project_path, DBT_DEPENDENCIES_FILE_NAMES) return False @@ -83,7 +83,7 @@ def get_dbt_packages_subpath(source_folder: Path) -> str: try: dbt_project_file_content = yaml.safe_load(fp) except yaml.YAMLError: - logger.info(f"Unable to read the {DBT_PROJECT_FILENAME} file") + logger.info("Unable to read the %s file", DBT_PROJECT_FILENAME) else: if isinstance(dbt_project_file_content, dict): subpath = dbt_project_file_content.get("packages-install-path", DBT_DEFAULT_PACKAGES_FOLDER) @@ -126,7 +126,7 @@ def copy_manifest_file_if_exists(source_manifest: str | Path, dbt_project_folder dbt_project_folder = Path(dbt_project_folder) source_manifest = str(source_manifest) if source_manifest and Path(source_manifest).exists(): - logger.info(f"Copying the manifest from {source_manifest}...") + logger.info("Copying the manifest from %s...", source_manifest) target_folder_path = dbt_project_folder / DBT_TARGET_DIR_NAME tmp_manifest_filepath = target_folder_path / DBT_MANIFEST_FILE_NAME Path(target_folder_path).mkdir(parents=True, exist_ok=True) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 9c583f71f7..cb30f3f63f 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -334,7 +334,7 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901 root_id = node_by_name[node_name_patched] root_nodes.add(root_id) else: - logger.warning(f"Selector {self.node_name} not found.") + logger.warning("Selector %s not found.", self.node_name) return selected_nodes selected_nodes.update(root_nodes) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index c16713191c..b722952a50 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -80,7 +80,7 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]: try: return _decompress_telemetry_metadata(compressed_metadata) except (binascii.Error, zlib.error, json.JSONDecodeError, UnicodeDecodeError) as e: - logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}") + logger.warning("Failed to decompress telemetry metadata: %s: %s", type(e).__name__, e) return {} diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 1e4a4b3d8c..6b7f7a158d 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -425,16 +425,18 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo """ if try_number > 1: logger.info( - f"Retry attempt #%s – Running model '%s' from project '%s' using {self.__class__.__name__}", + "Retry attempt #%s – Running model '%s' from project '%s' using %s", try_number - 1, self.model_unique_id, self.project_dir, + self.__class__.__name__, ) else: logger.info( - f"Falling back to running model '%s' from project '%s' using {self.__class__.__name__}", + "Falling back to running model '%s' from project '%s' using %s", self.model_unique_id, self.project_dir, + self.__class__.__name__, ) upstream_task = context["ti"].task.dag.get_task(self.producer_task_id) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 940dac27f4..12f57382e2 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -574,7 +574,7 @@ def _handle_datasets(self, context: Context) -> None: if settings.enable_uri_xcom and (uris := [outlet.uri for outlet in outlets]): context["ti"].xcom_push(key="uri", value=uris) - logger.info(f"Pushed outlet URI(s) to XCom: {uris}") + logger.info("Pushed outlet URI(s) to XCom: %s", uris) def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None: if self.cache_dir is None: diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index c41f9d3773..6198d3cde5 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -210,13 +210,13 @@ def _acquire_venv_lock(self) -> None: os.mkdir(str(self.virtualenv_dir)) with open(self._lock_file, "w") as lf: - logger.info(f"Acquiring lock at {self._lock_file} with pid {str(self._pid)}") + self.log.info("Acquiring lock at %s with pid %s", self._lock_file, self._pid) lf.write(str(self._pid)) @depends_on_virtualenv_dir def _release_venv_lock(self) -> None: if not self._lock_file.is_file(): - logger.warning(f"Lockfile {self._lock_file} not found, perhaps deleted by other concurrent operator?") + self.log.warning("Lockfile %s not found, perhaps deleted by other concurrent operator?", self._lock_file) return with open(self._lock_file) as lf: @@ -225,7 +225,7 @@ def _release_venv_lock(self) -> None: if lock_file_pid == self._pid: return self._lock_file.unlink() - logger.warning(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}") + self.log.warning("Lockfile owned by process of pid %s, while operator has pid %s", lock_file_pid, self._pid) class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): # type: ignore[misc] diff --git a/cosmos/plugin/cluster_policy.py b/cosmos/plugin/cluster_policy.py index 8e6cdaf57e..0b1c394884 100644 --- a/cosmos/plugin/cluster_policy.py +++ b/cosmos/plugin/cluster_policy.py @@ -45,6 +45,8 @@ def task_instance_mutation_hook(task_instance: TaskInstance) -> None: if watcher_dbt_execution_queue and task_instance.try_number and _is_watcher_sensor(task_instance): if task_instance.try_number >= retry_number: log.info( - f"Setting task {task_instance.task_id} to use watcher dbt execution queue: {watcher_dbt_execution_queue}", + "Setting task %s to use watcher dbt execution queue: %s", + task_instance.task_id, + watcher_dbt_execution_queue, ) task_instance.queue = watcher_dbt_execution_queue diff --git a/cosmos/versioning.py b/cosmos/versioning.py index 8c3a361703..e022bf5bb9 100644 --- a/cosmos/versioning.py +++ b/cosmos/versioning.py @@ -35,6 +35,6 @@ def _create_folder_version_hash(dir_path: Path) -> str: buf = fp.read() hasher.update(buf) except FileNotFoundError: - logger.warning(f"The dbt project folder contains a symbolic link to a non-existent file: {filepath}") + logger.warning("The dbt project folder contains a symbolic link to a non-existent file: %s", filepath) return hasher.hexdigest() diff --git a/tests/plugin/test_cluster_policy.py b/tests/plugin/test_cluster_policy.py index d33bc78841..1139aa666d 100644 --- a/tests/plugin/test_cluster_policy.py +++ b/tests/plugin/test_cluster_policy.py @@ -187,7 +187,9 @@ def test_logging_when_queue_is_set(self, mock_log): task_instance_mutation_hook(task_instance) mock_log.info.assert_called_once_with( - "Setting task my_test_task to use watcher dbt execution queue: custom_retry_queue", + "Setting task %s to use watcher dbt execution queue: %s", + "my_test_task", + "custom_retry_queue", ) @patch("cosmos.settings.watcher_dbt_execution_queue", "custom_retry_queue") diff --git a/tests/test_converter.py b/tests/test_converter.py index 8c3dba5da1..1f10da34f9 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1162,9 +1162,9 @@ def test_dag_versioning_hash_error_handling(mock_load_dbt_graph, mock_hash_func, # Error should be logged as warning mock_logger.warning.assert_called_once() - warning_call = mock_logger.warning.call_args[0][0] - assert "Failed to append dbt project hash to DAG documentation" in warning_call - assert "File system error" in warning_call + warning_args = mock_logger.warning.call_args[0] + assert "Failed to append dbt project hash to DAG documentation" in warning_args[0] + assert "File system error" in str(warning_args[1]) @skipif_airflow_lt_3_dag_doc_hash @@ -1220,7 +1220,7 @@ def test_dag_versioning_successful_logging(mock_load_dbt_graph, mock_hash_func, ) # Check that the hash logging call was made (there are multiple debug calls now) - debug_calls = [str(call) for call in mock_logger.debug.call_args_list] + debug_calls = [call.args[0] % call.args[1:] for call in mock_logger.debug.call_args_list if call.args] assert any( "Appended dbt project hash test_hash_123 to DAG test_dag_logging documentation" in call for call in debug_calls )