Skip to content

Commit

Permalink
Splitting up connector code and tests (#5466)
Browse files Browse the repository at this point in the history
  • Loading branch information
galvana authored Dec 7, 2024
1 parent c08f5ee commit b0ef6f2
Show file tree
Hide file tree
Showing 57 changed files with 3,900 additions and 3,649 deletions.
48 changes: 25 additions & 23 deletions src/fides/api/service/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from fides.api.models.connectionconfig import ConnectionConfig as ConnectionConfig
from fides.api.models.connectionconfig import ConnectionType as ConnectionType
from fides.api.service.connectors.base_connector import BaseConnector as BaseConnector
from fides.api.service.connectors.bigquery_connector import (
BigQueryConnector as BigQueryConnector,
)
from fides.api.service.connectors.consent_email_connector import (
GenericConsentEmailConnector,
)
Expand All @@ -29,47 +32,46 @@
from fides.api.service.connectors.fides_connector import (
FidesConnector as FidesConnector,
)
from fides.api.service.connectors.google_cloud_mysql_connector import (
GoogleCloudSQLMySQLConnector as GoogleCloudSQLMySQLConnector,
)
from fides.api.service.connectors.google_cloud_postgres_connector import (
GoogleCloudSQLPostgresConnector as GoogleCloudSQLPostgresConnector,
)
from fides.api.service.connectors.http_connector import HTTPSConnector as HTTPSConnector
from fides.api.service.connectors.manual_webhook_connector import (
ManualWebhookConnector as ManualWebhookConnector,
)
from fides.api.service.connectors.mariadb_connector import (
MariaDBConnector as MariaDBConnector,
)
from fides.api.service.connectors.microsoft_sql_server_connector import (
MicrosoftSQLServerConnector as MicrosoftSQLServerConnector,
)
from fides.api.service.connectors.mongodb_connector import (
MongoDBConnector as MongoDBConnector,
)
from fides.api.service.connectors.mysql_connector import (
MySQLConnector as MySQLConnector,
)
from fides.api.service.connectors.postgres_connector import (
PostgreSQLConnector as PostgreSQLConnector,
)
from fides.api.service.connectors.rds_mysql_connector import (
RDSMySQLConnector as RDSMySQLConnector,
)
from fides.api.service.connectors.rds_postgres_connector import (
RDSPostgresConnector as RDSPostgresConnector,
)
from fides.api.service.connectors.redshift_connector import (
RedshiftConnector as RedshiftConnector,
)
from fides.api.service.connectors.s3_connector import S3Connector
from fides.api.service.connectors.saas_connector import SaaSConnector as SaaSConnector
from fides.api.service.connectors.scylla_connector import (
ScyllaConnector as ScyllaConnector,
)
from fides.api.service.connectors.sql_connector import (
BigQueryConnector as BigQueryConnector,
)
from fides.api.service.connectors.sql_connector import (
GoogleCloudSQLMySQLConnector as GoogleCloudSQLMySQLConnector,
)
from fides.api.service.connectors.sql_connector import (
GoogleCloudSQLPostgresConnector as GoogleCloudSQLPostgresConnector,
)
from fides.api.service.connectors.sql_connector import (
MariaDBConnector as MariaDBConnector,
)
from fides.api.service.connectors.sql_connector import (
MicrosoftSQLServerConnector as MicrosoftSQLServerConnector,
)
from fides.api.service.connectors.sql_connector import MySQLConnector as MySQLConnector
from fides.api.service.connectors.sql_connector import (
PostgreSQLConnector as PostgreSQLConnector,
)
from fides.api.service.connectors.sql_connector import (
RedshiftConnector as RedshiftConnector,
)
from fides.api.service.connectors.sql_connector import (
from fides.api.service.connectors.snowflake_connector import (
SnowflakeConnector as SnowflakeConnector,
)
from fides.api.service.connectors.timescale_connector import (
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/service/connectors/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fides.api.models.connectionconfig import ConnectionConfig, ConnectionTestStatus
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest, RequestTask
from fides.api.service.connectors.query_config import QueryConfig
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.collection_util import Row
from fides.config import CONFIG

Expand Down
158 changes: 158 additions & 0 deletions src/fides/api/service/connectors/bigquery_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from typing import List, Optional

from loguru import logger
from sqlalchemy import text
from sqlalchemy.engine import ( # type: ignore
Connection,
Engine,
LegacyCursorResult,
create_engine,
)
from sqlalchemy.orm import Session
from sqlalchemy.sql import Executable # type: ignore
from sqlalchemy.sql.elements import TextClause

from fides.api.common_exceptions import ConnectionException
from fides.api.graph.execution import ExecutionNode
from fides.api.models.connectionconfig import ConnectionTestStatus
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest, RequestTask
from fides.api.schemas.connection_configuration.connection_secrets_bigquery import (
BigQuerySchema,
)
from fides.api.service.connectors.query_configs.bigquery_query_config import (
BigQueryQueryConfig,
)
from fides.api.service.connectors.query_configs.query_config import SQLQueryConfig
from fides.api.service.connectors.sql_connector import SQLConnector
from fides.api.util.collection_util import Row


class BigQueryConnector(SQLConnector):
"""Connector specific to Google BigQuery"""

secrets_schema = BigQuerySchema

# Overrides BaseConnector.build_uri
def build_uri(self) -> str:
"""Build URI of format"""
config = self.secrets_schema(**self.configuration.secrets or {})
dataset = f"/{config.dataset}" if config.dataset else ""
return f"bigquery://{config.keyfile_creds.project_id}{dataset}" # pylint: disable=no-member

# Overrides SQLConnector.create_client
def create_client(self) -> Engine:
"""
Returns a SQLAlchemy Engine that can be used to interact with Google BigQuery.
Overrides to pass in credentials_info
"""
secrets = self.configuration.secrets or {}
uri = secrets.get("url") or self.build_uri()

keyfile_creds = secrets.get("keyfile_creds", {})
credentials_info = dict(keyfile_creds) if keyfile_creds else {}

return create_engine(
uri,
credentials_info=credentials_info,
hide_parameters=self.hide_parameters,
echo=not self.hide_parameters,
)

# Overrides SQLConnector.query_config
def query_config(self, node: ExecutionNode) -> BigQueryQueryConfig:
"""Query wrapper corresponding to the input execution_node."""

db: Session = Session.object_session(self.configuration)
return BigQueryQueryConfig(
node, SQLConnector.get_namespace_meta(db, node.address.dataset)
)

def partitioned_retrieval(
self,
query_config: SQLQueryConfig,
connection: Connection,
stmt: TextClause,
) -> List[Row]:
"""
Retrieve data against a partitioned table using the partitioning spec configured for this node to execute
multiple queries against the partitioned table.
This is only supported by the BigQueryConnector currently.
NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL,
we should be sure to still support the existing `where_clause` partition definition on
any in-progress DSRs so that they can run through to completion.
"""
if not isinstance(query_config, BigQueryQueryConfig):
raise TypeError(
f"Unexpected query config of type '{type(query_config)}' passed to BigQueryConnector's `partitioned_retrieval`"
)

partition_clauses = query_config.get_partition_clauses()
logger.info(
f"Executing {len(partition_clauses)} partition queries for node '{query_config.node.address}' in DSR execution"
)
rows = []
for partition_clause in partition_clauses:
logger.debug(
f"Executing partition query with partition clause '{partition_clause}'"
)
existing_bind_params = stmt.compile().params
partitioned_stmt = text(f"{stmt} AND ({text(partition_clause)})").params(
existing_bind_params
)
results = connection.execute(partitioned_stmt)
rows.extend(self.cursor_result_to_rows(results))
return rows

# Overrides SQLConnector.test_connection
def test_connection(self) -> Optional[ConnectionTestStatus]:
"""
Overrides SQLConnector.test_connection with a BigQuery-specific connection test.
The connection is tested using the native python client for BigQuery, since that is what's used
by the detection and discovery workflows/codepaths.
TODO: migrate the rest of this class, used for DSR execution, to also make use of the native bigquery client.
"""
try:
bq_schema = BigQuerySchema(**self.configuration.secrets or {})
client = bq_schema.get_client()
all_projects = [project for project in client.list_projects()]
if all_projects:
return ConnectionTestStatus.succeeded
logger.error("No Bigquery Projects found with the provided credentials.")
raise ConnectionException(
"No Bigquery Projects found with the provided credentials."
)
except Exception as e:
logger.exception(f"Error testing connection to remote BigQuery {str(e)}")
raise ConnectionException(f"Connection error: {e}")

def mask_data(
self,
node: ExecutionNode,
policy: Policy,
privacy_request: PrivacyRequest,
request_task: RequestTask,
rows: List[Row],
) -> int:
"""Execute a masking request. Returns the number of records updated or deleted"""
query_config = self.query_config(node)
update_or_delete_ct = 0
client = self.client()
for row in rows:
update_or_delete_stmts: List[Executable] = (
query_config.generate_masking_stmt(
node, row, policy, privacy_request, client
)
)
if update_or_delete_stmts:
with client.connect() as connection:
for update_or_delete_stmt in update_or_delete_stmts:
results: LegacyCursorResult = connection.execute(
update_or_delete_stmt
)
update_or_delete_ct = update_or_delete_ct + results.rowcount
return update_or_delete_ct
5 changes: 4 additions & 1 deletion src/fides/api/service/connectors/dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
DynamoDBSchema,
)
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.query_config import DynamoDBQueryConfig, QueryConfig
from fides.api.service.connectors.query_configs.dynamodb_query_config import (
DynamoDBQueryConfig,
)
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.aws_util import get_aws_session
from fides.api.util.collection_util import Row
from fides.api.util.logger import Pii
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/service/connectors/fides_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from fides.api.schemas.redis_cache import Identity
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.fides.fides_client import FidesClient
from fides.api.service.connectors.query_config import QueryConfig
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.collection_util import Row
from fides.api.util.errors import FidesError

Expand Down
56 changes: 56 additions & 0 deletions src/fides/api/service/connectors/google_cloud_mysql_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import List

import pymysql
from google.cloud.sql.connector import Connector
from google.oauth2 import service_account
from sqlalchemy.engine import Engine, LegacyCursorResult, create_engine # type: ignore

from fides.api.schemas.connection_configuration.connection_secrets_google_cloud_sql_mysql import (
GoogleCloudSQLMySQLSchema,
)
from fides.api.service.connectors.sql_connector import SQLConnector
from fides.api.util.collection_util import Row
from fides.config import get_config

CONFIG = get_config()


class GoogleCloudSQLMySQLConnector(SQLConnector):
"""Connector specific to Google Cloud SQL for MySQL"""

secrets_schema = GoogleCloudSQLMySQLSchema

# Overrides SQLConnector.create_client
def create_client(self) -> Engine:
"""Returns a SQLAlchemy Engine that can be used to interact with a database"""

config = self.secrets_schema(**self.configuration.secrets or {})

credentials = service_account.Credentials.from_service_account_info(
dict(config.keyfile_creds)
)

# initialize connector with the loaded credentials
connector = Connector(credentials=credentials)

def getconn() -> pymysql.connections.Connection:
conn: pymysql.connections.Connection = connector.connect(
config.instance_connection_name,
"pymysql",
user=config.db_iam_user,
db=config.dbname,
enable_iam_auth=True,
)
return conn

return create_engine("mysql+pymysql://", creator=getconn)

@staticmethod
def cursor_result_to_rows(results: LegacyCursorResult) -> List[Row]:
"""results to a list of dictionaries"""
return SQLConnector.default_cursor_result_to_rows(results)

def build_uri(self) -> None:
"""
We need to override this method so it is not abstract anymore, and GoogleCloudSQLMySQLConnector is instantiable.
"""
Loading

0 comments on commit b0ef6f2

Please sign in to comment.