From c2c2958886cc3c8a51c4f5fc1a8c36b65921edd9 Mon Sep 17 00:00:00 2001 From: Nicolas Lenepveu Date: Wed, 10 Jan 2024 18:19:24 +0100 Subject: [PATCH] feat: allow to set clustering and time partitioning options at table creation (#928) * refactor: standardize bigquery options handling to manage more options * feat: handle table partitioning, table clustering and more table options (expiration_timestamp, expiration_timestamp, require_partition_filter, default_rounding_mode) via create_table dialect options * fix: having clustering fields and partitioning exposed has table indexes leads to bad autogenerated version file def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.drop_index('clustering', table_name='dataset.some_table') op.drop_index('partition', table_name='dataset.some_table') # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.create_index('partition', 'dataset.some_table', ['createdAt'], unique=False) op.create_index('clustering', 'dataset.some_table', ['id', 'createdAt'], unique=False) # ### end Alembic commands ### * docs: update README to describe how to create clustered and partitioned table as well as other newly supported table options * test: adjust system tests since indexes are no longer populated from table partitions and clustering info * test: alembic now supports creating partitioned tables * test: run integration tests with all the new create_table options * chore: rename variables to represent what it is a bit more clearly * fix: assertions should no be used to validate user inputs * refactor: extract process_option_value() from post_create_table() for improved readability * docs: add docstring to post_create_table() and _process_option_value() * test: increase code coverage by testing error cases * refactor: better represent the distinction between the option value data type check and the transformation in SQL literal * test: adding test cases for _validate_option_value_type() and _process_option_value() * chore: coding style * chore: reformat files with black * test: typo in tests * feat: change the option name for partitioning to leverage the TimePartitioning interface of the Python Client for Google BigQuery * fix: TimePartitioning.field is optional * chore: coding style * test: fix system test with table option bigquery_require_partition_filter * feat: add support for experimental range_partitioning option * test: fix system test with new bigquery_time_partitioning table option * docs: update README with time_partitioning and range_partitioning * test: relevant comments in unit tests * test: cover all error cases * chore: no magic numbers * chore: consistency in docstrings * chore: no magic number * chore: better error types * chore: fix W605 invalid escape sequence --- README.rst | 53 ++- sqlalchemy_bigquery/base.py | 280 +++++++++++-- tests/system/test_alembic.py | 17 +- tests/system/test_sqlalchemy_bigquery.py | 22 +- tests/unit/conftest.py | 7 + tests/unit/test_catalog_functions.py | 13 +- tests/unit/test_table_options.py | 474 +++++++++++++++++++++++ 7 files changed, 799 insertions(+), 67 deletions(-) create mode 100644 tests/unit/test_table_options.py diff --git a/README.rst b/README.rst index a2036289..17534886 100644 --- a/README.rst +++ b/README.rst @@ -292,7 +292,12 @@ To add metadata to a table: .. code-block:: python - table = Table('mytable', ..., bigquery_description='my table description', bigquery_friendly_name='my table friendly name') + table = Table('mytable', ..., + bigquery_description='my table description', + bigquery_friendly_name='my table friendly name', + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_expiration_timestamp=datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) To add metadata to a column: @@ -300,6 +305,52 @@ To add metadata to a column: Column('mycolumn', doc='my column description') +To create a clustered table: + +.. code-block:: python + + table = Table('mytable', ..., bigquery_clustering_fields=["a", "b", "c"]) + +To create a time-unit column-partitioned table: + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning( + field="mytimestamp", + type_="MONTH", + expiration_ms=1000 * 60 * 60 * 24 * 30 * 6, # 6 months + ), + bigquery_require_partition_filter=True, + ) + +To create an ingestion-time partitioned table: + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning(), + bigquery_require_partition_filter=True, + ) + +To create an integer-range partitioned table + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_range_partitioning=bigquery.RangePartitioning( + field="zipcode", + range_=bigquery.PartitionRange(start=0, end=100000, interval=10), + ), + bigquery_require_partition_filter=True, + ) + Threading and Multiprocessing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 03488250..f4266f13 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -19,6 +19,7 @@ """Integration between SQLAlchemy and BigQuery.""" +import datetime from decimal import Decimal import random import operator @@ -27,7 +28,11 @@ from google import auth import google.api_core.exceptions from google.cloud.bigquery import dbapi -from google.cloud.bigquery.table import TableReference +from google.cloud.bigquery.table import ( + RangePartitioning, + TableReference, + TimePartitioning, +) from google.api_core.exceptions import NotFound import packaging.version import sqlalchemy @@ -35,7 +40,7 @@ import sqlalchemy.sql.functions import sqlalchemy.sql.sqltypes import sqlalchemy.sql.type_api -from sqlalchemy.exc import NoSuchTableError +from sqlalchemy.exc import NoSuchTableError, NoSuchColumnError from sqlalchemy import util from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.compiler import ( @@ -631,6 +636,13 @@ def visit_NUMERIC(self, type_, **kw): class BigQueryDDLCompiler(DDLCompiler): + option_datatype_mapping = { + "friendly_name": str, + "expiration_timestamp": datetime.datetime, + "require_partition_filter": bool, + "default_rounding_mode": str, + } + # BigQuery has no support for foreign keys. def visit_foreign_key_constraint(self, constraint): return None @@ -654,26 +666,99 @@ def get_column_specification(self, column, **kwargs): return colspec def post_create_table(self, table): + """ + Constructs additional SQL clauses for table creation in BigQuery. + + This function processes the BigQuery dialect-specific options and generates SQL clauses for partitioning, + clustering, and other table options. + + Args: + table (Table): The SQLAlchemy Table object for which the SQL is being generated. + + Returns: + str: A string composed of SQL clauses for time partitioning, clustering, and other BigQuery specific + options, each separated by a newline. Returns an empty string if no such options are specified. + + Raises: + TypeError: If the time_partitioning option is not a `TimePartitioning` object or if the clustering_fields option is not a list. + NoSuchColumnError: If any field specified in clustering_fields does not exist in the table. + """ + bq_opts = table.dialect_options["bigquery"] - opts = [] - if ("description" in bq_opts) or table.comment: - description = process_string_literal( - bq_opts.get("description", table.comment) + options = {} + clauses = [] + + if ( + bq_opts.get("time_partitioning") is not None + and bq_opts.get("range_partitioning") is not None + ): + raise ValueError( + "biquery_time_partitioning and bigquery_range_partitioning" + " dialect options are mutually exclusive." + ) + + if (time_partitioning := bq_opts.get("time_partitioning")) is not None: + self._raise_for_type( + "time_partitioning", + time_partitioning, + TimePartitioning, ) - opts.append(f"description={description}") - if "friendly_name" in bq_opts: - opts.append( - "friendly_name={}".format( - process_string_literal(bq_opts["friendly_name"]) + if time_partitioning.expiration_ms: + _24hours = 1000 * 60 * 60 * 24 + options["partition_expiration_days"] = ( + time_partitioning.expiration_ms / _24hours ) + + partition_by_clause = self._process_time_partitioning( + table, + time_partitioning, ) - if opts: - return "\nOPTIONS({})".format(", ".join(opts)) + clauses.append(partition_by_clause) - return "" + if (range_partitioning := bq_opts.get("range_partitioning")) is not None: + self._raise_for_type( + "range_partitioning", + range_partitioning, + RangePartitioning, + ) + + partition_by_clause = self._process_range_partitioning( + table, + range_partitioning, + ) + + clauses.append(partition_by_clause) + + if (clustering_fields := bq_opts.get("clustering_fields")) is not None: + self._raise_for_type("clustering_fields", clustering_fields, list) + + for field in clustering_fields: + if field not in table.c: + raise NoSuchColumnError(field) + + clauses.append(f"CLUSTER BY {', '.join(clustering_fields)}") + + if ("description" in bq_opts) or table.comment: + description = bq_opts.get("description", table.comment) + self._validate_option_value_type("description", description) + options["description"] = description + + for option in self.option_datatype_mapping: + if option in bq_opts: + options[option] = bq_opts.get(option) + + if options: + individual_option_statements = [ + "{}={}".format(k, self._process_option_value(v)) + for (k, v) in options.items() + if self._validate_option_value_type(k, v) + ] + clauses.append(f"OPTIONS({', '.join(individual_option_statements)})") + + return " " + "\n".join(clauses) def visit_set_table_comment(self, create): table_name = self.preparer.format_table(create.element) @@ -686,6 +771,152 @@ def visit_drop_table_comment(self, drop): table_name = self.preparer.format_table(drop.element) return f"ALTER TABLE {table_name} SET OPTIONS(description=null)" + def _validate_option_value_type(self, option: str, value): + """ + Validates the type of the given option value against the expected data type. + + Args: + option (str): The name of the option to be validated. + value: The value of the dialect option whose type is to be checked. The type of this parameter + is dynamic and is verified against the expected type in `self.option_datatype_mapping`. + + Returns: + bool: True if the type of the value matches the expected type, or if the option is not found in + `self.option_datatype_mapping`. + + Raises: + TypeError: If the type of the provided value does not match the expected type as defined in + `self.option_datatype_mapping`. + """ + if option in self.option_datatype_mapping: + self._raise_for_type( + option, + value, + self.option_datatype_mapping[option], + ) + + return True + + def _raise_for_type(self, option, value, expected_type): + if type(value) is not expected_type: + raise TypeError( + f"bigquery_{option} dialect option accepts only {expected_type}," + f" provided {repr(value)}" + ) + + def _process_time_partitioning( + self, table: Table, time_partitioning: TimePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. + + Args: + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - time_partitioning (TimePartitioning): The time partitioning details, + including the field to be used for partitioning. + + Returns: + - str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to + partition data on the specified field. + + Example: + - Given a table with a TIMESTAMP type column 'event_timestamp' and setting + 'time_partitioning.field' to 'event_timestamp', the function returns + "PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)". + """ + field = "_PARTITIONDATE" + trunc_fn = "DATE_TRUNC" + + if time_partitioning.field is not None: + field = time_partitioning.field + if isinstance( + table.columns[time_partitioning.field].type, + sqlalchemy.sql.sqltypes.TIMESTAMP, + ): + trunc_fn = "TIMESTAMP_TRUNC" + + return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})" + + def _process_range_partitioning( + self, table: Table, range_partitioning: RangePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a range of integers. + + Args: + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - range_partitioning (RangePartitioning): The RangePartitioning object containing the + partitioning field, range start, range end, and interval. + + Returns: + - str: A SQL string for range partitioning using RANGE_BUCKET and GENERATE_ARRAY functions. + + Raises: + - AttributeError: If the partitioning field is not defined. + - ValueError: If the partitioning field (i.e. column) data type is not an integer. + - TypeError: If the partitioning range start/end values are not integers. + + Example: + "PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 10))" + """ + if range_partitioning.field is None: + raise AttributeError( + "bigquery_range_partitioning expects field to be defined" + ) + + if not isinstance( + table.columns[range_partitioning.field].type, + sqlalchemy.sql.sqltypes.INT, + ): + raise ValueError( + "bigquery_range_partitioning expects field (i.e. column) data type to be INTEGER" + ) + + range_ = range_partitioning.range_ + + if not isinstance(range_.start, int): + raise TypeError( + "bigquery_range_partitioning expects range_.start to be an int," + f" provided {repr(range_.start)}" + ) + + if not isinstance(range_.end, int): + raise TypeError( + "bigquery_range_partitioning expects range_.end to be an int," + f" provided {repr(range_.end)}" + ) + + default_interval = 1 + + return f"PARTITION BY RANGE_BUCKET({range_partitioning.field}, GENERATE_ARRAY({range_.start}, {range_.end}, {range_.interval or default_interval}))" + + def _process_option_value(self, value): + """ + Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. + + Args: + value: The value to be transformed. + + Returns: + The processed value in a format suitable for inclusion in a SQL query. + + Raises: + NotImplementedError: When there is no transformation registered for a data type. + """ + option_casting = { + # Mapping from option type to its casting method + str: lambda x: process_string_literal(x), + int: lambda x: x, + float: lambda x: x, + bool: lambda x: "true" if x else "false", + datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x), + } + + if (option_cast := option_casting.get(type(value))) is not None: + return option_cast(value) + + raise NotImplementedError(f"No transformation registered for {repr(value)}") + def process_string_literal(value): return repr(value.replace("%", "%%")) @@ -997,25 +1228,8 @@ def get_pk_constraint(self, connection, table_name, schema=None, **kw): return {"constrained_columns": []} def get_indexes(self, connection, table_name, schema=None, **kw): - table = self._get_table(connection, table_name, schema) - indexes = [] - if table.time_partitioning: - indexes.append( - { - "name": "partition", - "column_names": [table.time_partitioning.field], - "unique": False, - } - ) - if table.clustering_fields: - indexes.append( - { - "name": "clustering", - "column_names": table.clustering_fields, - "unique": False, - } - ) - return indexes + # BigQuery has no support for indexes. + return [] def get_schema_names(self, connection, **kw): if isinstance(connection, Engine): diff --git a/tests/system/test_alembic.py b/tests/system/test_alembic.py index 1948a19a..30308c68 100644 --- a/tests/system/test_alembic.py +++ b/tests/system/test_alembic.py @@ -23,7 +23,7 @@ from sqlalchemy import Column, DateTime, Integer, String, Numeric import google.api_core.exceptions -from google.cloud.bigquery import SchemaField +from google.cloud.bigquery import SchemaField, TimePartitioning alembic = pytest.importorskip("alembic") @@ -138,15 +138,12 @@ def test_alembic_scenario(alembic_table): op.drop_table("accounts") assert alembic_table("accounts") is None - op.execute( - """ - create table transactions( - account INT64 NOT NULL, - transaction_time DATETIME NOT NULL, - amount NUMERIC(11, 2) NOT NULL - ) - partition by DATE(transaction_time) - """ + op.create_table( + "transactions", + Column("account", Integer, nullable=False), + Column("transaction_time", DateTime(), nullable=False), + Column("amount", Numeric(11, 2), nullable=False), + bigquery_time_partitioning=TimePartitioning(field="transaction_time"), ) op.alter_column("transactions", "amount", nullable=True) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 62b534ff..cccbd4bb 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -22,6 +22,8 @@ import datetime import decimal +from google.cloud.bigquery import TimePartitioning + from sqlalchemy.engine import create_engine from sqlalchemy.schema import Table, MetaData, Column from sqlalchemy.ext.declarative import declarative_base @@ -539,6 +541,14 @@ def test_create_table(engine, bigquery_dataset): Column("binary_c", sqlalchemy.BINARY), bigquery_description="test table description", bigquery_friendly_name="test table name", + bigquery_expiration_timestamp=datetime.datetime(2183, 3, 26, 8, 30, 0), + bigquery_time_partitioning=TimePartitioning( + field="timestamp_c", + expiration_ms=1000 * 60 * 60 * 24 * 30, # 30 days + ), + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["integer_c", "decimal_c"], ) meta.create_all(engine) meta.drop_all(engine) @@ -594,17 +604,7 @@ def test_view_names(inspector, inspector_using_test_dataset, bigquery_dataset): def test_get_indexes(inspector, inspector_using_test_dataset, bigquery_dataset): for _ in [f"{bigquery_dataset}.sample", f"{bigquery_dataset}.sample_one_row"]: indexes = inspector.get_indexes(f"{bigquery_dataset}.sample") - assert len(indexes) == 2 - assert indexes[0] == { - "name": "partition", - "column_names": ["timestamp"], - "unique": False, - } - assert indexes[1] == { - "name": "clustering", - "column_names": ["integer", "string"], - "unique": False, - } + assert len(indexes) == 0 def test_get_columns(inspector, inspector_using_test_dataset, bigquery_dataset): diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index f808b380..6f197196 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -25,6 +25,8 @@ import pytest import sqlalchemy +from sqlalchemy_bigquery.base import BigQueryDDLCompiler, BigQueryDialect + from . import fauxdbi sqlalchemy_version = packaging.version.parse(sqlalchemy.__version__) @@ -91,6 +93,11 @@ def metadata(): return sqlalchemy.MetaData() +@pytest.fixture() +def ddl_compiler(): + return BigQueryDDLCompiler(BigQueryDialect(), None) + + def setup_table(connection, name, *columns, initial_data=(), **kw): metadata = sqlalchemy.MetaData() table = sqlalchemy.Table(name, metadata, *columns, **kw) diff --git a/tests/unit/test_catalog_functions.py b/tests/unit/test_catalog_functions.py index 78614c9f..7eab7b7b 100644 --- a/tests/unit/test_catalog_functions.py +++ b/tests/unit/test_catalog_functions.py @@ -126,18 +126,7 @@ def test_get_indexes(faux_conn): client.tables.foo.time_partitioning = TimePartitioning(field="tm") client.tables.foo.clustering_fields = ["user_email", "store_code"] - assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [ - dict( - name="partition", - column_names=["tm"], - unique=False, - ), - dict( - name="clustering", - column_names=["user_email", "store_code"], - unique=False, - ), - ] + assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [] def test_no_table_pk_constraint(faux_conn): diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py new file mode 100644 index 00000000..2147fb1d --- /dev/null +++ b/tests/unit/test_table_options.py @@ -0,0 +1,474 @@ +# Copyright (c) 2021 The sqlalchemy-bigquery Authors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import datetime +import sqlite3 +import pytest +import sqlalchemy + +from google.cloud.bigquery import ( + PartitionRange, + RangePartitioning, + TimePartitioning, + TimePartitioningType, +) + +from .conftest import setup_table + + +def test_table_expiration_timestamp_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00')" + ) + + +def test_table_default_rounding_mode_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(default_rounding_mode='ROUND_HALF_EVEN')" + ) + + +def test_table_clustering_fields_dialect_option_no_such_column(faux_conn): + with pytest.raises(sqlalchemy.exc.NoSuchColumnError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_clustering_fields=["country", "unknown"], + ) + + +def test_table_clustering_fields_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields=["country", "town"], + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING )" + " CLUSTER BY country, town" + ) + + +def test_table_clustering_fields_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_clustering_fields is not a list + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields="country, town", + ) + + +def test_table_time_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)" + ) + + +def test_table_require_partition_filter_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + bigquery_require_partition_filter=True, + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(require_partition_filter=true)" + ) + + +def test_table_time_partitioning_with_field_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_by_month_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_=TimePartitioningType.MONTH, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, MONTH)" + ) + + +def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )" + " PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=21600000, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(partition_expiration_days=0.25)" + ) + + +def test_table_partitioning_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_time_partitioning is not a TimePartitioning object + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning="DATE(createdAt)", + ) + + +def test_table_range_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=2, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 2))" + ) + + +def test_table_range_partitioning_dialect_option_no_field(faux_conn): + # expect TypeError when bigquery_range_partitioning field is not defined + with pytest.raises( + AttributeError, + match="bigquery_range_partitioning expects field to be defined", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): + # expect ValueError when bigquery_range_partitioning field is not an INTEGER + with pytest.raises( + ValueError, + match=r"bigquery_range_partitioning expects field \(i\.e\. column\) data type to be INTEGER", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_range_missing(faux_conn): + # expect TypeError when bigquery_range_partitioning range start or end is missing + with pytest.raises( + TypeError, + match="bigquery_range_partitioning expects range_.start to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning(field="zipcode"), + ) + + with pytest.raises( + TypeError, + match="bigquery_range_partitioning expects range_.end to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange(start=1), + ), + ) + + +def test_table_range_partitioning_dialect_option_default_interval(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 1))" + ) + + +def test_time_and_range_partitioning_mutually_exclusive(faux_conn): + # expect ValueError when both bigquery_time_partitioning and bigquery_range_partitioning are provided + with pytest.raises(ValueError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_range_partitioning=RangePartitioning(), + bigquery_time_partitioning=TimePartitioning(), + ) + + +def test_table_all_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering and partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["country", "town"], + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=2592000000, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " CLUSTER BY country, town" + " OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" + ) + + +def test_validate_friendly_name_value_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("friendly_name", "Friendly name") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("friendly_name", 1983) + + +def test_validate_expiration_timestamp_value_type(ddl_compiler): + # expect option value to be transformed as a timestamp expression + + assert ddl_compiler._validate_option_value_type( + "expiration_timestamp", + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("expiration_timestamp", "2038-01-01") + + +def test_validate_require_partition_filter_type(ddl_compiler): + # expect option value to be transformed as a literal boolean + + assert ddl_compiler._validate_option_value_type("require_partition_filter", True) + assert ddl_compiler._validate_option_value_type("require_partition_filter", False) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("require_partition_filter", "true") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("require_partition_filter", "false") + + +def test_validate_default_rounding_mode_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type( + "default_rounding_mode", "ROUND_HALF_EVEN" + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("default_rounding_mode", True) + + +def test_validate_unmapped_option_type(ddl_compiler): + # expect option value with no typed specified in mapping to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("unknown", "DEFAULT_IS_STRING") + + +def test_process_str_option_value(ddl_compiler): + # expect string to be transformed as a string expression + assert ddl_compiler._process_option_value("Some text") == "'Some text'" + + +def test_process_datetime_value(ddl_compiler): + # expect datetime object to be transformed as a timestamp expression + assert ( + ddl_compiler._process_option_value( + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00") + ) + == "TIMESTAMP '2038-01-01 00:00:00+00:00'" + ) + + +def test_process_int_option_value(ddl_compiler): + # expect int to be unchanged + assert ddl_compiler._process_option_value(90) == 90 + + +def test_process_boolean_option_value(ddl_compiler): + # expect boolean to be transformed as a literal boolean expression + + assert ddl_compiler._process_option_value(True) == "true" + assert ddl_compiler._process_option_value(False) == "false" + + +def test_process_not_implementer_option_value(ddl_compiler): + # expect to raise + with pytest.raises(NotImplementedError): + ddl_compiler._process_option_value(float)