Skip to content

Commit

Permalink
Refactor copy_to_enterprise to remove locks in an intelligent way bef…
Browse files Browse the repository at this point in the history
…ore doing alter/rename work
  • Loading branch information
floptical committed Apr 2, 2024
1 parent e55980d commit 5995995
Showing 1 changed file with 87 additions and 37 deletions.
124 changes: 87 additions & 37 deletions databridge_etl_tools/db2/db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,42 @@ def run_ddl(self):
except Exception as e:
raise Exception("DEBUG: " + str(e) + " RETURN: " + str(return_val) + " DDL: " + self.ddl + " check query" + check_stmt)

def remove_locks(self, table, schema, lock_type=None):
# Update 4-2-2024:
# Because we're running ALTER statements, this tries to get an AccessExclusiveLock on the table.
# We cannot get this lock if anyone else is viewing the table through something like dbeaver (but not pro) because
# dbeaver puts an AccessShareLock on the table when holding it open. Which blocks our alter.
# We cannot get an AccessExclusive lock if ANY lock exists on the table, so find and kill them.

lock_stmt = f'''
SELECT pg_locks.pid,
pg_locks.mode,
pg_locks.granted,
pg_class.relname AS table_name,
pg_namespace.nspname AS schema_name,
pg_stat_activity.query AS current_query
FROM pg_locks
JOIN pg_class ON pg_locks.relation = pg_class.oid
JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
WHERE pg_class.relname = '{table}' AND pg_namespace.nspname = '{schema}'
'''
if lock_type:
lock_stmt += f" AND pg_locks.mode = '{lock_type}'"
lock_stmt += ';'

self.pg_cursor.execute(lock_stmt)
locks = self.pg_cursor.fetchall()
if locks:
print('Locks on table found!!')
for p in locks:
pid = p[0]
lock = p[1]
query = p[5]
print(f'Killing query: "{query}" with lock type "{lock}"')
self.pg_cursor.execute(f'SELECT pg_terminate_backend({pid});')
self.pg_cursor.execute(f'COMMIT')

def create_staging_from_enterprise(self):
self.confirm_table_existence()
self.get_table_column_info_from_enterprise()
Expand Down Expand Up @@ -385,6 +421,8 @@ def copy_to_enterprise(self):
reg_id = reg_id_return

# Get enterprise row_count, needed for resetting the objectid counter in our delta table
# Some pending alters will prevent this from happening. Remove highest level locks only, which will be AccessExclusiveLock.
self.remove_locks(self.enterprise_dataset_name, self.enterprise_schema, lock_type='AccessExclusiveLock')
row_count_stmt=f'select count(*) from {self.enterprise_schema}.{self.enterprise_dataset_name}'
self.pg_cursor.execute(row_count_stmt)
row_count = self.pg_cursor.fetchone()[0]
Expand Down Expand Up @@ -424,21 +462,24 @@ def copy_to_enterprise(self):
else:
stage_table = f'{self.copy_from_source_schema}.{self.table_name}'

# Update 4-1-2024: My code repeatedly drops the objectid column, but the dropped columns are actually still there and just hidden.
# With a frequently updating dataset, this can eventually result in this error:
# psycopg2.errors.TooManyColumns: tables can have at most 1600 columns
# https://stackoverflow.com/questions/29387569/table-can-have-at-most-1600-columns-in-postgres-openerp/39130447#39130447
# To get around this, we'll copy to a temporary table and then rename at the end.
#
# If registered and has an objectid columnd, remove that column so the insert happens WAY faster.
# Then recreate the column as a serial so that it populates, then set it back to int4 for SDE to work
# Update: changing to serial back to int4 apparently doesn't work, but it doesn't prevent Pro from opening and viewing
# tables so nvm.
#################################
# objectid update method
if oid_column and reg_id:
# Update 4-1-2024: My code repeatedly drops the objectid column, but the dropped columns are actually still there and just hidden.
# With a frequently updating dataset, this can eventually result in this error:
# psycopg2.errors.TooManyColumns: tables can have at most 1600 columns
# https://stackoverflow.com/questions/29387569/table-can-have-at-most-1600-columns-in-postgres-openerp/39130447#39130447
# To get around this, we'll copy to a temporary table and then rename at the end.

# If registered and has an objectid columnd, remove that column so the insert happens WAY faster.
# Then recreate the column as a serial so that it populates, then set it back to int4 for SDE to work
# Update: changing to serial back to int4 apparently doesn't work, but it doesn't prevent Pro from opening and viewing
# tables so nvm.
temp_final_table = prod_table + '_aflw_temp'
new_update_stmt = f'''
update_stmt_1 = f'''
BEGIN;
CREATE TABLE {temp_final_table} (LIKE {prod_table} INCLUDING CONSTRAINTS INCLUDING DEFAULTS);
-- Drop our ESRI objectid column so we can insert without any overhead from the objectid column doing stuff
ALTER TABLE {temp_final_table} DROP COLUMN {oid_column};
Expand All @@ -448,15 +489,36 @@ def copy_to_enterprise(self):
-- Recreate it as an autoincrementer SERIAL column, it is much much faster,
-- and the values will get populated automagically.
ALTER TABLE {temp_final_table} ADD {oid_column} serial NOT NULL;
-- We're done the work, now replace the table.
DROP TABLE {prod_table};
END;
'''
update_stmt_2 = f'''
-- We're done the work, now replace the table.
BEGIN;
DROP TABLE {prod_table} CASCADE;
ALTER TABLE {temp_final_table} RENAME TO {self.enterprise_dataset_name};
END;
'''
# non-objectid
# If we're actually fully registered then set the delta insert table to our row count.
update_stmt_3 = f'''
-- Set these vals to our row_count so ESRIs next_rowid() increments without collisions
UPDATE {self.enterprise_schema}.i{reg_id} SET base_id={row_count + 1}, last_id={row_count} WHERE id_type = 2;
'''
try:
self.pg_cursor.execute(update_stmt_1)
# Remove locks just before we perform a rename.
self.remove_locks(self.enterprise_dataset_name, self.enterprise_schema)
self.pg_cursor.execute(update_stmt_2)
self.pg_cursor.execute(update_stmt_3)
self.pg_cursor.execute('COMMIT')
#####################
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK')
raise e
###############################
# non-objectid update method
else:
new_update_stmt = f'''
update_stmt = f'''
BEGIN;
-- Truncate our table (won't show until commit)
DELETE FROM {prod_table};
Expand All @@ -465,27 +527,15 @@ def copy_to_enterprise(self):
FROM {stage_table}
END;
'''

# If we're actually fully registered then set the delta insert table to our row count.
if reg_id:
new_update_stmt += f'''
-- Set these vals to our row_count so ESRIs next_rowid() increments without collisions
UPDATE {self.enterprise_schema}.i{reg_id} SET base_id={row_count + 1}, last_id={row_count} WHERE id_type = 2;
'''

self.logger.info("Running update_stmt: " + str(new_update_stmt))
try:
#####################
# The big cahooney, run our large delete and insert statement which won't
# show any differences until we commit.
#####################
self.pg_cursor.execute(new_update_stmt)
self.pg_cursor.execute('COMMIT')
#####################
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK')
raise e
self.logger.info("Running update_stmt: " + str(update_stmt))
try:
self.pg_cursor.execute(update_stmt)
self.pg_cursor.execute('COMMIT')
#####################
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK')
raise e

# If successful, drop the etl_staging and old table when we're done to save space.
# NOTE: don't do this for now to make task migration easier -Roland 9/25/2023
Expand Down

0 comments on commit 5995995

Please sign in to comment.