forked from web2py/web2py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract_pgsql_models.py
286 lines (243 loc) · 9.98 KB
/
extract_pgsql_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Create web2py model (python code) to represent PostgreSQL tables.
Features:
* Uses ANSI Standard INFORMATION_SCHEMA (might work with other RDBMS)
* Detects legacy "keyed" tables (not having an "id" PK)
* Connects directly to running databases, no need to do a SQL dump
* Handles notnull, unique and referential constraints
* Detects most common datatypes and default values
* Support PostgreSQL columns comments (ie. for documentation)
Requeriments:
* Needs PostgreSQL pyscopg2 python connector (same as web2py)
* If used against other RDBMS, import and use proper connector (remove pg_ code)
Created by Mariano Reingart, based on a script to "generate schemas from dbs"
(mysql) by Alexandre Andrade
"""
HELP = """
USAGE: extract_pgsql_models db host port user passwd
Call with PostgreSQL database connection parameters,
web2py model will be printed on standard output.
EXAMPLE: python extract_pgsql_models.py mydb localhost 5432 reingart saraza
"""
# Config options
DEBUG = False # print debug messages to STDERR
SCHEMA = 'public' # change if not using default PostgreSQL schema
# Constant for Field keyword parameter order (and filter):
KWARGS = ('type', 'length', 'default', 'required', 'ondelete',
'notnull', 'unique', 'label', 'comment')
import sys
def query(conn, sql, *args):
"Execute a SQL query and return rows as a list of dicts"
cur = conn.cursor()
ret = []
try:
if DEBUG:
print >> sys.stderr, "QUERY: ", sql % args
cur.execute(sql, args)
for row in cur:
dic = {}
for i, value in enumerate(row):
field = cur.description[i][0]
dic[field] = value
if DEBUG:
print >> sys.stderr, "RET: ", dic
ret.append(dic)
return ret
finally:
cur.close()
def get_tables(conn, schema=SCHEMA):
"List table names in a given schema"
rows = query(conn, """SELECT table_name FROM information_schema.tables
WHERE table_schema = %s
ORDER BY table_name""", schema)
return [row['table_name'] for row in rows]
def get_fields(conn, table):
"Retrieve field list for a given table"
if DEBUG:
print >> sys.stderr, "Processing TABLE", table
rows = query(conn, """
SELECT column_name, data_type,
is_nullable,
character_maximum_length,
numeric_precision, numeric_precision_radix, numeric_scale,
column_default
FROM information_schema.columns
WHERE table_name=%s
ORDER BY ordinal_position""", table)
return rows
def define_field(conn, table, field, pks):
"Determine field type, default value, references, etc."
f = {}
ref = references(conn, table, field['column_name'])
if ref:
f.update(ref)
elif field['column_default'] and \
field['column_default'].startswith("nextval") and \
field['column_name'] in pks:
# postgresql sequence (SERIAL) and primary key!
f['type'] = "'id'"
elif field['data_type'].startswith('character'):
f['type'] = "'string'"
if field['character_maximum_length']:
f['length'] = field['character_maximum_length']
elif field['data_type'] in ('text', ):
f['type'] = "'text'"
elif field['data_type'] in ('boolean', 'bit'):
f['type'] = "'boolean'"
elif field['data_type'] in ('integer', 'smallint', 'bigint'):
f['type'] = "'integer'"
elif field['data_type'] in ('double precision', 'real'):
f['type'] = "'double'"
elif field['data_type'] in ('timestamp', 'timestamp without time zone'):
f['type'] = "'datetime'"
elif field['data_type'] in ('date', ):
f['type'] = "'date'"
elif field['data_type'] in ('time', 'time without time zone'):
f['type'] = "'time'"
elif field['data_type'] in ('numeric', 'currency'):
f['type'] = "'decimal'"
f['precision'] = field['numeric_precision']
f['scale'] = field['numeric_scale'] or 0
elif field['data_type'] in ('bytea', ):
f['type'] = "'blob'"
elif field['data_type'] in ('point', 'lseg', 'polygon', 'unknown', 'USER-DEFINED'):
f['type'] = "" # unsupported?
else:
raise RuntimeError("Data Type not supported: %s " % str(field))
try:
if field['column_default']:
if field['column_default'] == "now()":
d = "request.now"
elif field['column_default'] == "true":
d = "True"
elif field['column_default'] == "false":
d = "False"
else:
d = repr(eval(field['column_default']))
f['default'] = str(d)
except (ValueError, SyntaxError):
pass
except Exception, e:
raise RuntimeError(
"Default unsupported '%s'" % field['column_default'])
if not field['is_nullable']:
f['notnull'] = "True"
comment = get_comment(conn, table, field)
if comment is not None:
f['comment'] = repr(comment)
return f
def is_unique(conn, table, field):
"Find unique columns (incomplete support)"
rows = query(conn, """
SELECT information_schema.constraint_column_usage.column_name
FROM information_schema.table_constraints
NATURAL JOIN information_schema.constraint_column_usage
WHERE information_schema.table_constraints.table_name=%s
AND information_schema.constraint_column_usage.column_name=%s
AND information_schema.table_constraints.constraint_type='UNIQUE'
;""", table, field['column_name'])
return rows and True or False
def get_comment(conn, table, field):
"Find the column comment (postgres specific)"
rows = query(conn, """
SELECT d.description AS comment
FROM pg_class c
JOIN pg_description d ON c.oid=d.objoid
JOIN pg_attribute a ON c.oid = a.attrelid
WHERE c.relname=%s AND a.attname=%s
AND a.attnum = d.objsubid
;""", table, field['column_name'])
return rows and rows[0]['comment'] or None
def primarykeys(conn, table):
"Find primary keys"
rows = query(conn, """
SELECT information_schema.constraint_column_usage.column_name
FROM information_schema.table_constraints
NATURAL JOIN information_schema.constraint_column_usage
WHERE information_schema.table_constraints.table_name=%s
AND information_schema.table_constraints.constraint_type='PRIMARY KEY'
;""", table)
return [row['column_name'] for row in rows]
def references(conn, table, field):
"Find a FK (fails if multiple)"
rows1 = query(conn, """
SELECT table_name, column_name, constraint_name,
update_rule, delete_rule, ordinal_position
FROM information_schema.key_column_usage
NATURAL JOIN information_schema.referential_constraints
NATURAL JOIN information_schema.table_constraints
WHERE information_schema.key_column_usage.table_name=%s
AND information_schema.key_column_usage.column_name=%s
AND information_schema.table_constraints.constraint_type='FOREIGN KEY'
;""", table, field)
if len(rows1) == 1:
rows2 = query(conn, """
SELECT table_name, column_name, *
FROM information_schema.constraint_column_usage
WHERE constraint_name=%s
""", rows1[0]['constraint_name'])
row = None
if len(rows2) > 1:
row = rows2[int(rows1[0]['ordinal_position']) - 1]
keyed = True
if len(rows2) == 1:
row = rows2[0]
keyed = False
if row:
if keyed: # THIS IS BAD, DON'T MIX "id" and primarykey!!!
ref = {'type': "'reference %s.%s'" % (row['table_name'],
row['column_name'])}
else:
ref = {'type': "'reference %s'" % (row['table_name'],)}
if rows1[0]['delete_rule'] != "NO ACTION":
ref['ondelete'] = repr(rows1[0]['delete_rule'])
return ref
elif rows2:
raise RuntimeError("Unsupported foreign key reference: %s" %
str(rows2))
elif rows1:
raise RuntimeError("Unsupported referential constraint: %s" %
str(rows1))
def define_table(conn, table):
"Output single table definition"
fields = get_fields(conn, table)
pks = primarykeys(conn, table)
print "db.define_table('%s'," % (table, )
for field in fields:
fname = field['column_name']
fdef = define_field(conn, table, field, pks)
if fname not in pks and is_unique(conn, table, field):
fdef['unique'] = "True"
if fdef['type'] == "'id'" and fname in pks:
pks.pop(pks.index(fname))
print " Field('%s', %s)," % (fname,
', '.join(["%s=%s" % (k, fdef[k]) for k in KWARGS
if k in fdef and fdef[k]]))
if pks:
print " primarykey=[%s]," % ", ".join(["'%s'" % pk for pk in pks])
print " migrate=migrate)"
print
def define_db(conn, db, host, port, user, passwd):
"Output database definition (model)"
dal = 'db = DAL("postgres://%s:%s@%s:%s/%s", pool_size=10)'
print dal % (user, passwd, host, port, db)
print
print "migrate = False"
print
for table in get_tables(conn):
define_table(conn, table)
if __name__ == "__main__":
if len(sys.argv) < 6:
print HELP
else:
# Parse arguments from command line:
db, host, port, user, passwd = sys.argv[1:6]
# Make the database connection (change driver if required)
import psycopg2
cnn = psycopg2.connect(database=db, host=host, port=port,
user=user, password=passwd,
)
# Start model code generation:
define_db(cnn, db, host, port, user, passwd)