pulsar mop mqtt二次开发奉献开源我的项目代码

Linux MacBook单机部署Pulsar并开启认证性能

pulsar集群搭建_亲测胜利

pulsar开启mqtt和认证

pulsar自定义认证插件开发

pulsar自定义创立公布和订阅主题权限插件开发

Fork代码


源代码地址
https://github.com/streamnati...

拜访原始仓库,点击fork,将原始仓库代码fork到本人的GitHub账号下,成为正本仓库。

点击Fork到本人的仓库

Clone正本仓库到本地

把fork后的正本仓库 clone 到本地。

git clone https://github.com/tw-iot/mop.git   

配置上游我的项目地址

配置上游我的项目地址的目标未来如果原来那个我的项目streamnative/mop有代码更新时,咱们须要把它最新代码合并到我本人的Fork的我的项目中,这样能力放弃代码同步,否则你的我的项目永远停留在Fork时候的版本。

cd mopgit remote add upstream https://github.com/streamnative/mop.git获取上游我的项目更新git fetch upstream合并到本地分支git merge upstream/master提交推送git push origin master这样你的代码就和原我的项目的代码放弃同步了。

这里的 upstream 就是咱们上游我的项目地址的别名,待会儿就是从这个我的项目中去拉最新的代码。

创立本地分支

进入仓库目录,应用如下命令创立并切换到authorization分支(本人的本地分支)。

git checkout -b authorization # 创立并切换到authorization分支

本地仓库批改提交

基于本地分支authorization进行代码批改,而后进行本地提交。

git add -Agit commit -m "first commit"git push

创立一个 Pull Request

当初假如你在本地我的项目中批改了代码,新增了文件,当咱们把代码push到Github之后,你就能够在GitHub发动一个Pull Request了,告知原我的项目,我修复了一些Bug,更新了某些个性,请把我的代码合并过来吧。

新建一个 Pull Request,如果GitHub发现你的代码和原我的项目差别,那么就能够胜利 Create Pull Request。这样原我的项目的负责人就能够收到你的Pull Reuqest了。而后就等着他审核、合并代码,审核通过之后,你的代码将被正式合并到他的原我的项目中去。


批改mop mqtt插件源代码

MQTT代理5682端口

切换对应的分支git checkout branch-2.8.1 

批改类
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor

    processConnect 办法        MqttConnectPayload payload = msg.payload();    String clientId = payload.clientIdentifier();        //截取clientId    String clientIdentifier = payload.clientIdentifier();    if (clientIdentifier != null && clientIdentifier.length() != 0) {        clientId = clientIdentifier.split("\\|")[0];    }    String userRole = null;    // Authenticate the client        userRole = authResult.getUserRole();    //连贯胜利上后,马上发一条上线音讯,与1883建设连贯    NettyUtils.setUserRole(channel, userRole);    pulsarMsg("login", channel);        //心跳办法,每订阅一个topic,多一个心跳(这里是个bug,一个连贯一个心跳,已解决)    @Override    public void processPingReq(Channel channel) {        //channel.writeAndFlush(pingResp());        //topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {        //    exchanger.writeAndFlush(pingReq());        //}));                channel.writeAndFlush(pingResp());        brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));    }                //发送pusar音讯办法 proxy代理    //此办法会依据topic,lookup找到broker,而后和broker的mqtt1883建设连贯,    //这样,客户端给5682发心跳, 5682回一个心跳包,而后给brokerPool发心跳,就是给1883发心跳    //代理5682发送心跳的办法processPingReq(),会给1883发送心跳包    //********所以, 连贯代理5682端口时,只须要连贯胜利后,发送一次在线音讯,就和1883建设连贯,会维持心跳,断开连接也会有事件    public void pulsarMsg(String type, Channel channel) {        String userRole = NettyUtils.getUserRole(channel);        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {            return;        }        String[] devArr = userRole.split("$");        String name;        String key;        if (devArr.length == 1) {            //用户连贯            key = "user";            name = userRole;        } else {            name = devArr[0];            key = devArr[1];        }        try {            //设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login            //设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout            //用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;            Map<String, Object> map = new HashMap();            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));            map.put("v", "1.0");            map.put("t", System.currentTimeMillis());            map.put("method", "sys." + type);            CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(                    TopicName.get(pulsarTopic));            lookupResult.whenComplete((brokerAddress, throwable) -> {                if (null != throwable) {                    log.error("pulsarMsg========11111+++++++topic:{}, error:{}", pulsarTopic, throwable);                    return;                }                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);                MqttMessage msg = new MqttMessage(pingHeader, null , JSON.toString(map));                //这样发送收不到音讯,只是与mqtt1883端口建设了连贯,就算用mqtt订阅这个topic也收不到数据                writeToMqttBroker(channel, msg, pulsarTopic, brokerAddress);            });        } catch (Exception e) {            e.printStackTrace();            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());        }    }                                        

