关于websocket:websocket-swoole-swoft

63次阅读

共计 11999 个字符,预计需要花费 30 分钟才能阅读完成。

1.Websocket

1.OSI 七层与 TCP/IP 五层模型

2.socket

Socket 实际上是对 TCP/IP 协定的封装,自身并不是协定,而是一个调用接口(API).

Socket 的呈现只是使得程序员更不便地应用 TCP/IP 协定栈而已,是对 TCP/IP 协定的形象,从而造成了咱们晓得的一些最根本的函数接口.

比方 create、listen、connect、accept、send、read 和 write.

3. 简介

WebSocket 是一种网络通信协定.

WebSocket 协定在 2008 年诞生,2011 年成为国际标准。所有浏览器都曾经反对了.

服务器能够被动向客户端推送信息,客户端也能够被动向服务器发送信息,属于服务器推送技术的一种。` 长链接 `
# 服务器推送技术
1    Webpush
2    HTTP server push
3    Pushlet
4    Long polling
5    Flash XMLSocket relays
6    Reliable Group Data Delivery (RGDD)
7    Push notification

4. 与 HTTP 的比照

5. 特点

(1)建设在 TCP 协定之上,服务器端的实现比拟容易。(2)与 HTTP 协定有着良好的兼容性。默认端口也是 `80` 和 `443`,并且握手阶段采纳 HTTP 协定,因而握手时不容易屏蔽,能通过各种 HTTP 代理服务器。(3)数据格式比拟轻量,性能开销小,通信高效。(4)能够发送文本,也能够发送二进制数据。(5)没有 ` 同源限度 `,客户端能够与任意服务器通信。(6)协定标识符是 `ws`(如果加密,则为 wss),服务器网址就是 URL。(scheme)
ws://example.com:80/uri
wss://example.com:80/uri

6. 示例

# 客户端
let ws = new WebSocket("wss://echo.websocket.org");

ws.onopen = function(evt) {console.log("Connection open ..."); 
  ws.send("Hello WebSockets!");
};

ws.onmessage = function(evt) {console.log( "Received Message:" + evt.data);
  ws.close();};

ws.onclose = function(evt) {console.log("Connection closed.");
};

# 服务端
php -> socket_create(),  new Socket
python -> socket,
go -> gorilla/websocket
node -> socket.io / socket.io -client
# 调试
https://jsbin.com/?js,console,output
# webSocket.readyState
readyState 属性返回实例对象的以后状态,共有四种。CONNECTING:值为 0,示意正在连接。OPEN:值为 1,示意连贯胜利,能够通信了。CLOSING:值为 2,示意连贯正在敞开。CLOSED:值为 3,示意连贯曾经敞开,或者关上连贯失败。

2.swoole

1. 简介

作者 韩天峰 
pecl 开发组成员

php 扩大

Swoole 是一个 PHP 的 ` 协程 ` ` 高性能 ` 网络通信引擎,应用 C/C++ 语言编写,提供了多种通信协议的网络服务器和客户端模块。能够不便疾速的实现 TCP/UDP 服务、高性能 Web、WebSocket 服务、物联网、实时通信、游戏、微服务等,使 PHP 不再局限于传统的 Web 畛域。4.4+ 之后, 全面协程化, PHP 协程框架 
# 文档
https://www.swoole.com/

