Skip to content

Commit

Permalink
Merge pull request #38 from CityOfPhiladelphia/test-no-alter-update-4…
Browse files Browse the repository at this point in the history
…-2024

Redo postgres upload to viewer method to not use any alters
  • Loading branch information
floptical authored Apr 26, 2024
2 parents 92155f4 + 4d697d9 commit c27b657
Showing 1 changed file with 97 additions and 107 deletions.
204 changes: 97 additions & 107 deletions databridge_etl_tools/db2/db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,14 @@ def remove_locks(self, table, schema, lock_type=None):
self.pg_cursor.execute(lock_stmt)
locks = self.pg_cursor.fetchall()
if locks:
print('Locks on table found!!')
self.logger.info('Locks on table found!!')
for p in locks:
pid = p[0]
lock = p[1]
# Note: Can't seem to reliably get what the actual query is, not trusting
# the pid matching between pg_locks and pg_stat_activity.
#query = p[5]
print(f'Killing pid: "{pid}" with lock type "{lock}"')
self.logger.info(f'Killing pid: "{pid}" with lock type "{lock}"')
self.pg_cursor.execute(f'SELECT pg_terminate_backend({pid});')
self.pg_cursor.execute(f'COMMIT')

Expand All @@ -354,6 +354,8 @@ def create_staging_from_enterprise(self):
self.run_ddl()

def copy_to_enterprise(self):
''''Copy from either department table or etl_staging temp table, depending on args passed.'''

get_enterprise_columns_stmt = f'''
SELECT array_agg(COLUMN_NAME::text order by COLUMN_NAME)
FROM information_schema.columns
Expand All @@ -363,9 +365,10 @@ def copy_to_enterprise(self):
self.logger.info("Executing get_enterprise_columns_stmt: " + str(get_enterprise_columns_stmt))
self.pg_cursor.execute(get_enterprise_columns_stmt)
enterprise_columns = [column for column in self.pg_cursor.fetchall()[0]][0]
# Assert we actually end up with a columns list, in case this table is messed up.
assert enterprise_columns

# Figure out what the official OBJECTID is (since there can be multiple like "OBJECTID_1")

get_oid_column_stmt = f'''
SELECT rowid_column FROM sde.sde_table_registry
WHERE table_name = '{self.enterprise_dataset_name}' AND schema = '{self.enterprise_schema}'
Expand All @@ -378,50 +381,80 @@ def copy_to_enterprise(self):
oid_column = oid_column_return[0]
else:
oid_column = None
if oid_column != 'objectid':
self.logger.info(f'Non-standard OID column detected, you should correct this!!: {oid_column}')

print('enterprise_columns: ' + str(enterprise_columns))
print('oid_column: ' + str(oid_column))
# Don't have object_id in their because our new method does not use objectid for the insert.
if oid_column:
enterprise_columns.remove(oid_column)
if 'objectid' in enterprise_columns:
enterprise_columns.remove('objectid')

# Assert we actually end up with a columns list, in case this table is messed up.
assert enterprise_columns

# Metadata column added into postgres tables by arc programs, not needed.
# Metadata column added into postgres tables by arc programs, often empty, not needed.
if 'gdb_geomattr_data' in enterprise_columns:
enterprise_columns.remove('gdb_geomattr_data')

# Get our enterprise columns which we'll use for our insert statement below
enterprise_columns_str = ', '.join(enterprise_columns)
staging_columns = enterprise_columns
print('staging_columns: ' + str(staging_columns))
staging_columns_str = ', '.join(staging_columns)


###############
# First we need to reset the objectid number that next_rowid() pulls from
# so we're not making crazy objectids in the trillions.
# To do this we need to modify the insert delta table of our SDE table.
# Extra checks to see if we're registered
reg_stmt=f'''
SELECT registration_id FROM sde.sde_table_registry
WHERE schema = '{self.enterprise_schema}' AND table_name = '{self.enterprise_dataset_name}'
'''
self.logger.info("Running reg_stmt: " + str(reg_stmt))
self.pg_cursor.execute(reg_stmt)
reg_id_return = self.pg_cursor.fetchone()
# Can be NULL if programmatically registered so treat carefully
# If this table isn't "really" registered, then don't worry about resetting
# the insert delta table because it won't exist and this table is likely not meant to be edited.
if reg_id_return:
print('Table is fully registered, resetting objectid field in delta insert table.')
if isinstance(reg_id_return, list) or isinstance(reg_id_return, tuple):
reg_id = reg_id_return[0]
else:
reg_id = reg_id_return

