Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary key constraint enforcement via SQLalchemy eventing? #1127

Open
jlynchMicron opened this issue Oct 9, 2024 · 0 comments
Open

Primary key constraint enforcement via SQLalchemy eventing? #1127

jlynchMicron opened this issue Oct 9, 2024 · 0 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API.

Comments

@jlynchMicron
Copy link

Is your feature request related to a problem? Please describe.
No.

Describe the solution you'd like
BigQuery does NOT support primary key constraints ("Value constraints for primary keys and foreign keys are not enforced" https://cloud.google.com/bigquery/docs/information-schema-table-constraints). Many users may wish it did for a variety of reasons and may also control interactions to BigQuery via SQLalchemy. With that in mind, what if primary key constraining was controlled in a "best effort" manor via the SQLalchemy event system?

Below is a function I wrote as a 'before_commit' event listener for an application I am working on that may serve as inspiration for a more robust primary key constraint system that could be implemented as a feature of the python-bigquery-sqlalchemy codebase:

def commit_pkey_uniqueness_check(session:sa_orm.Session):
    """
    Function that can verify if a new database item's (row) _pkey does not already exist in the database.
    This function is intended to be used as an SQLalchemy event listener.
    This function is mostly used for enforcing primary key uniqueness in GCP BigQuery.

    Example usage:
        from sqlalchemy import event
        from sqlalchemy.orm import Session

        class tgt_cls:
            _pkey:int
            ...

        event.listen(Session, 'before_commit', commit_pkey_uniqueness_check)
    """

    logging.debug("Running commit_pkey_uniqueness_check for new database item(s).")

    cls_pkey_tracker:Dict[type:List[int]] = {}
    new_items = [item for item in session.new]
    logging.debug(f"Item(s) to check: {len(new_items)}")

    #Make sure there are no repeated pkey's in items list.
    for item in new_items:
        if item.__class__ not in cls_pkey_tracker.keys():
            cls_pkey_tracker[item.__class__] = []

        #Sometimes SA event ordering can cause pkey to not be generated yet for items, via my 'before_insert' event.
        if not item._pkey:
            generate_primary_key(None, None, item)

        if item._pkey not in cls_pkey_tracker[item.__class__]:
            cls_pkey_tracker[item.__class__].append(item._pkey)
        else:
            msg = f"Duplicate primary key detected! There is a repeat primary key in the list of instances being uploaded! pkey: {item._pkey}"
            logging.error(msg)
            raise ValueError(msg)
    
    #Make sure all incoming pkey's are not already in database.
    for cls in cls_pkey_tracker.keys():
        logging.debug(f"Running pkey uniqueness query on ORM class: {cls}")

        #For robustness, could I switch "tbl.c['_pkey']" to "tbl.primary_key"? Could commit this to sqlalchemy_bigquery as a way to enforce primary key constraints for community.
        stmt = sa.select(sa_orm.class_mapper(cls)).where(sa_orm.class_mapper(cls).c['_pkey'] in cls_pkey_tracker[item.__class__])
        result = session.execute(stmt).all()

        if result:
            msg = f'Duplicate primary key detected! There is a repeat primary key(s) in list of instances being uploaded. pkey(s):{result}'
            logging.error(msg)
            raise ValueError(msg)

Caveats: This protection would only work if all database insert/update actions were ran through SQLalchemy and this also does not protect against duplicate simultaneous inserts/updates from multiple processes. For my main use case of daily ETL jobs and single process bulk uploads, this works.

@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API. label Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API.
Projects
None yet
Development

No branches or pull requests

2 participants