先说一下遇到的问题,应用的是 beanstalk 队列,有两个 tube, 应用 supervisor 监控 beanstalk 生产队列(主过程 A),主过程 A 产生两个子过程(子过程 B,子过程 C),每个子过程解决一个 tube 的数据。
supervisor 配置如下:
[program:queue-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
解决生产队列的代码如下:
/**
* 启动生产队列
*
* @command php console.php queue worker
*/
public function wokerAction()
{
echo "------ worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
$beanstalk->addWorker(
'main',
function (BeanstalkJob $job) use ($config, $logger) {$taskId = $job->getBody();
try {$manager = new MainQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {$logger->error("tube:main, task:{$taskId} exception" . kg_json_encode(['file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),]));
}
exit(0);
}
);
$beanstalk->addWorker(
'notice',
function (BeanstalkJob $job) use ($config, $logger) {$taskId = $job->getBody();
try {$manager = new NoticeQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {$logger->error("tube:notice, task:{$taskId} exception" . kg_json_encode(['file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),]));
}
exit(0);
}
);
$beanstalk->doWork();}
常常会呈现上面的报错,子过程 B 或者 C 就退出了,然而主过程没事。
shmop_open(): unable to attach or create shared memory segment 'No such file or directory'
也查阅了 supervisor 的文档,外面有个针对组的配置,然而试了不起作用,这里的组配置应该是针对主过程的,主过程没事,子过程死活就不论了。
stopasgroup=true
killasgroup=true
这个事件折腾了好几天,子过程不定时的会死,总是在找为什么会呈现共享内存谬误的起因,过程的创立应用的是现成的类库,如果总找不到起因程序就不稳固了。
起初转变思路,反正只有两个 tube,罗唆搞两个独立过程算了,不纠结什么子过程了,这样就把监控的事件交给了 supervisor 治理了,其实不是 supervisor 的问题,是咱们应用的姿态不对。
supervisor 配置如下:
[program:queue-main-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue main_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
[program:queue-notice-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue notice_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
解决生产队列的代码如下:
(1)解决 main tube 队列
/**
* 启动 main 生产队列
*
* @command php console.php queue main_worker
*/
public function mainWorkerAction()
{
$tube = 'main';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {$taskId = $job->getBody();
try {$manager = new MainQueue();
$manager->handle($taskId);
$job->delete();} catch (\Throwable $e) {$logger->error("tube:{$tube}, task:{$taskId} exception" . kg_json_encode(['file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),]));
}
} else {sleep(1);
}
}
}
(2)解决 notice tube 队列
/**
* 启动 notice 生产队列
*
* @command php console.php queue notice_worker
*/
public function noticeWorkerAction()
{
$tube = 'notice';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {$taskId = $job->getBody();
try {$manager = new NoticeQueue();
$manager->handle($taskId);
$job->delete();} catch (\Throwable $e) {$logger->error("tube:{$tube}, task:{$taskId} exception" . kg_json_encode(['file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),]));
}
} else {sleep(1);
}
}
}
至于为什么当初会抉择应用 fork 子过程的形式,说到底还是图不便,因为有现成的货色,拿来就用,然而出了问题,又迟迟搞不定。吃过亏才发现,还是用点简略稳固的计划,哪怕看上去没有那么美。
我的项目组件
- 后盾框架:phalcon 3.4.5
- 前端框架:layui 2.8.2
- 全文检索:xunsearch 1.4.9
- 即时通讯:workerman 3.5.22
- 根底依赖:php7.3,mysql5.7,redis5.0
我的项目文档
- 运行环境搭建
- 零碎服务配置
- 客户终端配置
意见反馈
- 码云平台
- 官网社区