MQTT1883端口

批改类
io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl

    processConnect 办法         //截取clientId    String clientIdentifier = payload.clientIdentifier();    if (clientIdentifier != null && clientIdentifier.length() != 0) {        clientId = clientIdentifier.split("\\|")[0];    }        //连贯胜利上后,马上发一条上线音讯    pulsarMsg("login", channel);       processDisconnect办法   //断开连接后,发送离线音讯   pulsarMsg("logout", channel);       processConnectionLost办法    pulsarMsg("logout", channel);        @Override    public void processPingReq(Channel channel) {        channel.writeAndFlush(pingResp());        //心跳,==========留神看默认实现有没有变动        pulsarMsg("login", channel);    }        /**     * 保留最初在线的topic和工夫,避免5682端口订阅多个topic,有多个心跳包     */    public static Map<String, Long> onlineTopicMap = new ConcurrentHashMap<>();        //发送mqtt音讯给pusar办法    public void pulsarMsg(String type, Channel channel) {        String userRole = NettyUtils.getUserRole(channel);        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {            return;        }        String[] devArr = userRole.split("$");        String ame;        String key;        if (devArr.length == 1) {            //用户连贯            key = "user";            name = userRole;        } else {            name = devArr[0];            key = devArr[1];        }        try {            //设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login            //设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout            //用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;            //是心跳包,且map里存在topic            /*long currentTime = System.currentTimeMillis();            if ("login".equals(type) && onlineTopicMap.containsKey(pulsarTopic)) {                long time = onlineTopicMap.get(pulsarTopic);                //小于5秒不反复发送                if (currentTime - time < 5000) {                    return;                }            }            onlineTopicMap.put(pulsarTopic, currentTime);*/            Map<String, Object> map = new HashMap();            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));            map.put("v", "1.0");            map.put("t", System.currentTimeMillis());            map.put("method", "sys." + type);            ByteBuf buf = Unpooled.copiedBuffer(JSON.toString(map), CharsetUtil.UTF_8);            MqttPublishMessage msg = MessageBuilder.publish()                    .payload(buf)                    .topicName(pulsarTopic)                    .qos(MqttQoS.AT_MOST_ONCE)                    .retained(false)                    .build();            this.qosPublishHandlers.qos0().publish(channel, msg);        } catch (Exception e) {            e.printStackTrace();            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());        }    }

减少配置

mqttAuthorizationEnabled=true
public static final String AUTH_MYSQL = "mysql";public static final List<String> SUPPORTED_AUTH_METHODS = ImmutableList.of(AUTH_BASIC, AUTH_TOKEN, AUTH_MYSQL); case AUTH_MYSQL:        return new AuthenticationDataCommand(payload.password() + ":" + payload.userName() + ":" + payload.clientIdentifier());

io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyHandler
io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler

exceptionCaught 办法//异样断开连接时调用processor.processConnectionLost(ctx.channel());

打包好后,复制到pulsar的protocols目录,没有就创立一个protocols目录

mvn install -DskipTests  cp mqtt-impl/target/pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar /Users/liang/software/apache-pulsar-2.8.0.8/protocols/

参考链接:
https://blog.csdn.net/baidu_2...
https://blog.csdn.net/wo54107...