发布于 5年前
php 非阻塞处理多进程请求
<?php
$pid = pcntl_fork();
if($pid==-1){
throw new RuntimeException("fork error");
exit(0);
}else if($pid==0){
run();
}else{
exit("exit");
}
function run(){
$curChildPro = 0;
$maxChildPro = 5; // 同一时刻最多 5 个进程
$index = 0;
$ppid = posix_getpid();
$childs=[];
$readFds = [];
$writeFds= [];
$exceptions = [];
$context_option['socket']['backlog'] = 1024;
$context = stream_context_create($context_option);
stream_context_set_option($context, 'socket', 'so_reuseport', 1);
$socket = stream_socket_server("tcp://0.0.0.0:8081", $errno, $errstr);
stream_set_blocking($socket,0);
$readFds[]=$socket;
while (true) {
while (count($childs)>=$maxChildPro){
foreach($childs as $key => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
// If the process has already exited
if($res == -1 || $res > 0)
unset($childs[$key]);
}
sleep(1);
}
$index ++;
$pid = pcntl_fork();//在此处代码会裂开两部分,一个父进程,一个子进程,可以共享$index变量
if ($pid == -1) {
}elseif ($pid > 0) {
$childs[]=$pid;
$curChildPro++;
//父进程会得到子进程号$pid,所以这里是父进程执行的逻辑
//echo "-------- current process\e[1;31m" . $curChildPro . "\e[0m--------.\r\n";
//echo "\e[1;31m我是父进程{$index},我的进程id是{$ppid}.我的子进程id{$pid}\e[0m".PHP_EOL;
cli_set_process_title("我是父进程{$index},我的进程id是{$ppid}.我的子进程id{$pid}");
}elseif($pid==0){
$cpid = posix_getpid();
//echo "我是{$ppid}的子进程,我的进程{$index}id是{$cpid}.".PHP_EOL;
cli_set_process_title("我是{$ppid}的子进程,我的进程id是{$cpid}.");
while (1) {
$reads = $readFds;
$writes = $writeFds;
$ret = @stream_select($reads, $writes, $exceptions, null);
if (!$ret){
continue;
}
if (!empty($reads)) {
foreach ($reads as $fd) {
if ($fd == $socket) {
set_error_handler(function () {
});
$connect = stream_socket_accept($socket, 0, $remoteAddr);
stream_set_blocking($connect, 0);
restore_error_handler();
$readFds[] = $connect;
$writeFds[] = $connect;
} else {
$data = fread($fd, 1024);
if ($data == '') {
$ret = array_search($fd, $readFds);
if ($ret !== false) {
unset($readFds[$ret]);
fclose($fd);
}
} else {
//request data
}
}
}
}
if(!empty($writes)){
foreach ($writes as $fd) {
if ($fd != $socket) {
set_error_handler(function () {
});
$res = "HTTP/1.1 200 ok\r\nAccept-Ranges: bytes\r\ncontent-type: text/html; charset=utf-8\r\n\r\nThe local time is " . date('Y-m-d H:i:s') . "\r\n";
fwrite($fd, $res);
restore_error_handler();
$wet = array_search($fd, $writeFds);
if ($wet !== false) {
unset($writeFds[$wet]);
}
fclose($fd);
exit();
}
}
}
}
}
}
}