先说一下遇到的问题,应用的是beanstalk队列,有两个tube, 应用 supervisor 监控 beanstalk 生产队列(主过程A),主过程A产生两个子过程(子过程B,子过程C),每个子过程解决一个tube的数据。
supervisor配置如下:
[program:queue-worker]command=/usr/local/bin/php /var/www/html/ctc/console.php queue workeruser=www-dataautostart=trueautorestart=trueredirect_stderr=truestartretries=30startsecs=10
解决生产队列的代码如下:
/** * 启动生产队列 * * @command php console.php queue worker */public function wokerAction(){ echo "------ worker start ------" . PHP_EOL; $beanstalk = $this->getBeanstalk(); $logger = $this->getLogger('queue'); $beanstalk->addWorker( 'main', function (BeanstalkJob $job) use ($config, $logger) { $taskId = $job->getBody(); try { $manager = new MainQueue(); $manager->handle($taskId); } catch (\Throwable $e) { $logger->error("tube:main, task:{$taskId} exception " . kg_json_encode([ 'file' => $e->getFile(), 'line' => $e->getLine(), 'message' => $e->getMessage(), ])); } exit(0); } ); $beanstalk->addWorker( 'notice', function (BeanstalkJob $job) use ($config, $logger) { $taskId = $job->getBody(); try { $manager = new NoticeQueue(); $manager->handle($taskId); } catch (\Throwable $e) { $logger->error("tube:notice, task:{$taskId} exception " . kg_json_encode([ 'file' => $e->getFile(), 'line' => $e->getLine(), 'message' => $e->getMessage(), ])); } exit(0); } ); $beanstalk->doWork();}
常常会呈现上面的报错,子过程B或者C就退出了,然而主过程没事。
shmop_open(): unable to attach or create shared memory segment 'No such file or directory'
也查阅了supervisor的文档,外面有个针对组的配置,然而试了不起作用,这里的组配置应该是针对主过程的,主过程没事,子过程死活就不论了。
stopasgroup=truekillasgroup=true
这个事件折腾了好几天,子过程不定时的会死,总是在找为什么会呈现共享内存谬误的起因,过程的创立应用的是现成的类库,如果总找不到起因程序就不稳固了。
起初转变思路,反正只有两个tube,罗唆搞两个独立过程算了,不纠结什么子过程了,这样就把监控的事件交给了supervisor治理了,其实不是supervisor的问题,是咱们应用的姿态不对。
supervisor 配置如下:
[program:queue-main-worker]command=/usr/local/bin/php /var/www/html/ctc/console.php queue main_workeruser=www-dataautostart=trueautorestart=trueredirect_stderr=truestartretries=30startsecs=10[program:queue-notice-worker]command=/usr/local/bin/php /var/www/html/ctc/console.php queue notice_workeruser=www-dataautostart=trueautorestart=trueredirect_stderr=truestartretries=30startsecs=10
解决生产队列的代码如下:
(1)解决 main tube 队列
/** * 启动main生产队列 * * @command php console.php queue main_worker */ public function mainWorkerAction() { $tube = 'main'; echo "------{$tube} worker start ------" . PHP_EOL; $beanstalk = $this->getBeanstalk(); $logger = $this->getLogger('queue'); while (true) { $job = $beanstalk->reserveFromTube($tube); if ($job instanceof BeanstalkJob) { $taskId = $job->getBody(); try { $manager = new MainQueue(); $manager->handle($taskId); $job->delete(); } catch (\Throwable $e) { $logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([ 'file' => $e->getFile(), 'line' => $e->getLine(), 'message' => $e->getMessage(), ])); } } else { sleep(1); } } }
(2)解决 notice tube 队列
/** * 启动notice生产队列 * * @command php console.php queue notice_worker */ public function noticeWorkerAction() { $tube = 'notice'; echo "------{$tube} worker start ------" . PHP_EOL; $beanstalk = $this->getBeanstalk(); $logger = $this->getLogger('queue'); while (true) { $job = $beanstalk->reserveFromTube($tube); if ($job instanceof BeanstalkJob) { $taskId = $job->getBody(); try { $manager = new NoticeQueue(); $manager->handle($taskId); $job->delete(); } catch (\Throwable $e) { $logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([ 'file' => $e->getFile(), 'line' => $e->getLine(), 'message' => $e->getMessage(), ])); } } else { sleep(1); } } }
至于为什么当初会抉择应用fork子过程的形式,说到底还是图不便,因为有现成的货色,拿来就用,然而出了问题,又迟迟搞不定。吃过亏才发现,还是用点简略稳固的计划,哪怕看上去没有那么美。
我的项目组件
- 后盾框架:phalcon 3.4.5
- 前端框架:layui 2.8.2
- 全文检索:xunsearch 1.4.9
- 即时通讯:workerman 3.5.22
- 根底依赖:php7.3, mysql5.7, redis5.0
我的项目文档
- 运行环境搭建
- 零碎服务配置
- 客户终端配置
意见反馈
- 码云平台
- 官网社区