乐趣区

关于webrtc:Openviduserver搭建kurento负载均衡机制

为什么要扩大流媒体服务器(kurento)

咱们从 openvidu 官网上拉下代码, 启动后,默认是一个 openvidu 服务器对一个 kurento 服务器。别离去解决信令和流媒体。依据官网文档所指的,一个 session, 参与者有 7 个的会话条件下,不同配置的服务器可能接受的压力如下图:

是的,当咱们应用 4c8g 的服务器的时候,实践上只能解决 7 个 session, 也就是只有 7 个房间同时存在!这在生产上是基本不可能接受的。因为信令服务器压力自身不大,所以咱们必须扩大媒体服务器(kurento),让其承受能力减少。

怎么扩大 kurento

openvidu 分为 CE 版本和 Pro 版本,咱们应用的是 CE 版本,也就是源代码外面不会将 kurento 的代码给咱们,那咱们该怎么扩大源代码,让其能够负载多个 kurento 呢?首先能够看一下 openviduPro-kurento 的架构

是的,如上图,一个 openvidu 服务器扩大了 3 台 kurento 服务器,因为瓶颈在 kurento 上,不思考 openviduserver 的话,实践上咱们的可承受能力会乘以 3!接下来咱们剖析一下 openvidu 的源码。

KMS_URIS=["ws://116.196.10.***:8888/kurento"]
io.openvidu.server.config.OpenviduConfig:

...
 public List<String> checkKmsUris() {

        String property = "KMS_URIS";

        return asKmsUris(property, getValue(property));

  }
...

从这里能够看到,代码的入口在这里。然而如果咱们在 KMS_URIS 外面增加多个 kurento 地址,他还是只会取地址外面的第一条 kurento 地址,也就是没有解决负载操作。起因在上面:

public class FixedOneKmsManager extends KmsManager {

    @Override
    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {KmsProperties firstProps = kmsProperties.get(0);
        KurentoClient kClient = null;
        Kms kms = new Kms(firstProps, loadManager);
        try {kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));
            this.addKms(kms);
            kms.setKurentoClient(kClient);

            // TODO: This should be done in KurentoClient connected event
            kms.setKurentoClientConnected(true);
            kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());

        } catch (KurentoException e) {log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri());
            if (kClient != null) {kClient.destroy();
            }
            throw new Exception();}
        return Arrays.asList(kms);
    }

    @Override
    @PostConstruct
    protected void postConstructInitKurentoClients() {
        try {List<KmsProperties> kmsProps = new ArrayList<>();
            for (String kmsUri : this.openviduConfig.getKmsUris()) {String kmsId = KmsManager.generateKmsId();
                kmsProps.add(new KmsProperties(kmsId, kmsUri));
            }
            this.initializeKurentoClients(kmsProps, true);
        } catch (Exception e) {
            // Some KMS wasn't reachable
            log.error("Shutting down OpenVidu Server");
            System.exit(1);
        }
    }

}

initializeKurentoClients 办法外面分明的写了,只会取 KMS_URIS 的第 0 条数据。那如果咱们想解决多条,这里就须要返回一个汇合,很简略,咱们定义一个本人的 KmsManager 类, 重写 initializeKurentoClients 办法。如下:

public class FixedJDKmsManager extends KmsManager {

    @Override
    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {ArrayList<Kms> results = new ArrayList<>();

        for(KmsProperties kmsp:kmsProperties){
            KurentoClient kClient = null;
            Kms kms = new Kms(kmsp, loadManager);
            try {kClient = KurentoClient.create(kmsp.getUri(), this.generateKurentoConnectionListener(kms.getId()));
                this.addKms(kms);
                kms.setKurentoClient(kClient);

                // TODO: This should be done in KurentoClient connected event
                kms.setKurentoClientConnected(true);
                kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());

                results.add(kms);
            } catch (KurentoException e) {log.error("KMS in {} is not reachable by OpenVidu Server", kmsp.getUri());
                if (kClient != null) {kClient.destroy();
                }
                throw new Exception();}
        }

        return results;
    }

    @Override
    @PostConstruct
    protected void postConstructInitKurentoClients() {
        try {List<KmsProperties> kmsProps = new ArrayList<>();
            for (String kmsUri : this.openviduConfig.getKmsUris()) {String kmsId = KmsManager.generateKmsId();
                kmsProps.add(new KmsProperties(kmsId, kmsUri));
            }
            this.initializeKurentoClients(kmsProps, true);
        } catch (Exception e) {
            // Some KMS wasn't reachable
            log.error("Shutting down OpenVidu Server");
            System.exit(1);
        }
    }

}

