为什么要扩大流媒体服务器(kurento)

咱们从openvidu官网上拉下代码,启动后,默认是一个openvidu服务器对一个kurento服务器。别离去解决信令和流媒体。依据官网文档所指的,一个session,参与者有7个的会话条件下,不同配置的服务器可能接受的压力如下图:

是的,当咱们应用4c8g的服务器的时候,实践上只能解决7个session,也就是只有7个房间同时存在!这在生产上是基本不可能接受的。因为信令服务器压力自身不大,所以咱们必须扩大媒体服务器(kurento),让其承受能力减少。

怎么扩大kurento

openvidu 分为CE版本和Pro版本,咱们应用的是CE版本,也就是源代码外面不会将kurento的代码给咱们,那咱们该怎么扩大源代码,让其能够负载多个kurento呢?首先能够看一下openviduPro-kurento的架构

是的,如上图,一个openvidu服务器扩大了3台kurento服务器,因为瓶颈在kurento上,不思考openviduserver的话,实践上咱们的可承受能力会乘以3!接下来咱们剖析一下openvidu的源码。

KMS_URIS=["ws://116.196.10.***:8888/kurento"]
io.openvidu.server.config.OpenviduConfig:... public List<String> checkKmsUris() {        String property = "KMS_URIS";        return asKmsUris(property, getValue(property));  }...

从这里能够看到,代码的入口在这里。然而如果咱们在KMS_URIS外面增加多个kurento地址,他还是只会取地址外面的第一条kurento地址,也就是没有解决负载操作。起因在上面:

