Source for file PowerProcess.class.php

Documentation is available at PowerProcess.class.php

  1. <?php 
  2.  
  3. declare(ticks 1);
  4.  
  5. /**
  6.  * PowerProcess is an abstraction class for PHP's posix and pcntl extensions.
  7.  * 
  8.  * It enables easy process forking or threading to allow use of parallel
  9.  * processes for completing complicated tasks that would otherwise be
  10.  * inefficient for normal serial and procedural processing
  11.  * 
  12.  * If you like PowerProcess, please consider donating
  13.  *  - BTC: 1K2tvdYzdDDd8w6vNHQgvbNQnhcHqLEadx
  14.  *  - LTC: LfceD3QH2n1FqH8inqHdKxjBFV55QvuESv
  15.  * 
  16.  * @package PowerProcess
  17.  * 
  18.  * @author Don Bauer <lordgnu@me.com>
  19.  * @link https://github.com/lordgnu/PowerProcess
  20.  * @license MIT License
  21.  * @version 2.0
  22.  * 
  23.  * @copyright
  24.  *  Copyright (c) 2011 Don Bauer <lordgnu@me.com>
  25.  *  Permission is hereby granted, free of charge, to any person obtaining a copy
  26.  *  of this software and associated documentation files (the "Software"), to deal
  27.  *  in the Software without restriction, including without limitation the rights
  28.  *  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  29.  *  copies of the Software, and to permit persons to whom the Software is
  30.  *  furnished to do so, subject to the following conditions:
  31.  * 
  32.  *  The above copyright notice and this permission notice shall be included in
  33.  *  all copies or substantial portions of the Software.
  34.  * 
  35.  *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  36.  *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  37.  *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  38.  *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  39.  *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  40.  *  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  41.  *  SOFTWARE.
  42.  */
  43. class PowerProcess {
  44.     /**
  45.      * Current PowerProcess version
  46.      * 
  47.      * @var string 
  48.      */
  49.     public static $version '2.0';
  50.     
  51.     /**
  52.      * Data store for data that is to be passed to the child process which is to be spawned
  53.      * 
  54.      * @var mixed 
  55.      */
  56.     public $threadData;
  57.     
  58.     /**
  59.      * Boolean variable which determines whether or not to shutdown the control process (parent)
  60.      * 
  61.      * @var boolean 
  62.      */
  63.     public $complete;
  64.     
  65.     /**
  66.      * Callback array for setting callback functions based on signals that can be sent to the parent process
  67.      * 
  68.      * @var array 
  69.      */
  70.     private $callbacks;
  71.     
  72.     /**
  73.      * The name of the current thread.  Used by WhoAmI()
  74.      * 
  75.      * @var string 
  76.      */
  77.     private $currentThread;
  78.     
  79.     /** 
  80.      * Whether to log internal debug message
  81.      * 
  82.      * @var boolean 
  83.      */
  84.     private $debugLogging;
  85.     
  86.     /**
  87.      * The maximum number of concurrent threads that can be running at any given time.
  88.      * 
  89.      * This setting has an impact on performance for PowerProcess so play
  90.      * with it on the system you are on to determine a good value.
  91.      * 10 is a good place to start
  92.      * 
  93.      * @var integer 
  94.      */
  95.     private $maxThreads;
  96.     
  97.     /**
  98.      * Array which stores the thread data for the control process (parent) to manage running child threads
  99.      * 
  100.      * @var array 
  101.      */
  102.     private $myThreads;
  103.     
  104.     /**
  105.      * Session ID of parent session when process is daemonized
  106.      * 
  107.      * @var integer 
  108.      */
  109.     private $parentSID;
  110.     
  111.     /** 
  112.      * The pid of the parent process
  113.      * 
  114.      * Used after a process is forked to
  115.      * determine whether the new thread is to run the thread code
  116.      * 
  117.      * @var integer 
  118.      */
  119.     private $parentPID;
  120.     
  121.     /**
  122.      * Sleep timer in micro seconds for the parent process to sleep between status checks using Tick()
  123.      * 
  124.      * @var integer 
  125.      */
  126.     private $tickCount = 100;
  127.     
  128.     /**
  129.      * Whether to add a timestamp to log output
  130.      * 
  131.      * @var boolean 
  132.      */
  133.     private $timeStampLogs = true;
  134.     
  135.     /**
  136.      * The maximum number of seconds a thread will be allowed to run.
  137.      * 
  138.      * Set to 0 to disable a time limit (use with caution)
  139.      * 
  140.      * @var integer 
  141.      */
  142.     private $threadTimeLimit;
  143.     
  144.     /**
  145.      * Location to log information messages to.
  146.      * 
  147.      * Can be a file or php://stdout, php://stderr.
  148.      * Set to false to disable
  149.      * 
  150.      * @var mixed 
  151.      */
  152.     private $logTo;
  153.     
  154.     /**
  155.      * When logging is enabled, this points to the socket in which to write log messages.
  156.      * 
  157.      * @var resource 
  158.      */
  159.     private $logSocket;
  160.     
  161.     /**
  162.      * Signals to install for SignalDispatcher.
  163.      * 
  164.      * You can use any signal constant PNCTL supports
  165.      * @link http://us3.php.net/manual/en/pcntl.constants.php
  166.      * 
  167.      * @var array 
  168.      */
  169.     private $signalArray = array(
  170.         SIGUSR1,    // User-Defined 1
  171.         SIGUSR2        // User-Defined 2
  172.     );
  173.     
  174.     /**
  175.      * PowerProcess constructor.
  176.      * 
  177.      * Returns an instanced PowerProcess object or dies on failure
  178.      * 
  179.      * @param integer    $maxThreads            Max number of concurrent threads to allow at any given time
  180.      * @param integer    $threadTimeLimit    Maximum number of seconds a thread is allowed to live
  181.      * @param boolean    $daemon             Whether to start as a deamon or just a normal script
  182.      * @param string    $logTo                 What stream to log output to
  183.      * @param boolean    $debugLogging        Whether to enable debug logging
  184.      * 
  185.      * @return object    Instanced PowerProcess object
  186.      */
  187.     public function PowerProcess($maxThreads 10$threadTimeLimit 300$daemon false$logTo false$debugLogging false{
  188.         if (function_exists('pcntl_fork'&& function_exists('posix_getpid')) {
  189.             // Set the current thread name
  190.             $this->currentThread = 'CONTROL';
  191.             
  192.             // Set the max threads setting
  193.             $this->SetMaxThreads($maxThreads);
  194.             
  195.             // Set the thread time limit setting
  196.             $this->SetThreadTimeLimit($threadTimeLimit);
  197.             
  198.             // Init the logger
  199.             $this->InitializeLogger($logTo$debugLogging);
  200.             
  201.             if ($daemon{
  202.                 // Attempt to daemonize
  203.                 if (!$this->Daemonize()) {
  204.                     die("Could not daemonize");
  205.                 else {
  206.                     $this->Log("Daemonized successfully",true);
  207.                 }
  208.             else {
  209.                 // Register control process PID
  210.                 $this->parentPID = $this->GetPID();
  211.                 $this->parentSID = false;
  212.                 $this->Log("Parent PID detected as {$this->parentPID}",true);
  213.             }
  214.             
  215.             // The the complete flag to false
  216.             $this->complete = false;
  217.             
  218.             // Install the signal handler
  219.             $this->InstallSignalHandler();
  220.             
  221.             // Init the Thread Queue
  222.             $this->myThreads = array();
  223.             
  224.             // Log completion of startup
  225.             $this->Log("Startup process complete",true);
  226.         } else {
  227.             die("PowerProcess requires both the POSIX and PCNTL extensions to operate.\n");
  228.         }
  229.     }
  230.     
  231.     /**
  232.      * Frees up memory
  233.      */
  234.     public function __destruct() {
  235.         unset($this->callbacks);
  236.         unset($this->myThreads);
  237.         
  238.         // Handle any remaining signals
  239.         pcntl_signal_dispatch();
  240.         
  241.         $this->RemoveLogger();
  242.     }
  243.     
  244.     /**
  245.      * Executes specified program in the current process space
  246.      * 
  247.      * @param string $process    Path to the binary process to execute
  248.      * @param array $args        Array of argument strings to pass to the program
  249.      */
  250.     public function Exec($process, $args = null) {
  251.         if ($args == null) {
  252.             pcntl_exec($process);
  253.         } else {
  254.             pcntl_exec($process, $args);
  255.         }
  256.     }
  257.     
  258.     /**
  259.      * Returns the PID of the current process
  260.      * 
  261.      * @return integer
  262.      */
  263.     public function GetPID() {
  264.         return posix_getpid();
  265.     }
  266.     
  267.     /**
  268.      * Returns the PID of the process that spawned this one
  269.      * 
  270.      * @return integer
  271.      */
  272.     public function GetControlPID() {
  273.         return posix_getppid();
  274.     }
  275.     
  276.     /**
  277.      * Get the status of a running thread by name or PID
  278.      * 
  279.      * @param string|integer $name The name or PID of the process for which you want status information
  280.      * @return array|boolean
  281.      */
  282.     public function GetThreadStatus($name = false) {
  283.         if ($name === false) return false;
  284.         if (isset($this->myThreads[$name])) {
  285.             return $this->myThreads[$name];
  286.         } else {
  287.             return false;
  288.         }
  289.     }
  290.     
  291.     /**
  292.      * Determine whether the control process is daemonized
  293.      * 
  294.      * @return boolean
  295.      */
  296.     public function IsDaemon() {
  297.         return $this->parentSID !== false;
  298.     }
  299.     
  300.     /**
  301.      * Log a message
  302.      * 
  303.      * @param string $msg The message to log
  304.      * @param boolean $internal Whether this is an internal debug logging message
  305.      */
  306.     public function Log($msg, $internal = false) {
  307.         if ($this->logSocket !== false) {
  308.             if (!$internal || $this->debugLogging) {
  309.                 if ($this->timeStampLogs) {
  310.                     fwrite($this->logSocket, sprintf("[%s][%-12s] %s\n", date("Y-m-d H:i:s"), $this->WhoAmI(), $msg));
  311.                 } else {
  312.                     fwrite($this->logSocket, sprintf("[%-12s] %s\n", $this->WhoAmI(), $msg));
  313.                 }
  314.             }
  315.         }
  316.     }
  317.     
  318.     /**
  319.      * Restarts the control process
  320.      */
  321.     public function Restart() {
  322.         // Build Path of Script
  323.         if (isset($_SERVER['_'])) {
  324.             $cmd = $_SERVER['_'];
  325.             $this->Log("Attempting to restart using {$cmd}",true);
  326.         } else {
  327.             $this->Log("Can not restart - Shutting down", true);
  328.             return $this->Shutdown();
  329.         }
  330.         
  331.         // Wait for threads to complete
  332.         while ($this->ThreadCount()) {
  333.             $this->CheckThreads();
  334.             $this->Tick();
  335.         }
  336.         
  337.         // Remove the first arg if this is a stand-alone
  338.         if ($cmd == $_SERVER['argv'][0]) unset($_SERVER['argv'][0]);
  339.         
  340.         // Execute Restart
  341.         $this->Exec($cmd, $_SERVER['argv']);
  342.         $this->Shutdown(true);
  343.     }
  344.     
  345.     /**
  346.      * Registers a callback function for the signal dispatcher or for special signals used by PowerProcess
  347.      * 
  348.      * Special signals are:
  349.      *   - 'shutdown' : Triggered on completion of the Shutdown() method
  350.      *   - 'threadotl' : Triggered on killing a thread due to exceeding time limit
  351.      *   
  352.      * @param int|string $signal The signal to register a callback for
  353.      * @param callback $callback The callback function
  354.      */
  355.     public function RegisterCallback($signal, $callback = false) {
  356.         if ($callback !== false) $this->callbacks[$signal] = $callback;
  357.         
  358.         // Register with PCNTL
  359.         if (is_int($signal)) {
  360.             $this->Log("Registering signal {$signal} with dispatcher",true);
  361.             pcntl_signal($signal, array(&$this, 'SignalDispatch'));
  362.             
  363.             // Unblock the Signal
  364.             pcntl_sigprocmask(SIG_UNBLOCK,array($signal));
  365.         }
  366.     }
  367.     
  368.     /**
  369.      * Determines whether we should be running the control code or the thread code
  370.      * 
  371.      * @return boolean
  372.      */
  373.     public function RunControlCode() {
  374.         $this->Tick();
  375.         if (!$this->complete) {
  376.             return $this->ControlCheck();
  377.         } else {
  378.             $this->SignalDispatch('shutdown');
  379.             return false;
  380.         }
  381.     }
  382.     
  383.     /**
  384.      * Determines whether we should be running the child code
  385.      * 
  386.      * @return boolean
  387.      */
  388.     public function RunThreadCode() {
  389.         return !$this->ControlCheck();
  390.     }
  391.     
  392.     /**
  393.      * Send a signal to a process
  394.      * 
  395.      * @param integer $pid
  396.      * @param integer $signal
  397.      */
  398.     public function SendSignal($pid = 0, $signal = 0) {
  399.         if ($signal > 0 && $pid > 0) {
  400.             return posix_kill($pid, $signal) && pcntl_signal_dispatch();
  401.         } else {
  402.             return false;
  403.         }
  404.     }
  405.     
  406.     /**
  407.      * Set the max number of threads that can be running concurrently
  408.      * 
  409.      * @param integer $maxThreads The max number of threads to run concurrently
  410.      */
  411.     public function SetMaxThreads($maxThreads = 10) {
  412.         $this->maxThreads = $maxThreads;
  413.     }
  414.     
  415.     /**
  416.      * Set the max number of seconds a thread can run before being terminated
  417.      * 
  418.      * @param integer $threadTimeLimit The max number of seconds a thread can run
  419.      */
  420.     public function SetThreadTimeLimit($threadTimeLimit = 300) {
  421.         $this->threadTimeLimit = $threadTimeLimit;
  422.     }
  423.     
  424.     /**
  425.      * Initiates the shutdown procedure for PowerProcess
  426.      * 
  427.      * @param boolean $exit When set to true, Shutdown causes the script to exit
  428.      */
  429.     public function Shutdown($exit = false) {
  430.         $this->Log("Initiating shutdown",true);
  431.         
  432.         while ($this->ThreadCount()) {
  433.             $this->CheckThreads();
  434.             $this->Tick();
  435.         }
  436.     
  437.         $this->complete = true;
  438.         
  439.         $this->Log("Shutdown Complete");
  440.         if ($exit) exit;
  441.     }
  442.     
  443.     /**
  444.      * Determines if a new process can be spawned
  445.      * 
  446.      * @return boolean
  447.      */
  448.     public function SpawnReady() {
  449.         $this->Tick();
  450.         return ($this->ThreadCount() < $this->maxThreads);
  451.     }
  452.     
  453.     /**
  454.      * Spawn a new thread
  455.      * 
  456.      * @param $name The name of the thread to be spawned
  457.      * 
  458.      * @return boolean
  459.      */
  460.     public function SpawnThread($name = false) {
  461.         // Check to make sure we can spawn another thread
  462.         if (!$this->SpawnReady()) {
  463.             $this->Log("The maximum number of threads are already running",true);
  464.             $this->Tick();
  465.             return false;
  466.         }
  467.         
  468.         if ($name !== false) {
  469.             // Check to make sure there is not already a named thread with this name
  470.             if ($this->GetThreadStatus($name) !== false) {
  471.                 $this->Log("There is already a thread named '{$name}' running",true);
  472.                 $this->Tick();
  473.                 return false;
  474.             }
  475.         }
  476.         
  477.         $pid = pcntl_fork();
  478.         
  479.         if ($pid) {
  480.             // We are the control thread so log the child in a queue
  481.             $index = ($name === false) ? $pid : $name;
  482.             $name = ($name === false) ? "THREAD:{$pid}$name;
  483.             $this->myThreads[$index] = array(
  484.                 'pid'    =>    $pid,
  485.                 'time'    =>    time(),
  486.                 'name'    =>    $name
  487.             );
  488.             $this->Log("Spawned thread: {$name}",true);
  489.             $this->Tick();
  490.             return true;
  491.         } else {
  492.             // We are the child thread so change the current thread var
  493.             $this->currentThread = ($name === false) ? "THREAD:".$this->GetPID() : $name;
  494.             return true;
  495.         }
  496.     }
  497.     
  498.     /**
  499.      * Get the count of running threads
  500.      * 
  501.      * @return integer
  502.      */
  503.     public function ThreadCount() {
  504.         return count($this->myThreads);
  505.     }
  506.     
  507.     /**
  508.      * Process signals to be dispatched and sleep for a number of microseconds
  509.      */
  510.     public function Tick() {
  511.         // Dispatch Pending Signals
  512.         pcntl_signal_dispatch();
  513.         
  514.         // Check Running Threads
  515.         if ($this->parentPID == $this->GetPID()) $this->CheckThreads();
  516.         
  517.         // Tick
  518.         usleep($this->tickCount);
  519.     }
  520.     
  521.     /**
  522.      * Get the name of the current thread
  523.      * 
  524.      * @return string The name of the current thread
  525.      */
  526.     public function WhoAmI() {
  527.         return $this->currentThread;
  528.     }
  529.     
  530.     // All Private Functions Below Here
  531.     /**
  532.      * Checks all running threads to make sure they are still running and their time limit  has not been exceeded
  533.      * 
  534.      * If a thread has exceeded it's time limit, this method will kill that process
  535.      * and dispatch the special signal 'threadotl'
  536.      */
  537.     private function CheckThreads() {
  538.         foreach ($this->myThreads as $i => $thread) {
  539.             // Check to make sure the process is still running
  540.             if ($this->PIDDead($thread['pid']) != 0) {
  541.                 // Thread is Dead
  542.                 unset($this->myThreads[$i]);
  543.             } elseif ($this->threadTimeLimit > 0) {
  544.                 if (time() - $thread['time'] > $this->threadTimeLimit) {
  545.                     $this->KillThread($thread['pid']);
  546.                     $this->Log("Thread {$thread['name']} has exceeded the thread time limit",true);
  547.                     $this->SignalDispatch('threadotl');
  548.                     unset($this->myThreads[$i]);
  549.                 }
  550.             }
  551.         }
  552.     }
  553.     
  554.     /**
  555.      * Check if the current process is the control process
  556.      * 
  557.      * @return boolean
  558.      */
  559.     private function ControlCheck() {
  560.         return $this->parentPID == $this->GetPID();
  561.     }
  562.     
  563.     /**
  564.      * Attempts to daemonize the current process
  565.      * 
  566.      * @return integer
  567.      */
  568.     private function Daemonize() {
  569.         $this->Log("Attempting to Daemonize",true);
  570.         
  571.         // First need to fork
  572.         $pid = pcntl_fork();
  573.         
  574.         // Tick to catch signals
  575.         $this->Tick();
  576.         
  577.         if ($pid < 0) exit; // Error
  578.         if ($pid) exit;        // Parent
  579.         
  580.         $this->parentSID = posix_setsid();
  581.         
  582.         // Need to reset the parent PID
  583.         $this->parentPID = $this->GetPID();
  584.         $this->Log("Parent PID {$this->parentPID}",true);
  585.         $this->Log("Parent SID {$this->parentSID}",true);
  586.         
  587.         return ($this->parentSID > 0);
  588.     }
  589.     
  590.     /**
  591.      * Initialize the logging stream if enabled
  592.      * 
  593.      * @param string|boolean $logTo The path or stream to log to or false to disable
  594.      */
  595.     private function InitializeLogger($logTo, $debugLogging) {
  596.         if ($logTo !== false) {
  597.             $this->logSocket = @fopen($logTo, 'w');
  598.             $this->debugLogging = $debugLogging;
  599.         } else {
  600.             $this->logSocket = false;
  601.             $this->debugLogging = false;
  602.         }
  603.     }
  604.     
  605.     /**
  606.      * Installs the default signal handlers
  607.      */
  608.     private function InstallSignalHandler() {
  609.         // Register the callback for thread completion
  610.         $this->RegisterCallback(SIGCHLD, array($this,'CheckThreads'));
  611.         $this->Log("SIGCHLD callback registered",true);
  612.         
  613.         // Register the callback for restart requests
  614.         $this->RegisterCallback(SIGHUP, array($this, 'Restart'));
  615.         $this->Log("SIGHUP callback registered",true);
  616.         
  617.         // Register the callback for shutdown requests
  618.         $this->RegisterCallback(SIGTERM, array($this, 'Shutdown'));
  619.         $this->Log("SIGTERM callback registered",true);
  620.         
  621.         // Install the signal handler
  622.         foreach ($this->signalArray as $signal) $this->RegisterCallback($signal);
  623.         $this->Log("Signal Dispatcher installed",true);
  624.     }
  625.     
  626.     /**
  627.      * Kill a thread by PID
  628.      * 
  629.      * @param integer $pid The PID of the thread to kill
  630.      */
  631.     private function KillThread($pid = 0) {
  632.         $this->SendSignal($pid, SIGTERM);
  633.     }
  634.     
  635.     /**
  636.      * Determine whether a child pid has exited
  637.      * 
  638.      * Returns the PID of child which exited or 0
  639.      * 
  640.      * @param integer $pid The PID to check
  641.      * @return integer
  642.      */
  643.     private function PIDDead($pid = 0) {
  644.         if ($pid > 0) {
  645.             return pcntl_waitpid($pid, $status, WUNTRACED OR WNOHANG);
  646.         } else {
  647.             return 0;
  648.         }
  649.     }
  650.     
  651.     /**
  652.      * Closes the logging stream
  653.      */
  654.     private function RemoveLogger() {
  655.         if ($this->logSocket) {
  656.             @fclose($this->logSocket);
  657.         }
  658.     }
  659.     
  660.     /**
  661.      * Handles dispatching of signals to user-defined callbacks
  662.      * 
  663.      * @param integer|string $signal
  664.      */
  665.     private function SignalDispatch($signal) {
  666.         // Log Dispatch
  667.         $this->Log("Received signal: {$signal}",true);
  668.         
  669.         // Check the callback array for this signal number
  670.         if (isset($this->callbacks[$signal])) {
  671.             // Execute the callback
  672.             call_user_func($this->callbacks[$signal]);
  673.         } else {
  674.             // No callback registered
  675.             $this->Log("There is no callback registered for signal {$signal}",true);
  676.         }
  677.         
  678.         // Handle SIGTERM for threads
  679.         if ($signal == 15) exit(0);
  680.     }
  681.     
  682. }

Documentation generated on Wed, 11 Jan 2012 16:04:12 -0600 by phpDocumentor 1.4.3