关于pulsar:pulsar-mop-mqtt二次开发贡献开源项目代码

50次阅读

共计 6465 个字符,预计需要花费 17 分钟才能阅读完成。

pulsar mop mqtt 二次开发奉献开源我的项目代码

Linux MacBook 单机部署 Pulsar 并开启认证性能

pulsar 集群搭建_亲测胜利

pulsar 开启 mqtt 和认证

pulsar 自定义认证插件开发

pulsar 自定义创立公布和订阅主题权限插件开发

Fork 代码

源代码地址
https://github.com/streamnati…

拜访原始仓库,点击 fork,将原始仓库代码 fork 到本人的 GitHub 账号下,成为正本仓库。

点击 Fork 到本人的仓库

Clone 正本仓库到本地

把 fork 后的正本仓库 clone 到本地。

git clone https://github.com/tw-iot/mop.git   

配置上游我的项目地址

配置上游我的项目地址的目标未来如果原来那个我的项目 streamnative/mop 有代码更新时,咱们须要把它最新代码合并到我本人的 Fork 的我的项目中,这样能力放弃代码同步,否则你的我的项目永远停留在 Fork 时候的版本。

cd mop

git remote add upstream https://github.com/streamnative/mop.git

获取上游我的项目更新
git fetch upstream

合并到本地分支
git merge upstream/master

提交推送
git push origin master

这样你的代码就和原我的项目的代码放弃同步了。

这里的 upstream 就是咱们上游我的项目地址的别名,待会儿就是从这个我的项目中去拉最新的代码。

创立本地分支

进入仓库目录,应用如下命令创立并切换到 authorization 分支(本人的本地分支)。

git checkout -b authorization # 创立并切换到 authorization 分支 

本地仓库批改提交

基于本地分支 authorization 进行代码批改,而后进行本地提交。

git add -A

git commit -m "first commit"

git push

创立一个 Pull Request

当初假如你在本地我的项目中批改了代码,新增了文件,当咱们把代码 push 到 Github 之后,你就能够在 GitHub 发动一个 Pull Request 了,告知原我的项目,我修复了一些 Bug,更新了某些个性,请把我的代码合并过来吧。

新建一个 Pull Request,如果 GitHub 发现你的代码和原我的项目差别,那么就能够胜利 Create Pull Request。这样原我的项目的负责人就能够收到你的 Pull Reuqest 了。而后就等着他审核、合并代码,审核通过之后,你的代码将被正式合并到他的原我的项目中去。

批改 mop mqtt 插件源代码

MQTT 代理 5682 端口

 切换对应的分支
git checkout branch-2.8.1 

批改类
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor


    processConnect 办法
    
    MqttConnectPayload payload = msg.payload();
    String clientId = payload.clientIdentifier();
    
    // 截取 clientId
    String clientIdentifier = payload.clientIdentifier();
    if (clientIdentifier != null && clientIdentifier.length() != 0) {clientId = clientIdentifier.split("\\|")[0];
    }

    String userRole = null;
    // Authenticate the client
    
    userRole = authResult.getUserRole();

    // 连贯胜利上后, 马上发一条上线音讯, 与 1883 建设连贯
    NettyUtils.setUserRole(channel, userRole);
    pulsarMsg("login", channel);
    
    // 心跳办法, 每订阅一个 topic, 多一个心跳 (这里是个 bug, 一个连贯一个心跳, 已解决)
    @Override
    public void processPingReq(Channel channel) {//channel.writeAndFlush(pingResp());
        //topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {//    exchanger.writeAndFlush(pingReq());
        //}));
        
        channel.writeAndFlush(pingResp());
        brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));
    }
            
    // 发送 pusar 音讯办法 proxy 代理
    // 此办法会依据 topic,lookup 找到 broker, 而后和 broker 的 mqtt1883 建设连贯,
    // 这样, 客户端给 5682 发心跳, 5682 回一个心跳包, 而后给 brokerPool 发心跳, 就是给 1883 发心跳
    // 代理 5682 发送心跳的办法 processPingReq(), 会给 1883 发送心跳包
    //******** 所以, 连贯代理 5682 端口时, 只须要连贯胜利后, 发送一次在线音讯, 就和 1883 建设连贯, 会维持心跳, 断开连接也会有事件
    public void pulsarMsg(String type, Channel channel) {String userRole = NettyUtils.getUserRole(channel);
        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {return;}
        String[] devArr = userRole.split("$");
        String name;
        String key;
        if (devArr.length == 1) {
            // 用户连贯
            key = "user";
            name = userRole;
        } else {name = devArr[0];
            key = devArr[1];
        }

        try {// 设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
            // 设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
            // 用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
            Map<String, Object> map = new HashMap();
            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
            map.put("v", "1.0");
            map.put("t", System.currentTimeMillis());
            map.put("method", "sys." + type);
            CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(TopicName.get(pulsarTopic));
            lookupResult.whenComplete((brokerAddress, throwable) -> {if (null != throwable) {log.error("pulsarMsg========11111+++++++topic:{}, error:{}", pulsarTopic, throwable);
                    return;
                }
                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);
                MqttMessage msg = new MqttMessage(pingHeader, null , JSON.toString(map));
                // 这样发送收不到音讯, 只是与 mqtt1883 端口建设了连贯, 就算用 mqtt 订阅这个 topic 也收不到数据
                writeToMqttBroker(channel, msg, pulsarTopic, brokerAddress);
            });
        } catch (Exception e) {e.printStackTrace();
            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
        }
    }
                    
                    

