Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def generate_or_convert_task(
conversion_function = node_converters.get(resource_type, None)
if conversion_function is not None:
task_id = task_meta.id
logger.debug(f"Converting node <{node.unique_id}> task <{task_id}> using <{conversion_function.__name__}>")
logger.debug("Converting node <%s> task <%s> using <%s>", node.unique_id, task_id, conversion_function.__name__)
# In Cosmos 2.0 we should review this implementation and use render_config or another simpler interface:
task = conversion_function( # type: ignore
dag=dag,
Expand All @@ -520,7 +520,7 @@ def generate_or_convert_task(
detached_from_parent=detached_from_parent,
)
if task is not None:
logger.debug(f"Conversion of node <{node.unique_id}> task <{task_id}> was successful!")
logger.debug("Conversion of node <%s> task <%s> was successful!", node.unique_id, task_id)
else:
task = create_airflow_task(task_meta, dag, task_group)
return task
Expand Down
23 changes: 15 additions & 8 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ def _calculate_yaml_selectors_cache_current_version(

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
"Cosmos performance: time to calculate cache identifier %s for current version: %s",
cache_identifier,
elapsed_time,
)
return f"{dbt_project_hash},{yaml_selector_hash},{cache_key_hash}"

Expand All @@ -331,7 +333,9 @@ def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir:

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
"Cosmos performance: time to calculate cache identifier %s for current version: %s",
cache_identifier,
elapsed_time,
)
return f"{dbt_project_hash},{hash_args}"

Expand Down Expand Up @@ -406,7 +410,7 @@ def delete_unused_dbt_cache(
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}")
logger.info("Delete the Cosmos cache stored in Airflow Variables that hasn't been used for %s", max_age_last_usage)
cosmos_dags_ids = defaultdict(list)
all_variables = session.scalars(select(Variable)).all()
total_cosmos_variables = 0
Expand All @@ -431,12 +435,14 @@ def delete_unused_dbt_cache(
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for var_key in vars_keys:
logger.info(f"Removing the {cache_type} cache {var_key}")
logger.info("Removing the %s cache %s", cache_type, var_key)
Variable.delete(var_key)
deleted_cosmos_variables += 1

logger.info(
f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. "
"Deleted %s/%s Airflow Variables used to store Cosmos cache.",
deleted_cosmos_variables,
total_cosmos_variables,
)
Comment thread
pankajastro marked this conversation as resolved.
return deleted_cosmos_variables

Expand All @@ -460,13 +466,14 @@ def delete_unused_dbt_remote_cache_files( # pragma: no cover
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}")
logger.info("Delete the Cosmos cache stored remotely that hasn't been used for %s", max_age_last_usage)
cosmos_dags_ids_remote_cache_files = defaultdict(list)

configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
f"No remote cache directory configured. Skipping the deletion of the {cache_type} cache files in remote storage."
"No remote cache directory configured. Skipping the deletion of the %s cache files in remote storage.",
cache_type,
)
return 0

Expand Down Expand Up @@ -495,7 +502,7 @@ def delete_unused_dbt_remote_cache_files( # pragma: no cover
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the {cache_type} cache remote file {file}")
logger.info("Removing the %s cache remote file %s", cache_type, file)
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
Expand Down
24 changes: 18 additions & 6 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ def __init__(
current_time = time.perf_counter()
elapsed_time = current_time - previous_time
logger.info(
f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to parse the dbt project for DAG using {self.dbt_graph.load_method}"
"Cosmos performance (%s) - [%s|%s]: It took %.3gs to parse the dbt project for DAG using %s",
cache_identifier,
platform.node(),
os.getpid(),
elapsed_time,
self.dbt_graph.load_method,
)
previous_time = current_time

Expand Down Expand Up @@ -385,7 +390,11 @@ def __init__(
current_time = time.perf_counter()
elapsed_time = current_time - previous_time
logger.info(
f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG."
"Cosmos performance (%s) - [%s|%s]: It took %.3gs to build the Airflow DAG.",
cache_identifier,
platform.node(),
os.getpid(),
elapsed_time,
)
logger.info("::endgroup::Cosmos DAG parsing logs")

Expand Down Expand Up @@ -414,9 +423,9 @@ def _add_dbt_project_hash_to_dag_docs(self, dag: DAG | None) -> None:
else:
dag.doc_md = f"**dbt project hash:** `{dbt_project_hash}`"

logger.debug(f"Appended dbt project hash {dbt_project_hash} to DAG {dag.dag_id} documentation")
logger.debug("Appended dbt project hash %s to DAG %s documentation", dbt_project_hash, dag.dag_id)
except Exception as e:
logger.warning(f"Failed to append dbt project hash to DAG documentation: {e}")
logger.warning("Failed to append dbt project hash to DAG documentation: %s", e)

def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901
self,
Expand Down Expand Up @@ -480,9 +489,12 @@ def _store_cosmos_telemetry_metadata_on_dag( # noqa: C901
)
stored_metadata = True
except ParamValidationError as e:
logger.warning(f"Failed to store compressed Cosmos telemetry metadata in DAG {dag.dag_id} params: {e}")
logger.warning("Failed to store compressed Cosmos telemetry metadata in DAG %s params: %s", dag.dag_id, e)

if stored_metadata:
logger.debug(
f"Stored compressed Cosmos telemetry metadata in DAG {dag.dag_id} params (original size: {len(str(metadata))} bytes, compressed: {len(compressed_metadata)} bytes)"
"Stored compressed Cosmos telemetry metadata in DAG %s params (original size: %s bytes, compressed: %s bytes)",
dag.dag_id,
len(str(metadata)),
len(compressed_metadata),
)
2 changes: 1 addition & 1 deletion cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def add_entity(self, entity: CosmosEntity) -> None:

:param entity: The entity to add
"""
logger.info(f"Adding entity {entity.id} to group {self.id}...")
logger.info("Adding entity %s to group %s...", entity.id, self.id)

self.entities.append(entity)

Expand Down
36 changes: 22 additions & 14 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def dbt_ls_cache_key_args(self) -> list[str]:
airflow_vars = [var_name, Variable.get(var_name, "")]
cache_args.extend(airflow_vars)

logger.debug(f"Value of `dbt_ls_cache_key_args` for <{self.cache_key}>: {cache_args}")
logger.debug("Value of `dbt_ls_cache_key_args` for <%s>: %s", self.cache_key, cache_args)
return cache_args

@cached_property
Expand All @@ -555,7 +555,7 @@ def get_dbt_yaml_selectors_cache_key_args(self, impl_version: str) -> list[str]:
will be reparsed and the new value will be stored.
"""
cache_args = [impl_version] + self._yaml_selectors_airflow_vars
logger.debug(f"Value of `dbt_yaml_selectors_cache_key` for <{self.cache_key}>: {cache_args}")
logger.debug("Value of `dbt_yaml_selectors_cache_key` for <%s>: %s", self.cache_key, cache_args)
return cache_args

def _save_cache_to_variable(self, cache_dict: dict[str, Any], cache_name: str) -> None:
Expand Down Expand Up @@ -783,13 +783,13 @@ def should_use_yaml_selectors_cache(self) -> bool:

def load_via_dbt_ls_cache(self) -> bool:
"""(Try to) load dbt ls cache from an Airflow Variable"""
logger.info(f"Trying to parse the dbt project using dbt ls cache {self.cache_key}...")
logger.info("Trying to parse the dbt project using dbt ls cache %s...", self.cache_key)
if self.should_use_dbt_ls_cache():
project_path = self.project_path

cache_dict = self.get_dbt_ls_cache()
if not cache_dict:
logger.info(f"Cosmos performance: Cache miss for {self.cache_key}")
logger.info("Cosmos performance: Cache miss for %s", self.cache_key)
return False

cache_version = cache_dict.get("version")
Expand All @@ -801,16 +801,20 @@ def load_via_dbt_ls_cache(self) -> bool:

if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version):
logger.info(
f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_key} is {len(dbt_ls_cache)}"
"Cosmos performance [%s|%s]: The cache size for %s is %s",
platform.node(),
os.getpid(),
self.cache_key,
len(dbt_ls_cache),
)
self.load_method = LoadMode.DBT_LS_CACHE

nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache)
self.nodes = nodes
self.filtered_nodes = nodes
logger.info(f"Cosmos performance: Cache hit for {self.cache_key} - {current_version}")
logger.info("Cosmos performance: Cache hit for %s - %s", self.cache_key, current_version)
return True
logger.info(f"Cosmos performance: Cache miss for {self.cache_key} - skipped")
logger.info("Cosmos performance: Cache miss for %s - skipped", self.cache_key)
return False

def should_use_partial_parse_cache(self) -> bool:
Expand Down Expand Up @@ -893,13 +897,13 @@ def load_via_dbt_ls_without_cache(self) -> None:
dbt_cmd = self.render_config.dbt_executable_path
dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd

logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...")
logger.info("Trying to parse the dbt project in `%s` using dbt ls...", self.render_config.project_path)
project_path = self.project_path
if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

with tempfile.TemporaryDirectory() as tmpdir:
logger.debug(f"Content of the dbt project dir {project_path}: `{os.listdir(project_path)}`")
logger.debug("Content of the dbt project dir %s: `%s`", project_path, os.listdir(project_path))
tmpdir_path = Path(tmpdir)

self._copy_or_create_symbolic_links(project_path, tmpdir_path)
Expand Down Expand Up @@ -1174,13 +1178,13 @@ def load_parsed_selectors(self, selector_definitions: dict[str, Any]) -> YamlSel
Returns:
YamlSelectors: A YamlSelectors instance
"""
logger.info(f"Trying to parse the dbt yaml selectors using {self.cache_key}...")
logger.info("Trying to parse the dbt yaml selectors using %s...", self.cache_key)

if self.should_use_yaml_selectors_cache():
cache_dict = self.get_yaml_selectors_cache()

if not cache_dict:
logger.info(f"Cosmos performance: Cache miss for {self.cache_key}")
logger.info("Cosmos performance: Cache miss for %s", self.cache_key)

return self.parse_yaml_selectors(selector_definitions)

Expand All @@ -1196,13 +1200,17 @@ def load_parsed_selectors(self, selector_definitions: dict[str, Any]) -> YamlSel

if cache_dict and not cache.were_yaml_selectors_modified(cache_version, current_version):
logger.info(
f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.cache_key} is {len(yaml_selectors.parsed)}"
"Cosmos performance [%s|%s]: The cache size for %s is %s",
platform.node(),
os.getpid(),
self.cache_key,
len(yaml_selectors.parsed),
)
logger.info(f"Cosmos performance: Cache hit for {self.cache_key} - {current_version}")
logger.info("Cosmos performance: Cache hit for %s - %s", self.cache_key, current_version)

return yaml_selectors

logger.info(f"Cosmos performance: Cache miss for {self.cache_key} - skipped")
logger.info("Cosmos performance: Cache miss for %s - skipped", self.cache_key)

return self.parse_yaml_selectors(selector_definitions)

Expand Down
4 changes: 2 additions & 2 deletions cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def extract_sql_file_requirements(self, code: str) -> tuple[list[str], set[str]]
if base_node.node.name == "config":
config_selectors |= self._parse_jinja_config_node(base_node)
except KeyError as e:
logger.warning(f"Could not add upstream model for config in {self.path}: {e}")
logger.warning("Could not add upstream model for config in %s: %s", self.path, e)

return upstream_models, config_selectors

Expand Down Expand Up @@ -237,7 +237,7 @@ def _extract_config(self, kwarg: Any, config_name: str) -> Any:

except Exception as e:
# if we can't convert it to a constant, we can't do anything with it
logger.warning(f"Could not parse {config_name} from config in {self.path}: {e}")
logger.warning("Could not parse %s from config in %s: %s", config_name, self.path, e)
pass

def __repr__(self) -> str:
Expand Down
6 changes: 3 additions & 3 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool:
if filepath.exists() and filepath.stat().st_size > 0:
return True

logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
logger.info("Project %s does not have %s", project_path, DBT_DEPENDENCIES_FILE_NAMES)
return False


Expand Down Expand Up @@ -83,7 +83,7 @@ def get_dbt_packages_subpath(source_folder: Path) -> str:
try:
dbt_project_file_content = yaml.safe_load(fp)
except yaml.YAMLError:
logger.info(f"Unable to read the {DBT_PROJECT_FILENAME} file")
logger.info("Unable to read the %s file", DBT_PROJECT_FILENAME)
else:
if isinstance(dbt_project_file_content, dict):
subpath = dbt_project_file_content.get("packages-install-path", DBT_DEFAULT_PACKAGES_FOLDER)
Expand Down Expand Up @@ -126,7 +126,7 @@ def copy_manifest_file_if_exists(source_manifest: str | Path, dbt_project_folder
dbt_project_folder = Path(dbt_project_folder)
source_manifest = str(source_manifest)
if source_manifest and Path(source_manifest).exists():
logger.info(f"Copying the manifest from {source_manifest}...")
logger.info("Copying the manifest from %s...", source_manifest)
target_folder_path = dbt_project_folder / DBT_TARGET_DIR_NAME
tmp_manifest_filepath = target_folder_path / DBT_MANIFEST_FILE_NAME
Path(target_folder_path).mkdir(parents=True, exist_ok=True)
Expand Down
2 changes: 1 addition & 1 deletion cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901
root_id = node_by_name[node_name_patched]
root_nodes.add(root_id)
else:
logger.warning(f"Selector {self.node_name} not found.")
logger.warning("Selector %s not found.", self.node_name)
return selected_nodes

selected_nodes.update(root_nodes)
Expand Down
2 changes: 1 addition & 1 deletion cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def get_cosmos_telemetry_metadata(dag: DAG) -> dict[str, Any]:
try:
return _decompress_telemetry_metadata(compressed_metadata)
except (binascii.Error, zlib.error, json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Failed to decompress telemetry metadata: {type(e).__name__}: {e}")
logger.warning("Failed to decompress telemetry metadata: %s: %s", type(e).__name__, e)
return {}


Expand Down
6 changes: 4 additions & 2 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,18 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo
"""
if try_number > 1:
logger.info(
f"Retry attempt #%s – Running model '%s' from project '%s' using {self.__class__.__name__}",
"Retry attempt #%s – Running model '%s' from project '%s' using %s",
try_number - 1,
self.model_unique_id,
self.project_dir,
self.__class__.__name__,
)
else:
logger.info(
f"Falling back to running model '%s' from project '%s' using {self.__class__.__name__}",
"Falling back to running model '%s' from project '%s' using %s",
self.model_unique_id,
self.project_dir,
self.__class__.__name__,
)

upstream_task = context["ti"].task.dag.get_task(self.producer_task_id)
Expand Down
2 changes: 1 addition & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ def _handle_datasets(self, context: Context) -> None:

if settings.enable_uri_xcom and (uris := [outlet.uri for outlet in outlets]):
context["ti"].xcom_push(key="uri", value=uris)
logger.info(f"Pushed outlet URI(s) to XCom: {uris}")
logger.info("Pushed outlet URI(s) to XCom: %s", uris)

def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None:
if self.cache_dir is None:
Expand Down
6 changes: 3 additions & 3 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ def _acquire_venv_lock(self) -> None:
os.mkdir(str(self.virtualenv_dir))

with open(self._lock_file, "w") as lf:
logger.info(f"Acquiring lock at {self._lock_file} with pid {str(self._pid)}")
self.log.info("Acquiring lock at %s with pid %s", self._lock_file, self._pid)
lf.write(str(self._pid))
Comment thread
pankajastro marked this conversation as resolved.

@depends_on_virtualenv_dir
def _release_venv_lock(self) -> None:
if not self._lock_file.is_file():
logger.warning(f"Lockfile {self._lock_file} not found, perhaps deleted by other concurrent operator?")
self.log.warning("Lockfile %s not found, perhaps deleted by other concurrent operator?", self._lock_file)
return

with open(self._lock_file) as lf:
Expand All @@ -225,7 +225,7 @@ def _release_venv_lock(self) -> None:
if lock_file_pid == self._pid:
return self._lock_file.unlink()

logger.warning(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}")
self.log.warning("Lockfile owned by process of pid %s, while operator has pid %s", lock_file_pid, self._pid)


class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): # type: ignore[misc]
Expand Down
Loading
Loading