public class FixedOneKmsManager extends KmsManager {    @Override    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {        KmsProperties firstProps = kmsProperties.get(0);        KurentoClient kClient = null;        Kms kms = new Kms(firstProps, loadManager);        try {            kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));            this.addKms(kms);            kms.setKurentoClient(kClient);            // TODO: This should be done in KurentoClient connected event            kms.setKurentoClientConnected(true);            kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());        } catch (KurentoException e) {            log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri());            if (kClient != null) {                kClient.destroy();            }            throw new Exception();        }        return Arrays.asList(kms);    }    @Override    @PostConstruct    protected void postConstructInitKurentoClients() {        try {            List<KmsProperties> kmsProps = new ArrayList<>();            for (String kmsUri : this.openviduConfig.getKmsUris()) {                String kmsId = KmsManager.generateKmsId();                kmsProps.add(new KmsProperties(kmsId, kmsUri));            }            this.initializeKurentoClients(kmsProps, true);        } catch (Exception e) {            // Some KMS wasn't reachable            log.error("Shutting down OpenVidu Server");            System.exit(1);        }    }}

initializeKurentoClients 办法外面分明的写了,只会取KMS_URIS的第0条数据。那如果咱们想解决多条,这里就须要返回一个汇合,很简略,咱们定义一个本人的KmsManager类,重写initializeKurentoClients办法。如下:

public class FixedJDKmsManager extends KmsManager {    @Override    public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception {        ArrayList<Kms> results = new ArrayList<>();        for(KmsProperties kmsp:kmsProperties){            KurentoClient kClient = null;            Kms kms = new Kms(kmsp, loadManager);            try {                kClient = KurentoClient.create(kmsp.getUri(), this.generateKurentoConnectionListener(kms.getId()));                this.addKms(kms);                kms.setKurentoClient(kClient);                // TODO: This should be done in KurentoClient connected event                kms.setKurentoClientConnected(true);                kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());                results.add(kms);            } catch (KurentoException e) {                log.error("KMS in {} is not reachable by OpenVidu Server", kmsp.getUri());                if (kClient != null) {                    kClient.destroy();                }                throw new Exception();            }        }        return results;    }    @Override    @PostConstruct    protected void postConstructInitKurentoClients() {        try {            List<KmsProperties> kmsProps = new ArrayList<>();            for (String kmsUri : this.openviduConfig.getKmsUris()) {                String kmsId = KmsManager.generateKmsId();                kmsProps.add(new KmsProperties(kmsId, kmsUri));            }            this.initializeKurentoClients(kmsProps, true);        } catch (Exception e) {            // Some KMS wasn't reachable            log.error("Shutting down OpenVidu Server");            System.exit(1);        }    }}

配置应用本人定义的kmsManager

怎么实现加权轮询负载平衡

和kms_urls同理,咱们能够在配置文件增加kms_weights:

KMS_WEIGHT=[1]

并且,人为的管制kms_urls的size与kms_weight等同

而后在openviduConfig外面,将其与kms_urls相似的情理初始化。外围代码如下:

public class OpenviduConfig{    ...    private List<Integer> kmsWeights;    public List<Integer> getKmsWeights() {        return kmsWeights;    }    public void setKmsWeights(List<Integer> weights) {        this.kmsWeights = weights;        log.info("=====kms权重被重置为:{}", this.kmsUrisList);    }    public List<Integer> initWeight() {        String property = "KMS_WEIGHT";        return asKmsWeights(property, getValue(property));    }    protected void checkConfigurationProperties(boolean loadDotenv) {      ...      kmsUrisList = checkKmsUris();      kmsWeights = initWeight();      ...    }  ...}

到此,咱们将本人想要的加权参数放入容器了,然而openvidu还没有抉择kurento,接下来须要找到openvidu寻找对应kurento的中央,依据权值去取对应的url:

熟读源码后能够发现,在KurentoSessionManager.joinRoom办法外面,调用getLessLoadedConnectedAndRunningKms办法,这个办法就是抉择kms的办法。同样的情理,咱们能够重写一个办法去依照本人的逻辑抉择kms我写的如下:

public synchronized Kms getLoadBalanceConnectedAndRunningKms() throws NoSuchElementException {        List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected()                && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId()) ).collect(Collectors.toList());        if (kmsLoads.isEmpty()) {            throw new NoSuchElementException();        } else {            //todo 这里编写kms的负载平衡            this.openviduConfig.getKmsWeights();//            Kms kms = kmsLoads.get(Integer.parseInt(kmsNode)).kms;            //第一次:初始化数据            KmsWeightRobin.initKmsLoads(kmsLoads,this.openviduConfig.getKmsWeights());            KmsLoad kmsWeightRobin = KmsWeightRobin.getKmsWeightRobin();            log.info("=========>>>>>>加权轮询后,抉择kms:{}",kmsWeightRobin.kms.getUri());            return  kmsWeightRobin.getKms();        }    }
public class KmsWeightRobin {//    static Map<String,Integer> ipMap=new HashMap<>();    static Map<KmsManager.KmsLoad,Integer> kmsMap=new HashMap<>();    static List<KmsManager.KmsLoad> kmsLoads = null;    static List<Integer> weights = null;    public  static synchronized void initMap() {        kmsMap.clear();        for(int i = 0;i<kmsLoads.size();i++){            kmsMap.put(kmsLoads.get(i),weights.get(i));        }    }    static Integer pos=0;    public static KmsManager.KmsLoad getKmsWeightRobin(){        Map<KmsManager.KmsLoad,Integer> ipServerMap=new ConcurrentHashMap<>();        ipServerMap.putAll(kmsMap);        Set<KmsManager.KmsLoad> ipSet=ipServerMap.keySet();        Iterator<KmsManager.KmsLoad> ipIterator=ipSet.iterator();        //定义一个list放所有server        ArrayList<KmsManager.KmsLoad> ipArrayList=new ArrayList<KmsManager.KmsLoad>();        //循环set,依据set中的能够去得悉map中的value,给list中增加对应数字的server数量        while (ipIterator.hasNext()){            KmsManager.KmsLoad serverName=ipIterator.next();            Integer weight=ipServerMap.get(serverName);            for (int i = 0;i < weight ;i++){                ipArrayList.add(serverName);            }        }        KmsManager.KmsLoad serverName=null;        if (pos>=ipArrayList.size()){            pos=0;        }        serverName=ipArrayList.get(pos);        //轮询+1        pos ++;        return  serverName;    }//    public static Boolean initMapFlag = false;    public static void initKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){        if(ObjectUtils.isEmpty(KmsWeightRobin.kmsLoads)){            KmsWeightRobin.kmsLoads = kmsLoads;        }        if(ObjectUtils.isEmpty(KmsWeightRobin.weights)){            KmsWeightRobin.weights = weights;        }//        if(!initMapFlag){            initMap();//            initMapFlag = true;//        }    }    public static void updateKmsLoads(List<KmsManager.KmsLoad> kmsLoads,List<Integer> weights){        return;    }}

这样,在抉择kms的时候,就能够让容器依照weight去抉择了。