一个简单混合协议通讯列子,物联网和互联网通讯。

38次阅读

共计 6072 个字符,预计需要花费 16 分钟才能阅读完成。

这个列子主要讨论 Tcp,WebSocket 和 http 之间的通讯。长连接和长连接通讯,长连接和短连接通讯。其他协议同理可得
Tcp:代表硬件设备
WebSocket:代表客户端
http:代表网页

本列子是基于 one 框架(https://github.com/lizhichao/one)开发.
配置协议 监听端口
由于 swoole 的模型 WebSocket server 包含 http server , http server 包含 tcp server。
所以我们配置主服务为 WebSocket server , 添加两个 http 和 tcp 监听。配置文件如下:
return [
‘server’ => [
‘server_type’ => \One\Swoole\OneServer::SWOOLE_WEBSOCKET_SERVER,
‘port’ => 8082,
‘action’ => \App\Test\MixPro\Ws::class,
‘mode’ => SWOOLE_PROCESS,
‘sock_type’ => SWOOLE_SOCK_TCP,
‘ip’ => ‘0.0.0.0’,
‘set’ => [
‘worker_num’ => 5
]
],
‘add_listener’ => [
// http 监听
[
‘port’ => 8081,
‘action’ => \App\Server\AppHttpPort::class,
‘type’ => SWOOLE_SOCK_TCP,
‘ip’ => ‘0.0.0.0’,
‘set’ => [
‘open_http_protocol’ => true,
‘open_websocket_protocol’ => false
]
],
// tcp 监听
[
‘port’ => 8083,
‘pack_protocol’ => \One\Protocol\Text::class, // tcp 打包,解包协议,方便在终端调试 我们使用 text 协议. 换行符 表示一个包的结束
‘action’ => \App\Test\MixPro\TcpPort::class,
‘type’ => SWOOLE_SOCK_TCP,
‘ip’ => ‘0.0.0.0’,
‘set’ => [
‘open_http_protocol’ => false,
‘open_websocket_protocol’ => false
]
]
]
];

接下来去 \App\Test\MixPro\Ws 和 \App\Test\MixPro\TcpPort 实现各种事件处理。\App\Server\AppHttpPort 是框架内置的,通过路由处理 http 请求的,配置路由即可。
配置路由

// 首页
Router::get(‘/mix’, [
‘use’ => HttpController::class . ‘@index’,
‘middle’ => [\App\Test\MixPro\TestMiddle::class . ‘@isLogin’] // 中间件 如果用户登录了 直接跳转到相应的页面
]);

Router::group([
‘middle’ => [\App\Test\MixPro\TestMiddle::class . ‘@checkSession’] // 中间件 让用户登录后 才能进入聊天页面 http websocket 都能获取到这个 session
], function () {

// websocket 页面
Router::get(‘/mix/ws’, HttpController::class . ‘@ws’);

// http 页面
Router::get(‘/mix/http’, HttpController::class . ‘@http’);

// http 轮训消息接口
Router::post(‘/mix/http/loop’, HttpController::class . ‘@httpLoop’);

// http 发送消息接口
Router::post(‘/mix/http/send’, HttpController::class . ‘@httpSend’);

});

配置的都是 http 协议路由。websocket 和 tpc 我们直接在回调 action 处理。如果你的项目复杂也可以配置相应的路由。one 框架的路由支持任何协议,使用方法也是统一的。
处理 tcp 协议
其中__construct,onConnect,onClose 不是必须的。如果你想在服务器运行开始时最一些事情就写到 __construct 里面。onConnect 当有客户端连接时触发,每个客户端触发一次 onClose 当有客户端连接断开时触发,每个客户端触发一次
class TcpPort extends Tcp
{
use Funs;

private $users = [];

/**
* @var Ws
*/
protected $server;

/**
* @var Client
*/
protected $global_data;

public function __construct($server, $conf)
{
parent::__construct($server, $conf);
$this->global_data = $this->server->global_data;
}

// 终端连接上服务器时
public function onConnect(\swoole_server $server, $fd, $reactor_id)
{
$name = uuid();
$this->users[$fd] = $name;
$this->sendTo(‘all’, json_encode([‘v’ => 1, ‘n’ => $name]));
$this->sendToTcp($fd, json_encode([‘v’ => 4, ‘n’ => $this->getAllName()]));
$this->global_data->bindId($fd, $name);
$this->send($fd, “ 你的名字是:” . $name);
}

// 消息处理 像某个 name 发送消息
public function onReceive(\swoole_server $server, $fd, $reactor_id, $data)
{
$arr = explode(‘ ‘, $data);
if (count($arr) !== 3 || $arr[0] !== ‘send’) {
$this->send($fd, “ 格式不正确 ”);
return false;
}
$n = $arr[1];
$d = $arr[2];
$this->sendTo($n, json_encode([‘v’ => 3, ‘n’ => $d]));
}

// 下线 通知所有其他终端,解除与 fd 的关系绑定。
public function onClose(\swoole_server $server, $fd, $reactor_id)
{
echo “tcp close {$fd} \n”;
$this->global_data->unBindFd($fd);
$this->sendTo(‘all’, json_encode([‘v’ => 2, ‘n’ => $this->users[$fd]]));
unset($this->users[$fd]);
}

}

定义了一个公共的 traitFuns 主要实现两个方法,获取所有的终端(tcp,ws,http),和向某个用户发送消息。在 ws、http 都会用到这个 在构造函数我们初始化了一个 global_data 用来保存,名称和 fd 的关系。你也可以使用方式储存。因为 fd 没次连接都不同。global_data 是 one 框架内置的。终端连接上服务器时触发事件 onConnect,我们给这个终端取个名字,并把关系保存在 global_data。通知所有终端有个新终端加入,并告诉刚加入的终端当前有哪些终端在线。
处理 websocket 协议
其中__construct,onHandShake,onOpen,onClose 不是必须的。
onHandShake,onOpen 是配合使用的,如果 onOpen 返回 false 服务器会拒绝连接。在 onOpen,onMessage,onClose 可以拿到当前用户的 session 信息和 http 是相通的。
class Ws extends WsServer
{
use Funs;

private $users = [];

/**
* @var Client
*/
public $global_data = null;

public function __construct(\swoole_server $server, array $conf)
{
parent::__construct($server, $conf);
$this->global_data = new Client();
}

// 初始化 session
public function onHandShake(\swoole_http_request $request, \swoole_http_response $response)
{
return parent::onHandShake($request, $response);
}

// ws 发送消息
public function onMessage(\swoole_websocket_server $server, \swoole_websocket_frame $frame)
{
$data = $frame->data;
$arr = json_decode($data, true);
$n = $arr[‘n’];
$d = $arr[‘d’];
$this->sendTo($n, json_encode([‘v’ => 3, ‘n’ => $d]));

}

// 判断用户是否登录 如果没有登录拒绝连接
public function onOpen(\swoole_websocket_server $server, \swoole_http_request $request)
{
$name = $this->session[$request->fd]->get(‘name’);
if ($name) {
$this->users[$request->fd] = $name;
$this->sendTo(‘all’, json_encode([‘v’ => 1, ‘n’ => $name]));
$this->global_data->bindId($request->fd, $name);
return true;
} else {
return false;
}
}

// ws 断开清除信息
public function onClose(\swoole_server $server, $fd, $reactor_id)
{
echo “ws close {$fd} \n”;
$this->global_data->unBindFd($fd);
$this->sendTo(‘all’, json_encode([‘v’ => 2, ‘n’ => $this->users[$fd]]));
unset($this->users[$fd]);
}
}

处理 http 协议
主要是 httpLoop 方法,轮训获取消息。因为 http 是短连接,发给 http 的信息我们是先存放在 $global_data,然后直接这里读取。防止连接间隙丢信息。

class HttpController extends Controller
{

use Funs;

/**
* @var Ws
*/
protected $server;

/**
* @var Client
*/
protected $global_data;

public function __construct($request, $response, $server = null)
{
parent::__construct($request, $response, $server);
$this->global_data = $this->server->global_data;
}

/**
* 首页
*/
public function index()
{
$code = sha1(uuid());
$this->session()->set(‘code’, $code);
return $this->display(‘index’, [‘code’ => $code]);
}

/**
* ws 页面
*/
public function ws()
{
$name = $this->session()->get(‘name’);
if (!$name) {
$name = uuid();
$this->session()->set(‘name’, $name);
}
return $this->display(‘ws’,[‘users’ => $this->getAllName(),’name’ => $name]);
}

/**
* http 页面
*/
public function http()
{
$name = $this->session()->get(‘name’);
if (!$name) {
$name = uuid();
$this->session()->set(‘name’, $name);
}
$this->global_data->set(“http.{$name}”, 1, time() + 60);
$this->sendTo(‘all’, json_encode([‘v’ => 1, ‘n’ => $name]));
return $this->display(‘http’, [‘list’ => $this->getAllName(), ‘name’ => $name]);
}

/**
* http 轮训
*/
public function httpLoop()
{
$name = $this->session()->get(‘name’);
$this->global_data->set(“http.{$name}”, 1, time() + 60);
$i = 0;
do {
$data = $this->global_data->getAndDel(“data.{$name}”);
$i++;
\co::sleep(0.1);
} while ($data === null && $i < 300);
if ($data) {
foreach ($data as &$v) {
$v = json_decode($v, true);
}
} else {
$data = [];
}
return $this->json($data);
}

/**
* http 发送消息
*/
public function httpSend()
{
$n = $this->request->post(‘n’);
$d = $this->request->post(‘d’);
if ($n && $d) {
$this->sendTo($n, json_encode([‘v’ => 3, ‘n’ => $d]));
return ‘1’;
}
return ‘0’;
}

public function __destruct()
{

}

public function __call($name, $arguments)
{
return $this->server->$name(…$arguments);
}

}

到此基本就完成了。你可以去看完整的代码:点这里
其他的一些列子:https://github.com/lizhichao/…

正文完
 0