Skip to content

Commit

Permalink
Merge pull request #37 from CityOfPhiladelphia/oracle-write-4-2024
Browse files Browse the repository at this point in the history
2 new oracle functions, reload table from CSV in one transaction. Or append to existing table from CSV.
  • Loading branch information
floptical authored Apr 23, 2024
2 parents 3557358 + 54f1e2a commit 92155f4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_pr_build.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ame: Build and test docker image only for PRs
name: Build and test docker image only for PRs

on:
pull_request:
Expand Down
31 changes: 31 additions & 0 deletions databridge_etl_tools/oracle/_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import boto3

def _interact_with_s3(self, method: str, path: str, s3_key: str):
'''
- method should be one of "get", "load"
'''
self.logger.info(f"{method.upper()}-ing file: s3://{self.s3_bucket}/{s3_key}")

s3 = boto3.resource('s3')
if method == 'get':
s3.Object(self.s3_bucket, s3_key).download_file(path)
self.logger.info(f'File successfully downloaded from S3 to {path}\n')
elif method == 'load':
s3.Object(self.s3_bucket, s3_key).put(Body=open(path, 'rb'))
self.logger.info(f'File successfully uploaded from {path} to S3\n')

def get_json_schema_from_s3(self):
_interact_with_s3(self, 'get', self.json_schema_path, self.json_schema_s3_key)

def get_csv_from_s3(self):
_interact_with_s3(self, 'get', self.csv_path, self.s3_key)

def load_json_schema_to_s3(self):
with open(self.json_schema_path, 'w') as f:
f.write(self.export_json_schema)

_interact_with_s3(self, 'load', self.json_schema_path, self.json_schema_s3_key)

def load_csv_to_s3(self, path):
'''Path of file to load - generally should be self.csv_path'''
_interact_with_s3(self, 'load', path, self.s3_key)
117 changes: 114 additions & 3 deletions databridge_etl_tools/oracle/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import petl as etl
import geopetl
import json
import hashlib



class Oracle():
Expand All @@ -16,8 +18,9 @@ class Oracle():
_json_schema_path = None
_fields = None
_row_count = None
from ._s3 import (get_csv_from_s3)

def __init__(self, connection_string, table_name, table_schema, s3_bucket, s3_key):
def __init__(self, connection_string, table_name, table_schema, s3_bucket, s3_key, **kwargs):
self.connection_string = connection_string
self.table_name = table_name
self.table_schema = table_schema
Expand Down Expand Up @@ -264,5 +267,113 @@ def extract(self):

self.logger.info('Successfully extracted from {}'.format(self.schema_table_name))

def write(self):
raise NotImplementedError
def append(self):
'''append a csv into a table.'''
self.get_csv_from_s3()
print('loading CSV into geopetl..')
rows = etl.fromcsv(self.csv_path)
num_rows_in_csv = rows.nrows()
if num_rows_in_csv == 0:
raise AssertionError('Error! Dataset is empty? Line count of CSV is 0.')
print(f'Rows: {num_rows_in_csv}')
interval = int(num_rows_in_csv / 10)

print(f"Loading CSV into Oracle table '{self.table_schema.upper()}.{self.table_name.upper()}..")
rows.progress(interval).appendoraclesde(self.conn, f'{self.table_schema.upper()}.{self.table_name.upper()}')

def load(self):
'''Copy CSV into table by first inserting into a temp table (_T affix) and then deleting and inserting into table in one transaction.'''
self.get_csv_from_s3()
print('loading CSV into geopetl..')
rows = etl.fromcsv(self.csv_path)
num_rows_in_csv = rows.nrows()
if num_rows_in_csv == 0:
raise AssertionError('Error! Dataset is empty? Line count of CSV is 0.')
print(f'Rows: {num_rows_in_csv}')
# Interval to print progress
interval = int(num_rows_in_csv / 10)

# Get columns from prod oracle table
cursor = self.conn.cursor()
cols_stmt = f'''SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
FROM all_tab_cols
WHERE table_name = '{self.table_name.upper()}'
AND owner = '{self.table_schema.upper()}'
AND column_name not like 'SYS_%'
'''
cursor.execute(cols_stmt)
cols = cursor.fetchall()[0][0]
# Detect if registered through existence of objectid column
sde_registered = False
if 'OBJECTID_' in cols:
raise AssertionError('Nonstandard OBJECTID columm detected! Please correct your objectid column to be named just "OBJECTID"!!')
if 'OBJECTID' in cols:
sde_registered = True
print('objectid found, assuming sde registered.')
cols = cols.replace('OBJECTID,', '')
cols = cols.replace(', OBJECTID', '')

