Skip to content

Commit

Permalink
Convert to BeamSchema type from ReadfromBQ (#17159)
Browse files Browse the repository at this point in the history
* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* Delete bigquery.py

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* BEAM-11587 Retrieve table schema and convert it to PCollection Element.

* Revert "BEAM-11587 Retrieve table schema and convert it to PCollection Element."

This reverts commit 8e2863a.

* backout BQ changes

* bigquery_schema_tools.py and bigquery_schema_tools_test.py added

* lint issues

* removed extra var

* lint and tox fixes.

* added integration test + pipeline logic

* assertequals test

* assertequals test

* assertequals test

* docstring issue

* Imported bq to bigquery_schema_tools.py

* integration test with assert_that (integration test might fail)

* fixed integration test

* fixed integration test

* try to fix pickling error

* trying beam.Map instead of function call

* print(coder type and element type)

* trying to use default protocol rather than highest protocol in coder

* look into properties of rowcoder

* add self

* add self

* check annotations

* rowtype schema

* make sure coder and roundtripped are the same

* passing int tests

* The ValueError test should be passing

* use ParDo instead of beam.Map

* use ParDo instead of beam.Map

* Redesign API with flag

* Redesign API with flag

* Redesign API with flag

* Redesign API with flag

* Redesign API with flag (different logic)

* Redesign API with flag refactored logic

* Redesign API with flag refactored logic, incorporated ParDo. Also modified all errors to be key errors.

* Added Public API Test.

* Hardcoded bigquery.py get_table call for integration test.

* Generalize get_table call

* tox fix

* e2e working with bug

* e2e working with bug

* changed w/ semicolon

* passing integration test

* passing integration test

* passing integration test

* only necessary output_types

* Fixes

* Fixes (import table)

* Fixes bq_v2_messages import

* bigquery.py logic refactor, direct read test, move to bq_schema_tools, docstrings, changed imports.

* revert pre- bq refactor

* post- refactor

* passing integration tests

* adjusted int test export

* different impls for expand_output_types

* define table_details only for direct_read expanded output

* dont modify spacing

* added extra unit testing + removed mapping for timestamp + single implementation of expand.

* added extra unit testing + removed mapping for timestamp + single implementation of expand.

* fixed bug in getting project name

* fix spacing

* fix spacing

* unittested + int tested

* unittested + int tested

* divided up error handling.

* renamed tests

* renamed tests

Co-authored-by: svetakvsundhar <[email protected]>
  • Loading branch information
svetakvsundhar and svetak authored Aug 3, 2022
1 parent d357d03 commit 5b1e152
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 5 deletions.
49 changes: 44 additions & 5 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2683,7 +2683,13 @@ class ReadFromBigQuery(PTransform):
to run queries with INTERACTIVE priority. This option is ignored when
reading from a table rather than a query. To learn more about query
priority, see: https://cloud.google.com/bigquery/docs/running-queries
"""
output_type (str): By default, this source yields Python dictionaries
(`PYTHON_DICT`). There is experimental support for producing a
PCollection with a schema and yielding Beam Rows via the option
`BEAM_ROW`. For more information on schemas, see
https://beam.apache.org/documentation/programming-guide/\
#what-is-a-schema)
"""
class Method(object):
EXPORT = 'EXPORT' # This is currently the default.
DIRECT_READ = 'DIRECT_READ'
Expand All @@ -2695,10 +2701,14 @@ def __init__(
gcs_location=None,
method=None,
use_native_datetime=False,
output_type=None,
*args,
**kwargs):
self.method = method or ReadFromBigQuery.Method.EXPORT
self.use_native_datetime = use_native_datetime
self.output_type = output_type
self._args = args
self._kwargs = kwargs

if self.method is ReadFromBigQuery.Method.EXPORT \
and self.use_native_datetime is True:
Expand All @@ -2716,22 +2726,51 @@ def __init__(
if isinstance(gcs_location, str):
gcs_location = StaticValueProvider(str, gcs_location)

if self.output_type == 'BEAM_ROW' and self._kwargs.get('query',
None) is not None:
raise ValueError(
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries.")

self.gcs_location = gcs_location
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
self._args = args
self._kwargs = kwargs

def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
return self._expand_export(pcoll)
output_pcollection = self._expand_export(pcoll)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
return self._expand_direct_read(pcoll)
output_pcollection = self._expand_direct_read(pcoll)

else:
raise ValueError(
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
return self._expand_output_type(output_pcollection)

def _expand_output_type(self, output_pcollection):
if self.output_type == 'PYTHON_DICT' or self.output_type is None:
return output_pcollection
elif self.output_type == 'BEAM_ROW':
table_details = bigquery_tools.parse_table_reference(
table=self._kwargs.get("table", None),
dataset=self._kwargs.get("dataset", None),
project=self._kwargs.get("project", None))
if isinstance(self._kwargs['table'], ValueProvider):
raise TypeError(
'%s: table must be of type string'
'; got ValueProvider instead' % self.__class__.__name__)
elif callable(self._kwargs['table']):
raise TypeError(
'%s: table must be of type string'
'; got a callable instead' % self.__class__.__name__)
return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
convert_to_usertype(
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
project_id=table_details.projectId,
dataset_id=table_details.datasetId,
table_id=table_details.tableId).schema)

def _expand_export(self, pcoll):
# TODO(https://github.com/apache/beam/issues/20683): Make ReadFromBQ rely
Expand Down
80 changes: 80 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import pytest

import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
Expand Down Expand Up @@ -178,6 +180,84 @@ def test_iobase_source(self):
query=query, use_standard_sql=True, project=self.project))
assert_that(result, equal_to(self.TABLE_DATA))

@pytest.mark.it_postcommit
def test_table_schema_retrieve(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
with beam.Pipeline(argv=self.args) as p:
result = (
p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
gcs_location="gs://bqio_schema_test",
dataset="beam_bigquery_io_test",
table="dfsqltable_3c7d6fd5_16e0460dfd0",
project="apache-beam-testing",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))

@pytest.mark.it_postcommit
def test_table_schema_retrieve_specifying_only_table(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
with beam.Pipeline(argv=self.args) as p:
result = (
p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
gcs_location="gs://bqio_schema_test",
table="apache-beam-testing:"
"beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))

@pytest.mark.it_postcommit
def test_table_schema_retrieve_with_direct_read(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table = the_table.schema
utype = bigquery_schema_tools.\
generate_user_type_from_bq_schema(table)
with beam.Pipeline(argv=self.args) as p:
result = (
p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
table="apache-beam-testing:"
"beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
output_type='BEAM_ROW'))
assert_that(
result,
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))


class ReadUsingStorageApiTests(BigQueryReadIntegrationTests):
TABLE_DATA = [{
Expand Down
119 changes: 119 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Tools used tool work with Schema types in the context of BigQuery.
Classes, constants and functions in this file are experimental and have no
backwards compatibility guarantees.
NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
"""

from typing import Optional
from typing import Sequence

import numpy as np

import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.portability.api import schema_pb2

# BigQuery types as listed in
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
BIG_QUERY_TO_PYTHON_TYPES = {
"STRING": str,
"INTEGER": np.int64,
"FLOAT64": np.float64,
"BOOLEAN": bool,
"BYTES": bytes,
#TODO(https://github.com/apache/beam/issues/20810):
# Finish mappings for all BQ types
}


def generate_user_type_from_bq_schema(the_table_schema):
#type: (bigquery.TableSchema) -> type

"""Convert a schema of type TableSchema into a pcollection element.
Args:
the_table_schema: A BQ schema of type TableSchema
Returns:
type: type that can be used to work with pCollections.
"""

the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
the_table_schema)
if the_schema == {}:
raise ValueError("Encountered an empty schema")
dict_of_tuples = []
for i in range(len(the_schema['fields'])):
if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
typ = bq_field_to_type(
the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
else:
raise ValueError(
f"Encountered "
f"an unsupported type: {the_schema['fields'][i]['type']!r}")
# TODO svetaksundhar@: Map remaining BQ types
dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
return usertype


def bq_field_to_type(field, mode):
if mode == 'NULLABLE':
return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
elif mode == 'REPEATED':
return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
elif mode is None or mode == '':
return BIG_QUERY_TO_PYTHON_TYPES[field]
else:
raise ValueError(f"Encountered an unsupported mode: {mode!r}")


def convert_to_usertype(table_schema):
usertype = beam.io.gcp.bigquery_schema_tools. \
generate_user_type_from_bq_schema(table_schema)
return beam.ParDo(
beam.io.gcp.bigquery_schema_tools.BeamSchemaConversionDoFn(usertype))


class BeamSchemaConversionDoFn(beam.DoFn):
# Converting a dictionary of tuples to a usertype.
def __init__(self, pcoll_val_ctor):
self._pcoll_val_ctor = pcoll_val_ctor

def process(self, dict_of_tuples):
yield self._pcoll_val_ctor(**dict_of_tuples)

def infer_output_type(self, input_type):
return self._pcoll_val_ctor

@classmethod
def _from_serialized_schema(cls, schema_str):
return cls(
beam.typehints.schemas.named_tuple_from_schema(
beam.utils.proto_utils.parse_Bytes(schema_str, schema_pb2.Schema)))

def __reduce__(self):
# when pickling, use bytes representation of the schema.
return (
self._from_serialized_schema,
(
beam.typehints.schemas.named_tuple_to_schema(
self._pcoll_val_ctor).SerializeToString(), ))
Loading

0 comments on commit 5b1e152

Please sign in to comment.