-
Notifications
You must be signed in to change notification settings - Fork 0
/
RadixSortLSD.chpl
380 lines (268 loc) · 15.8 KB
/
RadixSortLSD.chpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
/* !!! IMPORTANT !!!
This file is the intellectual property of Bears-R-Us and is taken from the Arkouda Data Analytics Python API for
super computing. None of the below code is my own work and I do not own any rights to the code. The original file is
located within the src/ directory on the master branch of the Arkouda repository. The repository can be found at
https://github.com/Bears-R-Us/arkouda and the original file can be opened directly at
https://github.com/Bears-R-Us/arkouda/blob/master/src/RadixSortLSD.chpl
All block comments prefaced with "to-do" are my analysis of the code.
*/
/* Radix Sort Least Significant Digit */
module RadixSortLSD
{
config const RSLSD_vv = false;
const vv = RSLSD_vv; // these need to be const for comms/performance reasons
/* todo: config keyword: variables that can be over written on the command line. argument flags are added when the
config keyword is used. <--RSLSD_numTasks 100> on the command line as an argument would set RSLSD_numTasks
to 100
*/
config const RSLSD_numTasks = here.maxTaskPar; // tasks per locale based on locale0
const numTasks = RSLSD_numTasks; // tasks per locale
const Tasks = {0..#numTasks}; // these need to be const for comms/performance reasons
/* todo: locale refers to a compute node. in our case, this would map to a core i believe */
/*todo: the param key word specifies value whose size must be known to the compiler */
private param bitsPerDigit = RSLSD_bitsPerDigit; // these need to be const for comms/performance reasons
private param numBuckets = 1 << bitsPerDigit; // these need to be const for comms/performance reasons
use BlockDist;
use BitOps;
use AryUtil;
use CommAggregation;
use IO;
use CTypes;
use Reflection;
use RangeChunk;
use Logging;
use ServerConfig;
use ArkoudaBlockCompat; /*todo: there are several variations of this, but most seem to be slight variations of the
BlockDist module. Seems to be returning a block dist of a certain size for the arkouda
application */
private config const logLevel = ServerConfig.logLevel;
private config const logChannel = ServerConfig.logChannel;
const rsLogger = new Logger(logLevel, logChannel);
/* todo: a record is the same thing as a class; methods and data fields encapsulated, there are differences though.
records DO NOT support inheritance and virtualization ("virtual dispatch"). a record variable must refer
to UNIQUE memory (multiple variables cannot map to the same record). records DO support copy initialization
and assignment, but classes DO NOT */
record KeysComparator {
inline proc key(k) { return k; }
}
record KeysRanksComparator {
inline proc key(kr) { const (k, _) = kr; return k; }
}
/* todo: proc keyword declares a function (proc == "procedure") */
// calculate sub-domain for task
inline proc calcBlock(task: int, low: int, high: int) {
var totalsize = high - low + 1;
var div = totalsize / numTasks;
var rem = totalsize % numTasks;
var rlow: int;
var rhigh: int;
if (task < rem) {
/* todo: the lower bound of the subdomain will be the size of each subdomain (+ 1) multiplied by the task
number, incremented by the data set's smallest number */
rlow = task * (div+1) + low;
rhigh = rlow + div;
}
else {
/* todo: if the task number is greater than the remainder of the data partitioning, then section off a
partition size within the remainder */
rlow = task * div + rem + low;
rhigh = rlow + div - 1;
}
return {rlow .. rhigh};
}
// calc global transposed index
// (bucket,loc,task) = (bucket * numLocales * numTasks) + (loc * numTasks) + task;
inline proc calcGlobalIndex(bucket: int, loc: int, task: int): int {
return ((bucket * numLocales * numTasks) + (loc * numTasks) + task);
}
/* Radix Sort Least Significant Digit
In-place radix sort a block distributed array
comparator is used to extract the key from array elements
*/
/*todo: the question mark operator queries the type of a variable. in the below line, it is getting the type of aD
then setting then casting the parameter 'a' to an array consisting of elements with the type of aD */
private proc radixSortLSDCore(a:[?aD] ?t, nBits, negs, comparator) {
try! rsLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"type = %s nBits = %t".format(t:string,nBits));
var temp = a;
// create a global count array to scan
/* todo: the # generates a counted range. so 0..#(n) represents the range from 0 to n. in this case n is
the ~number of locales~ * ~number of tasks~ * ~number of buckets~. variable gD is a distributed domain
with n sub domains */
var gD = Block.createDomain({0..#(numLocales * numTasks * numBuckets)});
var globalCounts: [gD] int;
// loop over digits todo: via binary rightshift
for rshift in {0..#nBits by bitsPerDigit} {
const last = (rshift + bitsPerDigit) >= nBits;
try! rsLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"rshift = %t".format(rshift));
// count digits
/* todo: Locales seems to be a built-in structure representing the UID's of the processing units.
the coforall here is semantically equivalent to dispatching a pthread to each core. */
coforall loc in Locales {
/* todo: <on loc> tells the compiler to dispatch the work within the block to parallel threads */
on loc {
/*_____________________________PARALLEL BY LOCALE_____________________________*/
// allocate counts
/* todo: allocate an array with the length of the number of tasks, then for each index, set its
value to be a counting range of numbers from 0 to numBuckets */
var tasksBucketCounts: [Tasks] [0..#numBuckets] int;
/* todo: the nested coforall here maps to the same threads as the outer coforall. for each task
number */
coforall task in Tasks {
/*_____________________________PARALLEL BY TASK_____________________________*/
/* todo: this ref variable here creates a reference to the appropriate task's buckets, which
are contained in the 2D var tasksBucketCounts: [Tasks] [0..#numBuckets] int; above */
ref taskBucketCounts = tasksBucketCounts[task];
// get local domain's indices
var lD = aD.localSubdomain();
// calc task's indices from local domain's indices
var tD = calcBlock(task, lD.low, lD.high);
/* todo: loop over */
// count digits in this task's part of the array
for i in tD {
/* todo: temp here is a copy of input array 'a'. the key is the value of the copy
array at i*/
const key = comparator.key(temp.localAccess[i]);
/*todo getDigit() line 264 AryUtil.chpl, returns the current binary digit */
var bucket = getDigit(key, rshift, last, negs); // calc bucket from key
/*todo: increment bucket which matches with the key */
taskBucketCounts[bucket] += 1;
}
/*_____________________________END PARALLEL BY TASK_____________________________*/
}//coforall task
// write counts in to global counts in transposed order
coforall tid in Tasks {
/*_____________________________PARALLEL BY TASK_____________________________*/
/* todo: newDstAggregator from CommAggregation.chpl line 16. performs an out of order
aggregation of the inputs when the source is local and the destination is global.
In this case, used to sum all counts in a given local proc bucket, and then copy that
sum to the corresponding index in the global counts */
var aggregator = newDstAggregator(int);
for task in Tasks {
ref taskBucketCounts = tasksBucketCounts[task];
/*todo: aggregator.copy(destination, source), over all buckets: local -> global */
for bucket in chunk(0..#numBuckets, numTasks, tid) {
aggregator.copy(globalCounts[calcGlobalIndex(bucket, loc.id, task)],
taskBucketCounts[bucket]);
}
}
/* todo: makes the aggregation result visible in dest, completes the aggregation. */
aggregator.flush();
/*_____________________________END PARALLEL BY TASK_____________________________*/
}//coforall task
/*_____________________________END PARALLEL BY LOCALE_____________________________*/
}//on loc
}//coforall loc
/* todo: at this point, the buckets have been counted up, the next step is placing them back into the list
in sorted order. */
// scan globalCounts to get bucket ends on each locale/task
var globalStarts = + scan globalCounts; //todo: not sure exactly how the open ended + op with scan works
globalStarts -= globalCounts;
if vv {printAry("globalCounts =",globalCounts);try! stdout.flush();}
if vv {printAry("globalStarts =",globalStarts);try! stdout.flush();}
// calc new positions and permute
coforall loc in Locales {
on loc {
/*_____________________________PARALLEL BY LOCALE_____________________________*/
// allocate counts
var tasksBucketPos: [Tasks] [0..#numBuckets] int;
// read start pos in to globalStarts back from transposed order
coforall tid in Tasks {
/*_____________________________PARALLEL BY TASK_____________________________*/
/* todo: source aggregator does the same thing as destination aggregator, but it copies from
global to local buffers */
var aggregator = newSrcAggregator(int);
for task in Tasks {
ref taskBucketPos = tasksBucketPos[task];
/*todo: aggregator.copy(destination, source), over all buckets: global -> local. used to
copy data from the global buffer back to a buffer local to the thread.*/
for bucket in chunk(0..#numBuckets, numTasks, tid) {
aggregator.copy(taskBucketPos[bucket],
globalStarts[calcGlobalIndex(bucket, loc.id, task)]);
}
}
/* todo: makes the aggregation result visible in dest, completes the aggregation. */
aggregator.flush();
/*_____________________________END PARALLEL BY TASK_____________________________*/
}//coforall task
/* todo: writes values back to the count */
coforall task in Tasks {
/*_____________________________PARALLEL BY TASK_____________________________*/
ref taskBucketPos = tasksBucketPos[task];
// get local domain's indices
var lD = aD.localSubdomain();
// calc task's indices from local domain's indices
var tD = calcBlock(task, lD.low, lD.high);
// calc new position and put data there in temp
{
var aggregator = newDstAggregator(t);
for i in tD {
const ref tempi = temp.localAccess[i];
const key = comparator.key(tempi);
var bucket = getDigit(key, rshift, last, negs); // calc bucket from key
var pos = taskBucketPos[bucket];
taskBucketPos[bucket] += 1;
aggregator.copy(a[pos], tempi);
}
aggregator.flush();
}
/*_____________________________END PARALLEL BY TASK_____________________________*/
}//coforall task
/*_____________________________END PARALLEL BY LOCALE_____________________________*/
}//on loc
}//coforall loc
// copy back to temp for next iteration
// Only do this if there are more digits left
if !last {
temp <=> a;
}
} // for rshift
}//proc radixSortLSDCore
// todo: all of these are different entry points into the sort. radixSortLSDCore is the main function
proc radixSortLSD(a:[?aD] ?t, checkSorted: bool = true): [aD] (t, int) {
var kr: [aD] (t,int) = [(key,rank) in zip(a,aD)] (key,rank);
if (checkSorted && isSorted(a)) {
return kr;
}
var (nBits, negs) = getBitWidth(a);
radixSortLSDCore(kr, nBits, negs, new KeysRanksComparator());
return kr;
}
/* Radix Sort Least Significant Digit
radix sort a block distributed array
returning a permutation vector as a block distributed array */
proc radixSortLSD_ranks(a:[?aD] ?t, checkSorted: bool = true): [aD] int {
if (checkSorted && isSorted(a)) {
var ranks: [aD] int = [i in aD] i;
return ranks;
}
var kr: [aD] (t,int) = [(key,rank) in zip(a,aD)] (key,rank);
var (nBits, negs) = getBitWidth(a);
radixSortLSDCore(kr, nBits, negs, new KeysRanksComparator());
var ranks: [aD] int = [(_, rank) in kr] rank;
return ranks;
}
/* Radix Sort Least Significant Digit
radix sort a block distributed array
returning sorted keys as a block distributed array */
proc radixSortLSD_keys(a: [?aD] ?t, checkSorted: bool = true): [aD] t {
var copy = a;
if (checkSorted && isSorted(a)) {
return copy;
}
var (nBits, negs) = getBitWidth(a);
radixSortLSDCore(copy, nBits, negs, new KeysComparator());
return copy;
}
proc radixSortLSD_memEst(size: int, itemsize: int) {
// 2 temp key+ranks arrays + globalStarts/globalClounts
return (2 * size * (itemsize + numBytes(int))) +
(2 * numLocales * numTasks * numBuckets * numBytes(int));
}
proc radixSortLSD_keys_memEst(size: int, itemsize: int) {
// 2 temp key arrays + globalStarts/globalClounts
return (2 * size * itemsize) +
(2 * numLocales * numTasks * numBuckets * numBytes(int));
}
}