UDP具备是一种很好的封装协定,比方OpenUOM应用UDP封装会比TCP好很多,当初越来越多的业务采纳UDP传输,而后本人定义按序达到以及流控逻辑,然而就我集体的应用教训来看,UDP太难做并发,大多数状况下,应用UDP会让epoll等高性能event机制劣势全无。本文以OpenUOM为例,阐明一下我是怎么解决UDP并发问题的。

异步并发模型与epoll

和apache相比,nginx采纳异步的解决形式,也就是说,一个线程能够解决多个连贯,基于event模型,来了个数据包就读,可能顺次达到的数据不属于同一个连贯,然而没关系,只有能将可读的socket描述符和具体的连贯对应上即可。这样会使得在大并发场景下,让CPU迫近其极限运行,因为它简直没有工夫闲着,它会始终解决达到的数据包。apache的模型就不是这样,它会让一个连贯独自占有一个线程,如果有大量的连贯就会有大量的线程,然而对于每一个线程而言,其数据读写的压力并不是很大,这就会导致大量线程之间频繁切换,而切换会导致cache的刷新等副作用...因而在同样的硬件配置情景下,nginx的异步模型要比apache好很多。

咱们曾经晓得,异步解决是搞定大并发的基本,接下来的问题是,如何让一个就绪的socket和一个业务逻辑连贯对应起来,这个问题在同步模型下并不存在,因为一个线程只解决一个连贯。已经的event机制比方select,poll,它们只能通知你socket n就绪了,你不得不本人去通过数据结构来组织socket n和该连贯信息之间的关系,典型的如下:

struct conn {    int sd;    void *others;};list conns;

一个链表conns囊括了该线程负责的所有连贯,如果select/poll通知你socket n就绪了,你不得不遍历这个conns链表,比拟谁的sd是n,而后取出conn来解决,尽管能够用更加高效的数据结构,然而查找是必不可少的。然而epoll解决了这个问题。

在调用epoll_ctrl将一个socket退出到epoll中时,API会为你提供一个指针,让你间接绑定一个socket描述符和一个指针,一旦socket就绪,取出的是一个构造体,其中蕴含了与该socket对应的指针,因而你便能够这么做:

conn.sd = sd;conn.others = all;ev.events = EPOLLIN;ev.data.ptr = &conn;epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);while (1) {     nfds = epoll_wait(kdpfd, events, 10000, -1);    for (n = 0; n < nfds; ++n) {        conn = events[n].data.ptr;        recv(conn.sd, ....);        ....    }}

conn会一下子取出来。这是正当的形式。毕竟,内核中曾经通过socket查找了,一个5元组惟一代表了一个连贯,为何要在用户态程序再找一次呢?因而除了epoll不须要遍历所有的被监督socket之外,能够保留用户的指针也是其绝对于select/poll的一大劣势。nginx正是用的这种形式。咱们回到OpenUOM。

应用TCP的OpenUOM

应用TCP的OpenUOM跟nginx简直是截然不同,其外围解决逻辑如下:

