乐趣区

PHP物联网开发利器之Actor并发模型

PHP 不适合做物联网服务端吗?

在传统的思维中,经常会有人告诉你,php 不适合用来做物联网服务端,让你换 java,node,go 等其他语言,是的,没错传统意义上的 php,确实很难做物联网服务器,因为它实在太蹩脚了,当然,这也不是意味着彻底就不能做。举个例子,当你想实现一个 TCP 服务器的时候,你可能需要写出原理大约如下的代码:

for ($i = 0;$i <= 1;$i++){$pid = pcntl_fork();
    if($pid){if($i == 0){$server = stream_socket_server("tcp://127.0.0.1:9501", $errno, $errstr, STREAM_SERVER_BIND);
        }else if($i == 1){$tickTime = time()+3600;
            while (1){usleep(1);
                if($tickTime == time()){//do my tick func}
            }
        }
    }
}

以上代码的意义等于在一个进程中创建一个 TCP 服务端,另外一个进程中死循环来做时间检测,从而实现定时器逻辑。这样看起来,确实很蹩脚,而且对于编程基础普遍比较薄弱的 PHPer 来说,这真的很难维护。当然这个时候,就会有人说,这不是还有 Workerman 吗,是的,确实还有 Workerman,Workerman 就是高度封装了上述代码原理,帮助你专心于实现代码逻辑的一个 PHP 多进程框架,因此说 PHP 不时候做物联网,其实这是谬论。当然这个时候可能又会有人说,go 语言有协程,你用 Workerman 当出现阻塞数据库调用的时候,那效率就非常的差,很难出现高并发,这么说没错,但是实际上,我们可以尽可能的用多进程去弥补这个不足,也就是堆机器。当然,如果你真的想锱铢必较,没关系,这个时候我们就可以拿出我们的杀器,那就是 Swoole4.x 的协程。

Swoole 做 TCP 服务器

举个例子,如下代码:

$server = new swoole_server("127.0.0.1", 9501);
$server->on('workerstart',function ($ser,$workerId){if($workerId == 0){swoole_timer_tick(1000,function (){});
    }
});
$server->on('connect', function ($server, $fd){echo "connection open: {$fd}\n";
});
$server->on('receive', function ($server, $fd, $reactor_id, $data) {$server->send($fd, "Swoole: {$data}");
    $server->close($fd);
});
$server->on('close', function ($server, $fd) {echo "connection close: {$fd}\n";
});
$server->start();

我们就可以很快的创建出一个多进程的协程 TCP 服务器,而且在各个回调函数内,均自动创建协程环境,我们可以在协程回调内,去调用协程的数据库 API,这样就避免了因为阻塞数据库调用而导致无法处理其他客户端请求的问题。然而尽管如此,很多人可能都没有思考过,如何优雅的写出自己的物联网服务器。举个例子,我们常见的互联网设备管理服务中,大约可能出现如下代码:

swoole_timer_tick(5000,function (){$deviceList = $db->getAll();
    foreach ($deviceList as $device){
        //do your check
        /*
         * 例如设备状态处于 1,那么需要处理流程 1
         * 例如设备状态处于 2,那么需要处理流程 2
         * 例如设备状态处于 3,那么需要处理流程 3
         */
    }
});

定时遍历检查设备状态以及广播
这样乍一看好像无伤大雅,但是当出现多种设备,且每种设备逻辑都不一致的时候,那么这样的编写模式就很容易写出一大坨代码出来,而且在协程下,如果不注意变量访问安全与协程上下文隔离,那么就很容易出现 bug,导致很难维护。

Actor 模型

什么是 Actor,简单来说,Actor 就是一种高度抽象化的并发模型,每个 Actor 实例的内存空间都是互相隔离的,用于降低用户编程与维护难度。关于 Swoole4.x 如何实现协程版本的 Actor,我们之前已经在文章 https://segmentfault.com/a/11… 中讲解了如何用 Swoole 实现协程的原理。

Actor 模型库实战

我们依旧用 easyswoole/actor 库来讲解,例如,我们有一种型号的设备,那么我们可以定义一个设备 Actor,并把该设备的全部逻辑,写在该 actor 模型内,例子代码如下:

namespace App\Device;


use EasySwoole\Actor\AbstractActor;
use EasySwoole\Actor\ActorConfig;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\EasySwoole\ServerManager;
use EasySwoole\EasySwoole\Trigger;

class DeviceActor extends AbstractActor
{
    private $fd;
    private $deviceId;
    private $lastHeartBeat;
    public static function configure(ActorConfig $actorConfig)
    {$actorConfig->setActorName('Device');
    }

    protected function onStart()
    {$this->lastHeartBeat = time();
        /*
         * 该参数是创建的时候传递的
         */
        $this->fd = $this->getArg()['fd'];
        $this->deviceId = $this->getArg()['deviceId'];
        // 记录到 table manager 中
        DeviceManager::addDevice(new DeviceBean([
            'deviceId'=>$this->deviceId,
            'actorId'=>$this->actorId(),
            'fd'=>$this->fd
        ]));
        // 推送消息
        ServerManager::getInstance()->getSwooleServer()->push($this->fd,"connect to server success,your actorId is {$this->actorId()}");
        // 创建一个定时器,如果一个设备 20s 没有收到消息,自动下线
        $this->tick(20*2000,function (){if(time() - $this->lastHeartBeat > 20){$this->exit(-1);
            }
        });
    }

    protected function onMessage($msg)
    {if($msg instanceof Command){switch ($msg->getCommand()){
                case $msg::RECONNECT:{
                    DeviceManager::updateDeviceInfo($this->deviceId,['fd'=>$msg->getArg()
                    ]);
                    $this->fd = $msg->getArg();
                    Logger::getInstance()->console("deviceId {$this->deviceId}  at actorId {$this->actorId()} reconnect success");
                    ServerManager::getInstance()->getSwooleServer()->push($this->fd,"deviceId {$this->deviceId}  at actorId {$this->actorId()} reconnect success");
                    break;
                }
                case $msg::WS_MSG:{$recv = $msg->getArg();
                    Logger::getInstance()->console("deviceId {$this->deviceId}  at actorId {$this->actorId()} recv ws msg: {$recv}");
                    ServerManager::getInstance()->getSwooleServer()->push($this->fd,'actor recv msg for hash'.md5($recv));
                    break;
                }
                case $msg::REPLY_MSG:{$recv = $msg->getArg();
                    Logger::getInstance()->console("deviceId {$this->deviceId}  at actorId {$this->actorId()} recv reply msg: {$recv}");
                    ServerManager::getInstance()->getSwooleServer()->push($this->fd,'actor recv reply msg'.$recv);
                    // 此处 return 一个数据,会返回给客户端
                    return "actorId {$this->actorId()} recv {$recv}";
                    break;
                }
            }
        }
    }

    protected function onExit($arg)
    {if($arg == -1){if(ServerManager::getInstance()->getSwooleServer()->exist($this->fd)){ServerManager::getInstance()->getSwooleServer()->push($this->fd,"heartbeat lost,actor exit");
                ServerManager::getInstance()->getSwooleServer()->close($this->fd);
            }
        }
        DeviceManager::deleteDevice($this->deviceId);
        Logger::getInstance()->console("deviceId {$this->deviceId} at actorId {$this->actorId()} exit");
    }

    protected function onException(\Throwable $throwable)
    {Trigger::getInstance()->throwable($throwable);
    }
}

在该 Actor 内,我们定义了这个设备的生命周期行为。

  • 设备上线,记录设备 id 与 fd 信息,并创建心跳周期检查
  • 收到消息,可以对该 Actor 投递数据,处理对应的消息行为
  • 设备下线,当设备下线,可以自动的清理定时器与其他的一些通知与清理逻辑

我们可以很清楚的看到,Actor 模型下,允许我们对一种设备模型进行高度自治的管理。当然,我们本章节主要在讲解如何优雅的利用 Swoole 协程来实现 Actor 模型,从而更好的开发管理我们的设备,因此我不再贴过多的代码,有兴趣的同学可以在 Easyswoole 框架 demo 中查看完整的示例代码 https://github.com/easy-swool…

Easyswoole 项目主页:http://easyswoole.com/
Easyswoole github 主仓库 https://github.com/easy-swool… , 如果你觉得我们的努力有对你起到帮助作用,记得给个 star

退出移动版