-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibparallel
270 lines (228 loc) · 7.19 KB
/
libparallel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env bash
# The number of jobs to run at once. Leave blank to auto detect.
PARALLEL_JOBS=
# Don't modify this list. It's tracking the PIDs of every job running in the
# background.
PARALLEL_JOB_LIST=()
# An array containing the list of status codes returned by the backgrounded
# jobs. This can be reset with `parallel::resetStatusCodes` and is
# automatically wiped out with `parallel::finish`.
PARALLEL_JOB_STATUS_LIST=()
# Preserves the old SIGINT trap. This library layers on top of the SIGINT
# trap. If you need to also layer your own functionality on top of this
# library, don't just use `trap your_handler SIGINT` but instead make sure
# to also call the previous SIGINT trap as well.
#
# Do not modify this variable yourself.
PARALLEL_OLD_INT_TRAP=
# The method to use when wating for jobs to finish. `wait -n` is faster but
# only introduced in Bash 4.3.
PARALLEL_WAIT_METHOD=
# Checks that all of the backgrounded jobs are still running. If they are
# not, this will remove them from the list.
#
# Examples
#
# parallel::curateJobList
#
# Returns nothing.
parallel::curateJobList() {
local alive oldSet pid
alive=()
oldSet=$-
set +eE
if [[ "${#PARALLEL_JOB_LIST[@]}" -gt 0 ]]; then
for pid in "${PARALLEL_JOB_LIST[@]}"; do
if kill -0 "$pid" > /dev/null 2>&1; then
alive[${#alive[@]}]=$pid
else
wait "$pid"
PARALLEL_JOB_STATUS_LIST[${#PARALLEL_JOB_STATUS_LIST[@]}]=$?
fi
done
fi
if [[ "${#alive[@]}" -gt 0 ]]; then
PARALLEL_JOB_LIST=( "${alive[@]}" )
else
PARALLEL_JOB_LIST=()
fi
set "-$oldSet"
}
# Wait for all jobs to complete. Resets the status code array. Returns
# the number of failures. Removes the SIGINT trap that was added in
# `parallel::run`.
#
# $1 - Destination variable for the status code array, optional.
#
# Returns the number of failures.
parallel::finish() {
local code failures
while [[ "${#PARALLEL_JOB_LIST[@]}" -gt 0 ]]; do
parallel::wait
done
if [[ "$(trap - SIGINT)" == "parallel::intTrap" ]]; then
# shellcheck disable=SC2064
trap "$PARALLEL_OLD_INT_TRAP" SIGINT
fi
failures=0
if [[ "${#PARALLEL_JOB_STATUS_LIST[@]}" -gt 0 ]]; then
for code in "${PARALLEL_JOB_STATUS_LIST[@]}"; do
if [[ "$code" -ne 0 ]]; then
failures=$((failures + 1))
fi
done
# Limit the return code to a maximum of 127
if [[ "$failures" -gt 126 ]]; then
failures=127
fi
fi
if [[ -n "${1:-}" ]]; then
local "$1" && assign::array "$1" "${PARALLEL_JOB_STATUS_LIST[@]}" && return "$failures"
else
return "$failures"
fi
}
# SIGINT trap handler for parallelized operations. Stop running jobs and
# send SIGINT to all children.
#
# Examples
#
# PARALLEL_OLD_INT_TRAP=$(trap - SIGINT)
# PARALLEL_OLD_INT_TRAP=${PARALLEL_OLD_INT_TRAP:--}
# trap parallel::intTrap SIGINT
#
# Returns nothing. Will resend SIGINT to current process after restoring
# the previous SIGINT trap.
parallel::intTrap() {
local pid
# Send signal to all children. Don't redeclare the pids variable
# shellcheck disable=SC2154
if [[ "${#PARALLEL_JOB_LIST[@]}" -gt 0 ]]; then
for pid in "${PARALLEL_JOB_LIST[@]}"; do
kill -SIGINT "$pid" > /dev/null 2>&1
done
fi
# Restore the trap and reissue the SIGINT. We want this to expand now
# instead of when signalled.
# shellcheck disable=SC2064
trap "${PARALLEL_OLD_INT_TRAP:--}" SIGINT
PARALLEL_OLD_INT_TRAP=""
kill -SIGINT $$
}
# Determines the number of parallel jobs that should be executed. This is
# executed automatically by the library. It returns the number of CPU
# processors, defaulting to at least 1.
#
# Example:
#
# PARALLEL_JOBS=$(parallel::getLimt)
#
# Returns nothing.
parallel::maxJobs() {
local maxJobs
# Detect number of processors.
if [[ -f /proc/cpuinfo ]]; then
maxJobs=$(grep -c "^processor"$'\t': /proc/cpuinfo)
else
maxJobs=$(sysctl -n hw.ncpu)
fi
# Run at least one job at a time.
if [[ "$maxJobs" -lt 1 ]]; then
maxJobs=1
fi
echo "$maxJobs"
}
# Run one or more jobs in subshells in parallel. Output from each job is
# currently intermingled, but that behavior may change in the future.
# The number of jobs that should execute at once is controlled by the
# PARALLEL_JOBS environment variable, which will default to the number of CPU
# cores detected.
#
# $@ - Command to run.
#
# All commands execute in a subshell and are unable to affect the parent's
# environment. SIGINT is caught and is sent to all executing commands. This
# helps Control-C abort everything as it should. The status code of the
# command is lost. This trap is removed when you use `parallel::finish`.
#
# If you intend to trap SIGINT, please set that trap up before you call
# `parallel::run` and only change it again after `parallel::finish`.
#
# Examples
#
# # Sample function
# sleepABit() {
# echo "About to sleep for $1 seconds"
# sleep "$1"
# echo "Done sleeping $1 seconds"
# }
#
# # Explicitly set a limit for this example
# PARALLEL_JOBS=1
#
# # Run a job. This executes in the background.
# parallel::run sleepABit 5
#
# # Run a third job. This stalls until any previous jobs finishes.
# parallel::run sleepABit 3
#
# # Wait for all jobs to be complete
# parallel::finish
#
# # At this point, all jobs have finished.
#
# Returns nothing.
parallel::run() {
local job
if [[ -z "$PARALLEL_OLD_INT_TRAP" ]]; then
PARALLEL_OLD_INT_TRAP="$(trap - SIGINT)"
#: This must set the trap to "-" if there was no previous trap.
PARALLEL_OLD_INT_TRAP="${PARALLEL_OLD_INT_TRAP:--}"
trap parallel::intTrap SIGINT
fi
if [[ "$PARALLEL_JOBS" -lt 1 ]]; then
PARALLEL_JOBS=$(parallel::maxJobs)
fi
while [[ "${#PARALLEL_JOB_LIST[@]}" -ge "$PARALLEL_JOBS" ]]; do
parallel::wait
done
# Quote so we can pass this to eval properly
printf -v job "%q " "$@"
# Start the job in a subshell so it is unable to affect the current shell.
(
eval "$job"
) < /dev/null &
PARALLEL_JOB_LIST[${#PARALLEL_JOB_LIST[@]}]=$!
}
# Waits for at least one job to complete. Used internally a lot. A user of
# this library would be better served by looking at `parallel::finish`.
#
# Examples
#
# parallel::run sleep 10
# parallel::wait
#
# Returns nothing.
parallel::wait() {
local newCount oldCount
if [[ -z "$PARALLEL_WAIT_METHOD" ]]; then
code=$(fun() { return 0; }; set +eE; fun & wait -n $! &> /dev/null; echo $?)
if [[ "$code" -eq 0 ]]; then
PARALLEL_WAIT_METHOD=wait
else
PARALLEL_WAIT_METHOD=sleep
fi
fi
oldCount=${#PARALLEL_JOB_LIST[@]}
parallel::curateJobList
newCount=${#PARALLEL_JOB_LIST[@]}
while [[ "$newCount" -gt 0 && "$newCount" -ge "$oldCount" ]]; do
if [[ "$PARALLEL_WAIT_METHOD" == wait ]]; then
wait -n "${PARALLEL_JOB_LIST[@]}" || :
else
sleep 1
fi
parallel::curateJobList
newCount=${#PARALLEL_JOB_LIST[@]}
done
}