3
3
namespace Upfor \ForkMan ;
4
4
5
5
/**
6
- * ForkMan
6
+ * ForkMan - A lightest process manager
7
7
*/
8
8
class ForkMan
9
9
{
@@ -69,7 +69,7 @@ public function __construct($procNum = 2, $name = '')
69
69
$ name = end ($ name );
70
70
}
71
71
72
- $ this ->name = $ name ;
72
+ $ this ->name = $ name ;
73
73
$ this ->procNum = $ procNum ;
74
74
75
75
if (!empty ($ _SERVER ['argv ' ]) && false !== array_search (static ::$ slaveLabel , $ _SERVER ['argv ' ])) {
@@ -80,7 +80,7 @@ public function __construct($procNum = 2, $name = '')
80
80
/**
81
81
* Execute only in master process
82
82
*
83
- * @param callable $masterHandler
83
+ * @param callable $masterHandler master process callback, which can be call_user_func() execute
84
84
* @return $this
85
85
*/
86
86
public function master (callable $ masterHandler )
@@ -100,7 +100,7 @@ public function master(callable $masterHandler)
100
100
*/
101
101
private function createMaster ($ limit )
102
102
{
103
- !$ this ->slaveCmd && $ this ->slaveCmd = $ this ->getCmd ();
103
+ !$ this ->slaveCmd && $ this ->slaveCmd = $ this ->currentCmd ();
104
104
105
105
@cli_set_process_title ($ this ->name . ': ' . 'master ' );
106
106
@@ -118,11 +118,11 @@ private function createMaster($limit)
118
118
*
119
119
* @return string
120
120
*/
121
- private function getCmd ()
121
+ private function currentCmd ()
122
122
{
123
123
$ prefix = empty ($ this ->prefix ) ? (!empty ($ _SERVER ['_ ' ]) ? realpath ($ _SERVER ['_ ' ]) : '/usr/bin/env php ' ) : $ this ->prefix ;
124
- $ mixed = array_merge ([$ prefix , $ _SERVER ['PHP_SELF ' ]], $ _SERVER ['argv ' ]);
125
- $ mixed = array_filter ($ mixed , function ($ item ) {
124
+ $ mixed = array_merge ([$ prefix , $ _SERVER ['PHP_SELF ' ]], $ _SERVER ['argv ' ]);
125
+ $ mixed = array_filter ($ mixed , function ($ item ) {
126
126
return strpos ($ item , './ ' ) !== 0 ;
127
127
});
128
128
@@ -141,20 +141,21 @@ private function createProcess()
141
141
['pipe ' , 'w ' ], // std output
142
142
['pipe ' , 'w ' ], // std error
143
143
];
144
- $ res = proc_open ($ this ->slaveCmd . ' ' . static ::$ slaveLabel , $ desc , $ pipes , getcwd ());
144
+ $ res = proc_open ($ this ->slaveCmd . ' ' . static ::$ slaveLabel , $ desc , $ pipes , getcwd ());
145
145
146
146
$ status = proc_get_status ($ res );
147
147
if (!isset ($ status ['pid ' ])) {
148
148
$ this ->log ('process create failed ' );
149
+
149
150
return $ this ->createProcess ();
150
151
}
151
152
152
- $ pid = $ status ['pid ' ];
153
+ $ pid = $ status ['pid ' ];
153
154
$ process = [
154
- 'res ' => $ res ,
155
- 'pipes ' => $ pipes ,
156
- 'idle ' => true , // process is idling
157
- 'pid ' => $ pid ,
155
+ 'res ' => $ res ,
156
+ 'pipes ' => $ pipes ,
157
+ 'idle ' => true , // process is idling
158
+ 'pid ' => $ pid ,
158
159
'callback ' => null , // call when the slave process finished
159
160
];
160
161
@@ -163,6 +164,7 @@ private function createProcess()
163
164
stream_set_blocking ($ pipes [2 ], 0 );
164
165
165
166
$ this ->log ('start ' . $ pid );
167
+
166
168
return $ process ;
167
169
}
168
170
@@ -189,7 +191,7 @@ public function log($info)
189
191
/**
190
192
* Execute only in slave process
191
193
*
192
- * @param callable $slaveHandler
194
+ * @param callable $slaveHandler slave process callback, which can be call_user_func() execute
193
195
* @return $this
194
196
*/
195
197
public function slave (callable $ slaveHandler )
@@ -211,7 +213,7 @@ private function createSlave()
211
213
212
214
while (true ) {
213
215
// listen input from master
214
- $ fp = @fopen ('php://stdin ' , 'r ' );
216
+ $ fp = @fopen ('php://stdin ' , 'r ' );
215
217
$ recv = @fread ($ fp , 8 ); // read content length
216
218
$ size = intval (rtrim ($ recv ));
217
219
$ data = @fread ($ fp , $ size );
@@ -239,14 +241,15 @@ private function createSlave()
239
241
public function submit ($ data , $ callback = null )
240
242
{
241
243
if (!$ this ->isSlave ) {
242
- $ process = &$ this ->getAvailableProcess ();
244
+ $ process = &$ this ->getAvailableProcess ();
243
245
$ process ['callback ' ] = $ callback ;
244
- $ data = json_encode ($ data );
245
- $ length = strlen ($ data );
246
- $ length = str_pad ($ length . '' , 8 , ' ' , STR_PAD_RIGHT );
246
+ $ data = json_encode ($ data );
247
+ $ length = strlen ($ data );
248
+ $ length = str_pad ($ length . '' , 8 , ' ' , STR_PAD_RIGHT );
247
249
248
250
// send to slave process, with length and content
249
251
fwrite ($ process ['pipes ' ][0 ], $ length . $ data );
252
+
250
253
return $ process ['pid ' ];
251
254
}
252
255
@@ -265,13 +268,12 @@ private function &getAvailableProcess()
265
268
if (isset ($ this ->procPool [$ index ])) {
266
269
$ this ->procPool [$ index ]['idle ' ] = false ;
267
270
$ this ->idleCount ++;
271
+
268
272
return $ this ->procPool [$ index ];
269
273
}
270
274
// sleep 50 ms
271
275
usleep (50000 );
272
276
}
273
-
274
- return null ;
275
277
}
276
278
277
279
/**
@@ -285,7 +287,7 @@ private function check()
285
287
foreach ($ this ->procPool as $ key => &$ process ) {
286
288
$ this ->checkProcessAlive ($ process );
287
289
if (!$ process ['idle ' ]) {
288
- echo stream_get_contents ($ process ['pipes ' ][2 ]); // std error
290
+ echo stream_get_contents ($ process ['pipes ' ][2 ]); // std error
289
291
$ result = stream_get_contents ($ process ['pipes ' ][1 ]); // std output
290
292
if (!empty ($ result )) {
291
293
$ process ['idle ' ] = true ;
@@ -304,6 +306,7 @@ private function check()
304
306
$ index = $ key ;
305
307
}
306
308
}
309
+
307
310
return $ index ;
308
311
}
309
312
@@ -358,6 +361,7 @@ public function loop($sleep = 0)
358
361
}
359
362
360
363
$ this ->check ();
364
+
361
365
return true ;
362
366
}
363
367
@@ -383,6 +387,7 @@ public function wait($timeout = 0)
383
387
$ killStatus = $ this ->killAllProcess ();
384
388
if ($ killStatus ) {
385
389
$ this ->log ('all slave processes exited( ' . ($ outed ? 'timeout ' : 'idle ' ) . ') ' );
390
+
386
391
return ;
387
392
}
388
393
}
@@ -399,7 +404,7 @@ public function wait($timeout = 0)
399
404
private function killAllProcess ()
400
405
{
401
406
$ killStatus = true ;
402
- foreach ($ this ->procPool as & $ process ) {
407
+ foreach ($ this ->procPool as $ process ) {
403
408
$ status = $ this ->killProcess ($ process );
404
409
if ($ status ) {
405
410
$ this ->log ('kill success: ' . $ process ['pid ' ]);
0 commit comments