forked from bndl/bndl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TODO
352 lines (198 loc) · 10.9 KB
/
TODO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
heartbeat at task level to identify failed tasks ?
sort should be lazy
- perhaps even learn buckets while sorting ... ? (i.e. single pass sort?)
the rx tx rates aren't updated on sp-dev
only start gui on driver by default
Enhance dashboard with info from https://pypi.python.org/pypi/psutil
Perhaps display skipped tasks clearer
Display task locality
Limit resource usage from supervisor with: https://docs.python.org/3.4/library/resource.html
check dependency graph for dependencies going packages down/up in wrong places
- e.g. bndl.compute.schedule to .base
move caching aspect of datasets to one module (not spread over .schedule and .base)
move caches, broadcast values etc down to execute layer
use hierarchical namespace for set / del
Possibly introduce a control and data layer
- i.e. to allow sending large volumes out of band from the control comms
- this allows the watchdog to operate better as well
- should improve stability
Consider not using 1 fixed worker per core, but 1 worker per node which forks per task
- implement in execute layer
- Use custom pickler to fetch broadcast value on receive (to ensure the data is available before forking)
- a lazy alternative to the above would still require the broadcasted data to be available per executing process
- caching could work through caching the data in pickled/marshalled form
- although not having to unpickle when using cache was one of the benefits of not using pyspark
- an advantage would be automatic cleanup after jobs have ran (less leakage)
! a major hurdle would be communication with e.g. Cassandra,
- can't be inherited by a forking process and still work :(
- would require performing the IO in the main process, and processing in a forked worker
! from the python docs:
- Note that safely forking a multithreaded process is problematic.
IDEA DROPPED:
- the two points above are prohibitive
- figure out an alternative memory efficient broadcast
Consider a communication architecture with a core cross node network and a on node inter process network
- perhaps supplemented by (temporary) direct connections on a data layer
Make entire compute.dataset api asynchronous
- i.e. some_action() returns a future
- use some_action().result() to get the results (may be an iterable)
- use some_action().cancel() to cancel the job
Allow stages to be composed of an undetermined amount of tasks
- i.e. allow a generator / queue of tasks
Allow jobs to be composed of an undetermined amount of stages
- or put differently, give jobs the ability to yield barriers
The API / data model might change into a job which yields 1+ tasks and 0+ barriers.
Consider changing the task execution model from push to pull.
- could easy implementing having a task in flight
- might be difficult with a generator / queue of tasks in combination with worker preferences
Implement check pointing
- perform cleanup of stages before the stage of the checkpointed dset
add pickle options to broadcast_pickle
Prevent occurences of:
27158 : OpenBLAS blas_thread_init: RLIMIT_NPROC 4096 current, 4127376 max
27158 : OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
Is triggered by import of numpy
support cassandra.coscan with _asdicts
try to implement read retry in cassandra scan without materializing an entire token range
implement drop function to drop columns / fields
- like pluck?
check partition sizes for cassandra scan
cassandra scan .parts() is not stable
consider driving the tasks from another process / do something about liveness of the driver during jobs
support caching also for cancelled jobs / stages / tasks
nodes are 'in error' very quickly after starting a job
(irresponsive due to task deserialization?)
allow for easier debugging by reporting the failed partition (or more)
add psize for ctx.range
add max pcount / psize for ctx.range / ctx.collection
why stage.tasks.sort(key=lambda t: t.id) ? costs time for a large numer of tasksc
add protocol version in hello
Support ctx.files for tar files
Support for ctx.files(split=str) for gzipped files
- gzip has seek, but no rfind
Crash task on driver side exceptions (e.g. in sending files which are read protected)
ctx.files.cleanup creates its own garbage in node.hosted_values
broadcast actual files without loading in the driver
cache_loc not filled when using first() ???
Whooops:
In [86]: texts.uncache()
ERROR:bndl.rmi.node:unable to perform remote invocation
Traceback (most recent call last):
File "/home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/rmi/node.py", line 51, in _request
response = (yield from asyncio.wait_for(response_future, self._timeout, loop=self.peer.loop))
File "/usr/lib64/python3.4/asyncio/tasks.py", line 381, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
ERROR:bndl.rmi.node:unable to perform remote invocation
Traceback (most recent call last):
File "/home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/rmi/node.py", line 51, in _request
response = (yield from asyncio.wait_for(response_future, self._timeout, loop=self.peer.loop))
File "/usr/lib64/python3.4/asyncio/tasks.py", line 381, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
ERROR:bndl.rmi.node:unable to perform remote invocation
Traceback (most recent call last):
File "/home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/rmi/node.py", line 51, in _request
response = (yield from asyncio.wait_for(response_future, self._timeout, loop=self.peer.loop))
File "/usr/lib64/python3.4/asyncio/tasks.py", line 381, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
WARNING:bndl.rmi.node:Response <Response exception=None, req_id=48, value=None> received for unknown request id 48
WARNING:bndl.rmi.node:Response <Response exception=None, req_id=48, value=None> received for unknown request id 48
WARNING:bndl.rmi.node:Response <Response exception=None, req_id=45, value=None> received for unknown request id 45
and another while nodes were reconnecting to driver:
KeyError: 'nl.tgho.priv.sp-prod-adg02.worker.32226.0.4'
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<_serve() done, defined at /home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/net/peer.py:262> exception=KeyError('nl.tgho.priv.sp-prod-adg02.worker.32226.0.4',)>
Traceback (most recent call last):
File "/usr/lib64/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "/home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/net/peer.py", line 270, in _serve
yield from self.local._peer_connected(self)
File "/home/frens.jan.rumph/venv/lib/python3.4/site-packages/bndl/net/node.py", line 224, in _peer_connected
del self.peers[known_peer.name]
KeyError: 'nl.tgho.priv.sp-prod-adg02.worker.32226.0.4'
WARNING:bndl.net.watchdog:<Peer:
support more syntactic sugar for creating accumulators, e.g.:
acc = ctx.accumulator(set(), 'add')
def task(...):
nonlocal acc
acc.add(1)
use local read if ctx.files on same node
Support shuffle with not all workers / use at runtime task dependencies
Implement cassandra.limit as
docs = ctx.cassandra_table('adg', 'document', contact_points='sp-prod-adg01')
In [27]: 10000 / sum(len(p.token_ranges) for p in docs.parts())
Out[27]: 7.027406886858749
In [28]: docs.limit(7).count(push_down=False)
Out[28]: 9957
Is span_by working for spanning by part of the primary key?
ctx.cassandra_table('adg_prod', 'authorship_features').select('doc_id', 'authorship_seq_no', 'affiliation_seq_no').span_by('doc_id', 'authorship_seq_no').take(10)
yields
[((83683000013, 1), Empty DataFrame
Columns: []
Index: [(83683000013, 1, 1)]), ((83683000027, 1), Empty DataFrame
Columns: []
Index: [(83683000027, 1, 1)]), ((83708200003, 1), Empty DataFrame
Columns: []
Index: [(83708200003, 1, 1)]), ((83708200003, 2), Empty DataFrame
Columns: []
Index: [(83708200003, 2, -1)]), ((83819800006, 1), Empty DataFrame
Columns: []
Index: [(83819800006, 1, 1)]), ((83819800006, 2), Empty DataFrame
Columns: []
Index: [(83819800006, 2, -1)]), ((83889400015, 1), Empty DataFrame
Columns: []
Index: [(83889400015, 1, 1)]), ((83889400015, 2), Empty DataFrame
Columns: []
Index: [(83889400015, 2, -1)]), ((83889400015, 3), Empty DataFrame
Columns: []
Index: [(83889400015, 3, -1)]), ((83889400015, 4), Empty DataFrame
Columns: []
Index: [(83889400015, 4, -1)])]
Shouldn't group_by_key and friends yield key, [value, ...] instead of key, [(key, value), ...] ?
Initializers / zero for reduce, sum,etc.
span_by can be a lot more efficient! DataFrame.groupby is expensive
strip key from values from group_by_key
use default dict in CassandraCoScanPartition._materialize for merged
allow distribution and sort key in shuffle
- e.g. distribute by block_id and sort by (block_id, person_id)
- just like select col1, col2, col3 from table group by 1, 2
this is also important for cogroup, currently it has to materialize a partition into a list!
add operators such as set difference (using shuffle)
de select van cassandra_table kan checken of de kolom bestaat
- wel rekening houden met udfs?
why is tree aggregate so slow?
- is it the new shuffle?
cache van cass partitioner moet rekening houden met bndl_cassandra.part_size_keys and friends
investigate if we can do something better than yielding entire groups (from e.g. group_by_key) in a tuple
Reconsider shuffle implementation for keywise aggregation:
- Spark uses uses create + merge + combine ...
- probably cheaper than sorting ...
join_with_cassandra with a single element key doesn't work
(getter should return an iterable)
remote cache read raw and decode on reading worker if was serialized in memory or on disk
Caching on multiple nodes
Checking group.executing_on in
<tr class="clickable {{ ' active' if group.executing_on else '' }}" data-href='group/{{ group.grouper }}'>
Compatibility with yappi 0.94
collect_as_files and friends -> save_as ... on worker
callsite for k-means doesn't work ... everything is a k-means iteration
implement k-means ||
implement k-means for sparse matrices
collect_as_files should check if directory exists before launching
shuffle is to slow ... also just a plain shuffle(n, sort=False)
Implement versions of itake (take and first) for cassandra coscan
Add named tuple like format for bndl_cassandra which supports __getitem__ to access fields.
Add config paramater for temp dir
Add pluck style to key_by
- think about with_value (consts _are_ the value here, not a key)
Use key_or_getter with filter (and others?)
Add filterfalse (and friends?)
Add .pluck(attr='xxx')
Use paging state in bndl_cassandra scan
test require_local_workers and friends
readd prefer_workers?
Require workers at job / dataset level
Add dataset.cache context manager (with block)
Take locality into account for dataset.coalesce_parts(...)