本章节我们将学习如何使用 MQ
库.
MQ 库简介
MQ
库实现了各类消息代理中间件 (Message Broker) 的连接协议, 目前支持:redis
、mqtt
、stomp
协议.
MQ
库基于上述协议实现了: 生产者 -> 消费者
与订阅 -> 发布
模型, 可以在不依赖其它服务的情况下独立完成任务.
API 介绍
cf 框架提供了多种 MQ
的封装, 当我们需要使用的时候需要根据实际的协议进行选择:
-- local MQ = require "MQ.mqtt"
-- local MQ = require "MQ.redis"
-- local MQ = require "MQ.stomp"
MQ:new(opt)
此方法将会创建一个的 MQ 对象实例.
opt
是一个 table
类型的参数, 可以传递如下值:
- host – 字符串类型, 消息队列的域名或者 IP 地址.
- port – int 类型, 消息队列监听的端口.
- auth/db – 字符串类型, 仅在 redis 协议下用作登录认证或者 db 选择(没有可以不填写).
- username/password – 字符串类型, 仅在 stomp/mqtt 协议下用作登录认证(没有可以不填写).
- vhost – 字符串类型, 仅在使用某些特定消息队列 server 的时候填写(例如:rabbit).
- keepalive – int 类型, 仅在使用 mqtt 的时候用来出发客户端主动发出心跳包的时间.
以 redis broker 为示例:
local MQ = require "MQ.redis"
local mq = MQ:new {
host = "localhost",
port = 6379,
-- db = 0,
-- auth = "123456789",
}
MQ:on(pattern, function)
此方法用来订阅一个指定 pattern
. 当broker
将消息传递到 cf 后, function
将会被调用.
MQ
库会为 function
注入一个 table
类型的参数msg
, 此参数将在断开连接的时候为nil
.
msg
根据采用的协议的不同 msg
的内容也将有所不同. 具体内容以 logging
库的打印为准.
标准使用示例:
local Log = require("logging"):new()
mq:on("/notice", function(msg)
if not msg then
return Log:ERROR("['/notice'] SUBSCRIBE ERROR: 连接已断开.")
end
Log:DEBUG(msg)
end)
开发者可以同时订阅多个parttern
.
MQ:emit(pattern, msg)
此方法用来向指定 pattern
发送消息. msg 为字符串类型的消息.
使用示例:
mq:emit('/notice', '{"code":200,"data":[1,2,3,4,5,6,7,8,9,10]}')
单个 MQ
可以一直复用 emit, 内部会创建一个写入队列去完成消息的顺序发送. (在多个实例中无法保证消息先后)
MQ:start()
此方法在作为独立运行服务端时候调用.
使用示例:
mq:start()
MQ:clsoe()
此方法可以关闭不再使用的 MQ; 在任何情况下 MQ 使用完毕后都需要调用此方法来释放资源.
使用示例:
mq:close()
开始实践
为了演示更加直观, 这里仅使用 redis 作为 broker 中专消息.
1. 模拟生产者与消费者
我们模拟 100 个生产者向 redis 的 /queue
投递消息, 同时定义了一个消费者订阅 /queue
持续进行消费
代码如下:
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"
cf.fork(function ()
local consumer = MQ:new {
host = "localhost",
port = 6379
}
local count = 0
consumer:on("/queue", function (msg)
if not msg then
Log:ERROR("[/queue]连接失败", "已经消费了"..count.."个消息")
return
end
count = count + 1
Log:DEBUG("开始消费:", msg, "已经消费了"..count.."个消息")
end)
consumer:start() -- Websoket 内部无需使用这个方法
end)
for i = 1, 100 do
cf.fork(function()
local producer = MQ:new {
host = "localhost",
port = 6379
}
producer:emit("/queue", json.encode({
code = 200,
data = {id = math.random(1, 1 << 32)
},
}))
producer:close()
end)
end
输出如下:
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3912595079}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 1 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2938696189}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 2 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3499397173}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 3 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1711272453}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 4 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3968420025}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 5 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1887895479}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 6 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3687986737}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 7 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2823099353}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 8 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2528190121}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 9 个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":4107999865}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 10 个消息
.
..
...
....
.....
[2019-06-25 16:05:36,247] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3608578767}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了 100 个消息
为了方便阅读. 我们这里取出前 10 条与最后第 100 条并且将 msg 的数据结构打印出来方便阅读.
消费者的处理方式采用同步非阻塞处理的 (当前业务未处理完成是不会继续处理下个消息的), 如果不想阻塞当前消息队列事件循环可以考虑自行fork
一个协程来处理.
2. 推送消息给某个用户
用户通过认证后接入到 Server 后订阅自己专属的频道, 当有用户专属消息的时候任何服务都可以利用此方法进行业务消息推送.
我们
代码实现如下:
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"
for uid = 1, 10 do
cf.fork(function ()
local client = MQ:new {
host = "localhost",
port = 6379
}
client:on("/user/"..uid.."/*", function (msg)
if not msg then
Log:ERROR("[/user/9257]连接失败")
return
end
Log:DEBUG("UID:["..uid.."]接收到推送消息", msg)
end)
client:start() -- Websoket 内部无需使用这个方法
end)
end
local server = MQ:new {
host = "localhost",
port = 6379
}
cf.at(1, function (...)
server:emit("/user/"..math.random(1, 10).."/ad", json.encode({
code = 200,
data = {}}))
end)
server:start()
运行后终端输出如下所示:
^C[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:20:23,506] [@script/main.lua:18] [DEBUG] : UID:[9]接收到推送消息, {["source"]="/user/9/ad", ["pattern"]="/user/9/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:24,504] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:25,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:26,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:27,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:28,506] [@script/main.lua:18] [DEBUG] : UID:[2]接收到推送消息, {["source"]="/user/2/ad", ["pattern"]="/user/2/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:29,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:30,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:31,505] [@script/main.lua:18] [DEBUG] : UID:[3]接收到推送消息, {["source"]="/user/3/ad", ["pattern"]="/user/3/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:32,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:33,506] [@script/main.lua:18] [DEBUG] : UID:[5]接收到推送消息, {["source"]="/user/5/ad", ["pattern"]="/user/5/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:34,503] [@script/main.lua:18] [DEBUG] : UID:[7]接收到推送消息, {["source"]="/user/7/ad", ["pattern"]="/user/7/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:35,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:36,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:37,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
^C[candy@MacBookPro:~/Documents/core_framework] $
这里我们可以看到, 由消息发布到 /user/9527/*
下的 topic
的时候, 我们可以通过一次 通配符
订阅就可以接收到所有下属路由消息.
3. 消息广播
在各种领域内, 消息推送已经成为了一种最常见的业务. 我们现在来尝试利用 MQ 实现消息推送业务.
首先, 我们将 script/main.lua
的文件写入如下代码:
-- main.lua
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"
for i = 1, 3 do
cf.fork(function ()
local uid = math.random(1, 1 << 32)
local client_mq = MQ:new {
host = "localhost", -- 主机名
port = 6379, -- 端口号
-- db = nil, -- 默认数据库
-- auth = nil, -- 密码
}
client_mq:on("/system/notice", function (msg)
if not msg then
Log:ERROR("['/system/notice'] SUBSCRIBE ERROR: 连接已断开.")
return
end
Log:DEBUG("UID:["..uid.."]接收到消息:", msg)
end)
client_mq:start()
end)
end
local server_mq = MQ:new {
host = "localhost", -- 主机名
port = 6379, -- 端口号
-- db = nil, -- 默认数据库
-- auth = nil, -- 密码
}
cf.at(3, function (args)
server_mq:emit("/system/notice", json.encode({
code = 200,
msg = "系统即将关闭"
}))
end)
server_mq:start()
这里我们用启动了 3 个协程来模拟用户订阅消息, 并且每个协程都使用不同的 UID 来打印. 然后再启动一个定时器模拟每三秒的消息推送业务.
打开终端运行 ./cfadmin
后, 输出如下:
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":" 系统即将关闭 ","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[candy@MacBookPro:~/Documents/core_framework] $
从终端的输出内容中可以看到, 我们确实每隔 3 秒就收到了一次消息推送.
4. 对基于 Websocket 协议的客户端实现业务推送
首先, 我们需要建立一套基于 httpd
库的 Websocket
路由. 让我们打开 script/main.lua
文件并将下面的代码写入进去.
local httpd require "httpd"
local app = httpd:new("Web")
app:ws('/ws', require "ws")
app:listen("0.0.0.0", 8080)
app:run()
Websocket
必须在建立与客户端的连接完成的同时利用 MQ
库订阅 /chat
. 每当客户端发送消息过来触发on_message
的时候, 都将会消息直接发布到 /chat
内部通过中转后实现推送聊天.
然后我们利用前面章节所学的Websocket 指南
, 编写一段简单的 Websocket 路由处理代码. 由于示例代码没有 UID 生成机制. 为了方便调试, 我们随机生成 32 位整数作为唯一 ID 标识符.
script/ws.lua
具体代码如下所示:
local MQ = require "MQ.redis"
local class = require "class"
local websocket = class("websocket")
function websocket:ctor (opt)
self.ws = opt.ws
self.id = math.random(1, 1 << 32)
end
function websocket:on_open ()
self.mq = MQ:new {host = 'localhost', port = 6379}
self.mq:on("/chat", function (msg)
if not msg then
return
end
self.ws:send(msg.payload)
end)
end
function websocket:on_message (data, typ)
if self.mq then
self.mq:emit("/chat", data)
end
print("客户端 ["..self.id.."] 发送了消息:["..data.."]")
end
function websocket:on_error (error)
end
function websocket:on_close ()
if self.mq then
self.mq:close()
self.mq = nil
end
end
return websocket
注意: 我们需要记住当客户端连接断开的时候记得关闭订阅回收资源. 启动./cfadmin
, 查看是否正常运行.
让我们下载客户端工具, 并且安装到我们的 Chrome
浏览器上. 提取码:cgwr
现在, 我们运行客户端工具在地址栏输入 localhost:8080/ws
连接我们刚刚启动的 Websocket Server, 然后开始向服务器发送消息.
如果从终端中和客户端看到类似的输出内容, 说明我们的示例编写完成.
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019/06/25 20:11:59] [INFO] httpd 正在监听: 0.0.0.0:8080
[2019/06/25 20:11:59] [INFO] httpd 正在运行 Web Server 服务...
[2019/06/25 20:12:01] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000095/Sec
[2019/06/25 20:12:17] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000080/Sec
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
[2019/06/25 20:12:23] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000052/Sec
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [3363385555] 发送了消息:[hello! 我是 1]
客户端 [1693861773] 发送了消息:[hello! 我是 2]
最后
上述代码仅用 redis
协议进行模拟, 其它协议请参考 Wiki.
学习完成
至此 Lua Web 开发指南 已经编写完毕. 软件开发领域内不仅仅需要师傅领进门, 个人修行也是一种能力的体现.
cf 框架都内置库非常的多, 维护框架都同时还要编写使用教程. 作者不可能一个一个介绍完全. cf 框架已经有了专属的 QQ 讨论社区: 727531854, 点击加群.
目前内部就作者一个人在里面. 如果您也对它比较感兴趣, 欢迎您到群里来一起交流技术.