-
Notifications
You must be signed in to change notification settings - Fork 0
/
pg2pgtap.py
168 lines (130 loc) · 5.61 KB
/
pg2pgtap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import sys, os, yaml, psycopg2, logging, sql, Db_object
def exec_sql(sql, cur, return_table):
try:
cur.execute(sql)
except psycopg2.Error as e:
return False
if return_table:
return cur.fetchall()
return True
def main():
'''Main entrypoint for the pg database > pgtap script'''
# --------------------------------------------
# -- Configuration
# --------------------------------------------
with open('config.yaml', 'r') as f:
config = yaml.load(f)
# Schema to evaluate
if 'Schema' in config:
schema_list = config['Schema']
else:
print 'CONFIG ERROR: No "Schema" section'
sys.exit(-1)
if 'Connection' in config:
host = config['Connection']['Host']
db = config['Connection']['Database']
user = config['Connection']['User']
pwd = config['Connection']['Password']
else:
print 'CONFIG ERROR: No database "Connection" section'
sys.exit(-1)
#output
if 'Output' in config:
output_dest = config['Output']['Destination']
else:
log_dest = ''
if 'Tests' in config:
tests_scehma = tuple(config['Tests']['Schema'])
tests_tables = tuple(config['Tests']['Table'])
tests_columns = tuple(config['Tests']['Column'])
else:
print 'CONFIG ERROR: No "Tests" section'
# --------------------------------------------
# -- DB Queries & Python objects rep of pg db
# --------------------------------------------
# Connect to database
conn_string = "host='{0}' dbname='{1}' user='{2}' password='{3}'".format(host,db, user, pwd)
conn = psycopg2.connect(conn_string)
# get the cursor
cur = conn.cursor() # WITH!
# lists of db objects
schema_objs, table_objs, column_objs, = [], [], []
#iterate over schema in config to be evaluated
for schema in schema_list:
schema_objs.append(Db_object.Schema(schema))
#iterate over tables
sql_query = sql.sql_queries('get_tables', schema)
row = exec_sql(sql_query, cur, True)
for table in row:
# table = [(table_name, table_type), ...]
table_objs.append(Db_object.Table(schema, table[0], table[1]))
for table in table_objs:
sql_query = sql.sql_queries('get_columns', schema, table.table_name() )
row = exec_sql(sql_query,cur, True)
# create / store ref, column objects
for column in row:
# column[x] = [column_name, column_type, c_ordinal_pos, is_nullable, column_defualt] # may remove ordianil_pos
column_objs.append(Db_object.Column(schema, table.table_name(), table.table_type(), column[0], column[1], column[2], column[3], column[4]))
# --------------------------------------------
# -- Write out PGTAP tests
# --------------------------------------------
s_tests, t_tests, c_tests = 0, 0, 0
# open output file and add header
with open(output_dest, 'w') as text_file:
text_file.write('\set ECHO')
text_file.write('\set QUIET 1')
text_file.write('-- Turn off echo and keep things quiet.')
text_file.write('\n-- Format the output for nice TAP.')
text_file.write('\n\pset format unaligned')
text_file.write('\pset tuples_only true')
text_file.write('\pset pager')
text_file.write('\n-- Revert all changes on failure.')
text_file.write('\set ON_ERROR_ROLLBACK 1')
text_file.write('--\set ON_ERROR_STOP true')
text_file.write('\set QUIET 1')
text_file.write('\n-- Load the TAP functions')
text_file.write('BEGIN;')
text_file.write('\n\i pgtap.sql\n\n')
'''
pgtap_tests = {('has_schema',):schema_objs,
('has_table',):table_objs,
('has_default', 'is_null', 'type_is', 'has_column'):column_objs
}
'''
pgtap_tests = {tests_scehma:schema_objs,
('has_table',):table_objs,
('has_default', 'is_null', 'type_is', 'has_column'):column_objs
}
for tests, db_objects in pgtap_tests.iteritems():
for obj in db_objects:
for test in tests:
test_method = getattr(obj, test)
text_file.write( '{0}\n'.format(test_method()) )
'''
# Schema Tests
for s in schema_objs:
s.has_schema()
s_tests += 1
#Table Tests
for t in table_objs:
text_file.write('{0} \n'.format( t.has_table()) )
t_tests += 1
# Column Tests
col_tests = ['has_default', 'is_null', 'type_is', 'has_column']
for c in column_objs:
c_header = '\n-- Column >>> {0}.{1}.{2}'.format(
c.schema_name(), c.table_name(), c.column_name())
text_file.write( '{0} \n'.format( c_header ) )
for test in col_tests:
test_method = getattr(c, test)
text_file.write( '{0}\n'.format(test_method()) )
c_tests += 1
'''
text_file.write( '\n{0} Schema tests'.format( s_tests ))
text_file.write( '\n{0} Table tests'.format( t_tests ))
text_file.write( '\n{0} Column tests'.format( c_tests ))
cur.close()
#SELECT * FROM finish();
#ROLLBACK;
if __name__ == '__main__':
main()