reg_stmt2 = f"select uuid from sde.gdb_items where name like '%{self.enterprise_schema}.{self.enterprise_dataset_name}';"
print(reg_stmt2)
self.logger.info("Running reg_stmt2: " + str(reg_stmt2))
self.pg_cursor.execute(reg_stmt2)
reg_uuid_return = self.pg_cursor.fetchone()
if isinstance(reg_uuid_return, list) or isinstance(reg_uuid_return, tuple):
reg_uuid = reg_uuid_return[0]
else:
reg_uuid = reg_uuid_return

# Get the objectid sequence associated with the table, also needed for resetting objectid column counter.
# Sometimes this may not exist somehow.
if oid_column:
seq_stmt = f'''
SELECT
S.relname AS sequence_name
FROM
pg_class S
JOIN pg_depend D ON (S.oid = D.objid)
JOIN pg_class T ON (D.refobjid = T.oid)
JOIN pg_namespace n ON (S.relnamespace = n.oid)
WHERE
S.relkind = 'S' AND
n.nspname = '{self.enterprise_schema}' AND
T.relname = '{self.enterprise_dataset_name}' AND
s.relname like '%objectid_seq';
'''

self.pg_cursor.execute(seq_stmt)
seq_return = self.pg_cursor.fetchone()
if not seq_return:
self.logger.warning(f'Could not find the objectid sequence! Ran statement: \n {seq_stmt}')
seq_name = False
if isinstance(seq_return, list) or isinstance(seq_return, tuple):
seq_name = seq_return[0]
else:
seq_name = seq_name
try:
assert seq_name
except AssertionError as e:
self.logger.warning(f'Could not find the objectid sequence! Ran statement: \n {seq_stmt}')
seq_name = False


if reg_id and reg_uuid and oid_column:
fully_registered = True
else:
fully_registered = False

if oid_column and reg_id:
self.logger.info('Resetting SDE delta insert table counter...')

# Get enterprise row_count, needed for resetting the objectid counter in our delta table
# Some pending alters will prevent this from happening. Remove highest level locks only, which will be AccessExclusiveLock.
self.remove_locks(self.enterprise_dataset_name, self.enterprise_schema, lock_type='AccessExclusiveLock')
Expand All @@ -437,22 +470,25 @@ def copy_to_enterprise(self):
self.logger.info("Running reset_stmt: " + str(reset_stmt))
self.pg_cursor.execute(reset_stmt)
self.pg_cursor.execute('COMMIT')
#############
else:
print('Table appears to not be registered(or done so using Rolands hacky registration method). Not resetting objectid field in delta insert table..')
reg_id = None

if not oid_column and reg_id:
raise AssertionError('SDE Registration mismatch! We found an objectid column from sde.sde_table_registry, but could not find a registration id. This table is broken and should be remade!')

# Fields to select from staging
select_fields = staging_columns_str
#############################
# Prepare to insert

###############
# Truncate is not 'MVCC-safe', which means concurrent select transactions will not be able to
# view/select the data during the execution of the update_stmt.
#truncate_stmt = f'''TRUNCATE TABLE {sel.enterprise_schema}.{self.enterprise_dataset_name}'''
# DELETE FROM is 'MVCC-safe'.
# Only remove objectid from columns list if there is a seq stmt because then objectid column will be auto populated
# Otherwise we need to fill in the oid column ourselves.
if oid_column and seq_name:
enterprise_columns.remove(oid_column)
if 'objectid' in enterprise_columns and seq_name:
enterprise_columns.remove('objectid')

# Get our enterprise columns which we'll use for our insert statement below
enterprise_columns_str = ', '.join(enterprise_columns)

self.logger.info('enterprise_columns: ' + str(enterprise_columns))
self.logger.info('oid_column: ' + str(oid_column))

prod_table = f'{self.enterprise_schema}.{self.enterprise_dataset_name}'
# If etl_staging, that means we got data uploaded from S3 or an ArcPy copy
Expand All @@ -464,83 +500,37 @@ def copy_to_enterprise(self):
else:
stage_table = f'{self.copy_from_source_schema}.{self.table_name}'

