我们知道php主要是用来做web应用的,而且平时使用的都是都是和其他的web服务器来结合使用,比如和apache,nginx和apache的时候,是作为apache的一个动态模块来加载,和nginx的时候主要是使用fpm的形式,现在其他的一些语言,比如python,nodejs,ruby,go都是单独作为http服务编程的,其实php也是可以的,php里面有一些扩展,我们平时做web应用的时候都是很少使用到的,比如fcntl,libevent,stream,posix等等,这些扩展主要是对unix的api的一些封装,使用它们其实和uinx的网络编程是一样的,只是更加的便捷和容易,还可以使用我们熟悉的php语言,而不是c。
workman其实就是把php的网络模块做一个整合,为php用户提供一个更加便捷的方式来创建网络编程的条件,比如实现tcp,udp服务器,实现http的web服务器等等。
主要的文件结构
Worker.php 是workman的主要入口文件,这个定义了workman作为一个服务器的相关的操作,变量初始化,参数解析,进程管理,请求接受,事件注册处理等等。
WebServer.php 继承了Worker.php主要还是使用Worker.php的逻辑,主要区别是使用Htpp的协议,以及实现对uri的简单路由分发。Events 是事件的相关类Lib 包含Timer.php 以及一些常数。Connection 是对连接的处理Protocols 是对协议的处理,主要是对Connection接收的数据的处理,比如http的时候添加header头等等。Worker.php的分析
内部函数 | 备注说明 |
---|---|
self::checkSapiEnv(); | //监测环境是否正确,workman只能运行在cli模式下面 |
self::init(); | //环境初始化 |
self::parseCommand(); | //解析参数,主要对应用start,stop,restart,status, -d的判断 |
self::daemonize(); | //master进程编程daemo守护进程 |
self::initWorkers(); | //初始化work进程 |
self::installSignal(); | // 安装信号量 |
self::saveMasterPid(); | // 保存master的进程id,一文件的形式 |
self::forkWorkers(); | //fork生成子进程,在event上面注册请求到达的事件 |
self::displayUI(); | //显示ui,用来在控制台打印输出一些内容 |
self::resetStd(); / | /重新设置标准输出和输入 |
self::monitorWorkers(); | // 监控子进程 |
selef::checkSapiEnv
//根据php_sapi_name来做判断if (php_sapi_name() != "cli") { exit("only run in command line mode \n");}
sapi接口是php为其他的应用提供的抽象层接口,不管是cli还是php-fpm,fastCgi都是调用这一层的接口。
在php源代码中main/SAPI.h 定义了_sapi_module_struct,每一个sapi的应用都是要传递该结构startup用来设置使用php的相关参数。其中php_sapi_name();函数返回的就是char *name对应的值。struct _sapi_module_struct { char *name; char *pretty_name; int (*startup)(struct _sapi_module_struct *sapi_module); int (*shutdown)(struct _sapi_module_struct *sapi_module); int (*activate)(TSRMLS_D); int (*deactivate)(TSRMLS_D); int (*ub_write)(const char *str, unsigned int str_length TSRMLS_DC); void (*flush)(void *server_context); struct stat *(*get_stat)(TSRMLS_D); char *(*getenv)(char *name, size_t name_len TSRMLS_DC); void (*sapi_error)(int type, const char *error_msg, ...);....};
self::init();
//更具debug_backtrace();返回最后一个调用该函数的文件,作为启动文件$backtrace = debug_backtrace();self::$_startFile = $backtrace[count($backtrace) - 1]['file'];// 设置pid的文件路名if (empty(self::$pidFile)) { self::$pidFile = __DIR__ . "/../" . str_replace('/', '_', self::$_startFile) . ".pid";}// 设置workman的日志文件,并且创建该文件if (empty(self::$logFile)) { self::$logFile = __DIR__ . '/../workerman.log';}$log_file = (string)self::$logFile;touch($log_file);chmod($log_file, 0622);// 初始化运行状态self::$_status = self::STATUS_STARTING;// 为了统计运行状态self::$_globalStatistics['start_timestamp'] = time();self::$_statisticsFile = sys_get_temp_dir() . '/workerman.status';// 设置进程的标题self::setProcessTitle('WorkerMan: master process start_file=' . self::$_startFile);// 初始化该进程的id,主要是初始化所有work进程的进程id为0self::initId();// Timer init.Timer::init();
parseCommand
这部分的逻辑比较简单,主要功能就是解析参数
global $argv;// 监测参数,最起码要设置一个参数start,stop等$start_file = $argv[0];if (!isset($argv[1])) { exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n");}// 获取参数,第一个是服务器控制,第二个是是否是daemon进程$command = trim($argv[1]);$command2 = isset($argv[2]) ? $argv[2] : '';// 启动的时候,判断是不是参数有-d的标志$mode = '';if ($command === 'start') { if ($command2 === '-d' || Worker::$daemonize) { $mode = 'in DAEMON mode'; } else { $mode = 'in DEBUG mode'; }}self::log("Workerman[$start_file] $command $mode");// 获取master进程ID,从本地文件里面获取$master_pid = @file_get_contents(self::$pidFile);$master_is_alive = $master_pid && @posix_kill($master_pid, 0);//posix_kill发送0的信号,用来检测该进程是不是存在if ($master_is_alive) { if ($command === 'start' && posix_getpid() != $master_pid) { self::log("Workerman[$start_file] already running"); exit; }} elseif ($command !== 'start' && $command !== 'restart') { self::log("Workerman[$start_file] not run"); exit;}switch ($command) { case 'start': if ($command2 === '-d') { Worker::$daemonize = true; } break; case 'status': if (is_file(self::$_statisticsFile)) { @unlink(self::$_statisticsFile); } // 发送status的信号到所有子进程 posix_kill($master_pid, SIGUSR2); // Waiting amoment. usleep(500000); // 显示数据收集的结果 @readfile(self::$_statisticsFile); exit(0); case 'restart': case 'stop': self::log("Workerman[$start_file] is stoping ..."); // Send stop signal to master process. $master_pid && posix_kill($master_pid, SIGINT); // Timeout. $timeout = 5; $start_time = time(); // Check master process is still alive? while (1) {$master_is_alive = $master_pid && posix_kill($master_pid, 0); if ($master_is_alive) { // Timeout? if (time() - $start_time >= $timeout) { self::log("Workerman[$start_file] stop fail"); exit; } // Waiting amoment. usleep(10000); continue; } // Stop success. self::log("Workerman[$start_file] stop success"); if ($command === 'stop') { exit(0); } if ($command2 === '-d') { Worker::$daemonize = true; } break; } break;case 'reload': posix_kill($master_pid, SIGUSR1); self::log("Workerman[$start_file] reload"); exit; default : exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n");}
daemonize
进程变为守护进程,主要是通过两次fork子进程(第一次主要是设置sid和原来的终端分离,第二主要失去重新获取控制终端的能力),关闭驻进程的方式。
if (!self::$daemonize) { return;}umask(0);$pid = pcntl_fork();if (-1 === $pid) { throw new Exception('fork fail');} elseif ($pid > 0) { exit(0);//退出主进程}//setsid()调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离 但是他还是可以申请重新打开一个控制终端,因为他是进程的组长if (-1 === posix_setsid()) { throw new Exception("setsid fail");}// 可以通过使进程不再成为会话组长来禁止进程重新打开控制终端$pid = pcntl_fork();if (-1 === $pid) { throw new Exception("fork fail");} elseif (0 !== $pid) { exit(0);}
initWorkers
初始化worker进程
主要就是worker进程开始监听端口,listen函数的内容如下:$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); if (!$this->_mainSocket) { throw new Exception($errmsg); } // 保持长连接,防止延迟 if (function_exists('socket_import_stream') && $this->transport === 'tcp') { $socket = socket_import_stream($this->_mainSocket); @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); } // 设置为非阻塞.stream_set_blocking($this->_mainSocket, 0);
installSignal
注册信号事件
pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false);//reload 重新加载pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false);//打印当前子进程状态pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false);// 管道关闭pcntl_signal(SIGPIPE, SIG_IGN, false);
saveMasterPid
把当前的子进程保存在文件中
self::$_masterPid = posix_getpid();if (false === @file_put_contents(self::$pidFile, self::$_masterPid)) { throw new Exception('can not save pid to ' . self::$pidFile);}
forkWorkers
主要是就是循环self::_workers,根据里面的count数量生成对应的子进程。
主要使用的方法是:forkOneWorker///获取空的pid的位置$id = self::getId($worker->workerId, 0);if ($id === false) { return;}//fork子进程$pid = pcntl_fork();// For mastif ($pid > 0) { self::$_pidMap[$worker->workerId][$pid] = $pid; self::$_idMap[$worker->workerId][$id] = $pid;} // For child processes.elseif (0 === $pid) { if ($worker->reusePort) { $worker->listen(); } if (self::$_status === self::STATUS_STARTING) { self::resetStd();}self::$_pidMap = array();self::$_workers = array($worker->workerId => $worker);Timer::delAll();self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());$worker->setUserAndGroup();$worker->id = $id;//work开始运行,接收请求$worker->run();exit(250);} else {throw new Exception("forkOneWorker fail");}
run函数
//注册进程结束时候的函数register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors'));Autoloader::setRootPath($this->_autoloadRootPath);// 创建event.根据eventLoopName来实例话eventif (!self::$globalEvent) { self::log("create globalEvent"); $eventLoopClass = "\\Workerman\\Events\\" . ucfirst(self::getEventLoopName()); self::$globalEvent = new $eventLoopClass; // 注册一个监听事件,当socket的连接是可读的时候. if ($this->_socketName) { if ($this->transport !== 'udp') { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } }}
acceptConnection函数
接收请求$new_socket = @stream_socket_accept($socket, 0, $remote_address);if (!$new_socket) { return;}// 实例化TcpConnection$connection = new TcpConnection($new_socket, $remote_address);$this->connections[$connection->id] = $connection;$connection->worker = $this;$connection->protocol = $this->protocol;//connection的onMessage指向当前类的onMessage方法$connection->onMessage = $this->onMessage;$connection->onClose = $this->onClose;$connection->onError = $this->onError; $connection->onBufferDrain = $this->onBufferDrain;$connection->onBufferFull = $this->onBufferFull;