Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ def get_profile_type(self) -> str:
target_type = profile["outputs"][self.target_name]["type"]
return str(target_type)

return "undefined"

Comment thread
tatiana marked this conversation as resolved.
def _get_profile_path(self, use_mock_values: bool = False) -> Path:
"""
Handle the profile caching mechanism.
Expand Down
8 changes: 4 additions & 4 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def validate_arguments(
Validate that mutually exclusive selectors filters have not been given.
Validate deprecated arguments.

:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param profile_config: ProfileConfig Object
:param render_config: Render configuration
:param profile_config: Profile configuration
:param task_args: Arguments to be used to instantiate an Airflow Task
:param execution_mode: the current execution mode
:param execution_config: Execution configuration
:param project_config: Project configuration
"""
for field in ("tags", "paths"):
select_items = retrieve_by_label(render_config.select, field)
Expand Down
2 changes: 2 additions & 0 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None)
Get the Airflow Operator class for a Task.

:param task: The Task to get the Operator for
:param dag: The DAG to get the Operator for
:param task_group: The TaskGroup to get the Operator for

:return: The Operator class
:rtype: BaseOperator
Expand Down
4 changes: 3 additions & 1 deletion cosmos/dbt/parser/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def clean_line(line: str) -> str:
details="Use the `cosmos.dbt.runner.extract_message_by_status` instead.",
) # type: ignore[untyped-decorator]
def extract_dbt_runner_issues(
result: dbtRunnerResult, status_levels: list[str] = ["warn"]
result: dbtRunnerResult, status_levels: list[str] | None = None
) -> tuple[list[str], list[str]]: # type: ignore[misc]
"""
Extracts messages from the dbt runner result and returns them as a formatted string.
Expand All @@ -136,6 +136,8 @@ def extract_dbt_runner_issues(
:return: two lists of strings, the first one containing the node names and the second one
containing the node result message.
"""
status_levels = ["warn"] if status_levels is None else status_levels

node_names = []
node_results = []

Expand Down
2 changes: 1 addition & 1 deletion cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def _extract_model_tests(
type=DbtModelType.DBT_TEST,
path=path,
dbt_vars=self.dbt_vars,
config=DbtModelConfig(upstream_models=set({model_name})),
config=DbtModelConfig(upstream_models={model_name}),
)
tests[test_model.name] = test_model
return tests
Expand Down
4 changes: 3 additions & 1 deletion cosmos/dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def run_command(


def extract_message_by_status(
result: dbtRunnerResult, status_levels: list[str] = ["warn"]
result: dbtRunnerResult, status_levels: list[str] | None = None
) -> tuple[list[str], list[str]]:
"""
Extracts messages from the dbt runner result and returns them as a formatted string.
Expand All @@ -121,6 +121,8 @@ def extract_message_by_status(
:return: two lists of strings, the first one containing the node names and the second one
containing the node result message.
"""
status_levels = ["warn"] if status_levels is None else status_levels

node_names = []
node_results = []

Expand Down
2 changes: 2 additions & 0 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run_command(
:param output_encoding: encoding to use for decoding stdout
:param cwd: Working directory to run the command in.
If None (default), the command is run in a temporary directory.
:param process_log_line: A callable to process log line

:return: :class:`namedtuple` containing:
``exit_code``
``output``: the last line from stderr or stdout
Expand Down
2 changes: 2 additions & 0 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def store_dbt_resource_status_from_log(
extracts node status information, and pushes it to XCom for consumption
by downstream watcher sensors.

:param line: A single line from dbt JSON logs.
:param extra_kwargs: Additional keywords arguments.
:param tests_per_model: Mapping of model unique_id to list of test unique_ids
associated with that model, as built by DbtGraph.update_node_dependency().
Empty dict when no tests exist.
Expand Down
5 changes: 1 addition & 4 deletions cosmos/operators/airflow_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ def __init__(
non_async_args |= set(inspect.signature(DbtLocalBaseOperator.__init__).parameters.keys())
non_async_args |= set(inspect.signature(AbstractDbtLocalBase.__init__).parameters.keys())

dbt_kwargs = {}

# Extract full_refresh from kwargs if present
dbt_kwargs["full_refresh"] = kwargs.pop("full_refresh", False)
dbt_kwargs = {"full_refresh": kwargs.pop("full_refresh", False)}

for arg_key, arg_value in kwargs.items():
if arg_key == "task_id":
Expand Down
9 changes: 5 additions & 4 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@

customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig((DBT_PROJECT_PATH).as_posix(), dbt_vars={"var": "2"}),
project_config=ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH.as_posix(),
dbt_vars={"var": "2"},
),
render_config=RenderConfig(
select=["path:seeds/raw_customers.csv"],
enable_mock_profile=False,
Expand All @@ -64,9 +67,7 @@

orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_PROJECT_PATH).as_posix(),
),
project_config=ProjectConfig(DBT_PROJECT_PATH.as_posix()),
render_config=RenderConfig(
select=["path:seeds/raw_orders.csv"],
enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping
Expand Down
2 changes: 1 addition & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ def test_profile_created_correctly_with_profile_mapping(
profile_config=profile_config,
)

assert dbt_graph.load_via_dbt_ls() == None
assert dbt_graph.load_via_dbt_ls() is None


@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False)
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_run_command_without_virtualenv_dir(
invocation_mode=InvocationMode.SUBPROCESS,
vars={"variable": "value"},
)
assert venv_operator.virtualenv_dir == None
assert venv_operator.virtualenv_dir is None
venv_operator.run_command(
cmd=["fake-dbt", "do-something"], env={}, context={"task_instance": MagicMock(), "run_id": "test_run_id"}
)
Expand Down
6 changes: 3 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ def test_old_dag(

def add_logger_if_needed(dag: DAG, ti: TaskInstance):
"""
Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead
Add a formatted logger to the task instance so all logs are surfaced to the command line instead
of into a task file. Since this is a local test run, it is much better for the user to see logs
in the command line, rather than needing to search for a log file.
Args:
ti: The taskinstance that will receive a logger

:param dag: The DAG that will receive a logger
:param ti: The task instance that will receive a logger
"""
logging_format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
Expand Down