-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsetlist.py
321 lines (268 loc) · 9.33 KB
/
setlist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import os
import sys
import time
import random
import cPickle as pickle
from multiprocessing import Pool
from sh import find
from util import load_level
from parser_common import UnknownFileHeader
# Generate a new UUID to invalidate an old level database.
DATABASE_VERSION = "37590c47-f652-4cb8-864e-db18c0e5e5e7"
def process_level(path):
if not os.path.isfile(path):
return None
try:
level = load_level(path, fast_mode = True)
except UnknownFileHeader:
return None
edges = None
doors = []
north, east, south, west = None, None, None, None
for link in level.links:
x, y, w, h = link.area
if w > 60:
if y == 0:
north = link.target
elif y == 63:
south = link.target
elif h > 60:
if x == 0:
west = link.target
elif x == 63:
east = link.target
else:
doors.append(link.target)
if north or east or south or west:
edges = (north, east, south, west)
return {
"level_hash" : level.level_hash(),
"pallet_hash" : level.pallet_hash(),
"path" : path,
"doors" : doors,
"edges" : edges,
"signs" : len(level.signs),
"actors" : len(level.actors),
"baddies" : len(level.baddies),
"treasures" : len(level.treasures),
}
def build_level_database(input_path):
print "Generating or regenerating level database."
print "This may take a long time if a lot of files need to be scanned."
paths = [p.strip() for p in find(input_path)]
levels = []
pool = Pool(processes=4)
ratio = 100.0 / len(paths)
processed = 0
last_percent = 0
for data in pool.imap_unordered(process_level, paths):
processed += 1
percent = int(processed * ratio)
if percent > last_percent:
last_percent = percent
print "... {}%".format(percent)
if not data:
continue
levels.append(data)
db = {
"levels" : levels,
"version" : DATABASE_VERSION,
}
with open("level_db.pickle", "w") as outfile:
pickle.dump(db, outfile)
def load_level_database():
db = None
failed = False
try:
with open("level_db.pickle", "r") as infile:
db = pickle.load(infile)
if db["version"] != DATABASE_VERSION:
failed = True
print "Level database needs to be regenerated!"
except IOError:
failed = True
print "No level database found!"
if failed:
print """
Re-run this script with the argument "--scan" followed by a path to
a folder containing all of the levels you wish to scan.
""".strip()
exit(1)
return db
def load_corpus():
db = load_level_database()
def sort_fn(level):
return level["level_hash"] + ":" + level["path"]
boring = []
duplicates = {}
by_hash = {}
by_path = {}
for level in sorted(db["levels"], key=sort_fn):
short_path = os.path.split(level["path"])[-1]
level_hash = level["level_hash"]
by_path[short_path] = level
things = level["signs"] + \
level["actors"] + \
level["baddies"] + \
level["treasures"]
if things == 0:
# TODO : Check the pallet hash to see if the level is
# likely "boring" since it doesn't have anything else in
# it. If it is boring, we should still include it in the
# by_path list, but not in by_hash.
pass
if by_hash.has_key(level_hash):
if not duplicates.has_key(level_hash):
duplicates[level_hash] = []
duplicates[level_hash].append(level)
else:
by_hash[level_hash] = level
for dupe_group in duplicates.values():
dupe_group.sort(key=sort_fn)
return boring, duplicates, by_hash, by_path
def generate_setlist(output_path):
print "Generating level playlist..."
boring, duplicates, by_hash, by_path = load_corpus()
visited = []
paths_written = []
unprocessed = by_hash.keys()
queue = []
quad_count = 0
pair_count = 0
ungrouped_count = 0
def find(key):
# accepts either the level name or hash
if key is None:
return None
assert(type(key) == str)
found = None
if by_path.has_key(key):
found = by_path[key]
elif by_hash.has_key(key):
found = by_hash[key]
if found and found["level_hash"] not in visited:
return found
return None
def take_level(setlist, level):
level_hash = level["level_hash"]
visited.append(level_hash)
try:
queue.remove(level_hash)
except:
pass
try:
unprocessed.remove(level_hash)
except:
pass
assert(level["path"] not in paths_written)
paths_written.append(level["path"])
setlist.write(level["path"] + "\n")
def enqueue(hash_or_path):
level = find(hash_or_path)
if level:
level_hash = level["level_hash"]
try:
unprocessed.remove(level_hash)
except:
pass
if level_hash not in queue and level_hash not in visited:
queue.append(level_hash)
def edge_search(level):
# this attempts to find near by levels to group together
if not level["edges"]:
return None
right_of = find(level["edges"][1])
left_of = find(level["edges"][3])
east, west = None, None
if right_of:
east = right_of
west = level
elif left_of:
east = level
west = left_of
else:
return None
ne_level, nw_level, se_level, sw_level = None, None, None, None
if east["edges"][2] and west["edges"][2]:
ne_level, nw_level = east, west
se_level = find(east["edges"][2])
sw_level = find(west["edges"][2])
elif east["edges"][0] and west["edges"][0]:
se_level, sw_level = east, west
ne_level = find(east["edges"][0])
nw_level = find(west["edges"][0])
if ne_level and nw_level and se_level and sw_level:
return (nw_level, ne_level, sw_level, se_level)
else:
return (west, east)
if by_path.has_key('onlinestartlocal.graal'):
enqueue('onlinestartlocal.graal')
elif by_path.has_key('level1.graal'):
enqueue('level1.graal')
with open(output_path, "w") as setlist:
while len(unprocessed) + len(queue) > 0:
pick = None
if len(queue):
pick = queue.pop(0)
else:
pick = unprocessed.pop(0)
level = find(pick)
assert(level is not None)
edges = []
edge_odds = 0.30
adjacent_group = edge_search(level)
if adjacent_group:
if len(adjacent_group) == 2:
pair_count += 1
setlist.write("*** pair ***\n")
edge_odds = 0.20
elif len(adjacent_group) == 4:
quad_count += 1
setlist.write("*** quad ***\n")
edge_odds = 0.10
for tile in adjacent_group:
take_level(setlist, tile)
edges += tile["edges"] or []
else:
ungrouped_count += 1
edges = level["edges"] or []
take_level(setlist, level)
if level["doors"]:
# add all levels linked by doors into the queue
for target in level["doors"]:
enqueue(target)
# small chance of adding neighbors into the queue
for edge_path in set(edges):
if random.random() <= edge_odds:
enqueue(edge_path)
if duplicates:
print "Duplicate levels which have been merged together:"
def path_for_key(key):
return os.path.split(by_hash[key]["path"])[-1]
for path in sorted(map(path_for_key, duplicates.keys())):
level_hash = by_path[path]["level_hash"]
dupe_group = duplicates[level_hash]
count = len(dupe_group) + 1
print " + {} ({}x, hash = {})".format(path, count, level_hash)
for dupe in dupe_group:
print " -", os.path.split(dupe["path"])[-1]
print "quads found:", quad_count
print "pairs found:", pair_count
print "individuals:", ungrouped_count
print "total tweets:", quad_count + pair_count + ungrouped_count
print "total levels:", quad_count * 4 + pair_count * 2 + ungrouped_count
if __name__ == "__main__":
random.seed(12345)
start = time.time()
if len(sys.argv) > 1 and sys.argv[1] == "--scan":
assert(len(sys.argv) == 3)
build_level_database(sys.argv[2])
else:
output_path = sys.argv[1] if len(sys.argv) > 1 else "playlist.txt"
generate_setlist(output_path)
elapsed = time.time() - start
print "Process complete."
if elapsed > 60:
print "Elapsed time in minutes:", elapsed / 60.0
else:
print "Elapsed time in seconds:", elapsed