前两天刚写了一篇文章,怎样在C++中使用协程 传送门。那个只是一个简单的例子,今天我们来聊聊如何将协程与 io_uring 结合起来使用,实现真正的异步 I/O 操作。 我个人觉得,C++协程最好的使用场景就是配合异步 I/O。
以前也写过一篇关于 io_uring 的文章 传送门,里面介绍了 io_uring 的基本概念和使用方法。今天我们就基于那个例子,来实现一个简单的 echo 服务器,使用协程来处理客户端的连接和数据收发。
简单画一个图,每一个 Awaitable
都是一个协程,在一个进程中,每一个连接都有两个协程(一个用于读,一个用于写)。还有一个特殊的协程 AwaitableAccept
用来处理接受连接的操作。
同一时刻,最多只能有一个协程在执行。其他的协程会被挂起,等待当前协程完成后再恢复执行。
因为协程可以让我们以同步的方式编写异步代码,避免传统的“回调函数地狱”,从而提高代码的可读性和可维护性。而 io_uring 则是 Linux 提供的一种高性能异步 I/O 接口,可以与协程结合使用,进一步提升性能。
因为代码量有点大,就不全贴在这里了,已经上传到GitHub上,可以去这里查看 传送门。
不废话了,直接上代码
coroutine handle
1#ifndef TASK_H
2#define TASK_H
3
4#include <coroutine>
5#include <exception>
6#include <functional>
7
8template <bool initialSuspend> struct Task {
9 struct promise_type {
10 std::coroutine_handle<> handle;
11 std::function<void()> onDone; // 协程结束时调用的清理回调
12
13 auto get_return_object() { return Task{*this}; }
14
15 auto initial_suspend() {
16 if constexpr (initialSuspend) {
17 return std::suspend_always{};
18 } else {
19 return std::suspend_never{};
20 }
21 }
22
23 auto final_suspend() noexcept {
24 struct Awaiter {
25 bool await_ready() noexcept { return false; }
26
27 void await_suspend(std::coroutine_handle<promise_type> h) noexcept {
28 if (h.promise().onDone) {
29 h.promise().onDone();
30 }
31 }
32
33 void await_resume() noexcept {}
34 };
35 return Awaiter{};
36 }
37
38 void unhandled_exception() { std::terminate(); }
39
40 void return_void() {}
41
42 promise_type() = default;
43 ~promise_type() = default;
44 };
45
46 explicit Task(promise_type &promise)
47 : handle_(std::coroutine_handle<promise_type>::from_promise(promise)) {}
48
49 Task(Task &&other) noexcept : handle_(other.handle_) {
50 other.handle_ = nullptr;
51 }
52
53 Task &operator=(Task &&other) noexcept {
54 if (this != &other) {
55 if (handle_ && !handle_.done())
56 handle_.destroy();
57 handle_ = other.handle_;
58 other.handle_ = nullptr;
59 }
60 return *this;
61 }
62
63 Task(const Task &) = delete;
64 Task &operator=(const Task &) = delete;
65
66 ~Task() {
67 if (handle_ && !handle_.done())
68 handle_.destroy();
69 }
70
71 void resume() { handle_.resume(); }
72
73 void setOnDone(std::function<void()> onDone) {
74 handle_.promise().onDone = std::move(onDone);
75 }
76
77private:
78 std::coroutine_handle<promise_type> handle_;
79};
80
81#endif // TASK_H
简单说一下几个关键点:
使用模板变量,来控制协程的初始挂起状态。
在协程结束时,可以通过
onDone
回调来执行清理操作。 在协程结束时,可以调用这个函数,完成一些自定义操作,后面会用到。协程结束时会自动调用
final_suspend
,可以在这里进行一些收尾工作。 final_suspend中定义了一个Awaiter
,用于在协程结束时执行清理操作。当协程结束时,Awaiter
会被唤醒,从而调用onDone
回调。当onDone
执行完成后,协程的资源会被释放。
io_uring wrapper
1class IoUring {
2public:
3 explicit IoUring(int port) : port_(port) {}
4
5 IoUring(const IoUring &) = delete;
6 IoUring &operator=(const IoUring &) = delete;
7 IoUring(IoUring &&) = delete;
8 IoUring &operator=(IoUring &&) = delete;
9
10 ~IoUring() = default;
11
12 void Stop() { running_.store(false); }
13
14 io_uring &Uring() { return ring_; }
15
16 std::expected<bool, std::string> Init();//初始化io_uring和网络
17 void run();//运行io_uring事件循环
18 Task<true> acceptServer();//接受客户端连接
19
20 Task<false> startSession(int fd, uint64_t connId);//处理客户端会话
21
22private:
23 std::expected<bool, std::string> createListenSocket();//创建监听socket
24
25 static int set_nonblocking(int fd) {//设置socket为非阻塞IO
26 return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
27 }
28
29 uint64_t getConnId() { return ++connId; }
30
31 io_uring ring_{};
32 uint16_t port_;
33 int listenFd_ = -1;
34 const int entries_ = 256; // Default number of entries
35 std::atomic<bool> running_ = true;
36
37 uint64_t connId = 0; // Connection ID for tracking connections
38
39 std::unordered_map<uint64_t, Task<false>> sessions_;
40};
以上就是对 io_uring 的一个简单封装,下面挑几个关键的函数来聊一下
IoUring::run
1void IoUring::run() {
2 while (running_.load()) {
3 io_uring_cqe *cqe = nullptr;
4 int ret = io_uring_wait_cqe(&ring_, &cqe);
5 if (ret < 0) {
6 if (ret == -EINTR)
7 continue;
8 break;
9 }
10 // user_data 保存着 Op 指针
11 auto *op = reinterpret_cast<AwaitableBaseOp *>(cqe->user_data);
12 if (!op) {
13 io_uring_cqe_seen(&ring_, cqe);
14 continue;
15 }
16 op->SetRes(cqe->res);//设置操作结果
17 io_uring_cqe_seen(&ring_, cqe);
18
19 op->resume();//恢复协程执行
20 }
21}
循环遍历 io_uring
的完成队列,处理每个完成的操作。
AwaitableBaseOp
是所有可等待操作的基类,负责管理协程的状态和生命周期,后面会用到。
op->SetRes(cqe->res);
把io_uring的结果传递给操作对象。
在 op->resume()
被调用时,协程会继续执行,直至下一个挂起点。
IoUring::startSession
1Task<false> IoUring::startSession(int fd, uint64_t connId) {
2 std::string buffer;
3 buffer.resize(1024);
4 while (true) {
5 auto res = co_await AwaitableRead(this, fd, buffer);
6 if (res <= 0) {
7 break;
8 }
9 std::cout << "Received data: ";
10 std::cout.write(buffer.data(), res);
11 std::cout << std::endl;
12 res = co_await AwaitableWrite(this, fd, std::move(std::string(buffer)));
13 if (res <= 0) {
14 break;
15 }
16 }
17 close(fd);
18 co_return;
19}
启动一个客户端会话,处理数据的读取和写入。
使用 co_await
关键字来等待异步操作的完成。
co_await AwaitableRead
这里会把当前协程挂起,然后当读取操作完成时,协程会被唤醒,并且可以获取到读取的结果。
co_await AwaitableWrite
这里同样会把当前协程挂起,等待写入操作完成。
IoUring::acceptServer
1Task<true> IoUring::acceptServer() {
2 while (true) {
3 auto clientFd = co_await AwaitableAccept(this, listenFd_);
4 std::cout << clientFd << std::endl;
5 if (clientFd < 0)
6 break;
7
8 set_nonblocking(clientFd);
9 auto connId = getConnId();
10 auto t = startSession(clientFd, connId);//启动一个客户端会话,处理数据的读取和写入。
11 t.setOnDone([connId, this]() { sessions_.erase(connId); });
12 sessions_.emplace(connId, std::move(t));
13 }
14 close(listenFd_);
15 co_return;
16}
开始一个服务器端的会话,接受客户端连接。
co_await AwaitableAccept
这里会把当前协程挂起,当有客户端连接时,协程会被唤醒,并且可以获取到接受的结果。
auto t = startSession(clientFd, connId);
这里会启动一个协程,处理数据的读取和写入。
这里有个关键点,startSession
会返回一个协程的句柄,一定要妥善保管这个句柄,如果没有保存,当 while
循环结束时,协程会被销毁,导致异常。
所以要把协程的句柄保存在 sessions_
这个 map 中,以便后续管理。
t.setOnDone([connId, this]() { sessions_.erase(connId); });
在这里,通过设置回调函数,设置了协程完成后的清理工作。
AwaitableBaseOp
1class AwaitableBaseOp {
2public:
3 explicit AwaitableBaseOp(std::coroutine_handle<> h) : coro_(h) {}
4
5 virtual ~AwaitableBaseOp() = default;
6
7 void resume() {
8 if (coro_ && !coro_.done()) {
9 coro_.resume();
10 }
11 }
12
13 void SetRes(int res) { res_ = res; }
14
15 int GetRes() const { return res_; }
16
17protected:
18 std::coroutine_handle<> coro_;
19 int res_ = 0;
20};
定义了协程的基本操作,包括恢复协程和设置结果。这个类的实例的指针会被传递给 io_uring_sqe
的 user_data 中。
当 io_uring
的异步操作完成时,io_uring
通过得到 io_uring_cqe
AwaitableBaseOp
的指针,调用 SetRes
来设置结果,然后调用 resume
来恢复协程。
Awaitable
需要到 accept read write 的操作都可以封装成一个 Awaitable 类,方便管理协程的状态和生命周期。
accept
1class AwaitableAccept {
2 IoUring *uring_ = nullptr;
3 sockaddr_storage addr_{};
4 socklen_t addrlen_{};
5 int serverFd_ = 0;
6
7 AwaitableBaseOp *op = nullptr;
8
9public:
10 //省略部分代码
11
12 ~AwaitableAccept() = default;
13
14 bool await_ready() const noexcept { return false; }
15
16 void await_suspend(std::coroutine_handle<> h) {
17 op = new AwaitableBaseOp(h);
18
19 io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
20 io_uring_prep_accept(sqe, serverFd_, reinterpret_cast<sockaddr *>(&addr_),
21 &addrlen_, 0);
22 io_uring_sqe_set_data(sqe, op);
23 io_uring_submit(&uring_->Uring());
24 }
25
26 int await_resume() const noexcept {
27 int res = op->GetRes();
28 delete op;
29 return res;
30 }
31};
这是 AwaitableAccept
类的实现,封装了对 io_uring
的 accept 操作。通过使用协程,可以方便地管理异步 I/O 操作的状态和生命周期。
await_suspend
函数会在协程挂起时(co_await 操作)被调用,负责将协程的句柄与 io_uring
的请求关联起来。
在函数内部,创建一个 AwaitableBaseOp
对象,并将协程的句柄传递给它。然后,准备一个 io_uring
的提交请求,并将 AwaitableBaseOp
对象的指针设置为请求的用户数据。最后,提交请求到 io_uring
。
await_resume
函数会在协程恢复时(调用 resume
)被调用,负责获取异步操作的结果并清理资源。
这里 return
最后返回结果。这里 return
的值会被 co_await
表达式的调用者获取到。
read
1class AwaitableRead {
2 IoUring *uring_ = nullptr;
3 int fd_ = 0;
4 std::string &buffer_;
5
6 AwaitableBaseOp *op_ = nullptr;
7
8public:
9 //省略部分代码
10 void await_suspend(std::coroutine_handle<> h) {
11 op_ = new AwaitableBaseOp(h);
12
13 io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
14 io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0);
15 io_uring_sqe_set_data(sqe, op_);
16 io_uring_submit(&uring_->Uring());
17 }
18 //省略部分代码
19};
AwaitableRead
类的实现,封装了对 io_uring
的读取操作。
中间省略了一些代码细节,但整体思路就是通过 AwaitableRead
类来简化异步读取操作的实现。
await_suspend
函数内部,将协程的句柄与 io_uring
的请求关联起来。然后,准备一个 io_uring
的提交请求,并将 AwaitableBaseOp
对象的指针设置为请求的用户数据。最后,提交请求到 io_uring
。
await_resume
函数会在协程恢复时被调用,负责获取异步操作的结果并清理资源。这里 return
最后返回结果。这里 return
的值会被 co_await
表达式的调用者获取到。
write
1class AwaitableWrite {
2 IoUring *uring_ = nullptr;
3 int fd_ = 0;
4 std::string buffer_;
5 AwaitableBaseOp *op_ = nullptr;
6
7public:
8 //省略部分代码
9
10 void await_suspend(std::coroutine_handle<> h) {
11 op_ = new AwaitableBaseOp(h);
12
13 io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
14 io_uring_prep_write(sqe, fd_, buffer_.data(), buffer_.size(), 0);
15 io_uring_sqe_set_data(sqe, op_);
16 io_uring_submit(&uring_->Uring());
17 }
18 //省略部分代码
19};
AwaitableWrite
类的实现,封装了对 io_uring
的写入操作。基本和 AwaitableRead
类类似,通过协程的方式简化了异步写入的流程。await_suspend
函数负责将协程的句柄与 io_uring
的请求关联起来,并提交写入请求。await_resume
函数则负责获取写入操作的结果并清理资源。
run
1int main() {
2 IoUring ioUring(8088);
3 if (auto ret = ioUring.Init(); !ret) {
4 std::cout << "uring init fail" << ret.error() << std::endl;
5 return -1;
6 }
7 auto t = ioUring.acceptServer();
8 std::cout << "Server started on port 8088." << std::endl;
9 t.resume();
10 ioUring.run();
11 return 0;
12}
运行这个程序
- 创建一个
IoUring
对象,监听8088端口。 - 调用
IoUring::Init
初始化io_uring
。 - 调用
IoUring::acceptServer
开始接受连接。 - 调用
IoUring::run
进入事件循环。
到这,整个协程与 io_uring
的结合就完成了。通过这种方式,我们可以在 C++ 中优雅地处理异步 I/O 操作,充分利用协程的优势,提高代码的可读性和可维护性。
可能会有人问,怎么没看到有关线程 std::thread
的代码?因为这个例子是单线程内使用协程的,所有的操作都是在同一个线程中完成的。所谓的异步是通过协程的挂起和恢复来实现的,而不是通过多线程并发执行。通过异步 I/O 读写时,不会阻塞当前线程,当读写完成后,通知当前线程,来实现的异步。
那这样设计的好处是什么呢?首先,它避免了多线程编程中的许多复杂性,比如线程安全、锁竞争等问题。其次,协程的上下文切换比线程轻量得多,性能开销更小。最后,协程可以让异步代码看起来像同步代码,极大地提高了可读性和可维护性。
如果要使用多线程来充分利用多核 CPU 的性能,可以在每个线程中创建一个 IoUring
对象,并使用协程来处理每个线程中的异步 I/O 操作。这样可以在保持代码简洁的同时,充分利用多核 CPU 的优势。