Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenLineage method in BigQueryToBigQueryOperator #44248

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,47 @@ def execute(self, context: Context) -> None:
project_id=conf["projectId"],
table_id=conf["tableId"],
)

def get_openlineage_facets_on_complete(self, task_instance):
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
)
from airflow.providers.google.cloud.openlineage.utils import (
get_facets_from_bq_table,
get_identity_column_lineage_facet,
)
from airflow.providers.openlineage.extractors import OperatorLineage

if not self.hook:
self.hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)

input_datasets = []
for source_project_dataset_table in self.source_project_dataset_tables:
source_table_object = self.hook.get_client(self.hook.project_id).get_table(source_project_dataset_table)
input_dataset_facets = get_facets_from_bq_table(source_table_object)

input_datasets.append(Dataset(
namespace="bigquery",
name=str(source_table_object.reference),
facets=input_dataset_facets,
))

dest_table_object = self.hook.get_client(self.hook.project_id).get_table(self.destination_project_dataset_table)
output_dataset_facets = get_facets_from_bq_table(dest_table_object)

output_dataset_facets["columnLineage"] = get_identity_column_lineage_facet(
field_names=[field.name for field in dest_table_object.schema], input_datasets=input_datasets
)

output_dataset = Dataset(
namespace="bigquery",
name=str(dest_table_object.reference),
facets=output_dataset_facets,
)

return OperatorLineage(inputs=input_datasets, outputs=[output_dataset])
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from unittest import mock

from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
from google.cloud.bigquery.table import Table

BQ_HOOK_PATH = "airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook"
TASK_ID = "test-bq-create-table-operator"
Expand Down Expand Up @@ -50,6 +51,26 @@ def split_tablename_side_effect(*args, **kwargs):
)


def get_table_side_effect(*args, **kwargs):
dataset_id = ""
if args[0] == SOURCE_PROJECT_DATASET_TABLES:
dataset_id = TEST_DATASET
elif args[0] == DESTINATION_PROJECT_DATASET_TABLE:
dataset_id = TEST_DATASET + '_new'
return Table.from_api_repr(
{
"tableReference": {"projectId": TEST_GCP_PROJECT_ID, "datasetId": dataset_id, "tableId": TEST_TABLE_ID},
"description": "table description",
"schema": {
"fields": [
{"name": "field1", "type": "STRING", "description": "field1 description"},
{"name": "field2", "type": "INTEGER", "description": "field2 description"},
]
},
}
)


class TestBigQueryToBigQueryOperator:
@mock.patch(BQ_HOOK_PATH)
def test_execute_without_location_should_execute_successfully(self, mock_hook):
Expand Down Expand Up @@ -110,3 +131,23 @@ def test_execute_single_regional_location_should_execute_successfully(self, mock
job_id=mock_hook.return_value.insert_job.return_value.job_id,
location=location,
)

@mock.patch(BQ_HOOK_PATH)
def test_get_openlineage_facets_on_complete(self, mock_hook):
operator = BigQueryToBigQueryOperator(
task_id=TASK_ID,
source_project_dataset_tables=SOURCE_PROJECT_DATASET_TABLES,
destination_project_dataset_table=DESTINATION_PROJECT_DATASET_TABLE,
)
mock_hook.return_value.split_tablename.side_effect = split_tablename_side_effect
mock_hook.return_value.get_client.return_value.get_table.side_effect = get_table_side_effect

operator.execute(context=mock.MagicMock())

lineage = operator.get_openlineage_facets_on_complete(None)
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == SOURCE_PROJECT_DATASET_TABLES
assert lineage.inputs[0].namespace == "bigquery"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == DESTINATION_PROJECT_DATASET_TABLE
assert lineage.outputs[0].namespace == "bigquery"