-
Notifications
You must be signed in to change notification settings - Fork 0
/
cdb2bmepd.py
179 lines (165 loc) · 5.67 KB
/
cdb2bmepd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
Script to get (clear) best moves for FENs from chessdb.cn.
"""
import argparse, asyncio, sys, time, cdblib
class cdb2bm:
def __init__(
self, filename, output, gap, drawGap, quiet, concurrency, user, suppressErrors
):
self.input = filename
self.lines = []
self.loaded = 0
with cdblib.open_file_rt(filename) as f:
for line in f:
line = line.strip()
if line:
self.lines.append(line)
if not line.startswith("#"): # ignore comments
self.loaded += 1
if output:
self.output = open(output, "w")
self.display = sys.stdout
else:
self.output = sys.stdout
self.display = sys.stderr
if quiet:
self.display = None
if self.display:
print(
f"Read {self.loaded} FENs from file {self.input}.",
file=self.display,
flush=True,
)
self.gap = gap
self.drawGap = drawGap
self.concurrency = concurrency
self.cdb = cdblib.cdbAPI(concurrency, user, not suppressErrors)
self.filtered = cdblib.AtomicInteger()
def best_move(self, movelist):
length = len(movelist)
if length == 0:
return None, ""
best = movelist[0]
if "score" not in best or type(best["score"]) != int:
return None, ""
score = best["score"]
second = movelist[1] if length >= 2 else None
if (
length == 1
or second is None
or "score" not in second
or type(second["score"]) != int
):
return best["san"], f"cdb eval: {score}"
gap = score - second["score"]
if (score == 0 and gap < self.drawGap) or (score and gap < self.gap):
return None, ""
return (
best["san"],
f"cdb eval: {score} (sbm: {second['san']}, {second['score']})",
)
async def parse_all(self, batchSize=None):
if self.display:
print(
f"Started parsing the FENs with concurrency {self.concurrency}"
+ (" ..." if batchSize == None else f" and batch size {batchSize} ..."),
file=self.display,
flush=True,
)
if batchSize is None:
batchSize = len(self.lines)
self.tic = time.time()
for i in range(0, len(self.lines), batchSize):
tasks = []
for line in self.lines[i : i + batchSize]:
tasks.append(asyncio.create_task(self.parse_single_line(line)))
for parse_line in tasks:
l = await parse_line
if l:
print(l, file=self.output)
if self.display:
elapsed = time.time() - self.tic
print(
f"Done. Processed {self.loaded} FENs in {elapsed:.1f}s.",
file=self.display,
)
print(
f"Filtered {self.filtered.get()} positions with bm output.",
file=self.display,
)
async def parse_single_line(self, line):
if line.startswith("#"): # ignore comments
return line
fen = " ".join(line.split()[:4]) # cdb ignores move counters anyway
r = await self.cdb.queryall(fen)
bm, s = self.best_move(r["moves"]) if "moves" in r else (None, "")
if bm:
self.filtered.inc()
return f'{fen} bm {bm}; c0 "{s}";' if bm else ""
async def main():
parser = argparse.ArgumentParser(
description='A simple script to request (clear) best moves from chessdb.cn for a list of FENs stored in a file. The script will output "{fen} bm {bm}; c0 {comment};" for every line containing a FEN with a clear best move on cdb. Lines beginning with "#" are ignored.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input", help="source filename with FENs (w/ or w/o move counters)"
)
parser.add_argument("output", nargs="?", help="optional destination filename")
parser.add_argument(
"--gap",
help="Necessary gap between best move and second best move.",
type=int,
default=20,
)
parser.add_argument(
"--drawGap",
help="Necessary gap between 0cp best move and second best move. (Default: max(GAP // 2, 1))",
type=int,
default=None,
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress all unnecessary output to the screen.",
)
parser.add_argument(
"-c",
"--concurrency",
help="Maximum concurrency of requests to cdb.",
type=int,
default=16,
)
parser.add_argument(
"-b",
"--batchSize",
help="Number of FENs processed in parallel. Small values guarantee more responsive output, large values give faster turnaround.",
type=int,
default=None,
)
parser.add_argument(
"-u",
"--user",
help="Add this username to the http user-agent header.",
)
parser.add_argument(
"-s",
"--suppressErrors",
action="store_true",
help="Suppress error messages from cdblib.",
)
args = parser.parse_args()
if args.drawGap is None:
args.drawGap = max(args.gap // 2, 1)
c2b = cdb2bm(
args.input,
args.output,
args.gap,
args.drawGap,
args.quiet,
args.concurrency,
args.user,
args.suppressErrors,
)
await c2b.parse_all(args.batchSize)
if __name__ == "__main__":
asyncio.run(main())