# Create a temp table name exactly 30 characters in length so we don't go over oracle 11g's table name limit
# and then hash it so that it's unique to our table name.
hashed = hashlib.sha256(self.table_name.encode()).hexdigest()
temp_table_name = self.table_schema.upper() + '.TMP_' + hashed[:26].upper()
# Create as the user we're logged in as, so we have sufficient perms to make the table.
cursor.execute('select user from dual')
running_user = cursor.fetchone()[0]
temp_table_name = running_user + '.TMP_' + hashed[:26].upper()

if running_user != 'SDE' and self.table_schema.upper() != running_user:
raise Exception(f'Must run this as schema owner or as SDE user, please adjust your connection string! Running user: {running_user}')

try:
# Create temp table to hold columns, minus any possible objectid name
tmp_table_made = False
tmp_tbl_stmt = f'''CREATE TABLE {temp_table_name} AS
SELECT {cols}
FROM {self.table_schema.upper()}.{self.table_name.upper()}
'''
print(tmp_tbl_stmt)
cursor.execute(tmp_tbl_stmt)
cursor.execute('COMMIT')
tmp_table_made = True

print(f'Loading CSV into {temp_table_name} (note that first printed progress rows are just loading csv into petl object)..')
# Remove objectid because we're loading into temp table first minus objectid
if sde_registered:
rows_mod = etl.cutout(rows, 'objectid')
rows_mod.progress(interval).tooraclesde(self.conn, temp_table_name)
else:
rows.progress(interval).tooraclesde(self.conn, temp_table_name)

if sde_registered:
copy_into_cols = 'OBJECTID, ' + cols
copy_stmt = f'''
INSERT INTO {self.table_schema.upper()}.{self.table_name.upper()} ({copy_into_cols})
SELECT SDE.GDB_UTIL.NEXT_ROWID('{self.table_schema.upper()}', '{self.table_name.upper()}'), {cols}
FROM {temp_table_name}
'''
else:
copy_into_cols = cols
copy_stmt = f'''
INSERT INTO {self.table_schema.upper()}.{self.table_name.upper()} ({copy_into_cols})
SELECT {cols}
FROM {temp_table_name}
'''
print('Begin copying from temp into final table..')
print(copy_stmt)
cursor.execute(f'DELETE FROM {self.table_schema.upper()}.{self.table_name.upper()}')
cursor.execute(copy_stmt)
cursor.execute('COMMIT')
cursor.execute(f'DROP TABLE {temp_table_name}')
cursor.execute('COMMIT')
cursor.execute(f'SELECT COUNT(*) FROM {self.table_schema.upper()}.{self.table_name.upper()}')
oracle_rows = cursor.fetchone()[0]
print(f'assert {num_rows_in_csv} == {oracle_rows}')
assert num_rows_in_csv == oracle_rows
print('Done.')
except (Exception, KeyboardInterrupt) as e:
cursor.execute('ROLLBACK')
if tmp_table_made or 'name is already used by an existing object' in str(e):
cursor.execute(f'DROP TABLE {temp_table_name}')
cursor.execute('COMMIT')
raise e
13 changes: 13 additions & 0 deletions databridge_etl_tools/oracle/oracle_commands.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .oracle import Oracle
from .. import utils
import click

@click.group()
Expand All @@ -23,3 +24,15 @@ def extract(ctx):
def extract_json_schema(ctx):
"""Extracts a dataset's schema in Oracle into a JSON file in S3"""
ctx.obj.load_json_schema_to_s3()

@oracle.command()
@click.pass_context
def append(ctx,):
"""Appends CSV file from S3 into an Oracle table"""
ctx.obj.append()

@oracle.command()
@click.pass_context
def load(ctx,):
"""Loads a CSV file from S3 into a temp Oracle table and overwrites a final table in one transaction (should have no downtime)"""
ctx.obj.load()

0 comments on commit 92155f4

Please sign in to comment.