共计 6465 个字符,预计需要花费 17 分钟才能阅读完成。
pulsar mop mqtt 二次开发奉献开源我的项目代码
Linux MacBook 单机部署 Pulsar 并开启认证性能
pulsar 集群搭建_亲测胜利
pulsar 开启 mqtt 和认证
pulsar 自定义认证插件开发
pulsar 自定义创立公布和订阅主题权限插件开发
Fork 代码
源代码地址
https://github.com/streamnati…
拜访原始仓库,点击 fork,将原始仓库代码 fork 到本人的 GitHub 账号下,成为正本仓库。
点击 Fork 到本人的仓库
Clone 正本仓库到本地
把 fork 后的正本仓库 clone 到本地。
git clone https://github.com/tw-iot/mop.git
配置上游我的项目地址
配置上游我的项目地址的目标未来如果原来那个我的项目 streamnative/mop 有代码更新时,咱们须要把它最新代码合并到我本人的 Fork 的我的项目中,这样能力放弃代码同步,否则你的我的项目永远停留在 Fork 时候的版本。
cd mop
git remote add upstream https://github.com/streamnative/mop.git
获取上游我的项目更新
git fetch upstream
合并到本地分支
git merge upstream/master
提交推送
git push origin master
这样你的代码就和原我的项目的代码放弃同步了。
这里的 upstream 就是咱们上游我的项目地址的别名,待会儿就是从这个我的项目中去拉最新的代码。
创立本地分支
进入仓库目录,应用如下命令创立并切换到 authorization 分支(本人的本地分支)。
git checkout -b authorization # 创立并切换到 authorization 分支
本地仓库批改提交
基于本地分支 authorization 进行代码批改,而后进行本地提交。
git add -A
git commit -m "first commit"
git push
创立一个 Pull Request
当初假如你在本地我的项目中批改了代码,新增了文件,当咱们把代码 push 到 Github 之后,你就能够在 GitHub 发动一个 Pull Request 了,告知原我的项目,我修复了一些 Bug,更新了某些个性,请把我的代码合并过来吧。
新建一个 Pull Request,如果 GitHub 发现你的代码和原我的项目差别,那么就能够胜利 Create Pull Request。这样原我的项目的负责人就能够收到你的 Pull Reuqest 了。而后就等着他审核、合并代码,审核通过之后,你的代码将被正式合并到他的原我的项目中去。
批改 mop mqtt 插件源代码
MQTT 代理 5682 端口
切换对应的分支
git checkout branch-2.8.1
批改类
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor
processConnect 办法
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
// 截取 clientId
String clientIdentifier = payload.clientIdentifier();
if (clientIdentifier != null && clientIdentifier.length() != 0) {clientId = clientIdentifier.split("\\|")[0];
}
String userRole = null;
// Authenticate the client
userRole = authResult.getUserRole();
// 连贯胜利上后, 马上发一条上线音讯, 与 1883 建设连贯
NettyUtils.setUserRole(channel, userRole);
pulsarMsg("login", channel);
// 心跳办法, 每订阅一个 topic, 多一个心跳 (这里是个 bug, 一个连贯一个心跳, 已解决)
@Override
public void processPingReq(Channel channel) {//channel.writeAndFlush(pingResp());
//topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {// exchanger.writeAndFlush(pingReq());
//}));
channel.writeAndFlush(pingResp());
brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));
}
// 发送 pusar 音讯办法 proxy 代理
// 此办法会依据 topic,lookup 找到 broker, 而后和 broker 的 mqtt1883 建设连贯,
// 这样, 客户端给 5682 发心跳, 5682 回一个心跳包, 而后给 brokerPool 发心跳, 就是给 1883 发心跳
// 代理 5682 发送心跳的办法 processPingReq(), 会给 1883 发送心跳包
//******** 所以, 连贯代理 5682 端口时, 只须要连贯胜利后, 发送一次在线音讯, 就和 1883 建设连贯, 会维持心跳, 断开连接也会有事件
public void pulsarMsg(String type, Channel channel) {String userRole = NettyUtils.getUserRole(channel);
if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {return;}
String[] devArr = userRole.split("$");
String name;
String key;
if (devArr.length == 1) {
// 用户连贯
key = "user";
name = userRole;
} else {name = devArr[0];
key = devArr[1];
}
try {// 设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
// 设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
// 用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
Map<String, Object> map = new HashMap();
map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
map.put("v", "1.0");
map.put("t", System.currentTimeMillis());
map.put("method", "sys." + type);
CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(TopicName.get(pulsarTopic));
lookupResult.whenComplete((brokerAddress, throwable) -> {if (null != throwable) {log.error("pulsarMsg========11111+++++++topic:{}, error:{}", pulsarTopic, throwable);
return;
}
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);
MqttMessage msg = new MqttMessage(pingHeader, null , JSON.toString(map));
// 这样发送收不到音讯, 只是与 mqtt1883 端口建设了连贯, 就算用 mqtt 订阅这个 topic 也收不到数据
writeToMqttBroker(channel, msg, pulsarTopic, brokerAddress);
});
} catch (Exception e) {e.printStackTrace();
log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
}
}
MQTT1883 端口
批改类
io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl
processConnect 办法
// 截取 clientId
String clientIdentifier = payload.clientIdentifier();
if (clientIdentifier != null && clientIdentifier.length() != 0) {clientId = clientIdentifier.split("\\|")[0];
}
// 连贯胜利上后, 马上发一条上线音讯
pulsarMsg("login", channel);
processDisconnect 办法
// 断开连接后, 发送离线音讯
pulsarMsg("logout", channel);
processConnectionLost 办法
pulsarMsg("logout", channel);
@Override
public void processPingReq(Channel channel) {channel.writeAndFlush(pingResp());
// 心跳,========== 留神看默认实现有没有变动
pulsarMsg("login", channel);
}
/**
* 保留最初在线的 topic 和工夫, 避免 5682 端口订阅多个 topic, 有多个心跳包
*/
public static Map<String, Long> onlineTopicMap = new ConcurrentHashMap<>();
// 发送 mqtt 音讯给 pusar 办法
public void pulsarMsg(String type, Channel channel) {String userRole = NettyUtils.getUserRole(channel);
if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {return;}
String[] devArr = userRole.split("$");
String ame;
String key;
if (devArr.length == 1) {
// 用户连贯
key = "user";
name = userRole;
} else {name = devArr[0];
key = devArr[1];
}
try {// 设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
// 设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
// 用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
// 是心跳包, 且 map 里存在 topic
/*long currentTime = System.currentTimeMillis();
if ("login".equals(type) && onlineTopicMap.containsKey(pulsarTopic)) {long time = onlineTopicMap.get(pulsarTopic);
// 小于 5 秒不反复发送
if (currentTime - time < 5000) {return;}
}
onlineTopicMap.put(pulsarTopic, currentTime);*/
Map<String, Object> map = new HashMap();
map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
map.put("v", "1.0");
map.put("t", System.currentTimeMillis());
map.put("method", "sys." + type);
ByteBuf buf = Unpooled.copiedBuffer(JSON.toString(map), CharsetUtil.UTF_8);
MqttPublishMessage msg = MessageBuilder.publish()
.payload(buf)
.topicName(pulsarTopic)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.build();
this.qosPublishHandlers.qos0().publish(channel, msg);
} catch (Exception e) {e.printStackTrace();
log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
}
}
减少配置
mqttAuthorizationEnabled=true
public static final String AUTH_MYSQL = "mysql";
public static final List<String> SUPPORTED_AUTH_METHODS = ImmutableList.of(AUTH_BASIC, AUTH_TOKEN, AUTH_MYSQL);
case AUTH_MYSQL:
return new AuthenticationDataCommand(payload.password() + ":" + payload.userName() + ":" + payload.clientIdentifier());
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyHandler
io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler
exceptionCaught 办法
// 异样断开连接时调用
processor.processConnectionLost(ctx.channel());
打包好后, 复制到 pulsar 的 protocols 目录, 没有就创立一个 protocols 目录
mvn install -DskipTests
cp mqtt-impl/target/pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar /Users/liang/software/apache-pulsar-2.8.0.8/protocols/
参考链接:
https://blog.csdn.net/baidu_2…
https://blog.csdn.net/wo54107…