Featured image of post linux中io_uring和C++协程的结合,实现真正的异步I/O。简单的TCP echo server

C++协程+io_uring实现异步IO

介绍了如何在C++中结合使用协程和io_uring来实现异步I/O操作。通过一个简单的TCP echo服务器示例,展示了协程如何简化异步编程,提高代码的可读性和维护性。

前两天刚写了一篇文章,怎样在C++中使用协程 传送门。那个只是一个简单的例子,今天我们来聊聊如何将协程与 io_uring 结合起来使用,实现真正的异步 I/O 操作。 我个人觉得,C++协程最好的使用场景就是配合异步 I/O。

以前也写过一篇关于 io_uring 的文章 传送门,里面介绍了 io_uring 的基本概念和使用方法。今天我们就基于那个例子,来实现一个简单的 echo 服务器,使用协程来处理客户端的连接和数据收发。

简单画一个图,每一个 Awaitable 都是一个协程,在一个进程中,每一个连接都有两个协程(一个用于读,一个用于写)。还有一个特殊的协程 AwaitableAccept 用来处理接受连接的操作。 同一时刻,最多只能有一个协程在执行。其他的协程会被挂起,等待当前协程完成后再恢复执行。

因为协程可以让我们以同步的方式编写异步代码,避免传统的“回调函数地狱”,从而提高代码的可读性和可维护性。而 io_uring 则是 Linux 提供的一种高性能异步 I/O 接口,可以与协程结合使用,进一步提升性能。

因为代码量有点大,就不全贴在这里了,已经上传到GitHub上,可以去这里查看 传送门

不废话了,直接上代码

coroutine handle

 1#ifndef TASK_H
 2#define TASK_H
 3
 4#include <coroutine>
 5#include <exception>
 6#include <functional>
 7
 8template <bool initialSuspend> struct Task {
 9  struct promise_type {
10    std::coroutine_handle<> handle;
11    std::function<void()> onDone; // 协程结束时调用的清理回调
12
13    auto get_return_object() { return Task{*this}; }
14
15    auto initial_suspend() {
16      if constexpr (initialSuspend) {
17        return std::suspend_always{};
18      } else {
19        return std::suspend_never{};
20      }
21    }
22
23    auto final_suspend() noexcept {
24      struct Awaiter {
25        bool await_ready() noexcept { return false; }
26
27        void await_suspend(std::coroutine_handle<promise_type> h) noexcept {
28          if (h.promise().onDone) {
29            h.promise().onDone();
30          }
31        }
32
33        void await_resume() noexcept {}
34      };
35      return Awaiter{};
36    }
37
38    void unhandled_exception() { std::terminate(); }
39
40    void return_void() {}
41
42    promise_type() = default;
43    ~promise_type() = default;
44  };
45
46  explicit Task(promise_type &promise)
47      : handle_(std::coroutine_handle<promise_type>::from_promise(promise)) {}
48
49  Task(Task &&other) noexcept : handle_(other.handle_) {
50    other.handle_ = nullptr;
51  }
52
53  Task &operator=(Task &&other) noexcept {
54    if (this != &other) {
55      if (handle_ && !handle_.done())
56        handle_.destroy();
57      handle_ = other.handle_;
58      other.handle_ = nullptr;
59    }
60    return *this;
61  }
62
63  Task(const Task &) = delete;
64  Task &operator=(const Task &) = delete;
65
66  ~Task() {
67    if (handle_ && !handle_.done())
68      handle_.destroy();
69  }
70
71  void resume() { handle_.resume(); }
72
73  void setOnDone(std::function<void()> onDone) {
74    handle_.promise().onDone = std::move(onDone);
75  }
76
77private:
78  std::coroutine_handle<promise_type> handle_;
79};
80
81#endif // TASK_H

简单说一下几个关键点:

  1. 使用模板变量,来控制协程的初始挂起状态。

  2. 在协程结束时,可以通过 onDone 回调来执行清理操作。 在协程结束时,可以调用这个函数,完成一些自定义操作,后面会用到。

  3. 协程结束时会自动调用 final_suspend,可以在这里进行一些收尾工作。 final_suspend中定义了一个 Awaiter,用于在协程结束时执行清理操作。当协程结束时,Awaiter 会被唤醒,从而调用 onDone 回调。当 onDone 执行完成后,协程的资源会被释放。