配置应用本人定义的 kmsManager

怎么实现加权轮询负载平衡

和 kms_urls 同理,咱们能够在配置文件增加 kms_weights:

KMS_WEIGHT=[1]

并且,人为的管制 kms_urls 的 size 与 kms_weight 等同

而后在 openviduConfig 外面,将其与 kms_urls 相似的情理初始化。外围代码如下:

public class OpenviduConfig{

    ...

    private List<Integer> kmsWeights;


    public List<Integer> getKmsWeights() {return kmsWeights;}

    public void setKmsWeights(List<Integer> weights) {
        this.kmsWeights = weights;
        log.info("=====kms 权重被重置为:{}", this.kmsUrisList);
    }

    public List<Integer> initWeight() {

        String property = "KMS_WEIGHT";

        return asKmsWeights(property, getValue(property));
    }


    protected void checkConfigurationProperties(boolean loadDotenv) {

      ...
      kmsUrisList = checkKmsUris();

      kmsWeights = initWeight();
      ...

    }

  ...

}

到此,咱们将本人想要的加权参数放入容器了,然而 openvidu 还没有抉择 kurento,接下来须要找到 openvidu 寻找对应 kurento 的中央,依据权值去取对应的 url:

熟读源码后能够发现,在 KurentoSessionManager.joinRoom 办法外面,调用 getLessLoadedConnectedAndRunningKms 办法,这个办法就是抉择 kms 的办法。同样的情理,咱们能够重写一个办法去依照本人的逻辑抉择 kms 我写的如下:

public synchronized Kms getLoadBalanceConnectedAndRunningKms() throws NoSuchElementException {List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected()
                && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId()) ).collect(Collectors.toList());

        if (kmsLoads.isEmpty()) {throw new NoSuchElementException();
        } else {
            //todo 这里编写 kms 的负载平衡
            this.openviduConfig.getKmsWeights();
//            Kms kms = kmsLoads.get(Integer.parseInt(kmsNode)).kms;

            // 第一次: 初始化数据
            KmsWeightRobin.initKmsLoads(kmsLoads,this.openviduConfig.getKmsWeights());
            KmsLoad kmsWeightRobin = KmsWeightRobin.getKmsWeightRobin();
            log.info("=========>>>>>> 加权轮询后, 抉择 kms:{}",kmsWeightRobin.kms.getUri());
            return  kmsWeightRobin.getKms();}
    }

public class KmsWeightRobin {//    static Map<String,Integer> ipMap=new HashMap<>();

    static Map<KmsManager.KmsLoad,Integer> kmsMap=new HashMap<>();

    static List<KmsManager.KmsLoad> kmsLoads = null;

    static List<Integer> weights = null;

    public  static synchronized void initMap() {kmsMap.clear();

        for(int i = 0;i<kmsLoads.size();i++){kmsMap.put(kmsLoads.get(i),weights.get(i));
        }
    }



    static Integer pos=0;
    public static KmsManager.KmsLoad getKmsWeightRobin(){Map<KmsManager.KmsLoad,Integer> ipServerMap=new ConcurrentHashMap<>();
        ipServerMap.putAll(kmsMap);

        Set<KmsManager.KmsLoad> ipSet=ipServerMap.keySet();
        Iterator<KmsManager.KmsLoad> ipIterator=ipSet.iterator();

        // 定义一个 list 放所有 server
        ArrayList<KmsManager.KmsLoad> ipArrayList=new ArrayList<KmsManager.KmsLoad>();

        // 循环 set,依据 set 中的能够去得悉 map 中的 value,给 list 中增加对应数字的 server 数量
        while (ipIterator.hasNext()){KmsManager.KmsLoad serverName=ipIterator.next();
            Integer weight=ipServerMap.get(serverName);
            for (int i = 0;i < weight ;i++){ipArrayList.add(serverName);
            }
        }
        KmsManager.KmsLoad serverName=null;
        if (pos>=ipArrayList.size()){pos=0;}
        serverName=ipArrayList.get(pos);
        // 轮询 +1
        pos ++;
        return  serverName;
    }


//    public static Boolean initMapFlag = false;

    public static void initKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){if(ObjectUtils.isEmpty(KmsWeightRobin.kmsLoads)){KmsWeightRobin.kmsLoads = kmsLoads;}
        if(ObjectUtils.isEmpty(KmsWeightRobin.weights)){KmsWeightRobin.weights = weights;}
//        if(!initMapFlag){initMap();
//            initMapFlag = true;
//        }


    }


    public static void updateKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){return;}


}

这样,在抉择 kms 的时候,就能够让容器依照 weight 去抉择了。

退出移动版