Skip to content

Commit cba8d62

Browse files
turbaszekkaxil
andauthored
Refactor list rendering in commands (#12704)
This commit unifies the mechanism of rendering output of tabular data. This gives users a possibility to eiter display a tabular representation of data or render it as valid json or yaml payload. Closes: #12699 Co-authored-by: Kaxil Naik <[email protected]>
1 parent ae0e8f4 commit cba8d62

23 files changed

+413
-272
lines changed

UPDATING.md

+34
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,40 @@ assists users migrating to a new version.
5252

5353
## Master
5454

55+
### Changes to output argument in commands
56+
57+
From Airflow 2.0, We are replacing [tabulate](https://pypi.org/project/tabulate/) with [rich](https://github.com/willmcgugan/rich) to render commands output. Due to this change, the `--output` argument
58+
will no longer accept formats of tabulate tables. Instead, it now accepts:
59+
60+
- `table` - will render the output in predefined table
61+
- `json` - will render the output as a json
62+
- `yaml` - will render the output as yaml
63+
64+
By doing this we increased consistency and gave users possibility to manipulate the
65+
output programmatically (when using json or yaml).
66+
67+
Affected commands:
68+
69+
- `airflow dags list`
70+
- `airflow dags report`
71+
- `airflow dags list-runs`
72+
- `airflow dags list-jobs`
73+
- `airflow connections list`
74+
- `airflow connections get`
75+
- `airflow pools list`
76+
- `airflow pools get`
77+
- `airflow pools set`
78+
- `airflow pools delete`
79+
- `airflow pools import`
80+
- `airflow pools export`
81+
- `airflow role list`
82+
- `airflow providers list`
83+
- `airflow providers get`
84+
- `airflow providers hooks`
85+
- `airflow tasks states-for-dag-run`
86+
- `airflow users list`
87+
- `airflow variables list`
88+
5589
### Azure Wasb Hook does not work together with Snowflake hook
5690

5791
The WasbHook in Apache Airflow use a legacy version of Azure library. While the conflict is not

airflow/cli/cli_parser.py

+6-11
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
from functools import lru_cache
2727
from typing import Callable, Dict, Iterable, List, NamedTuple, Optional, Set, Union
2828

29-
from tabulate import tabulate_formats
30-
3129
from airflow import settings
3230
from airflow.cli.commands.legacy_commands import check_legacy_command
3331
from airflow.configuration import conf
@@ -176,13 +174,10 @@ def positive_int(value):
176174
)
177175
ARG_OUTPUT = Arg(
178176
("--output",),
179-
help=(
180-
"Output table format. The specified value is passed to "
181-
"the tabulate module (https://pypi.org/project/tabulate/). "
182-
),
183-
metavar="FORMAT",
184-
choices=tabulate_formats,
185-
default="plain",
177+
help=("Output format. Allowed values: json, yaml, table (default: table)"),
178+
metavar="(table, json, yaml)",
179+
choices=("table", "json", "yaml"),
180+
default="table",
186181
)
187182
ARG_COLOR = Arg(
188183
('--color',),
@@ -1033,7 +1028,7 @@ class GroupCommand(NamedTuple):
10331028
name='list',
10341029
help='List variables',
10351030
func=lazy_load_command('airflow.cli.commands.variable_command.variables_list'),
1036-
args=(),
1031+
args=(ARG_OUTPUT,),
10371032
),
10381033
ActionCommand(
10391034
name='get',
@@ -1110,7 +1105,7 @@ class GroupCommand(NamedTuple):
11101105
name='get',
11111106
help='Get a connection',
11121107
func=lazy_load_command('airflow.cli.commands.connection_command.connections_get'),
1113-
args=(ARG_CONN_ID, ARG_COLOR),
1108+
args=(ARG_CONN_ID, ARG_COLOR, ARG_OUTPUT),
11141109
),
11151110
ActionCommand(
11161111
name='list',

airflow/cli/commands/cheat_sheet_command.py

+2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
from airflow.cli.cli_parser import ActionCommand, GroupCommand, airflow_commands
2323
from airflow.cli.simple_table import SimpleTable
24+
from airflow.utils.cli import suppress_logs_and_warning
2425
from airflow.utils.helpers import partition
2526

2627

28+
@suppress_logs_and_warning()
2729
def cheat_sheet(args):
2830
"""Display cheat-sheet."""
2931
display_commands_index()

airflow/cli/commands/connection_command.py

+30-52
Original file line numberDiff line numberDiff line change
@@ -19,78 +19,54 @@
1919
import json
2020
import os
2121
import sys
22-
from typing import List
22+
from typing import Any, Dict, List
2323
from urllib.parse import urlparse, urlunparse
2424

25-
import pygments
2625
import yaml
27-
from pygments.lexers.data import YamlLexer
2826
from sqlalchemy.orm import exc
29-
from tabulate import tabulate
3027

28+
from airflow.cli.simple_table import AirflowConsole
3129
from airflow.exceptions import AirflowNotFoundException
3230
from airflow.hooks.base_hook import BaseHook
3331
from airflow.models import Connection
3432
from airflow.utils import cli as cli_utils
35-
from airflow.utils.cli import should_use_colors
36-
from airflow.utils.code_utils import get_terminal_formatter
33+
from airflow.utils.cli import suppress_logs_and_warning
3734
from airflow.utils.session import create_session
3835

3936

40-
def _tabulate_connection(conns: List[Connection], tablefmt: str):
41-
tabulate_data = [
42-
{
43-
'Conn Id': conn.conn_id,
44-
'Conn Type': conn.conn_type,
45-
'Description': conn.description,
46-
'Host': conn.host,
47-
'Port': conn.port,
48-
'Is Encrypted': conn.is_encrypted,
49-
'Is Extra Encrypted': conn.is_encrypted,
50-
'Extra': conn.extra,
51-
}
52-
for conn in conns
53-
]
54-
55-
msg = tabulate(tabulate_data, tablefmt=tablefmt, headers='keys')
56-
return msg
57-
58-
59-
def _yamulate_connection(conn: Connection):
60-
yaml_data = {
61-
'Id': conn.id,
62-
'Conn Id': conn.conn_id,
63-
'Conn Type': conn.conn_type,
64-
'Description': conn.description,
65-
'Host': conn.host,
66-
'Schema': conn.schema,
67-
'Login': conn.login,
68-
'Password': conn.password,
69-
'Port': conn.port,
70-
'Is Encrypted': conn.is_encrypted,
71-
'Is Extra Encrypted': conn.is_encrypted,
72-
'Extra': conn.extra_dejson,
73-
'URI': conn.get_uri(),
37+
def _connection_mapper(conn: Connection) -> Dict[str, Any]:
38+
return {
39+
'id': conn.id,
40+
'conn_id': conn.conn_id,
41+
'conn_type': conn.conn_type,
42+
'description': conn.description,
43+
'host': conn.host,
44+
'schema': conn.schema,
45+
'login': conn.login,
46+
'password': conn.password,
47+
'port': conn.port,
48+
'is_encrypted': conn.is_encrypted,
49+
'is_extra_encrypted': conn.is_encrypted,
50+
'extra_dejson': conn.extra_dejson,
51+
'get_uri': conn.get_uri(),
7452
}
75-
return yaml.safe_dump(yaml_data, sort_keys=False)
7653

7754

55+
@suppress_logs_and_warning()
7856
def connections_get(args):
7957
"""Get a connection."""
8058
try:
8159
conn = BaseHook.get_connection(args.conn_id)
8260
except AirflowNotFoundException:
8361
raise SystemExit("Connection not found.")
84-
85-
yaml_content = _yamulate_connection(conn)
86-
if should_use_colors(args):
87-
yaml_content = pygments.highlight(
88-
code=yaml_content, formatter=get_terminal_formatter(), lexer=YamlLexer()
89-
)
90-
91-
print(yaml_content)
62+
AirflowConsole().print_as(
63+
data=[conn],
64+
output=args.output,
65+
mapper=_connection_mapper,
66+
)
9267

9368

69+
@suppress_logs_and_warning()
9470
def connections_list(args):
9571
"""Lists all connections at the command line"""
9672
with create_session() as session:
@@ -99,9 +75,11 @@ def connections_list(args):
9975
query = query.filter(Connection.conn_id == args.conn_id)
10076
conns = query.all()
10177

102-
tablefmt = args.output
103-
msg = _tabulate_connection(conns, tablefmt)
104-
print(msg)
78+
AirflowConsole().print_as(
79+
data=conns,
80+
output=args.output,
81+
mapper=_connection_mapper,
82+
)
10583

10684

10785
def _format_connections(conns: List[Connection], fmt: str) -> str:

airflow/cli/commands/dag_command.py

+52-46
Original file line numberDiff line numberDiff line change
@@ -16,60 +16,38 @@
1616
# under the License.
1717

1818
"""Dag sub-commands"""
19+
import ast
1920
import errno
2021
import json
2122
import logging
2223
import signal
2324
import subprocess
2425
import sys
25-
from typing import List
2626

2727
from graphviz.dot import Dot
28-
from tabulate import tabulate
2928

3029
from airflow import settings
3130
from airflow.api.client import get_current_api_client
31+
from airflow.cli.simple_table import AirflowConsole
3232
from airflow.configuration import conf
3333
from airflow.exceptions import AirflowException, BackfillUnfinished
3434
from airflow.executors.debug_executor import DebugExecutor
3535
from airflow.jobs.base_job import BaseJob
3636
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
3737
from airflow.models.dag import DAG
3838
from airflow.utils import cli as cli_utils
39-
from airflow.utils.cli import get_dag, get_dag_by_file_location, process_subdir, sigint_handler
39+
from airflow.utils.cli import (
40+
get_dag,
41+
get_dag_by_file_location,
42+
process_subdir,
43+
sigint_handler,
44+
suppress_logs_and_warning,
45+
)
4046
from airflow.utils.dot_renderer import render_dag
4147
from airflow.utils.session import create_session, provide_session
4248
from airflow.utils.state import State
4349

4450

45-
def _tabulate_dag_runs(dag_runs: List[DagRun], tablefmt: str = "fancy_grid") -> str:
46-
tabulate_data = (
47-
{
48-
'ID': dag_run.id,
49-
'Run ID': dag_run.run_id,
50-
'State': dag_run.state,
51-
'DAG ID': dag_run.dag_id,
52-
'Execution date': dag_run.execution_date.isoformat(),
53-
'Start date': dag_run.start_date.isoformat() if dag_run.start_date else '',
54-
'End date': dag_run.end_date.isoformat() if dag_run.end_date else '',
55-
}
56-
for dag_run in dag_runs
57-
)
58-
return tabulate(tabular_data=tabulate_data, tablefmt=tablefmt)
59-
60-
61-
def _tabulate_dags(dags: List[DAG], tablefmt: str = "fancy_grid") -> str:
62-
tabulate_data = (
63-
{
64-
'DAG ID': dag.dag_id,
65-
'Filepath': dag.filepath,
66-
'Owner': dag.owner,
67-
}
68-
for dag in sorted(dags, key=lambda d: d.dag_id)
69-
)
70-
return tabulate(tabular_data=tabulate_data, tablefmt=tablefmt, headers='keys')
71-
72-
7351
@cli_utils.action_logging
7452
def dag_backfill(args, dag=None):
7553
"""Creates backfill job or dry run for a DAG"""
@@ -293,21 +271,41 @@ def dag_next_execution(args):
293271

294272

295273
@cli_utils.action_logging
274+
@suppress_logs_and_warning()
296275
def dag_list_dags(args):
297276
"""Displays dags with or without stats at the command line"""
298277
dagbag = DagBag(process_subdir(args.subdir))
299-
dags = dagbag.dags.values()
300-
print(_tabulate_dags(dags, tablefmt=args.output))
278+
AirflowConsole().print_as(
279+
data=sorted(dagbag.dags.values(), key=lambda d: d.dag_id),
280+
output=args.output,
281+
mapper=lambda x: {
282+
"dag_id": x.dag_id,
283+
"filepath": x.filepath,
284+
"owner": x.owner,
285+
},
286+
)
301287

302288

303289
@cli_utils.action_logging
290+
@suppress_logs_and_warning()
304291
def dag_report(args):
305292
"""Displays dagbag stats at the command line"""
306293
dagbag = DagBag(process_subdir(args.subdir))
307-
print(tabulate(dagbag.dagbag_stats, headers="keys", tablefmt=args.output))
294+
AirflowConsole().print_as(
295+
data=dagbag.dagbag_stats,
296+
output=args.output,
297+
mapper=lambda x: {
298+
"file": x.file,
299+
"duration": x.duration,
300+
"dag_num": x.dag_num,
301+
"task_num": x.task_num,
302+
"dags": sorted(ast.literal_eval(x.dags)),
303+
},
304+
)
308305

309306

310307
@cli_utils.action_logging
308+
@suppress_logs_and_warning()
311309
def dag_list_jobs(args, dag=None):
312310
"""Lists latest n jobs"""
313311
queries = []
@@ -324,6 +322,7 @@ def dag_list_jobs(args, dag=None):
324322
if args.state:
325323
queries.append(BaseJob.state == args.state)
326324

325+
fields = ['dag_id', 'state', 'job_type', 'start_date', 'end_date']
327326
with create_session() as session:
328327
all_jobs = (
329328
session.query(BaseJob)
@@ -332,15 +331,16 @@ def dag_list_jobs(args, dag=None):
332331
.limit(args.limit)
333332
.all()
334333
)
335-
fields = ['dag_id', 'state', 'job_type', 'start_date', 'end_date']
336-
all_jobs = [[job.__getattribute__(field) for field in fields] for job in all_jobs]
337-
msg = tabulate(
338-
all_jobs, [field.capitalize().replace('_', ' ') for field in fields], tablefmt=args.output
339-
)
340-
print(msg)
334+
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]
335+
336+
AirflowConsole().print_as(
337+
data=all_jobs,
338+
output=args.output,
339+
)
341340

342341

343342
@cli_utils.action_logging
343+
@suppress_logs_and_warning()
344344
def dag_list_dag_runs(args, dag=None):
345345
"""Lists dag runs for a given DAG"""
346346
if dag:
@@ -361,13 +361,19 @@ def dag_list_dag_runs(args, dag=None):
361361
execution_end_date=args.end_date,
362362
)
363363

364-
if not dag_runs:
365-
print(f'No dag runs for {args.dag_id}')
366-
return
367-
368364
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
369-
table = _tabulate_dag_runs(dag_runs, tablefmt=args.output)
370-
print(table)
365+
AirflowConsole().print_as(
366+
data=dag_runs,
367+
output=args.output,
368+
mapper=lambda dr: {
369+
"dag_id": dr.dag_id,
370+
"run_id": dr.run_id,
371+
"state": dr.state,
372+
"execution_date": dr.execution_date.isoformat(),
373+
"start_date": dr.start_date.isoformat() if dr.start_date else '',
374+
"end_date": dr.end_date.isoformat() if dr.end_date else '',
375+
},
376+
)
371377

372378

373379
@provide_session

0 commit comments

Comments
 (0)