io_uring wrapper

 1class IoUring {
 2public:
 3  explicit IoUring(int port) : port_(port) {}
 4
 5  IoUring(const IoUring &) = delete;
 6  IoUring &operator=(const IoUring &) = delete;
 7  IoUring(IoUring &&) = delete;
 8  IoUring &operator=(IoUring &&) = delete;
 9
10  ~IoUring() = default;
11
12  void Stop() { running_.store(false); }
13
14  io_uring &Uring() { return ring_; }
15
16  std::expected<bool, std::string> Init();//初始化io_uring和网络
17  void run();//运行io_uring事件循环
18  Task<true> acceptServer();//接受客户端连接
19
20  Task<false> startSession(int fd, uint64_t connId);//处理客户端会话
21
22private:
23  std::expected<bool, std::string> createListenSocket();//创建监听socket
24
25  static int set_nonblocking(int fd) {//设置socket为非阻塞IO
26    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
27  }
28
29  uint64_t getConnId() { return ++connId; }
30
31  io_uring ring_{};
32  uint16_t port_;
33  int listenFd_ = -1;
34  const int entries_ = 256; // Default number of entries
35  std::atomic<bool> running_ = true;
36
37  uint64_t connId = 0; // Connection ID for tracking connections
38
39  std::unordered_map<uint64_t, Task<false>> sessions_;
40};

以上就是对 io_uring 的一个简单封装,下面挑几个关键的函数来聊一下

IoUring::run

 1void IoUring::run() {
 2  while (running_.load()) {
 3    io_uring_cqe *cqe = nullptr;
 4    int ret = io_uring_wait_cqe(&ring_, &cqe);
 5    if (ret < 0) {
 6      if (ret == -EINTR)
 7        continue;
 8      break;
 9    }
10    // user_data 保存着 Op 指针
11    auto *op = reinterpret_cast<AwaitableBaseOp *>(cqe->user_data);
12    if (!op) {
13      io_uring_cqe_seen(&ring_, cqe);
14      continue;
15    }
16    op->SetRes(cqe->res);//设置操作结果
17    io_uring_cqe_seen(&ring_, cqe);
18
19    op->resume();//恢复协程执行
20  }
21}

循环遍历 io_uring 的完成队列,处理每个完成的操作。

AwaitableBaseOp 是所有可等待操作的基类,负责管理协程的状态和生命周期,后面会用到。

op->SetRes(cqe->res); 把io_uring的结果传递给操作对象。

op->resume() 被调用时,协程会继续执行,直至下一个挂起点。

IoUring::startSession

 1Task<false> IoUring::startSession(int fd, uint64_t connId) {
 2  std::string buffer;
 3  buffer.resize(1024);
 4  while (true) {
 5    auto res = co_await AwaitableRead(this, fd, buffer);
 6    if (res <= 0) {
 7      break;
 8    }
 9    std::cout << "Received data: ";
10    std::cout.write(buffer.data(), res);
11    std::cout << std::endl;
12    res = co_await AwaitableWrite(this, fd, std::move(std::string(buffer)));
13    if (res <= 0) {
14      break;
15    }
16  }
17  close(fd);
18  co_return;
19}

启动一个客户端会话,处理数据的读取和写入。

使用 co_await 关键字来等待异步操作的完成。

co_await AwaitableRead 这里会把当前协程挂起,然后当读取操作完成时,协程会被唤醒,并且可以获取到读取的结果。

co_await AwaitableWrite 这里同样会把当前协程挂起,等待写入操作完成。

