Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,8 @@ def run_extract(
field_delimiter: str = ',',
print_header: bool = True,
labels: Optional[Dict] = None,
) -> str:
return_full_job: bool = False,
) -> Union[str, BigQueryJob]:
"""
Executes a BigQuery extract command to copy data from BigQuery to
Google Cloud Storage. See here:
Expand All @@ -1915,6 +1916,7 @@ def run_extract(
:param print_header: Whether to print a header for a CSV file extract.
:param labels: a dictionary containing labels for the job/query,
passed to BigQuery
:param return_full_job: return full job instead of job id only
"""
warnings.warn(
"This method is deprecated. Please use `BigQueryHook.insert_job` method.", DeprecationWarning
Expand Down Expand Up @@ -1953,6 +1955,8 @@ def run_extract(

job = self.insert_job(configuration=configuration, project_id=self.project_id)
self.running_job_id = job.job_id
if return_full_job:
return job
return job.job_id

def run_query(
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink

if TYPE_CHECKING:
Expand Down Expand Up @@ -115,17 +115,16 @@ def execute(self, context: 'Context'):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
job_id = hook.run_extract(
job: BigQueryJob = hook.run_extract(
source_project_dataset_table=self.source_project_dataset_table,
destination_cloud_storage_uris=self.destination_cloud_storage_uris,
compression=self.compression,
export_format=self.export_format,
field_delimiter=self.field_delimiter,
print_header=self.print_header,
labels=self.labels,
return_full_job=True,
)

job = hook.get_job(job_id=job_id).to_api_repr()
conf = job["configuration"]["extract"]["sourceTable"]
dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"]
BigQueryTableLink.persist(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ def test_execute(self, mock_hook):
field_delimiter=field_delimiter,
print_header=print_header,
labels=labels,
return_full_job=True,
)