什么是惊群
简单说,惊群是因为多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),当事件发生时,就会唤醒所有等待的(休眠的)进程(线程)。但是事件只能被一个进程或线程处理,而其他进程(线程)获取失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。维基百科-惊群
产生惊群的条件
- 多个进程或者多个线程
- 同时等待处理一个事件
具体场景复现
测试环境 :
- debian11
- 内核5.10.0-8
- 编译器 clang11
多线程和多进程在惊群问题上差不多,为了少些一点,下文中没有特殊说明,多线程 也包含了多进程
在linux中,使用C/C++ 编写 tcp server时,会依次调用 socket()
bind()
listen()
accept()
这几个函数,这几个函数会打开socket,绑定ip和端口,开始监听端口,accept函数会阻塞当前进程,等待客户端连接。
如果在单线程中,只有一个accept函数在等待客户端连接,当客户端来连接的时候,只会有一个accept函数来处理,所以也不会存在惊群问题了。
在多线模型中,多个线程分别accept同一个socket,当有客户端连接时,内核会通知所有的线程来处理这个请求,但是呢,请求只能被一个线程处理,其他的线程的不到这个事件,只能白白被唤醒。
这是最简单的一种惊群,这种情况在linux2.6以后就不会产生了。因为在Linux 2.6 版本之后,通过引入一个标记位 WQ_FLAG_EXCLUSIVE,解决掉了 Accept 惊群效应。我原本还想在centos3.9(内核版本是2.5)中去复现这种情况,但是折腾了好久,也没能在centos上编译也运行C++程序,遂放弃。 不废话了,上代码,测试第一种情况
1#include <netinet/in.h>
2#include <iostream>
3#include <sys/epoll.h>
4#include <iostream>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8
9#define WORKER_THREAD 4
10//创建socket,并返回fd
11int createSocket() {
12 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
13 if (fd < 0) {
14 std::cout << "create socket error" << std::endl;
15 return 0;
16 }
17
18 sockaddr_in sockAddr{};
19 sockAddr.sin_port = htons(PORT);
20 sockAddr.sin_family = AF_INET;
21 sockAddr.sin_addr.s_addr = htons(INADDR_ANY);
22
23 if (bind(fd, (sockaddr *) &sockAddr, sizeof(sockAddr)) < 0) {
24 std::cout << "bind socket error, port:" << PORT << std::endl;
25 return 0;
26 }
27
28 if (listen(fd, 100) < 0) {
29 std::cout << "listen port error" << std::endl;
30 return 0;
31 }
32 return fd;
33}
34
35void Worker1(int socketFd, int k) {
36 std::cout << " Worker" << k << " run " << std::endl;
37 while (true) {
38 int tfd = 0;
39 sockaddr_in cli_addr{};
40 socklen_t length = sizeof(cli_addr);
41 tfd = accept(socketFd, (sockaddr *) &cli_addr, &length);
42 std::cout << "worker" << k << " in " << std::endl;
43 if (tfd <= 0) {
44 std::cout << "accept error" << std::endl;
45 return;
46 } else {
47 std::cout << "worker" << k << " accept " << std::endl;
48 }
49 }
50}
51
52int main() {
53 std::mutex mutex;
54 std::unique_lock<std::mutex> lck(mutex);
55 std::condition_variable cv;
56
57 int fd = createSocket();
58 //第一种,多个线程不使用多路复用,accept同一个socket
59 for (int i = 0; i < WORKER_THREAD; ++i) {
60 std::thread th(&Worker1, fd, i + 1);
61 th.detach();
62 }
63
64 cv.wait(lck);
65 return 0;
66}
这代码可以用C写,但是习惯用C++了,就用C++写吧。代码也比较简单,createSocket()
创建了一个socket,然后4个线程分别去accept这个socket。
下面是运行结果:
可以看到,4个线程都在运行,并且accept,但是当连接来的时候,只有个线程能得到事件。
既然linux内核已经帮我们处理了惊群,那我们还考虑这些干啥,直接用不就完了。
但是,我们在写代码的时候一般不会直接阻塞accept的,都是使用多路复用来帮我们处理连接阻塞的是多路复用函数。目前综合性能比较好的IO多路复用是epoll。当在多线程中使用epoll时,惊群问题就会出现了。 先代码和结果,然后再解释
1#include <netinet/in.h>
2#include <iostream>
3#include <sys/epoll.h>
4#include <iostream>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8
9#define WORKER_THREAD 4
10//创建socket,并返回fd
11int createSocket() {
12 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
13 if (fd < 0) {
14 std::cout << "create socket error" << std::endl;
15 return 0;
16 }
17
18 sockaddr_in sockAddr{};
19 sockAddr.sin_port = htons(PORT);
20 sockAddr.sin_family = AF_INET;
21 sockAddr.sin_addr.s_addr = htons(INADDR_ANY);
22
23 if (bind(fd, (sockaddr *) &sockAddr, sizeof(sockAddr)) < 0) {
24 std::cout << "bind socket error, port:" << PORT << std::endl;
25 return 0;
26 }
27
28 if (listen(fd, 100) < 0) {
29 std::cout << "listen port error" << std::endl;
30 return 0;
31 }
32 return fd;
33}
34
35void Worker2(int socketFd, int k) {
36 std::cout << " Worker" << k << " run " << std::endl;
37
38 int eFd = epoll_create(1);
39 if (eFd < 0) {
40 std::cout << "create epoll fail";
41 return;
42 }
43 epoll_event epev_{};
44 epev_.events = EPOLLIN;
45 epev_.data.fd = socketFd;
46 epoll_ctl(eFd, EPOLL_CTL_ADD, socketFd, &epev_);
47 epoll_event events[EVENT_NUM];
48
49 while (true) {
50 int eNum = epoll_wait(eFd, events, EVENT_NUM, -1);
51 if (eNum == -1) {
52 std::cout << "epoll error";
53 return;
54 }
55 //一定要加上这句,防止事件被瞬间处理,导致看不到结果
56 std::this_thread::sleep_for((std::chrono::seconds (1)));
57 std::cout << "worker" << k << " in " << std::endl;
58 for (int i = 0; i < eNum; ++i) {
59 if (events[i].data.fd == socketFd) {
60 int tfd = 0;
61 sockaddr_in cli_addr{};
62 socklen_t length = sizeof(cli_addr);
63 tfd = accept(socketFd, (sockaddr *) &cli_addr, &length);
64 if (tfd <= 0) {
65 std::cout << "accept error" << std::endl;
66 } else {
67 std::cout << "worker" << k << " accept " << std::endl;
68 }
69 } else {
70 //处理正常的socket读写事件,这里可以忽略,不是这次关注的点
71 }
72 }
73 }
74}
75
76int main() {
77 std::mutex mutex;
78 std::unique_lock<std::mutex> lck(mutex);
79 std::condition_variable cv;
80 int fd = createSocket();
81 //第二种,多个线程使用epoll多路复用,accept同一个socket
82 for (int i = 0; i < WORKER_THREAD; ++i) {
83 std::thread th(&Worker2, fd, i + 1);
84 th.detach();
85 }
86}
结果
这里可以看到,当有客户端来连接的时候,4个线程都被唤醒了,但是只有workr2 线程成功获取了事件,其余的3个线程都白白唤醒浪费了性能为啥内核已经处理了第一种情景下的惊群问题,第二种情景下的惊群问题为啥就不处理了呢?
我的猜想不一定正确,如果有错误,请指出:
accept 只能是被一个进程调用成功(连接事件只会处理一次嘛),所以内核就直接处理了(一个accept只会唤醒一个进程)。但 epoll 不一样,epoll中管理了很多连接,不止socket这一个,除了可能后续被 accept 调用外,还有可能是其他网络 IO 事件的,而其他 IO 事件是否只能由一个进程处理,是不一定的,这是一个由用户决定的事情,例如可能一个文件会由多个进程来读写。所以,对 epoll 默认对于多进程监听同一文件不会设置互斥,所以就导致了epoll惊群问题。
在linux4.5内核之后给epoll添加了一个 EPOLLEXCLUSIVE
的标志位,如果设置了这个标志位,那epoll将进程挂到等待队列时将会设置一下互斥标志位,这时实现跟内核原生accept一样的特性,只会唤醒队列中的一个进程。参考资料 感谢这位大神的文章
修改一下worker2函数:
1void Worker2(int socketFd, int k) {
2 std::cout << " Worker" << k << " run " << std::endl;
3
4 int eFd = epoll_create(1);
5 if (eFd < 0) {
6 std::cout << "create epoll fail";
7 return;
8 }
9 epoll_event epev_{};
10 //给epoll加上 互斥标志
11 epev_.events = EPOLLIN | EPOLLEXCLUSIVE;
12 epev_.data.fd = socketFd;
13 epoll_ctl(eFd, EPOLL_CTL_ADD, socketFd, &epev_);
14 epoll_event events[EVENT_NUM];
15
16 while (true) {
17 int eNum = epoll_wait(eFd, events, EVENT_NUM, -1);
18 if (eNum == -1) {
19 std::cout << "epoll error";
20 return;
21 }
22 //一定要加上这句,防止事件被瞬间处理,导致看不到结果
23 std::this_thread::sleep_for((std::chrono::seconds(1)));
24 std::cout << "worker" << k << " in " << std::endl;
25 for (int i = 0; i < eNum; ++i) {
26 if (events[i].data.fd == socketFd) {
27 int tfd = 0;
28 sockaddr_in cli_addr{};
29 socklen_t length = sizeof(cli_addr);
30 tfd = accept(socketFd, (sockaddr *) &cli_addr, &length);
31 if (tfd <= 0) {
32 std::cout << "accept error" << std::endl;
33 } else {
34 std::cout << "worker" << k << " accept " << std::endl;
35 }
36 } else {
37 //处理正常的socket读写事件,这里可以忽略,不是这次关注的点
38 }
39 }
40 }
41}
现在来测试一下
现在的epoll已经不会有惊群问题了另一种方式
其实解决多线程使用epoll等多路复用导致的惊群问题,还有一个更彻底解决方法,让每个线程分别打开一个socket,并且这些socket绑定在同一个端口,然后accept这个socket。这就像第一种情景那样,内核直接帮我们做了惊群处理。这里会使用到 linux 3.9后 socket提供SO_REUSEPORT
标志。使用这个标志后,会允许多个socket绑定和监听同一个端口。
代码如下
1#include <netinet/in.h>
2#include <iostream>
3#include <sys/epoll.h>
4#include <iostream>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8
9#define WORKER_THREAD 4
10//创建socket,并返回fd
11int createSocket2() {
12 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
13 if (fd == -1) {
14 std::cout << "create socket error" << std::endl;
15 return 0;
16 }
17
18 int on = 1;
19 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (const void *) &on, sizeof(on)) < 0) {
20 std::cout << "set opt error, ret:" << std::endl;
21 }
22
23 sockaddr_in sockAddr{};
24 sockAddr.sin_port = htons(PORT);
25 sockAddr.sin_family = AF_INET;
26 sockAddr.sin_addr.s_addr = htons(INADDR_ANY);
27
28 if (bind(fd, (sockaddr *) &sockAddr, sizeof(sockAddr)) < 0) {
29 std::cout << "bind socket error, port:" << PORT << std::endl;
30 return 0;
31 }
32
33 if (listen(fd, 100) < 0) {
34 std::cout << "listen port error" << std::endl;
35 return 0;
36 }
37 return fd;
38}
39
40void Worker3(int k) {
41 std::cout << " Worker" << k << " run " << std::endl;
42
43 int socketFd = createSocket2();
44 int eFd = epoll_create(1);
45 if (eFd == -1) {
46 std::cout << "create epoll fail" << std::endl;
47 return;
48 }
49
50 epoll_event epev_{};
51 epev_.events = EPOLLIN;
52 epev_.data.fd = socketFd;
53 epoll_ctl(eFd, EPOLL_CTL_ADD, socketFd, &epev_);
54 epoll_event events[EVENT_NUM];
55
56 while (true) {
57 int eNum = epoll_wait(eFd, events, EVENT_NUM, -1);
58 if (eNum == -1) {
59 std::cout << "epoll error" << std::endl;
60 return;
61 }
62 std::this_thread::sleep_for((std::chrono::seconds(1)));
63 std::cout << "worker" << k << " in " << std::endl;
64 for (int i = 0; i < eNum; ++i) {
65 if (events[i].data.fd == socketFd) {
66 int tfd = 0;
67 sockaddr_in cli_addr{};
68 socklen_t length = sizeof(cli_addr);
69 tfd = accept(socketFd, (sockaddr *) &cli_addr, &length);
70 if (tfd <= 0) {
71 std::cout << "accept error" << std::endl;
72 } else {
73 std::cout << "worker" << k << " accept " << std::endl;
74 }
75 } else {
76 //处理正常的socket读写事件
77 }
78 }
79 }
80}
81
82int main() {
83 std::mutex mutex;
84 std::unique_lock<std::mutex> lck(mutex);
85 std::condition_variable cv;
86
87 //第三种,多个线程使用epoll多路复用,每个线程分别bind,listen 同一个端口, accept各自的socket
88 for (int i = 0; i < WORKER_THREAD; ++i) {
89 std::thread th(&Worker3, i + 1);
90 th.detach();
91 }
92
93 cv.wait(lck);
94 return 0;
95}
结果
也是没有问题的,多个连接来的时候,只会有一个线程被唤醒,相当于在内核级别中实现了一个负载均衡
总结
简单总结一下,当多个线程或者进程同时阻塞同一个事件的时候,会出现惊群现象,如果不适用epoll等多路复用技术,在linux2.6 以后内核已经帮我们处理了惊群问题。
如果使用了epoll,就需要额外处理epoll导致的惊群问题,有两种方式
- linux4.5内核之后,epoll有一个
EPOLLEXCLUSIVE
特性,可以防止epoll惊群出现 - linux 3.9内核之后给 socket 提供
SO_REUSEPORT
特性,可以允许多个socket绑定在同一个端口上,相当于每个线程都有一个socket,在处理accept时,内核会自动处理惊群问题
1和2两种方式都能有效解决惊群问题,但是目前使用 socket的 SO_REUSEPORT
是最好的方式.。
我通过查资料得到 EPOLLEXCLUSIVE
标识会保证一个事件发生时候只有一个线程会被唤醒,来避免多惊群问题。不过任一时候只能有一个Worker调用 accept,限制了真正并行的吞吐量。
这个有待验证,等我有时间再去深入了解一下。