UDP 具备是一种很好的封装协定,比方 OpenUOM 应用 UDP 封装会比 TCP 好很多,当初越来越多的业务采纳 UDP 传输,而后本人定义按序达到以及流控逻辑,然而就我集体的应用教训来看,UDP 太难做并发,大多数状况下,应用 UDP 会让 epoll 等高性能 event 机制劣势全无。本文以 OpenUOM 为例,阐明一下我是怎么解决 UDP 并发问题的。
异步并发模型与 epoll
和 apache 相比,nginx 采纳异步的解决形式,也就是说,一个线程能够解决多个连贯,基于 event 模型,来了个数据包就读,可能顺次达到的数据不属于同一个连贯,然而没关系,只有能将可读的 socket 描述符和具体的连贯对应上即可。这样会使得在大并发场景下,让 CPU 迫近其极限运行,因为它简直没有工夫闲着,它会始终解决达到的数据包。apache 的模型就不是这样,它会让一个连贯独自占有一个线程,如果有大量的连贯就会有大量的线程,然而对于每一个线程而言,其数据读写的压力并不是很大,这就会导致大量线程之间频繁切换,而切换会导致 cache 的刷新等副作用 … 因而在同样的硬件配置情景下,nginx 的异步模型要比 apache 好很多。
咱们曾经晓得,异步解决是搞定大并发的基本,接下来的问题是,如何让一个就绪的 socket 和一个业务逻辑连贯对应起来,这个问题在同步模型下并不存在,因为一个线程只解决一个连贯。已经的 event 机制比方 select,poll,它们只能通知你 socket n 就绪了,你不得不本人去通过数据结构来组织 socket n 和该连贯信息之间的关系,典型的如下:
struct conn {
int sd;
void *others;
};
list conns;
一个链表 conns 囊括了该线程负责的所有连贯,如果 select/poll 通知你 socket n 就绪了,你不得不遍历这个 conns 链表,比拟谁的 sd 是 n,而后取出 conn 来解决,尽管能够用更加高效的数据结构,然而查找是必不可少的。然而 epoll 解决了这个问题。
在调用 epoll_ctrl 将一个 socket 退出到 epoll 中时,API 会为你提供一个指针,让你间接绑定一个 socket 描述符和一个指针,一旦 socket 就绪,取出的是一个构造体,其中蕴含了与该 socket 对应的指针,因而你便能够这么做:
conn.sd = sd;
conn.others = all;
ev.events = EPOLLIN;
ev.data.ptr = &conn;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);
while (1) {nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {conn = events[n].data.ptr;
recv(conn.sd, ....);
....
}
}
conn 会一下子取出来。这是正当的形式。毕竟,内核中曾经通过 socket 查找了,一个 5 元组惟一代表了一个连贯,为何要在用户态程序再找一次呢?因而除了 epoll 不须要遍历所有的被监督 socket 之外,能够保留用户的指针也是其绝对于 select/poll 的一大劣势。nginx 正是用的这种形式。咱们回到 OpenUOM。
应用 TCP 的 OpenUOM
应用 TCP 的 OpenUOM 跟 nginx 简直是截然不同,其外围解决逻辑如下:
/* 退出侦听 socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 退出 TUN 网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {if (events[n].data.ptr == context) {child_sd = accept(context.sd, remote_addr....);
multi_instance *mi = create_mi(child_sd, remote_addr, ...);
entry.ptr = mi;
entry.type = SOCKET;
new_ev.events = EPOLLIN;
new_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
....
} else if (events[n].data.ptr.type == SOCKET){multi_instance *mi = events[n].data.ptr;
data = read_from_socket(mi);
// 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包
decrypt(mi, data);
write_to_tun(data);
} else {tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}
以上就是 TCP 模式下的 OpenUOM 全副逻辑,能够看到,如果 socket 可读,那么就能够间接取到 multi_instance,而后程序解决就是了。我记得去年我就把 OpenUOM 改成多线程了,然而当初看来那是个失败的做法。如果应用 TCP,从上述逻辑能够看到,就算应用多线程,在 socket-to-tun 这个门路上也不必加锁,因而 multi_instance 间接通过 epoll_wait 就能够取的到。
须要 C /C++ Linux 服务器架构师学习材料加群 812855908(材料包含 C /C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),收费分享
应用 UDP 的 OpenUOM
然而对于 UDP 而言,OpenUOM 的解决逻辑根下面 TCP 的逻辑就截然不同了。因为全程只有一个 UDP socket,承受所有客户端的连贯,此时基本不存在什么多路复用的问题,充其量也就是那惟一的 UDP socket 和 tun 网卡字符设施二者之间的两路复用,应用 epoll 齐全没有必要。为了定位了具体的 multi_instance,你不得不先去 read 惟一的那个 UDP socket,而后依据 recvfrom 返回参数中的 sockaddr 构造体来结构 4 元组,而后依据这 4 元组在全局的 multi_instance hash 表中去查找具体 multi_instance 实例。其逻辑如下所示:
/* 退出惟一的 UDP socket */
context.sd = udp_sd;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 退出 TUN 网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) { // 实际上 nfds 最多也就是 2
if (events[n].data.ptr == context) {data = recvfrom(context.sd, remote_addr....);
lock(mi_hashtable); // 如果多线程,这个锁将会成为瓶颈,即使是 RW 锁也一样
multi_instance *mi = lookup_mi(child_sd, remote_addr, ...); // 再好的 hash 算法,也不是 0 老本的!unlock(mi_hashtable);
// 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包
decrypt(mi, data);
write_to_tun(data);
....
} else {tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}
可见,TCP 的 OpenUOM 和 UDP 的 OpenUOM 解决形式齐全不同,UDP 的问题在于,齐全没有充分利用 epoll 的多路复用机制,不得不依据数据包的 recvfrom 返回地址来查找 multi_instance…
让 UDP socket 也 Listen 起来
如果 UDP 也能像 TCP 一样,每一个用户接进来就为之创立一个独自的 socket 为其专门服务该多好,这样在大并发的时候,就能够充沛复用内核 UDP 层的 socket 查找论断加上 epoll 的告诉机制了。实践上这是可行的,因为 UDP 的 4 元组能够惟一辨认一个与之通信的客户端,尽管 UDP 生成无连贯,不牢靠,然而为每一个连贯的客户端创立一个 socket 并没有毁坏 UDP 的语义,只是扭转了 UDP 的编程模型而已,内核协定栈仍然不会去刻意保护一个 UDP 连贯,也不会进行任何的数据确认。
须要阐明的是,这种计划仅仅对“长连贯”的 UDP 有意义,比方 OpenUOM 这类。因为 UDP 是没有连贯的,那么你也就不晓得一个客户端什么时候会永远进行发送数据,因而必然要通过定时器来定时敞开那些在肯定时间段内没有数据的 socket。
为了验证可行性,我先在用户态做试验,也就是说,承受一个客户端的“连贯申请”(其实就是一个数据包) 时,我手工为其创立一个 socket,而后 bind 本地地址,并且 connect 从 recvfrom 返回的对端地址,这样实践上对于后续的数据包,epoll 都应该触发这个新的 socket,毕竟它更准确。事实是不是这样呢?以下的程序能够证实:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <assert.h>
#define SO_REUSEPORT 15
#define MAXBUF 10240
#define MAXEPOLLSIZE 100
int flag = 0;
int read_data(int sd) {char recvbuf[MAXBUF + 1];
int ret;
struct sockaddr_in client_addr;
socklen_t cli_len=sizeof(client_addr);
bzero(recvbuf, MAXBUF + 1);
ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);
if (ret > 0) {printf("read[%d]: %s from %dn", ret, recvbuf, sd);
} else {printf("read err:%s %dn", strerror(errno), ret);
}
fflush(stdout);
}
int udp_accept(int sd, struct sockaddr_in my_addr) {
int new_sd = -1;
int ret = 0;
int reuse = 1;
char buf[16];
struct sockaddr_in peer_addr;
socklen_t cli_len = sizeof(peer_addr);
ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);
if (ret > 0) { }
if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {perror("child socket");
exit(1);
} else {printf("parent:%d new:%dn", sd, new_sd);
}
ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));
if (ret) {exit(1);
}
ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
if (ret) {exit(1);
}
ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));
if (ret){perror("chid bind");
exit(1);
} else { }
peer_addr.sin_family = PF_INET;
printf("aaa:%sn", inet_ntoa(peer_addr.sin_addr));
if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {perror("chid connect");
exit(1);
} else { }
out:
return new_sd;
}
int main(int argc, char **argv) {
int listener, kdpfd, nfds, n, curfds;
socklen_t len;
struct sockaddr_in my_addr, their_addr;
unsigned int port;
struct epoll_event ev;
struct epoll_event events[MAXEPOLLSIZE];
int opt = 1;;
int ret = 0;
port = 1234;
if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {perror("socket");
exit(1);
} else {printf("socket OKn");
}
ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if (ret) {exit(1);
}
ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
if (ret) {exit(1);
}
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {perror("bind");
exit(1);
} else {printf("IP bind OKn");
}
kdpfd = epoll_create(MAXEPOLLSIZE);
ev.events = EPOLLIN|EPOLLET;
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
return -1;
} else {printf("ep add OKn");
}
while (1) {nfds = epoll_wait(kdpfd, events, 10000, -1);
if (nfds == -1) {perror("epoll_wait");
break;
}
for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {printf("listener:%dn", n);
int new_sd;
struct epoll_event child_ev;
new_sd = udp_accept(listener, my_addr);
child_ev.events = EPOLLIN;
child_ev.data.fd = new_sd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) {fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
return -1;
}
} else {read_data(events[n].data.fd);
}
}
}
close(listener);
return 0;
}
须要阐明的是,REUSEPORT 是必要的,因为在 connect 之前,你必须为新建的 socket bind 跟 listener 一样的 IP 地址和端口,因而就须要这个 socket 选项。
此时,如果你用多个 udp 客户端去给这个服务端发数据,会发现齐全实现了想要的成果。
内核中的 UDP Listener
尽管在用户态能够实现成果,然而编程模型并不太好用,为了创立一个 socket,你不得不先去 recvfrom 一下数据,好失去对端的地址,尽管应用 PEEK 标记能够让创立好 child socket 后再读一次,然而认真想想,最彻底的计划还是间接扩大内核,我基于 3.9.6 内核,对__udp4_lib_rcv 这个 UDP 协定栈接管函数作了以下的批改:
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
int proto)
{
......................
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);
if (sk != NULL) {
int ret;
#if 1
// 这个 UDP_LISTEN,通过 setsockopt 来设置
if (sk->sk_state == UDP_LISTEN) {
// 如果是 UDP 的 listener,创立一个数据 socket
struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);
if (newsk) {
struct inet_sock *newinet;
// 为这个数据传输 socket 依据 skb 来填充 4 元组信息
newinet = inet_sk(newsk);
newinet->inet_daddr = ip_hdr(skb)->saddr;
newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;
newinet->inet_saddr = ip_hdr(skb)->daddr;
rcu_assign_pointer(newinet->inet_opt, NULL);
newinet->mc_index = inet_iif(skb);
newinet->mc_ttl = ip_hdr(skb)->ttl;
newinet->rcv_tos = ip_hdr(skb)->tos;
newinet->inet_id = 0xffffffff ^ jiffies;
inet_sk_rx_dst_set(newsk, skb);
// sock 构造体新增 csk 变量,相似 TCP 的 accept queue,然而为了简略,目前每个 Listen socket 只能持有一个 csk,即 child sock。sk->csk = newsk;
// 将新的数据传输 socket 排入全局的 UDP socket hash 表
if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {printk("[UDP listen] get port errorn");
release_sock(newsk);
err = -2;
goto out_go;
}
ret = udp_queue_rcv_skb(newsk, skb);
// 唤醒 epoll,让 epoll 返回 UDP Listener
sk->sk_data_ready(sk, 0);
sock_put(newsk);
} else {printk("[UDP listen] create new errorn");
sock_put(sk);
return -1;
}
out_go:
sock_put(sk);
if (ret > 0)
return -ret;
return 0;
}
#endif
ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
......................
}
我只是测试,因而并没有扩大 UDP 的 accept 办法,只是简略的用 getsocketopt 来取得这个新的 socket 描述符并为 task 装置该文件描述符,setsockopt 能够设置一个 UDP socket 为 listener。这样用户态的编程模型就很简略了。
应用新的 Listen UDP 来革新 OpenUOM
有必要重构一下 OpenUOM 了,现如今它的逻辑变成了:
listen = 1;
listener = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));
/* 退出侦听 socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);
/* 退出 TUN 网卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);
while(1) {nfds = epoll_wait(kdpfd, events, 10000, -1);
for (n = 0; n < nfds; ++n) {if (events[n].data.ptr == context) {getsockopt(context.sd, SOL_SOCKET, &newsock_info....);
child_sd = newsock_info.sd;
multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);
entry.ptr = mi;
entry.type = SOCKET;
new_ev.events = EPOLLIN;
new_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
// 这是 UDP,内核除了告诉 Listener 之外,还会将数据排入 child_sd,因而须要去读取,能够参考 TCP 的 Fastopen 逻辑
data = recvfrom(child_sd, ....);
....
} else if (events[n].data.ptr.type == SOCKET){multi_instance *mi = events[n].data.ptr;
data = read_from_socket(mi);
// 这里简化了解决,因为并不是每一个数据包都是须要加密解密的,还有管制通道的包
decrypt(mi, data);
write_to_tun(data);
} else {tun *tun = events[n].data.ptr.ptr;
packet = read_from_tun(tun);
lock(mi_hashtable);
multi_instance *mi = lookup_multi_instance_from(packet);
unlock(mi_hashtable);
encrypt(packet);
write_to_socket(packet, mi);
}
}
...
}
除了把 accept 改成了 getsockopt 之外,别的简直和 TCP 的 OpenUOM 完全一致了。
如此一来,2014 年革新的 OpenUOM 多线程版本就完满了,用户态基本不须要再应用 recvfrom 返回的 address 信息来定位 multi_instance 了,一个 multi_instance 惟一和一个 socket 绑定,而每一个 socket 都由 epoll 来治理,大大降低了用户态查找 multi_instance 的开销,同时也防止了锁定。