MQTT1883 端口

批改类
io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl


    processConnect 办法
     
    // 截取 clientId
    String clientIdentifier = payload.clientIdentifier();
    if (clientIdentifier != null && clientIdentifier.length() != 0) {clientId = clientIdentifier.split("\\|")[0];
    }
    
    // 连贯胜利上后, 马上发一条上线音讯
    pulsarMsg("login", channel);
    
   processDisconnect 办法
   // 断开连接后, 发送离线音讯
   pulsarMsg("logout", channel);
   
    processConnectionLost 办法
    pulsarMsg("logout", channel);
    
    @Override
    public void processPingReq(Channel channel) {channel.writeAndFlush(pingResp());

        // 心跳,========== 留神看默认实现有没有变动
        pulsarMsg("login", channel);
    }
    
    /**
     * 保留最初在线的 topic 和工夫, 避免 5682 端口订阅多个 topic, 有多个心跳包
     */
    public static Map<String, Long> onlineTopicMap = new ConcurrentHashMap<>();
    
    // 发送 mqtt 音讯给 pusar 办法
    public void pulsarMsg(String type, Channel channel) {String userRole = NettyUtils.getUserRole(channel);
        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {return;}
        String[] devArr = userRole.split("$");
        String ame;
        String key;
        if (devArr.length == 1) {
            // 用户连贯
            key = "user";
            name = userRole;
        } else {name = devArr[0];
            key = devArr[1];
        }

        try {// 设施上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
            // 设施离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
            // 用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;

            // 是心跳包, 且 map 里存在 topic
            /*long currentTime = System.currentTimeMillis();
            if ("login".equals(type) && onlineTopicMap.containsKey(pulsarTopic)) {long time = onlineTopicMap.get(pulsarTopic);
                // 小于 5 秒不反复发送
                if (currentTime - time < 5000) {return;}
            }
            onlineTopicMap.put(pulsarTopic, currentTime);*/

            Map<String, Object> map = new HashMap();
            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
            map.put("v", "1.0");
            map.put("t", System.currentTimeMillis());
            map.put("method", "sys." + type);
            ByteBuf buf = Unpooled.copiedBuffer(JSON.toString(map), CharsetUtil.UTF_8);
            MqttPublishMessage msg = MessageBuilder.publish()
                    .payload(buf)
                    .topicName(pulsarTopic)
                    .qos(MqttQoS.AT_MOST_ONCE)
                    .retained(false)
                    .build();
            this.qosPublishHandlers.qos0().publish(channel, msg);
        } catch (Exception e) {e.printStackTrace();
            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
        }
    }

减少配置

mqttAuthorizationEnabled=true
public static final String AUTH_MYSQL = "mysql";

public static final List<String> SUPPORTED_AUTH_METHODS = ImmutableList.of(AUTH_BASIC, AUTH_TOKEN, AUTH_MYSQL);

 case AUTH_MYSQL:
        return new AuthenticationDataCommand(payload.password() + ":" + payload.userName() + ":" + payload.clientIdentifier());

io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyHandler
io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler

exceptionCaught 办法
// 异样断开连接时调用
processor.processConnectionLost(ctx.channel());

打包好后, 复制到 pulsar 的 protocols 目录, 没有就创立一个 protocols 目录

mvn install -DskipTests  

cp mqtt-impl/target/pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar /Users/liang/software/apache-pulsar-2.8.0.8/protocols/

参考链接:
https://blog.csdn.net/baidu_2…
https://blog.csdn.net/wo54107…

正文完
 0