IoUring::acceptServer

 1Task<true> IoUring::acceptServer() {
 2  while (true) {
 3    auto clientFd = co_await AwaitableAccept(this, listenFd_);
 4    std::cout << clientFd << std::endl;
 5    if (clientFd < 0)
 6      break;
 7
 8    set_nonblocking(clientFd);
 9    auto connId = getConnId();
10    auto t = startSession(clientFd, connId);//启动一个客户端会话,处理数据的读取和写入。
11    t.setOnDone([connId, this]() { sessions_.erase(connId); });
12    sessions_.emplace(connId, std::move(t));
13  }
14  close(listenFd_);
15  co_return;
16}

开始一个服务器端的会话,接受客户端连接。

co_await AwaitableAccept 这里会把当前协程挂起,当有客户端连接时,协程会被唤醒,并且可以获取到接受的结果。

auto t = startSession(clientFd, connId); 这里会启动一个协程,处理数据的读取和写入。 这里有个关键点,startSession 会返回一个协程的句柄,一定要妥善保管这个句柄,如果没有保存,当 while 循环结束时,协程会被销毁,导致异常。 所以要把协程的句柄保存在 sessions_ 这个 map 中,以便后续管理。

t.setOnDone([connId, this]() { sessions_.erase(connId); }); 在这里,通过设置回调函数,设置了协程完成后的清理工作。

AwaitableBaseOp

 1class AwaitableBaseOp {
 2public:
 3  explicit AwaitableBaseOp(std::coroutine_handle<> h) : coro_(h) {}
 4
 5  virtual ~AwaitableBaseOp() = default;
 6
 7  void resume() {
 8    if (coro_ && !coro_.done()) {
 9      coro_.resume();
10    }
11  }
12
13  void SetRes(int res) { res_ = res; }
14
15  int GetRes() const { return res_; }
16
17protected:
18  std::coroutine_handle<> coro_;
19  int res_ = 0;
20};

定义了协程的基本操作,包括恢复协程和设置结果。这个类的实例的指针会被传递给 io_uring_sqe 的 user_data 中。 当 io_uring 的异步操作完成时,io_uring 通过得到 io_uring_cqe AwaitableBaseOp 的指针,调用 SetRes 来设置结果,然后调用 resume 来恢复协程。

Awaitable

需要到 accept read write 的操作都可以封装成一个 Awaitable 类,方便管理协程的状态和生命周期。

accept

 1class AwaitableAccept {
 2  IoUring *uring_ = nullptr;
 3  sockaddr_storage addr_{};
 4  socklen_t addrlen_{};
 5  int serverFd_ = 0;
 6
 7  AwaitableBaseOp *op = nullptr;
 8
 9public:
10  //省略部分代码
11
12  ~AwaitableAccept() = default;
13
14  bool await_ready() const noexcept { return false; }
15
16  void await_suspend(std::coroutine_handle<> h) {
17    op = new AwaitableBaseOp(h);
18
19    io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
20    io_uring_prep_accept(sqe, serverFd_, reinterpret_cast<sockaddr *>(&addr_),
21                         &addrlen_, 0);
22    io_uring_sqe_set_data(sqe, op);
23    io_uring_submit(&uring_->Uring());
24  }
25
26  int await_resume() const noexcept {
27    int res = op->GetRes();
28    delete op;
29    return res;
30  }
31};

这是 AwaitableAccept 类的实现,封装了对 io_uring 的 accept 操作。通过使用协程,可以方便地管理异步 I/O 操作的状态和生命周期。

await_suspend 函数会在协程挂起时(co_await 操作)被调用,负责将协程的句柄与 io_uring 的请求关联起来。 在函数内部,创建一个 AwaitableBaseOp 对象,并将协程的句柄传递给它。然后,准备一个 io_uring 的提交请求,并将 AwaitableBaseOp 对象的指针设置为请求的用户数据。最后,提交请求到 io_uring

await_resume 函数会在协程恢复时(调用 resume)被调用,负责获取异步操作的结果并清理资源。 这里 return 最后返回结果。这里 return 的值会被 co_await 表达式的调用者获取到。

