From 042cc714b875738a4f224efe8b2c0f8ba9c5b7ef Mon Sep 17 00:00:00 2001 From: Shane Harter Date: Tue, 13 Nov 2012 11:41:18 -0800 Subject: [PATCH 01/12] Update README.md --- README.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 467f536..ccea70c 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,13 @@ Create solid, long-running PHP daemon processes by extending the Core_Daemon cla * A POSIX compatible operating system (Linux, OSX, BSD) * POSIX and PCNTL Extensions for PHP -#### Changes in 2.0: -* Create asynchronous background workers with the new Worker API. See the `PrimeNumbers` example application or the Wiki. -* Hook into a simple callback system using familiar `on(event, callable)` & `off()` syntax. Built-in events give you the ability to hook into application state changes, or make your own to create a simple message bus for your application. -* Create event loops that are not timer-based (build socket-servers, or use blocking system calls with ease) -* Dozens of additional enhancements and bug fixes. +#### [NEW] Version 2.1 Beta Testing +* Version 2.1 is currently being developed in the branch `feature_abstract_ipc`. It's currently at alpha and a formal beta will be announced soon. +* Exciting features in 2.1 include: + * A new Socket Server plugin that lets you create event-driven async servers in a few lines of code. + * A major refactoring of the Worker API includes pluggable IPC classes: Channel worker communication over any popular message queue or stick with the built-in SysV channel. * Improved, simpler worker debug shell with new, powerful commands and easier integration of custom breakpoints into your own worker code. + * Centralize all the process forking, reaping and management code spread around the `Core_Daemon` and `Core_Worker_Mediator` classes into a simple ProcessManager plugin. + * Dozens of other bug fixes and improvements towards simpler, clearer code in the core Daemon and Mediator classes. #### Support & Consulting * Commercial support & consulting is available, including on-site support in the San Francisco Bay Area. From 72c208ef269fd3e41bb76ae025ccb3e54d9453a2 Mon Sep 17 00:00:00 2001 From: Shane Harter Date: Tue, 13 Nov 2012 11:43:04 -0800 Subject: [PATCH 02/12] Update README.md --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ccea70c..4b26313 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,10 @@ Create solid, long-running PHP daemon processes by extending the Core_Daemon cla #### [NEW] Version 2.1 Beta Testing * Version 2.1 is currently being developed in the branch `feature_abstract_ipc`. It's currently at alpha and a formal beta will be announced soon. -* Exciting features in 2.1 include: +* Exciting features in 2.1 include: * A new Socket Server plugin that lets you create event-driven async servers in a few lines of code. - * A major refactoring of the Worker API includes pluggable IPC classes: Channel worker communication over any popular message queue or stick with the built-in SysV channel. * Improved, simpler worker debug shell with new, powerful commands and easier integration of custom breakpoints into your own worker code. + * A major refactoring of the Worker API includes pluggable IPC classes: Channel worker communication over any popular message queue or stick with the built-in SysV channel. + * Improved, simpler worker debug shell with new, powerful commands and easier integration of custom breakpoints into your own worker code. * Centralize all the process forking, reaping and management code spread around the `Core_Daemon` and `Core_Worker_Mediator` classes into a simple ProcessManager plugin. * Dozens of other bug fixes and improvements towards simpler, clearer code in the core Daemon and Mediator classes. From 5cfe5ffddb381ad38158f156496f10e66a860146 Mon Sep 17 00:00:00 2001 From: brunnels Date: Fri, 2 May 2014 05:41:10 -0500 Subject: [PATCH 03/12] Added fix for fatal error during worker cleanup Added running_count method to Mediator --- Core/Worker/Mediator.php | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Core/Worker/Mediator.php b/Core/Worker/Mediator.php index 70e40eb..900b7a5 100644 --- a/Core/Worker/Mediator.php +++ b/Core/Worker/Mediator.php @@ -721,7 +721,9 @@ public function run() { $this->log("Enforcing Timeout on Call $call_id in pid " . $call->pid); - $this->process($call->pid)->kill(); + if($this->process($call->pid) instanceof Core_Lib_Process) { + $this->process($call->pid)->kill(); + } $call->timeout(); unset($this->running_calls[$call_id]); @@ -997,6 +999,14 @@ public function process_count() { return 0; } + /** + * Retrieves the number of worker processes that are currently running + * + * @return number + */ + public function running_count() { + return count($this->running_calls); + } /** * Dump runtime stats in tabular fashion to the log. From 776750a5db500c2104713ebc31a2024138bb1508 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 00:05:19 +0300 Subject: [PATCH 04/12] Move getopt() out of object constructor to allow exceptions in it --- Core/Daemon.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Core/Daemon.php b/Core/Daemon.php index 86f8e7a..005d9a5 100755 --- a/Core/Daemon.php +++ b/Core/Daemon.php @@ -193,6 +193,7 @@ public static function getInstance() try { $o = new static; + $o->getopt(); $o->setup_plugins(); $o->setup_workers(); $o->check_environment(); @@ -249,7 +250,6 @@ protected function __construct() $this->set('filename', $argv[0]); $this->set('start_time', time()); $this->pid(getmypid()); - $this->getopt(); } /** From e7368d283a04a80995abcb9c497b58b95bf07db7 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 00:55:27 +0300 Subject: [PATCH 05/12] Remove Lock plugin TTL capabilities as outdated Fix Memcache lib Refactore Lock plugins code to be more eficient --- Core/Lib/Memcache.php | 24 ++++++++++---------- Core/Lock/File.php | 29 +++++------------------- Core/Lock/Lock.php | 49 +++++++++++++++++++++++------------------ Core/Lock/Memcached.php | 27 +++++++---------------- Core/Lock/Shm.php | 22 +++++------------- 5 files changed, 59 insertions(+), 92 deletions(-) diff --git a/Core/Lib/Memcache.php b/Core/Lib/Memcache.php index af3d8e3..8c93e48 100755 --- a/Core/Lib/Memcache.php +++ b/Core/Lib/Memcache.php @@ -64,7 +64,7 @@ public function ns($namespace = null) * @param integer $timeout_override The retry timeout in seconds * @return mixed The value from Memcache or False */ - public function getWithRetry($key, $flags = false, $timeout_override = false) + public function getWithRetry($key, $timeout_override = false) { if ($timeout_override) $max_tries = intval($timeout_override / 0.10); @@ -76,7 +76,7 @@ public function getWithRetry($key, $flags = false, $timeout_override = false) for ($i=0; $i<$max_tries; $i++) { - $value = $this->get($key, $flags); + $value = $this->get($key); if(false == empty($value)) return $value; @@ -94,7 +94,7 @@ public function getWithRetry($key, $flags = false, $timeout_override = false) * @param integer $expire * @return boolean */ - public function set($key, $var, $flags = null, $expire = null) + public function set($key, $var, $expire = 0) { if ($this->auto_retry) $max_tries = intval($this->auto_retry_timeout / 0.10); @@ -106,7 +106,7 @@ public function set($key, $var, $flags = null, $expire = null) for ($i=0; $i<$max_tries; $i++) { - if(parent::set($this->key($key), $var, $flags, $expire)) + if(parent::set($this->key($key), $var, $expire)) return true; usleep(100000); @@ -121,19 +121,19 @@ public function set($key, $var, $flags = null, $expire = null) * @param string $flags * @return mixed */ - public function get($key, $flags = null) + public function get($key) { - return parent::get($this->key($key), $flags); + return parent::get($this->key($key)); } - public function decrement($key, $value = 1) + public function decrement($key, $offset = 1) { - return parent::decrement($this->key($key), $value); + return parent::decrement($this->key($key), $offset); } - public function increment($key, $value = 1) + public function increment($key, $offset = 1) { - return parent::increment($this->key($key), $value); + return parent::increment($this->key($key), $offset); } public function delete($key) @@ -141,9 +141,9 @@ public function delete($key) return parent::delete($this->key($key)); } - public function replace($key, $var, $flags = null, $expire = null) + public function replace($key, $var, $expire = 0) { - return parent::replace($this->key($key), $var, $flags, $expire); + return parent::replace($this->key($key), $var, $expire); } /** diff --git a/Core/Lock/File.php b/Core/Lock/File.php index 61ffb24..0d1c1f0 100755 --- a/Core/Lock/File.php +++ b/Core/Lock/File.php @@ -20,7 +20,7 @@ class Core_Lock_File extends Core_Lock_Lock implements Core_IPlugin protected $filename; - public function __construct(Core_Daemon $daemon, Array $args = array()) + public function __construct(Core_Daemon $daemon, array $args = array()) { parent::__construct($daemon, $args); if (isset($args['path'])) @@ -40,13 +40,13 @@ public function setup() public function teardown() { // If the lockfile was set by this process, remove it. If filename is empty, this is being called before setup() - if (!empty($this->filename) && $this->pid == @file_get_contents($this->filename)) + if (!empty($this->filename) && $this->pid == $this->get()) @unlink($this->filename); } - public function check_environment(Array $errors = array()) + public function check_environment(array $errors = array()) { - if (is_writable($this->path) == false) + if (!is_writable($this->path)) $errors[] = 'Lock File Path ' . $this->path . ' Not Writable.'; return $errors; @@ -54,11 +54,6 @@ public function check_environment(Array $errors = array()) public function set() { - $lock = $this->check(); - - if ($lock) - throw new Exception('Core_Lock_File::set Failed. Additional Lock Detected. PID: ' . $lock); - // The lock value will contain the process PID file_put_contents($this->filename, $this->pid); @@ -67,25 +62,11 @@ public function set() protected function get() { - if (file_exists($this->filename) == false) + if (!file_exists($this->filename)) return false; $lock = file_get_contents($this->filename); - // If we're seeing our own lock.. - if ($lock == $this->pid) - return false; - - // If the process that wrote the lock is no longer running - $cmd_output = `ps -p $lock`; - if (strpos($cmd_output, $lock) === false) - return false; - - // If the lock is expired - clearstatcache(); - if ((filemtime($this->filename) + $this->ttl + Core_Lock_Lock::$LOCK_TTL_PADDING_SECONDS) < time()) - return false; - return $lock; } } \ No newline at end of file diff --git a/Core/Lock/Lock.php b/Core/Lock/Lock.php index 2b6d649..10b8e8e 100755 --- a/Core/Lock/Lock.php +++ b/Core/Lock/Lock.php @@ -8,7 +8,6 @@ */ abstract class Core_Lock_Lock implements Core_IPlugin { - public static $LOCK_TTL_PADDING_SECONDS = 2.0; public static $LOCK_UNIQUE_ID = 'daemon_lock'; /** @@ -24,30 +23,20 @@ abstract class Core_Lock_Lock implements Core_IPlugin */ public $daemon_name; - /** - * This is added to the const LOCK_TTL_SECONDS to determine how long the lock should last -- any lock provider should be - * self-expiring using these TTL's. This is done to minimize likelihood of errant locks being left behind after a kill or crash that - * would have to be manually removed. - * - * @var float Number of seconds the lock should be active -- padded with Core_Lock_Lock::LOCK_TTL_PADDING_SECONDS - */ - public $ttl = 0; - /** * The array of args passed-in at instantiation * @var Array */ protected $args = array(); - public function __construct(Core_Daemon $daemon, Array $args = array()) + public function __construct(Core_Daemon $daemon, array $args = array()) { $this->pid = getmypid(); $this->daemon_name = get_class($daemon); - $this->ttl = $daemon->loop_interval(); $this->args = $args; - $daemon->on(Core_Daemon::ON_INIT, array($this, 'set')); - $daemon->on(Core_Daemon::ON_PREEXECUTE, array($this, 'set')); + $daemon->on(Core_Daemon::ON_INIT, array($this, 'check')); + $daemon->on(Core_Daemon::ON_PREEXECUTE, array($this, 'check')); $that = $this; $daemon->on(Core_Daemon::ON_PIDCHANGE, function ($args) use ($that) { @@ -76,18 +65,36 @@ abstract protected function get(); /** * Check for the existence of a lock. - * Cache results of get() check for 1/10 a second. * * @return bool|int Either false or the PID of the process that has set the lock */ - public function check() + protected function exists() { - static $get = false; - static $get_time = false; + $pid = $this->get(); - $get = $this->get(); - $get_time = microtime(true); + // pid should be a positive number + if (!$pid) + return false; + + // If we're seeing our own lock.. + if ($pid == $this->pid) + return false; - return $get; + // If the process that wrote the lock is no longer running + $cmd_output = `ps -p $pid`; + if (strpos($cmd_output, $pid) === false) + return false; + + return $pid; + } + + + public function check() + { + $lock = $this->exists(); + if ($lock) + throw new Exception(get_class($this) . '::' . __FUNCTION__ . ' failed. Existing lock detected from PID: ' . $lock); + + $this->set(); } } \ No newline at end of file diff --git a/Core/Lock/Memcached.php b/Core/Lock/Memcached.php index 81ace0e..c136fc8 100755 --- a/Core/Lock/Memcached.php +++ b/Core/Lock/Memcached.php @@ -40,8 +40,7 @@ public function setup() public function teardown() { // If this PID set this lock, release it - $lock = $this->memcache->get(Core_Lock_Lock::$LOCK_UNIQUE_ID); - if ($lock == $this->pid) + if ($this->get() == $this->pid) $this->memcache->delete(Core_Lock_Lock::$LOCK_UNIQUE_ID); } @@ -51,35 +50,25 @@ public function check_environment(Array $errors = array()) if (false == (is_array($this->memcache_servers) && count($this->memcache_servers))) $errors[] = 'Memcache Plugin: Memcache Servers Are Not Set'; - - if (false == class_exists('Core_Memcache')) - $errors[] = 'Memcache Plugin: Dependant Class "Core_Memcache" Is Not Loaded'; - - if (false == class_exists('Memcached')) - $errors[] = 'Memcache Plugin: PHP Memcached Extension Is Not Loaded'; + + if (false == class_exists('Memcached')) + $errors[] = 'Memcache Plugin: PHP Memcached Extension Is Not Loaded'; + + if (false == class_exists('Core_Lib_Memcache')) + $errors[] = 'Memcache Plugin: Dependant Class "Core_Lib_Memcache" Is Not Loaded'; return $errors; } public function set() { - $lock = $this->check(); - if ($lock) - throw new Exception('Core_Lock_Memcached::set Failed. Existing Lock Detected from PID ' . $lock); - - $timeout = Core_Lock_Lock::$LOCK_TTL_PADDING_SECONDS + $this->ttl; - $this->memcache->set(Core_Lock_Lock::$LOCK_UNIQUE_ID, $this->pid, false, $timeout); + $this->memcache->set(Core_Lock_Lock::$LOCK_UNIQUE_ID, $this->pid); } protected function get() { $lock = $this->memcache->get(Core_Lock_Lock::$LOCK_UNIQUE_ID); - - // Ensure we're not seeing our own lock - if ($lock == $this->pid) - return false; - // If We're here, there's another lock... return the pid.. return $lock; } } \ No newline at end of file diff --git a/Core/Lock/Shm.php b/Core/Lock/Shm.php index bdc8ed5..981c26d 100755 --- a/Core/Lock/Shm.php +++ b/Core/Lock/Shm.php @@ -27,8 +27,7 @@ public function setup() public function teardown() { // If this PID set this lock, release it - $lock = shm_get_var($this->shm, self::ADDRESS); - if ($lock == $this->pid) { + if ($this->get() == $this->pid) { shm_remove($this->shm); shm_detach($this->shm); } @@ -42,25 +41,16 @@ public function check_environment(Array $errors = array()) public function set() { - $lock = $this->check(); - if ($lock) - throw new Exception('Core_Lock_Shm::set Failed. Existing Lock Detected from PID ' . $lock); - - shm_put_var($this->shm, self::ADDRESS, array('pid' => $this->pid, 'time' => time())); + shm_put_var($this->shm, self::ADDRESS, $this->pid); } protected function get() { + if (!shm_has_var($this->shm)) + return false; + $lock = shm_get_var($this->shm, self::ADDRESS); - - // Ensure we're not seeing our own lock - if ($lock['pid'] == $this->pid) - return false; - - // If it's expired... - if ($lock['time'] + $this->ttl + Core_Lock_Lock::$LOCK_TTL_PADDING_SECONDS >= time()) - return $lock; - return false; + return $lock; } } \ No newline at end of file From 5b52e987958f973f16cd094261bd529262b7a4a5 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 01:04:14 +0300 Subject: [PATCH 06/12] resolve conflict --- README.md | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/README.md b/README.md index e69e97f..9061db9 100644 --- a/README.md +++ b/README.md @@ -14,18 +14,11 @@ Create solid, long-running PHP daemon processes by extending the Core_Daemon cla * Namespace the code * Investigate updating the minimum version to PHP5.4. * More Details Here https://github.com/shaneharter/PHP-Daemon/wiki/Version-2.2 - -<<<<<<< HEAD -#### [NEW] Version 2.1 Beta Testing -* Version 2.1 is currently being developed in the branch `feature_abstract_ipc`. It's currently at alpha and a formal beta will be announced soon. -* Exciting features in 2.1 include: - * A new Socket Server plugin that lets you create event-driven async servers in a few lines of code. -======= + #### [NEW] Version 2.1 Released! * After a 9 month beta period, the code that was in the branch `feature_abstract_ipc` has been merged into master as v2.1 * Version 2.0 available in the v2.0 tag. * Exciting features in 2.1 include: ->>>>>>> 13c05c51beed2fd9609e030ef07d01f4f88db693 * A major refactoring of the Worker API includes pluggable IPC classes: Channel worker communication over any popular message queue or stick with the built-in SysV channel. * Improved, simpler worker debug shell with new, powerful commands and easier integration of custom breakpoints into your own worker code. * Centralize all the process forking, reaping and management code spread around the `Core_Daemon` and `Core_Worker_Mediator` classes into a simple ProcessManager plugin. From 23bb3757bdf36b6dd5aea4094f1379d1f14ce1a6 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 01:16:38 +0300 Subject: [PATCH 07/12] Fixed initialization --- Core/Lock/Memcached.php | 11 +++-------- Core/Lock/Shm.php | 5 ----- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/Core/Lock/Memcached.php b/Core/Lock/Memcached.php index c136fc8..0f32c0e 100755 --- a/Core/Lock/Memcached.php +++ b/Core/Lock/Memcached.php @@ -18,11 +18,6 @@ class Core_Lock_Memcached extends Core_Lock_Lock implements Core_IPlugin */ public $memcache_servers = array(); - public function __construct() - { - $this->pid = getmypid(); - } - public function setup() { // Connect to memcache @@ -48,13 +43,13 @@ public function check_environment(Array $errors = array()) { $errors = array(); - if (false == (is_array($this->memcache_servers) && count($this->memcache_servers))) + if (!(is_array($this->memcache_servers) && count($this->memcache_servers))) $errors[] = 'Memcache Plugin: Memcache Servers Are Not Set'; - if (false == class_exists('Memcached')) + if (!class_exists('Memcached')) $errors[] = 'Memcache Plugin: PHP Memcached Extension Is Not Loaded'; - if (false == class_exists('Core_Lib_Memcache')) + if (!class_exists('Core_Lib_Memcache')) $errors[] = 'Memcache Plugin: Dependant Class "Core_Lib_Memcache" Is Not Loaded'; return $errors; diff --git a/Core/Lock/Shm.php b/Core/Lock/Shm.php index 981c26d..d2ecb3b 100755 --- a/Core/Lock/Shm.php +++ b/Core/Lock/Shm.php @@ -12,11 +12,6 @@ class Core_Lock_Shm extends Core_Lock_Lock implements Core_IPlugin * @var Resource */ private $shm = false; - - public function __construct() - { - $this->pid = getmypid(); - } public function setup() { From 7e24a90bb79134ceb19beb3262f53170529bed59 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 10:41:32 +0300 Subject: [PATCH 08/12] Method visibility and compatibility fixes --- Core/Lock/File.php | 5 +---- Core/Lock/Lock.php | 17 ++++++++++------- Core/Lock/Memcached.php | 2 +- Core/Lock/Shm.php | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Core/Lock/File.php b/Core/Lock/File.php index 0d1c1f0..4d53642 100755 --- a/Core/Lock/File.php +++ b/Core/Lock/File.php @@ -52,12 +52,9 @@ public function check_environment(array $errors = array()) return $errors; } - public function set() + protected function set() { - // The lock value will contain the process PID file_put_contents($this->filename, $this->pid); - - touch($this->filename); } protected function get() diff --git a/Core/Lock/Lock.php b/Core/Lock/Lock.php index 10b8e8e..d6ad4f5 100755 --- a/Core/Lock/Lock.php +++ b/Core/Lock/Lock.php @@ -35,8 +35,7 @@ public function __construct(Core_Daemon $daemon, array $args = array()) $this->daemon_name = get_class($daemon); $this->args = $args; - $daemon->on(Core_Daemon::ON_INIT, array($this, 'check')); - $daemon->on(Core_Daemon::ON_PREEXECUTE, array($this, 'check')); + $daemon->on(Core_Daemon::ON_INIT, array($this, 'run')); $that = $this; $daemon->on(Core_Daemon::ON_PIDCHANGE, function ($args) use ($that) { @@ -50,7 +49,7 @@ public function __construct(Core_Daemon $daemon, array $args = array()) * @abstract * @return void */ - abstract public function set(); + abstract protected function set(); /** * Read the lock from whatever shared medium it's written to. @@ -68,7 +67,7 @@ abstract protected function get(); * * @return bool|int Either false or the PID of the process that has set the lock */ - protected function exists() + protected function check() { $pid = $this->get(); @@ -88,10 +87,14 @@ protected function exists() return $pid; } - - public function check() + /** + * Implements main plugin logic - die if lock exists or create it otherwise. + * + * @return null + */ + public function run() { - $lock = $this->exists(); + $lock = $this->check(); if ($lock) throw new Exception(get_class($this) . '::' . __FUNCTION__ . ' failed. Existing lock detected from PID: ' . $lock); diff --git a/Core/Lock/Memcached.php b/Core/Lock/Memcached.php index 0f32c0e..d57dd65 100755 --- a/Core/Lock/Memcached.php +++ b/Core/Lock/Memcached.php @@ -55,7 +55,7 @@ public function check_environment(Array $errors = array()) return $errors; } - public function set() + protected function set() { $this->memcache->set(Core_Lock_Lock::$LOCK_UNIQUE_ID, $this->pid); } diff --git a/Core/Lock/Shm.php b/Core/Lock/Shm.php index d2ecb3b..30dcb93 100755 --- a/Core/Lock/Shm.php +++ b/Core/Lock/Shm.php @@ -34,14 +34,14 @@ public function check_environment(Array $errors = array()) return $errors; } - public function set() + protected function set() { shm_put_var($this->shm, self::ADDRESS, $this->pid); } protected function get() { - if (!shm_has_var($this->shm)) + if (!shm_has_var($this->shm, self::ADDRESS)) return false; $lock = shm_get_var($this->shm, self::ADDRESS); From b95cb21ea1aa9172660ff10d7c03125f17b0f564 Mon Sep 17 00:00:00 2001 From: soulhunter1987 Date: Sun, 4 May 2014 10:51:46 +0300 Subject: [PATCH 09/12] Change unique id to use standard value (pid) --- Core/Lock/Lock.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Core/Lock/Lock.php b/Core/Lock/Lock.php index d6ad4f5..7b6fef7 100755 --- a/Core/Lock/Lock.php +++ b/Core/Lock/Lock.php @@ -8,7 +8,7 @@ */ abstract class Core_Lock_Lock implements Core_IPlugin { - public static $LOCK_UNIQUE_ID = 'daemon_lock'; + public static $LOCK_UNIQUE_ID = 'pid'; /** * The pid of the current daemon -- Set automatically by the constructor. From 3519f0cc4cfa1d4d90b8416c31a5cba0ef82c0b6 Mon Sep 17 00:00:00 2001 From: brunnels Date: Sat, 17 May 2014 06:43:08 -0500 Subject: [PATCH 10/12] Renamed all instances of memcache to memcached to remove any ambiguity or confusion about which php module is being utilized --- Core/Daemon.php | 2 +- Core/Lib/Memcache.php | 181 ---------------------------------------- Core/Lib/Memcached.php | 181 ++++++++++++++++++++++++++++++++++++++++ Core/Lock/Memcached.php | 102 +++++++++++----------- 4 files changed, 233 insertions(+), 233 deletions(-) delete mode 100755 Core/Lib/Memcache.php create mode 100755 Core/Lib/Memcached.php diff --git a/Core/Daemon.php b/Core/Daemon.php index 0d1b30e..e081427 100755 --- a/Core/Daemon.php +++ b/Core/Daemon.php @@ -1000,7 +1000,7 @@ protected function plugin($alias, Core_IPlugin $instance = null) * * @param String $alias The name of the worker -- Will be instantiated at $this->{$alias} * @param callable|Core_IWorker $worker An object of type Core_Worker OR a callable (function, callback, closure) - * @param Core_IWorkerVia $via A Core_IWorkerVia object that defines the medium for IPC (In theory could be any message queue, redis, memcache, etc) + * @param Core_IWorkerVia $via A Core_IWorkerVia object that defines the medium for IPC (In theory could be any message queue, redis, memcached, etc) * @return Core_Worker_ObjectMediator Returns a Core_Worker class that can be used to interact with the Worker * @todo Use 'callable' type hinting if/when we move to a php 5.4 requirement. */ diff --git a/Core/Lib/Memcache.php b/Core/Lib/Memcache.php deleted file mode 100755 index 8c93e48..0000000 --- a/Core/Lib/Memcache.php +++ /dev/null @@ -1,181 +0,0 @@ -auto_retry_timeout = max(0.10, $auto_retry_timeout); - $this->auto_retry = true; - return true; - } - - return false; - } - - /** - * Set a namespace that will be used on every set/get. Had to use abbreviation because 'namespace' is a reserved keyword. - * @param string $namespace Optional. If provided, it will set the namespace. - * @return boolean Returns true on Success - */ - public function ns($namespace = null) - { - if ($namespace !== null) - if (is_scalar($namespace)) - $this->namespace = $namespace; - - return $this->namespace; - } - - /** - * Return a value from Memcache with Retry functionality if a value is not returned. - * @param string|array $key - * @param string $flags - * @param integer $timeout_override The retry timeout in seconds - * @return mixed The value from Memcache or False - */ - public function getWithRetry($key, $timeout_override = false) - { - if ($timeout_override) - $max_tries = intval($timeout_override / 0.10); - else - $max_tries = intval($this->auto_retry_timeout / 0.10); - - if ($max_tries < 1) - $max_tries = 1; - - for ($i=0; $i<$max_tries; $i++) - { - $value = $this->get($key); - if(false == empty($value)) - return $value; - - usleep(100000); - } - - return $value; - } - - /** - * Set a value in Memcache with optional built-in Auto-Retry functionity - * @param string $key - * @param string $var - * @param string $flags - * @param integer $expire - * @return boolean - */ - public function set($key, $var, $expire = 0) - { - if ($this->auto_retry) - $max_tries = intval($this->auto_retry_timeout / 0.10); - else - $max_tries = 1; - - if ($max_tries < 1) - $max_tries = 1; - - for ($i=0; $i<$max_tries; $i++) - { - if(parent::set($this->key($key), $var, $expire)) - return true; - - usleep(100000); - } - - return false; - } - - /** - * Return a key or keys from Memcache - * @param string|array $key - * @param string $flags - * @return mixed - */ - public function get($key) - { - return parent::get($this->key($key)); - } - - public function decrement($key, $offset = 1) - { - return parent::decrement($this->key($key), $offset); - } - - public function increment($key, $offset = 1) - { - return parent::increment($this->key($key), $offset); - } - - public function delete($key) - { - return parent::delete($this->key($key)); - } - - public function replace($key, $var, $expire = 0) - { - return parent::replace($this->key($key), $var, $expire); - } - - /** - * Return the fully-qualified $key including namespace - * @param string|Array $key - * @return mixed - */ - public function key($key) - { - if (is_array($key)) - foreach($key as &$row) - $row = $this->key($key); - else - if (empty($this->namespace) == false) - $key = "{$this->namespace}_{$key}"; - - return $key; - } - - /** - * Connect to an array of Memcache servers - * @param array $connections - * @return Boolean Returns true only if all connections were made. - */ - public function connect_all(array $connections) - { - $connection_count = 0; - foreach ($connections as $connection) - if (is_array($connection) && isset($connection['host']) && isset($connection['port'])) - if ($this->addServer($connection['host'], $connection['port']) == true) - $connection_count++; - - return (count($connections) == $connection_count); - } -} \ No newline at end of file diff --git a/Core/Lib/Memcached.php b/Core/Lib/Memcached.php new file mode 100755 index 0000000..0432f37 --- /dev/null +++ b/Core/Lib/Memcached.php @@ -0,0 +1,181 @@ +auto_retry_timeout = max(0.10, $auto_retry_timeout); + $this->auto_retry = true; + return true; + } + + return false; + } + + /** + * Set a namespace that will be used on every set/get. Had to use abbreviation because 'namespace' is a reserved keyword. + * @param string $namespace Optional. If provided, it will set the namespace. + * @return boolean Returns true on Success + */ + public function ns($namespace = null) + { + if ($namespace !== null) + if (is_scalar($namespace)) + $this->namespace = $namespace; + + return $this->namespace; + } + + /** + * Return a value from memcached with Retry functionality if a value is not returned. + * @param string|array $key + * @param string $flags + * @param integer $timeout_override The retry timeout in seconds + * @return mixed The value from memcached or False + */ + public function getWithRetry($key, $timeout_override = false) + { + if ($timeout_override) + $max_tries = intval($timeout_override / 0.10); + else + $max_tries = intval($this->auto_retry_timeout / 0.10); + + if ($max_tries < 1) + $max_tries = 1; + + for ($i=0; $i<$max_tries; $i++) + { + $value = $this->get($key); + if(false == empty($value)) + return $value; + + usleep(100000); + } + + return $value; + } + + /** + * Set a value in memcached with optional built-in Auto-Retry functionity + * @param string $key + * @param string $var + * @param string $flags + * @param integer $expire + * @return boolean + */ + public function set($key, $var, $expire = 0) + { + if ($this->auto_retry) + $max_tries = intval($this->auto_retry_timeout / 0.10); + else + $max_tries = 1; + + if ($max_tries < 1) + $max_tries = 1; + + for ($i=0; $i<$max_tries; $i++) + { + if(parent::set($this->key($key), $var, $expire)) + return true; + + usleep(100000); + } + + return false; + } + + /** + * Return a key or keys from memcached + * @param string|array $key + * @param string $flags + * @return mixed + */ + public function get($key) + { + return parent::get($this->key($key)); + } + + public function decrement($key, $offset = 1) + { + return parent::decrement($this->key($key), $offset); + } + + public function increment($key, $offset = 1) + { + return parent::increment($this->key($key), $offset); + } + + public function delete($key) + { + return parent::delete($this->key($key)); + } + + public function replace($key, $var, $expire = 0) + { + return parent::replace($this->key($key), $var, $expire); + } + + /** + * Return the fully-qualified $key including namespace + * @param string|Array $key + * @return mixed + */ + public function key($key) + { + if (is_array($key)) + foreach($key as &$row) + $row = $this->key($key); + else + if (empty($this->namespace) == false) + $key = "{$this->namespace}_{$key}"; + + return $key; + } + + /** + * Connect to an array of memcached servers + * @param array $connections + * @return Boolean Returns true only if all connections were made. + */ + public function connect_all(array $connections) + { + $connection_count = 0; + foreach ($connections as $connection) + if (is_array($connection) && isset($connection['host']) && isset($connection['port'])) + if ($this->addServer($connection['host'], $connection['port']) == true) + $connection_count++; + + return (count($connections) == $connection_count); + } +} \ No newline at end of file diff --git a/Core/Lock/Memcached.php b/Core/Lock/Memcached.php index d57dd65..641904f 100755 --- a/Core/Lock/Memcached.php +++ b/Core/Lock/Memcached.php @@ -2,68 +2,68 @@ /** * Use a Memcached key. The value will be the PID and Memcached ttl will be used to implement lock expiration. - * + * * @author Shane Harter * @since 2011-07-28 */ class Core_Lock_Memcached extends Core_Lock_Lock implements Core_IPlugin { /** - * @var Core_Memcache + * @var Core_Lib_Memcached */ - private $memcache = false; + private $memcached = false; /** * @var array */ - public $memcache_servers = array(); - - public function setup() - { - // Connect to memcache - $this->memcache = new Core_Lib_Memcache(); - $this->memcache->ns($this->daemon_name); - - // We want to use the auto-retry feature built into our memcache wrapper. This will ensure that the occasional blocking operation on - // the memcache server doesn't crash the daemon. It'll retry every 1/10 of a second until it hits its limit. We're giving it a 1 second limit. - $this->memcache->auto_retry(1); - - if ($this->memcache->connect_all($this->memcache_servers) === false) - throw new Exception('Core_Lock_Memcached::setup failed: Memcached Connection Failed'); - } - - public function teardown() - { - // If this PID set this lock, release it - if ($this->get() == $this->pid) - $this->memcache->delete(Core_Lock_Lock::$LOCK_UNIQUE_ID); - } - - public function check_environment(Array $errors = array()) - { - $errors = array(); - - if (!(is_array($this->memcache_servers) && count($this->memcache_servers))) - $errors[] = 'Memcache Plugin: Memcache Servers Are Not Set'; - + public $memcached_servers = array(); + + public function setup() + { + // Connect to memcached + $this->memcached = new Core_Lib_Memcached(); + $this->memcached->ns($this->daemon_name); + + // We want to use the auto-retry feature built into our memcached wrapper. This will ensure that the occasional blocking operation on + // the memcached server doesn't crash the daemon. It'll retry every 1/10 of a second until it hits its limit. We're giving it a 1 second limit. + $this->memcached->auto_retry(1); + + if ($this->memcached->connect_all($this->memcached_servers) === false) + throw new Exception('Core_Lock_Memcached::setup failed: Memcached Connection Failed'); + } + + public function teardown() + { + // If this PID set this lock, release it + if ($this->get() == $this->pid) + $this->memcached->delete(Core_Lock_Lock::$LOCK_UNIQUE_ID); + } + + public function check_environment(Array $errors = array()) + { + $errors = array(); + + if (!(is_array($this->memcached_servers) && count($this->memcached_servers))) + $errors[] = 'Memcached Plugin: Memcached Servers Are Not Set'; + if (!class_exists('Memcached')) - $errors[] = 'Memcache Plugin: PHP Memcached Extension Is Not Loaded'; - - if (!class_exists('Core_Lib_Memcache')) - $errors[] = 'Memcache Plugin: Dependant Class "Core_Lib_Memcache" Is Not Loaded'; + $errors[] = 'Memcached Plugin: PHP Memcached Extension Is Not Loaded'; + + if (!class_exists('Core_Lib_Memcached')) + $errors[] = 'Memcached Plugin: Dependant Class "Core_Lib_Memcached" Is Not Loaded'; + + return $errors; + } + + protected function set() + { + $this->memcached->set(Core_Lock_Lock::$LOCK_UNIQUE_ID, $this->pid); + } + + protected function get() + { + $lock = $this->memcached->get(Core_Lock_Lock::$LOCK_UNIQUE_ID); - return $errors; - } - - protected function set() - { - $this->memcache->set(Core_Lock_Lock::$LOCK_UNIQUE_ID, $this->pid); - } - - protected function get() - { - $lock = $this->memcache->get(Core_Lock_Lock::$LOCK_UNIQUE_ID); - - return $lock; - } + return $lock; + } } \ No newline at end of file From d34f753c63e0091a1d2dbf3b1e48534b2cd24c7d Mon Sep 17 00:00:00 2001 From: brunnels Date: Sat, 17 May 2014 10:19:46 -0500 Subject: [PATCH 11/12] Fixed issues with workers shm and queue cleanup Fixed issue with Mediator reap() forking during daemon shutdown Added restart env to make it easier to restart via signal Changed daemon to always recover_workers on restart Cleaned up some residual comments and added some comments for IDE code completion to IWorker --- Core/Daemon.php | 33 ++++++++++++++++++++++++++++----- Core/IWorker.php | 5 +++++ Core/Lib/Memcached.php | 3 --- Core/Lib/Process.php | 13 +++++++++---- Core/Plugin/ProcessManager.php | 26 ++++++++++++++++++-------- Core/Worker/Mediator.php | 3 ++- Core/Worker/Via/SysV.php | 8 +++++++- 7 files changed, 69 insertions(+), 22 deletions(-) diff --git a/Core/Daemon.php b/Core/Daemon.php index e081427..6cc28b7 100755 --- a/Core/Daemon.php +++ b/Core/Daemon.php @@ -118,6 +118,7 @@ abstract class Core_Daemon */ private static $env = array( 'parent' => true, + 'restart' => false ); @@ -337,9 +338,27 @@ public function __destruct() { $this->set('shutdown', true); $this->dispatch(array(self::ON_SHUTDOWN)); - foreach(array_merge($this->workers, $this->plugins) as $object) { - $this->{$object}->teardown(); - unset($this->{$object}); + + foreach($this->workers as $key => $worker) { + if(!property_exists($this, $worker)) { + unset($this->workers[$key]); + continue; + } + + $this->$worker->teardown(); + unset($this->$worker); + unset($this->workers[$key]); + } + + foreach($this->plugins as $key => $plugin) { + if(!property_exists($this, $plugin)) { + unset($this->plugins[$key]); + continue; + } + + $this->$plugin->teardown(); + unset($this->$plugin); + unset($this->plugins[$key]); } } catch (Exception $e) @@ -890,7 +909,9 @@ private function auto_restart() if (!$this->is('parent') || !$this->is('daemonized')) return; - if ($this->runtime() < $this->auto_restart_interval || $this->auto_restart_interval < self::MIN_RESTART_SECONDS) + if (($this->runtime() < $this->auto_restart_interval || + $this->auto_restart_interval < self::MIN_RESTART_SECONDS) && + !$this->get('restart')) return false; $this->restart(); @@ -906,6 +927,8 @@ public function restart() if (!$this->is('parent') || !$this->is('daemonized')) return; + // recover workers when restarting + $this->set('recover_workers', true); $this->set('shutdown', true); $this->log('Restart Happening Now...'); @@ -918,7 +941,7 @@ public function restart() if (is_resource(STDOUT)) fclose(STDOUT); if (is_resource(STDERR)) fclose(STDERR); if (is_resource(STDIN)) fclose(STDIN); - exec($this->command()); + exec($this->command() . ' --recover_workers'); // A new daemon process has been created. This one will stick around just long enough to clean up the worker processes. exit(); diff --git a/Core/IWorker.php b/Core/IWorker.php index 13b2d6a..d966c32 100755 --- a/Core/IWorker.php +++ b/Core/IWorker.php @@ -7,6 +7,11 @@ * * You can use the Core_Daemon::on(ON_FORK) method to provide universal setup code that is run after every fork and * in every worker. The setup() method defined here can be used if you want specific setup code run in this forked process. + * + * @method boolean is_idle() Does the worker have at least one idle process? + * @method integer status(integer $call_id) Determine the status of a given call. Call ID's are returned when a job is called. + * @method number running_count() Retrieves the number of worker processes that are currently running + * */ interface Core_IWorker diff --git a/Core/Lib/Memcached.php b/Core/Lib/Memcached.php index 0432f37..e3ba9a5 100755 --- a/Core/Lib/Memcached.php +++ b/Core/Lib/Memcached.php @@ -60,7 +60,6 @@ public function ns($namespace = null) /** * Return a value from memcached with Retry functionality if a value is not returned. * @param string|array $key - * @param string $flags * @param integer $timeout_override The retry timeout in seconds * @return mixed The value from memcached or False */ @@ -90,7 +89,6 @@ public function getWithRetry($key, $timeout_override = false) * Set a value in memcached with optional built-in Auto-Retry functionity * @param string $key * @param string $var - * @param string $flags * @param integer $expire * @return boolean */ @@ -118,7 +116,6 @@ public function set($key, $var, $expire = 0) /** * Return a key or keys from memcached * @param string|array $key - * @param string $flags * @return mixed */ public function get($key) diff --git a/Core/Lib/Process.php b/Core/Lib/Process.php index b6bf27f..af0b146 100644 --- a/Core/Lib/Process.php +++ b/Core/Lib/Process.php @@ -34,7 +34,7 @@ public function timeout() { /** * Stop the process, using whatever means necessary, and possibly return a textual description - * @return bool|string + * @return int|array */ public function stop() { @@ -44,14 +44,19 @@ public function stop() { } if (time() > $this->stop_time + $this->timeout()) { - $this->kill(); - return "Worker Process '{$this->pid}' Shutdown Timeout: Killing..."; + return array('pid' => $this->pid, 'status' => $this->kill()); } - return null; + return false; } + /** + * + * @return int status from pcntl_waitpid + */ public function kill() { @posix_kill($this->pid, SIGKILL); + pcntl_waitpid($this->pid, $status, WNOHANG); + return $status; } } diff --git a/Core/Plugin/ProcessManager.php b/Core/Plugin/ProcessManager.php index dd360bd..f879598 100644 --- a/Core/Plugin/ProcessManager.php +++ b/Core/Plugin/ProcessManager.php @@ -23,7 +23,7 @@ class Core_Plugin_ProcessManager implements Core_IPlugin public $daemon; /** - * @var Core_Lib_Process[] + * @var [Core_Lib_Process] */ public $processes = array(); @@ -61,9 +61,14 @@ public function teardown() { return; while($this->count() > 0) { - foreach($this->processes() as $pid => $process) - if ($message = $process->stop()) - $this->daemon->log($message); + foreach($this->processes() as $pid => $process) { + if ($message = $process->stop()) { + // process got the SIGKILL treatment so pcntl_wait might not see it + // cleanup here instead of trying to reap + $this->daemon->log(sprintf("Worker Process '%s' Shutdown Timeout: Killing...", $message['pid'])); + $this->removePid($message['pid'], $message['status']); + } + } $this->reap(false); usleep(250000); @@ -188,10 +193,7 @@ public function reap($block = false) { if (!$pid || !isset($map[$pid])) break; - $alias = $map[$pid]->group; - $process = $this->processes[$alias][$pid]; - $this->daemon->dispatch(array(Core_Daemon::ON_REAP), array($process, $status)); - unset($this->processes[$alias][$pid]); + $this->removePid($pid, $status); // Keep track of process churn -- failures within a processes min_ttl // If too many failures of new processes occur inside a given interval, that's a problem. @@ -208,4 +210,12 @@ public function reap($block = false) { } } + private function removePid($pid, $status) { + $map = $this->processes(); + $alias = $map[$pid]->group; + $process = $this->processes[$alias][$pid]; + $this->daemon->dispatch(array(Core_Daemon::ON_REAP), array($process, $status)); + unset($this->processes[$alias][$pid]); + } + } diff --git a/Core/Worker/Mediator.php b/Core/Worker/Mediator.php index 0863265..e3464de 100644 --- a/Core/Worker/Mediator.php +++ b/Core/Worker/Mediator.php @@ -947,7 +947,8 @@ public function reap(Core_Lib_Process $process, $status) { break; } - $this->fork(); + if(!$this->daemon->get('shutdown')) + $this->fork(); } diff --git a/Core/Worker/Via/SysV.php b/Core/Worker/Via/SysV.php index 7d607f0..fe93ad8 100644 --- a/Core/Worker/Via/SysV.php +++ b/Core/Worker/Via/SysV.php @@ -55,8 +55,14 @@ public function __construct($malloc = null) { } public function __destruct() { + if(!$this->mediator->daemon->get('recover_workers')) { + $this->release(); + } + else { + @shm_detach($this->shm); + } + unset($this->mediator); - @shm_detach($this->shm); $this->shm = null; $this->queue = null; } From 94455edb5139c36ab7120b9094437dcd8f6f2961 Mon Sep 17 00:00:00 2001 From: brunnels Date: Sat, 17 May 2014 10:53:59 -0500 Subject: [PATCH 12/12] Added back missing $process variable --- Core/Plugin/ProcessManager.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Core/Plugin/ProcessManager.php b/Core/Plugin/ProcessManager.php index f879598..652c2dc 100644 --- a/Core/Plugin/ProcessManager.php +++ b/Core/Plugin/ProcessManager.php @@ -193,7 +193,7 @@ public function reap($block = false) { if (!$pid || !isset($map[$pid])) break; - $this->removePid($pid, $status); + $process = $this->removePid($pid, $status); // Keep track of process churn -- failures within a processes min_ttl // If too many failures of new processes occur inside a given interval, that's a problem. @@ -216,6 +216,7 @@ private function removePid($pid, $status) { $process = $this->processes[$alias][$pid]; $this->daemon->dispatch(array(Core_Daemon::ON_REAP), array($process, $status)); unset($this->processes[$alias][$pid]); + return $process; } }