php-fpm` 解决申请

sapi—> 初始化 http 的环境变量

phpcore —> 初始化 php 拓展, 初始化上下文环境

执行 php 脚本

2. 示例

1.http server
        $http = new Swoole\Http\Server("127.0.0.1", 9501);

    $http->on("start", function ($server) {echo "Swoole http server is started at http://127.0.0.1:9501\n";});

    $http->on("request", function ($request, $response) {$response->header("Content-Type", "text/plain");
        $response->end("Hello World\n");
    });

    $http->start();
2.websocket server
        $server = new Swoole\Websocket\Server("127.0.0.1", 9502);

    $server->on('open', function($server, $req) {echo "connection open: {$req->fd}\n";
    });

    $server->on('message', function($server, $frame) {echo "received message: {$frame->data}\n";
        $server->push($frame->fd, json_encode(["hello", "world"]));
    });

    $server->on('close', function($server, $fd) {echo "connection close: {$fd}\n";
    });

    $server->start();
3.tcp server
        $server = new Swoole\Server("127.0.0.1", 9503);
    $server->on('connect', function ($server, $fd){echo "connection open: {$fd}\n";
    });
    $server->on('receive', function ($server, $fd, $reactor_id, $data) {$server->send($fd, "Swoole: {$data}");
        $server->close($fd);
    });
    $server->on('close', function ($server, $fd) {echo "connection close: {$fd}\n";
    });
    $server->start();
4.udp server
        $serv = new Swoole\Server("127.0.0.1", 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);

    // 监听数据接管事件
    $serv->on('Packet', function ($serv, $data, $clientInfo) {$serv->sendto($clientInfo['address'], $clientInfo['port'], "Server".$data);
        var_dump($clientInfo);
    });

    // 启动服务器
    $serv->start();
5.task
$server = new Swoole\Server("127.0.0.1", 9502);
    $server->set(array('task_worker_num' => 4));
    $server->on('receive', function($server, $fd, $reactor_id, $data) {$task_id = $server->task("Async");
        echo "Dispatch AsyncTask: [id=$task_id]\n";
    });
    $server->on('task', function ($server, $task_id, $reactor_id, $data) {echo "New AsyncTask[id=$task_id]\n";
        $server->finish("$data -> OK");
    });
    $server->on('finish', function ($server, $task_id, $data) {echo "AsyncTask[$task_id] finished: {$data}\n";
    });
    $server->start();
6.coroutine
// 睡眠 1 万次,读取,写入,检查和删除文件 1 万次,应用 PDO 和 MySQLi 与数据库通信 1 万次,创立 TCP 服务器和多个客户端互相通信 1 万次,// 创立 UDP 服务器和多个客户端到互相通信 1 万次...... 一切都在一个过程一秒内完满实现!Swoole\Runtime::enableCoroutine();// 此行代码后,文件操作,sleep,Mysqli,PDO,streams 等都变成异步 IO,见文档 '一键协程化' 章节
   $s = microtime(true);
    //Co/run()见文档 '协程容器' 章节
   Co\run(function() {
    // i just want to sleep...
    for ($c = 100; $c--;) {go(function () {for ($n = 100; $n--;) {usleep(1000);
            }
        });
    }

    // 10k file read and write
    for ($c = 100; $c--;) {go(function () use ($c) {$tmp_filename = "/tmp/test-{$c}.php";
            for ($n = 100; $n--;) {$self = file_get_contents(__FILE__);
                file_put_contents($tmp_filename, $self);
                assert(file_get_contents($tmp_filename) === $self);
            }
            unlink($tmp_filename);
        });
    }

    // 10k pdo and mysqli read
    for ($c = 50; $c--;) {go(function () {$pdo = new PDO('mysql:host=127.0.0.1;dbname=test;charset=utf8', 'root', 'root');
            $statement = $pdo->prepare('SELECT * FROM `user`');
            for ($n = 100; $n--;) {$statement->execute();
                assert(count($statement->fetchAll()) > 0);
            }
        });
    }
    for ($c = 50; $c--;) {go(function () {$mysqli = new Mysqli('127.0.0.1', 'root', 'root', 'test');
            $statement = $mysqli->prepare('SELECT `id` FROM `user`');
            for ($n = 100; $n--;) {$statement->bind_result($id);
                $statement->execute();
                $statement->fetch();
                assert($id > 0);
            }
        });
    }

    // php_stream tcp server & client with 12.8k requests in single process
    function tcp_pack(string $data): string
    {return pack('n', strlen($data)) . $data;
    }

    function tcp_length(string $head): int
    {return unpack('n', $head)[1];
    }

    go(function () {$ctx = stream_context_create(['socket' => ['so_reuseaddr' => true, 'backlog' => 128]]);
        $socket = stream_socket_server(
            'tcp://0.0.0.0:9502',
            $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $ctx
        );
        if (!$socket) {echo "$errstr ($errno)\n";
        } else {
            $i = 0;
            while ($conn = stream_socket_accept($socket, 1)) {stream_set_timeout($conn, 5);
                for ($n = 100; $n--;) {$data = fread($conn, tcp_length(fread($conn, 2)));
                    assert($data === "Hello Swoole Server #{$n}!");
                    fwrite($conn, tcp_pack("Hello Swoole Client #{$n}!"));
                }
                if (++$i === 128) {fclose($socket);
                    break;
                }
            }
        }
    });
    for ($c = 128; $c--;) {go(function () {$fp = stream_socket_client("tcp://127.0.0.1:9502", $errno, $errstr, 1);
            if (!$fp) {echo "$errstr ($errno)\n";
            } else {stream_set_timeout($fp, 5);
                for ($n = 100; $n--;) {fwrite($fp, tcp_pack("Hello Swoole Server #{$n}!"));
                    $data = fread($fp, tcp_length(fread($fp, 2)));
                    assert($data === "Hello Swoole Client #{$n}!");
                }
                fclose($fp);
            }
        });
    }

    // udp server & client with 12.8k requests in single process
    go(function () {$socket = new Swoole\Coroutine\Socket(AF_INET, SOCK_DGRAM, 0);
        $socket->bind('127.0.0.1', 9503);
        $client_map = [];
        for ($c = 128; $c--;) {for ($n = 0; $n < 100; $n++) {$recv = $socket->recvfrom($peer);
                $client_uid = "{$peer['address']}:{$peer['port']}";
                $id = $client_map[$client_uid] = ($client_map[$client_uid] ?? -1) + 1;
                assert($recv === "Client: Hello #{$id}!");
                $socket->sendto($peer['address'], $peer['port'], "Server: Hello #{$id}!");
            }
        }
        $socket->close();});
    for ($c = 128; $c--;) {go(function () {$fp = stream_socket_client("udp://127.0.0.1:9503", $errno, $errstr, 1);
            if (!$fp) {echo "$errstr ($errno)\n";
            } else {for ($n = 0; $n < 100; $n++) {fwrite($fp, "Client: Hello #{$n}!");
                    $recv = fread($fp, 1024);
                    list($address, $port) = explode(':', (stream_socket_get_name($fp, true)));
                    assert($address === '127.0.0.1' && (int)$port === 9503);
                    assert($recv === "Server: Hello #{$n}!");
                }
                fclose($fp);
            }
        });
    }
  });
  echo 'use' . (microtime(true) - $s) . 's';
7.Channel
 Co\run(function(){
        // 应用 Channel 进行协程间通信
        $chan = new Swoole\Coroutine\Channel(1);
        Swoole\Coroutine::create(function () use ($chan) {for($i = 0; $i < 100000; $i++) {co::sleep(1.0);
                $chan->push(['rand' => rand(1000, 9999), 'index' => $i]);
                echo "$i\n";
            }
        });
        Swoole\Coroutine::create(function () use ($chan) {while(1) {$data = $chan->pop();
                var_dump($data);
            }
        });
  });

3. 格调

服务端 + 客户端

1. 异步格调
2. 协程格调

过程 —> 线程 —> 协程

连接池 (server 的 manager 模块)

# 创立
`Coroutine::create` 或 `go` 办法创立协程

反对 `waitgroup`
# 通信问题
     过程
             高性能共享内存 `Table`
       `Process` 模块 (代替 php 自带的 `pcntl`)
   协程
       `Coroutine\Channel`
        并发编程: Coroutine::create() + setdefer()  ->   go() + channel


退出协程 `exit` 禁用
` 异样捕捉 ` 不能跨协程
在多个协程间不能 ` 共用 ` 一个连贯
禁止应用 ` 动态类 ` 或者 ` 全局变量 ` 保留上下文对象
`sleep` 不能用

4. 根本须知

1. 四种设置回调函数的形式
# 匿名函数
$server->on('Request', function ($req, $resp) use ($a, $b, $c) {echo "hello world";});
Copy to clipboardErrorCopied
可应用 use 向匿名函数传递参数
# 类静态方法
class A
{static function test($req, $resp)
    {echo "hello world";}
}
$server->on('Request', 'A::Test');
$server->on('Request', array('A', 'Test'));
Copy to clipboardErrorCopied
对应的静态方法必须为 public
# 函数
function my_onRequest($req, $resp)
{echo "hello world";}
$server->on('Request', 'my_onRequest');
# 对象办法
class A
{function test($req, $resp)
    {echo "hello world";}
}
$object = new A();
$server->on('Request', array($object, 'test'));
对应的办法必须为 public
2. 同步 IO / 异步 IO
# 网络 io 模型:
同步模型(synchronous IO)阻塞 IO(bloking IO)非阻塞 IO(non-blocking IO)多路复用 IO(multiplexing IO)`poll/select`  -> `epoll`
信号驱动式 IO(signal-driven IO)异步 IO(asynchronous IO)
3.EventLoop
所谓 EventLoop,即事件循环,能够简略的了解为 `epoll_wait`,咱们会把所有要产生事件的句柄(fd)退出到 epoll_wait 中,这些事件包含可读,可写,出错等。咱们的过程就阻塞在 epoll_wait 这个内核函数上,当产生了事件 (或超时) 后 epoll_wait 这个函数就会完结阻塞返回后果,就能够回调相应的 PHP 函数,例如,收到客户端发来的数据,回调 OnRecieve 回调函数。当有大量的 fd 放入到了 epoll_wait 中,并且同时产生了大量的事件,epoll_wait 函数返回的时候咱们就会挨个调用相应的回调函数,叫做一轮事件循环,即 IO 多路复用,而后再次阻塞调用 epoll_wait 进行下一轮事件循环。
4.TCP 粘包问题
tcp 封包 解包 粘包 
数据封包协定规定:整个数据包蕴含 2 字节长度信息 + 数据包体。2 字节长度信息蕴含自身着 2 字节。如:数据体是(abcdefg)7 个字节,整体封包就是 09abcdefg,总共是 9 个字节的协定
EOF 结束符协定
固定包头 + 包体协定
5.IPC
同一台主机上两个过程间通信 (`Inter-Process Communication`)
在 Swoole 下应用了 2 种形式 :

# Unix Socket :
全名 UNIX Domain Socket, 简称 UDS
SOCK_STREAM : 数据大用 (有粘包问题)
SOCK_DGRAM : 数据小用 (64k)

# sysvmsg :
Linux 提供的音讯队列,这种 IPC 形式通过一个文件名来作为 key 进行通信,这种形式十分的不灵便,理论我的项目应用的并不多

5. 装置

扩大抵触

xdebug
phptrace
aop
molten
xhprof
phalcon(Swoole 协程无奈运行在 phalcon 框架中)

必须

php-7.1 或更高版本
gcc-4.8 或更高版本
make
autoconf
1. 源码装置
#1. 下载 swoole 源码
https://github.com/swoole/swoole-src/releases
http://pecl.php.net/package/swoole
http://git.oschina.net/swoole/swoole

#2. 从源码编译装置
下载源代码包后,在终端进入源码目录,执行上面的命令进行编译和装置
cd swoole-src && \
phpize && \
./configure && \
--enable-openssl  \
--enable-http2 && \
make && sudo make install

#3. 启用扩大
编译装置到零碎胜利后,须要在 php.ini 中退出一行 extension=swoole.so 来启用 Swoole 扩大
2.pecl 装置
pecl install swoole

3.swoft

version:2.x

1. 环境筹备

# 必要局部
PHP,版本 >=7.1
PHP 包管理器 Composer
PCRE 库
PHP 扩大 Swoole,版本 >=4.3
额定扩大:PDO、Redis

#抵触局部
Xdebug
Xhprof
Blackfire
Zend
trace
Uopz

2. 装置形式

1.docker
docker run -p 18306:18306 --name swoft swoft/swoft
2.docker-compose
git clone https://github.com/swoft-cloud/swoft
cd swoft
docker-compose up

# sserver 上面有以下文件配置
docker-compose.yml
docker-sync -> rsync , native_osx
3.composer
composer create-project swoft/swoft Swoft
4. 手动装置
git clone https://github.com/swoft-cloud/swoft
cd swoft
composer install
cp .env.example .env
5.swoftcli
# 反对从不同模板我的项目中疾速创立一个洁净的 Swoft 利用
php swoftcli.phar create:app --type full Swoft-Full
php swoftcli.phar create:app --type ws Swoft-WebSocket
php swoftcli.phar create:app --type tcp Swoft-TCP

# 应用
cp swoftcli.phar /usr/local/bin/swoftcli && chmod a+x swoftcli

3. 目录构造

├── app/    ----- 利用代码目录
│   ├── Annotation/        ----- 定义注解相干   (`ReflectionCalss`)
│   ├── Aspect/            ----- AOP 切面      (`Aspect-oriented programming`)
│   ├── Common/            ----- 一些具备独立性能的 class bean
│   ├── Console/           ----- 命令行代码目录
│   ├── Exception/         ----- 定义异样类目录
│   │   └── Handler/           ----- 定义异样解决类目录
│   ├── Http/              ----- HTTP 服务代码目录
│   │   ├── Controller/
│   │   └── Middleware/
│   ├── Helper/            ----- 助手函数
│   ├── Listener/          ----- 事件监听器目录
│   ├── Model/             ----- 模型、逻辑等代码目录(这些层并不限定,依据须要应用)
│   │   ├── Dao/
│   │   ├── Data/
│   │   ├── Logic/
│   │   └── Entity/
│   ├── Rpc/               ----- RPC 服务代码目录
│   │   └── Service/
│   │   └── Middleware/
│   ├── WebSocket/         ----- WebSocket 服务代码目录
│   │   ├── Chat/
│   │   ├── Middleware/
│   │   └── ChatModule.php
│   ├── Tcp/               ----- TCP 服务代码目录
│   │   └── Controller/        ----- TCP 服务解决控制器目录
│   ├── Application.php    ----- 利用类文件继承自 swoft 外围
│   ├── AutoLoader.php     ----- 我的项目扫描等信息(利用自身也算是一个组件)
│   └── bean.php
├── bin/
│   ├── bootstrap.php
│   └── swoft              ----- Swoft 入口文件
├── config/                ----- 利用配置目录
│   ├── base.php               ----- 根底配置
│   └── db.php                 ----- 数据库配置
├── public/                ----- 公共目录
├── resource/              ----- 利用资源目录
│   ├── language/              ----- 语言资源目录  
│   └── view/                  ----- 视图资源目录  
├── runtime/               ----- 长期文件目录(日志、上传文件、文件缓存等)├── test/                  ----- 单元测试目录
│   └── bootstrap.php
├── composer.json
├── phar.build.inc
└── phpunit.xml.dist

4. 运行服务

如果在 .env 文件中开启了调试 SWOFT_DEBUG=1 将会在控制台中显示更多具体的信息。
1.http server
# 启动 HTTP 服务
$ php ./bin/swoft http:start

# 以守护过程模式启动
$ php ./bin/swoft http:start -d

# 重启 HTTP 服务
$ php ./bin/swoft http:restart

# 从新加载 HTTP 服务
$ php ./bin/swoft http:reload

# 进行 HTTP 服务
$ php ./bin/swoft http:stop

# swoftcli
swoftcli -h
swoftcli run -c ws:start
swoftcli run -c http:start
2.websocket server
# 启动 WS 服务
$ php ./bin/swoft ws:start

# 以守护过程模式启动
$ php ./bin/swoft ws:start -d

# 重启 WS 服务
$ php ./bin/swoft ws:restart

# 从新加载 WS 服务
$ php ./bin/swoft ws:reload

# 敞开 WS 服务
$ php ./bin/swoft ws:stop
3.rpc server
# 启动 RPC 服务
$ php ./bin/swoft rpc:start

# 以守护过程模式启动
$ php ./bin/swoft rpc:start -d

# 重启 RPC 服务
$ php ./bin/swoft rpc:restart

# 从新加载 RPC 服务
$ php ./bin/swoft rpc:reload

# 敞开 RPC 服务
$ php ./bin/swoft rpc:stop

5. 注解

use Swoft\Http\Message\Request;
use Swoft\Http\Server\Annotation\Mapping\Controller;
use Swoft\Http\Server\Annotation\Mapping\RequestMapping;

/**
 * Class Home
 *
 * @Controller(prefix="home")
 */
class Home
{
    /**
     * 该办法路由地址为 /home/index
     *
     * @RequestMapping(route="/index", method="post")
     *
     * @param Request $request
     */
    public function index(Request $request)
    {// TODO:}
}

6.IoC – DI

IoC: Inversion of Control
DI: Dependency Injection

Bean 容器

# 定义
@Bean
命名空间:\Swoft\Bean\Annotation\Bean

name 定义 Bean 别名,缺省默认类名
scope 注入 Bean 类型,默认单例,Scope::SINGLETON/Scope::PROTOTYPE(每次创立)
ref 指定援用 Bean,用于定义在接口下面,指定应用哪个接口实现。
# 注入
@Inject
命名空间:\Swoft\Bean\Annotation\Inject

name 定义属性注入的 bean 名称,缺省属性主动类型名称
# 操作
App::getBean("name");
ApplicationContext::getBean('name');
BeanFactory::getBean('name');
BeanFactory::hasBean("name");

正文完
 0