read

 1class AwaitableRead {
 2  IoUring *uring_ = nullptr;
 3  int fd_ = 0;
 4  std::string &buffer_;
 5
 6  AwaitableBaseOp *op_ = nullptr;
 7
 8public:
 9  //省略部分代码
10  void await_suspend(std::coroutine_handle<> h) {
11    op_ = new AwaitableBaseOp(h);
12
13    io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
14    io_uring_prep_recv(sqe, fd_, buffer_.data(), buffer_.size(), 0);
15    io_uring_sqe_set_data(sqe, op_);
16    io_uring_submit(&uring_->Uring());
17  }
18  //省略部分代码
19};

AwaitableRead 类的实现,封装了对 io_uring 的读取操作。

中间省略了一些代码细节,但整体思路就是通过 AwaitableRead 类来简化异步读取操作的实现。

await_suspend 函数内部,将协程的句柄与 io_uring 的请求关联起来。然后,准备一个 io_uring 的提交请求,并将 AwaitableBaseOp 对象的指针设置为请求的用户数据。最后,提交请求到 io_uring

await_resume 函数会在协程恢复时被调用,负责获取异步操作的结果并清理资源。这里 return 最后返回结果。这里 return 的值会被 co_await 表达式的调用者获取到。

write

 1class AwaitableWrite {
 2  IoUring *uring_ = nullptr;
 3  int fd_ = 0;
 4  std::string buffer_;
 5  AwaitableBaseOp *op_ = nullptr;
 6
 7public:
 8  //省略部分代码
 9
10  void await_suspend(std::coroutine_handle<> h) {
11    op_ = new AwaitableBaseOp(h);
12
13    io_uring_sqe *sqe = io_uring_get_sqe(&uring_->Uring());
14    io_uring_prep_write(sqe, fd_, buffer_.data(), buffer_.size(), 0);
15    io_uring_sqe_set_data(sqe, op_);
16    io_uring_submit(&uring_->Uring());
17  }
18  //省略部分代码
19};

AwaitableWrite 类的实现,封装了对 io_uring 的写入操作。基本和 AwaitableRead 类类似,通过协程的方式简化了异步写入的流程。await_suspend 函数负责将协程的句柄与 io_uring 的请求关联起来,并提交写入请求。await_resume 函数则负责获取写入操作的结果并清理资源。

run

 1int main() {
 2  IoUring ioUring(8088);
 3  if (auto ret = ioUring.Init(); !ret) {
 4    std::cout << "uring init fail" << ret.error() << std::endl;
 5    return -1;
 6  }
 7  auto t = ioUring.acceptServer();
 8  std::cout << "Server started on port 8088." << std::endl;
 9  t.resume();
10  ioUring.run();
11  return 0;
12}

运行这个程序

  1. 创建一个 IoUring 对象,监听8088端口。
  2. 调用 IoUring::Init 初始化 io_uring
  3. 调用 IoUring::acceptServer 开始接受连接。
  4. 调用 IoUring::run 进入事件循环。

到这,整个协程与 io_uring 的结合就完成了。通过这种方式,我们可以在 C++ 中优雅地处理异步 I/O 操作,充分利用协程的优势,提高代码的可读性和可维护性。 可能会有人问,怎么没看到有关线程 std::thread 的代码?因为这个例子是单线程内使用协程的,所有的操作都是在同一个线程中完成的。所谓的异步是通过协程的挂起和恢复来实现的,而不是通过多线程并发执行。通过异步 I/O 读写时,不会阻塞当前线程,当读写完成后,通知当前线程,来实现的异步。

那这样设计的好处是什么呢?首先,它避免了多线程编程中的许多复杂性,比如线程安全、锁竞争等问题。其次,协程的上下文切换比线程轻量得多,性能开销更小。最后,协程可以让异步代码看起来像同步代码,极大地提高了可读性和可维护性。

如果要使用多线程来充分利用多核 CPU 的性能,可以在每个线程中创建一个 IoUring 对象,并使用协程来处理每个线程中的异步 I/O 操作。这样可以在保持代码简洁的同时,充分利用多核 CPU 的优势。

发表了62篇文章 · 总计150.45k字
本博客已稳定运行
© QX
使用 Hugo 构建
主题 StackJimmy 设计