#################################
# objectid update method
if oid_column and reg_id:
# Update 4-1-2024: My code repeatedly drops the objectid column, but the dropped columns are actually still there and just hidden.
# With a frequently updating dataset, this can eventually result in this error:
# psycopg2.errors.TooManyColumns: tables can have at most 1600 columns
# https://stackoverflow.com/questions/29387569/table-can-have-at-most-1600-columns-in-postgres-openerp/39130447#39130447
# To get around this, we'll copy to a temporary table and then rename at the end.

# If registered and has an objectid columnd, remove that column so the insert happens WAY faster.
# Then recreate the column as a serial so that it populates, then set it back to int4 for SDE to work
# Update: changing to serial back to int4 apparently doesn't work, but it doesn't prevent Pro from opening and viewing
# tables so nvm.
temp_final_table = prod_table + '_aflw_temp'
update_stmt_1 = f'''
BEGIN;
CREATE TABLE {temp_final_table} (LIKE {prod_table} INCLUDING CONSTRAINTS INCLUDING DEFAULTS);
-- Drop our ESRI objectid column so we can insert without any overhead from the objectid column doing stuff
ALTER TABLE {temp_final_table} DROP COLUMN {oid_column};
if fully_registered:
# Reset these these to our row_count so next call to next_rowid() increments without collisions
if seq_name:
self.logger.info("Resetting oid sequence..")
reset_oid_sequence_1 = f"SELECT setval('{self.enterprise_schema}.{seq_name}', 1, false)"
self.logger.info(reset_oid_sequence_1)
self.pg_cursor.execute(reset_oid_sequence_1)
reset_oid_sequence_2 = f"UPDATE {self.enterprise_schema}.i{reg_id} SET base_id=1, last_id={row_count} WHERE id_type = 2;"
self.logger.info("Resetting oid column..")
self.logger.info(reset_oid_sequence_2)
self.pg_cursor.execute(reset_oid_sequence_2)
self.pg_cursor.execute('COMMIT;')

-- Insert from etl_staging (or dept schema) into temp final table.
INSERT INTO {temp_final_table} ({enterprise_columns_str}) SELECT {select_fields} FROM {stage_table};
-- Recreate it as an autoincrementer SERIAL column, it is much much faster,
-- and the values will get populated automagically.
ALTER TABLE {temp_final_table} ADD {oid_column} serial NOT NULL;
END;
'''
update_stmt_2 = f'''
-- We're done the work, now replace the table.
BEGIN;
DROP TABLE {prod_table} CASCADE;
ALTER TABLE {temp_final_table} RENAME TO {self.enterprise_dataset_name};
END;
'''
# If we're actually fully registered then set the delta insert table to our row count.
update_stmt_3 = f'''
-- Set these vals to our row_count so ESRIs next_rowid() increments without collisions
UPDATE {self.enterprise_schema}.i{reg_id} SET base_id={row_count + 1}, last_id={row_count} WHERE id_type = 2;
'''
try:
self.logger.info(update_stmt_1)
self.pg_cursor.execute(update_stmt_1)
# Remove locks just before we perform a rename.
self.remove_locks(self.enterprise_dataset_name, self.enterprise_schema)
self.logger.info(update_stmt_2)
self.pg_cursor.execute(update_stmt_2)
self.logger.info(update_stmt_3)
self.pg_cursor.execute(update_stmt_3)
self.pg_cursor.execute('COMMIT')
#####################
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK')
raise e
###############################
# non-objectid update method
else:
try:
update_stmt = f'''
BEGIN;
-- Truncate our table (won't show until commit)
DELETE FROM {prod_table};
INSERT INTO {prod_table} ({enterprise_columns_str})
SELECT {select_fields}
SELECT {enterprise_columns_str}
FROM {stage_table};
END;
'''
self.logger.info("Running update_stmt: " + str(update_stmt))
try:
self.pg_cursor.execute(update_stmt)
self.pg_cursor.execute('COMMIT')
#####################
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK')
raise e
self.remove_locks(self.enterprise_dataset_name, self.enterprise_schema, lock_type='AccessExclusiveLock')
self.pg_cursor.execute(update_stmt)
self.pg_cursor.execute('COMMIT;')
except psycopg2.Error as e:
self.logger.error(f'Error truncating and inserting into enterprise! Error: {str(e)}')
self.pg_cursor.execute('ROLLBACK;')
raise e

# If successful, drop the etl_staging and old table when we're done to save space.
# NOTE: don't do this for now to make task migration easier -Roland 9/25/2023
Expand Down

0 comments on commit c27b657

Please sign in to comment.