/* 退出侦听socket */context.sd = listener;context.others = dont_care;listen_ev.events = EPOLLIN;listen_ev.data.ptr = context;epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);/* 退出TUN网卡 */tun.sd = tun;tun.others = dont_care;entry.ptr = tun;entry.type = TUN;tun_ev.events = EPOLLIN;tun_ev.data.ptr = entry;epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);while(1) {    nfds = epoll_wait(kdpfd, events, 10000, -1);    for (n = 0; n < nfds; ++n) {        if (events[n].data.ptr == context) {            child_sd = accept(context.sd, remote_addr....);            multi_instance *mi = create_mi(child_sd, remote_addr, ...);            entry.ptr = mi;            entry.type = SOCKET;            new_ev.events = EPOLLIN;            new_ev.data.ptr = entry;            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);            ....        } else if (events[n].data.ptr.type == SOCKET){            multi_instance *mi = events[n].data.ptr;            data = read_from_socket(mi);            // 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包            decrypt(mi, data);            write_to_tun(data);        } else {            tun *tun = events[n].data.ptr.ptr;            packet = read_from_tun(tun);            lock(mi_hashtable);            multi_instance *mi = lookup_multi_instance_from(packet);            unlock(mi_hashtable);            encrypt(packet);            write_to_socket(packet, mi);        }    }    ...}

以上就是TCP模式下的OpenUOM全副逻辑,能够看到,如果socket可读,那么就能够间接取到multi_instance,而后程序解决就是了。我记得去年我就把OpenUOM改成多线程了,然而当初看来那是个失败的做法。如果应用TCP,从上述逻辑能够看到,就算应用多线程,在socket-to-tun这个门路上也不必加锁,因而multi_instance间接通过epoll_wait就能够取的到。

须要C/C++ Linux服务器架构师学习材料加群812855908(材料包含C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),收费分享

应用UDP的OpenUOM

然而对于UDP而言,OpenUOM的解决逻辑根下面TCP的逻辑就截然不同了。因为全程只有一个UDP socket,承受所有客户端的连贯,此时基本不存在什么多路复用的问题,充其量也就是那惟一的UDP socket和tun网卡字符设施二者之间的两路复用,应用epoll齐全没有必要。为了定位了具体的multi_instance,你不得不先去read惟一的那个UDP socket,而后依据recvfrom返回参数中的sockaddr构造体来结构4元组,而后依据这4元组在全局的multi_instance hash表中去查找具体multi_instance实例。其逻辑如下所示:

/* 退出惟一的UDP socket */context.sd = udp_sd;context.others = dont_care;listen_ev.events = EPOLLIN;listen_ev.data.ptr = context;epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);/* 退出TUN网卡 */tun.sd = tun;tun.others = dont_care;entry.ptr = tun;entry.type = TUN;tun_ev.events = EPOLLIN;tun_ev.data.ptr = entry;epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);while(1) {    nfds = epoll_wait(kdpfd, events, 10000, -1);    for (n = 0; n < nfds; ++n) {  //实际上nfds最多也就是2        if (events[n].data.ptr == context) {            data = recvfrom(context.sd, remote_addr....);            lock(mi_hashtable);   //如果多线程,这个锁将会成为瓶颈,即使是RW锁也一样            multi_instance *mi = lookup_mi(child_sd, remote_addr, ...);  //再好的hash算法,也不是0老本的!            unlock(mi_hashtable);            // 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包            decrypt(mi, data);            write_to_tun(data);              ....        } else {            tun *tun = events[n].data.ptr.ptr;            packet = read_from_tun(tun);            lock(mi_hashtable);            multi_instance *mi = lookup_multi_instance_from(packet);            unlock(mi_hashtable);            encrypt(packet);            write_to_socket(packet, mi);        }    }    ...}

可见,TCP的OpenUOM和UDP的OpenUOM解决形式齐全不同,UDP的问题在于,齐全没有充分利用epoll的多路复用机制,不得不依据数据包的recvfrom返回地址来查找multi_instance...

让UDP socket也Listen起来

如果UDP也能像TCP一样,每一个用户接进来就为之创立一个独自的socket为其专门服务该多好,这样在大并发的时候,就能够充沛复用内核UDP层的socket查找论断加上epoll的告诉机制了。实践上这是可行的,因为UDP的4元组能够惟一辨认一个与之通信的客户端,尽管UDP生成无连贯,不牢靠,然而为每一个连贯的客户端创立一个socket并没有毁坏UDP的语义,只是扭转了UDP的编程模型而已,内核协定栈仍然不会去刻意保护一个UDP连贯,也不会进行任何的数据确认。
须要阐明的是,这种计划仅仅对“长连贯”的UDP有意义,比方OpenUOM这类。因为UDP是没有连贯的,那么你也就不晓得一个客户端什么时候会永远进行发送数据,因而必然要通过定时器来定时敞开那些在肯定时间段内没有数据的socket。
为了验证可行性,我先在用户态做试验,也就是说,承受一个客户端的“连贯申请”(其实就是一个数据包)时,我手工为其创立一个socket,而后bind本地地址,并且connect从recvfrom返回的对端地址,这样实践上对于后续的数据包,epoll都应该触发这个新的socket,毕竟它更准确。事实是不是这样呢?以下的程序能够证实:

#include <stdio.h>#include <stdlib.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/wait.h>#include <unistd.h>#include <arpa/inet.h>#include <fcntl.h>#include <sys/epoll.h>#include <sys/time.h>#include <sys/resource.h>#include <pthread.h>#include <assert.h>#define SO_REUSEPORT    15#define MAXBUF 10240#define MAXEPOLLSIZE 100int flag = 0;int read_data(int sd) {    char recvbuf[MAXBUF + 1];    int  ret;    struct sockaddr_in client_addr;    socklen_t cli_len=sizeof(client_addr);    bzero(recvbuf, MAXBUF + 1);      ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);    if (ret > 0) {        printf("read[%d]: %s  from  %dn", ret, recvbuf, sd);    } else {        printf("read err:%s  %dn", strerror(errno), ret);          }    fflush(stdout);}int udp_accept(int sd, struct sockaddr_in my_addr) {    int new_sd = -1;    int ret = 0;    int reuse = 1;    char buf[16];    struct sockaddr_in peer_addr;    socklen_t cli_len = sizeof(peer_addr);    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);    if (ret > 0) {    }    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {        perror("child socket");        exit(1);    } else {        printf("parent:%d  new:%dn", sd, new_sd);    }    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));    if (ret) {        exit(1);    }    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));    if (ret) {        exit(1);    }    ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));    if (ret){        perror("chid bind");        exit(1);    } else {    }    peer_addr.sin_family = PF_INET;    printf("aaa:%sn", inet_ntoa(peer_addr.sin_addr));    if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {        perror("chid connect");        exit(1);    } else {    }out:    return new_sd;}int main(int argc, char **argv) {    int listener, kdpfd, nfds, n, curfds;    socklen_t len;    struct sockaddr_in my_addr, their_addr;    unsigned int port;    struct epoll_event ev;    struct epoll_event events[MAXEPOLLSIZE];    int opt = 1;;    int ret = 0;    port = 1234;      if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {        perror("socket");        exit(1);    } else {        printf("socket OKn");    }    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));    if (ret) {        exit(1);    }    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));    if (ret) {        exit(1);    }      bzero(&my_addr, sizeof(my_addr));    my_addr.sin_family = PF_INET;    my_addr.sin_port = htons(port);    my_addr.sin_addr.s_addr = INADDR_ANY;    if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {        perror("bind");        exit(1);    } else {        printf("IP bind OKn");    }      kdpfd = epoll_create(MAXEPOLLSIZE);    ev.events = EPOLLIN|EPOLLET;    ev.data.fd = listener;    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);        return -1;    } else {        printf("ep add OKn");    }     while (1) {              nfds = epoll_wait(kdpfd, events, 10000, -1);        if (nfds == -1) {            perror("epoll_wait");            break;        }              for (n = 0; n < nfds; ++n) {            if (events[n].data.fd == listener) {                printf("listener:%dn", n);                int new_sd;                               struct epoll_event child_ev;                new_sd = udp_accept(listener, my_addr);                child_ev.events = EPOLLIN;                child_ev.data.fd = new_sd;                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) {                    fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);                    return -1;                }            } else {                read_data(events[n].data.fd);            }        }    }    close(listener);    return 0;}

须要阐明的是,REUSEPORT是必要的,因为在connect之前,你必须为新建的socket bind跟listener一样的IP地址和端口,因而就须要这个socket选项。
此时,如果你用多个udp客户端去给这个服务端发数据,会发现齐全实现了想要的成果。

内核中的UDP Listener

尽管在用户态能够实现成果,然而编程模型并不太好用,为了创立一个socket,你不得不先去recvfrom一下数据,好失去对端的地址,尽管应用PEEK标记能够让创立好child socket后再读一次,然而认真想想,最彻底的计划还是间接扩大内核,我基于3.9.6内核,对__udp4_lib_rcv这个UDP协定栈接管函数作了以下的批改:

int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,                   int proto){......................        sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);        if (sk != NULL) {                int ret;#if 1                // 这个UDP_LISTEN,通过setsockopt来设置                if (sk->sk_state == UDP_LISTEN) {                        // 如果是UDP的listener,创立一个数据socket                        struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);                        if (newsk) {                                struct inet_sock *newinet;                                // 为这个数据传输socket依据skb来填充4元组信息                                newinet               = inet_sk(newsk);                                newinet->inet_daddr   = ip_hdr(skb)->saddr;                                newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;                                newinet->inet_saddr           = ip_hdr(skb)->daddr;                                rcu_assign_pointer(newinet->inet_opt, NULL);                                newinet->mc_index     = inet_iif(skb);                                newinet->mc_ttl       = ip_hdr(skb)->ttl;                                newinet->rcv_tos      = ip_hdr(skb)->tos;                                newinet->inet_id = 0xffffffff ^ jiffies;                                inet_sk_rx_dst_set(newsk, skb);                                // sock构造体新增csk变量,相似TCP的accept queue,然而为了简略,目前每个Listen socket只能持有一个csk,即child sock。                                sk->csk = newsk;                                // 将新的数据传输socket排入全局的UDP socket hash表                                if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {                                        printk("[UDP listen] get port errorn");                                        release_sock(newsk);                                        err = -2;                                        goto out_go;                                }                                ret = udp_queue_rcv_skb(newsk, skb);                                // 唤醒epoll,让epoll返回UDP Listener                                sk->sk_data_ready(sk, 0);                                sock_put(newsk);                        } else {                                printk("[UDP listen] create new errorn");                                sock_put(sk);                                return -1;                        }out_go:                        sock_put(sk);                        if (ret > 0)                                return -ret;                        return 0;                }#endif                ret = udp_queue_rcv_skb(sk, skb);                sock_put(sk);......................}

我只是测试,因而并没有扩大UDP的accept办法,只是简略的用getsocketopt来取得这个新的socket描述符并为task装置该文件描述符,setsockopt能够设置一个UDP socket为listener。这样用户态的编程模型就很简略了。

应用新的Listen UDP来革新OpenUOM

有必要重构一下OpenUOM了,现如今它的逻辑变成了:

listen = 1;listener = socket(PF_INET, SOCK_DGRAM, 0);setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));/* 退出侦听socket */context.sd = listener;context.others = dont_care;listen_ev.events = EPOLLIN;listen_ev.data.ptr = context;epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);/* 退出TUN网卡 */tun.sd = tun;tun.others = dont_care;entry.ptr = tun;entry.type = TUN;tun_ev.events = EPOLLIN;tun_ev.data.ptr = entry;epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);while(1) {    nfds = epoll_wait(kdpfd, events, 10000, -1);    for (n = 0; n < nfds; ++n) {        if (events[n].data.ptr == context) {            getsockopt(context.sd, SOL_SOCKET, &newsock_info....);            child_sd = newsock_info.sd;            multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);            entry.ptr = mi;            entry.type = SOCKET;            new_ev.events = EPOLLIN;            new_ev.data.ptr = entry;            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);            // 这是UDP,内核除了告诉Listener之外,还会将数据排入child_sd,因而须要去读取,能够参考TCP的Fastopen逻辑            data = recvfrom(child_sd, ....);            ....        } else if (events[n].data.ptr.type == SOCKET){            multi_instance *mi = events[n].data.ptr;            data = read_from_socket(mi);            // 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包            decrypt(mi, data);            write_to_tun(data);        } else {            tun *tun = events[n].data.ptr.ptr;            packet = read_from_tun(tun);            lock(mi_hashtable);            multi_instance *mi = lookup_multi_instance_from(packet);            unlock(mi_hashtable);            encrypt(packet);            write_to_socket(packet, mi);        }    }    ...}

除了把accept改成了getsockopt之外,别的简直和TCP的OpenUOM完全一致了。
如此一来,2014年革新的OpenUOM多线程版本就完满了,用户态基本不须要再应用recvfrom返回的address信息来定位multi_instance了,一个multi_instance惟一和一个socket绑定,而每一个socket都由epoll来治理,大大降低了用户态查找multi_instance的开销,同时也防止了锁定。