Featured image of post Linux下使用iouring实现一个tcp服务

Linux下使用iouring实现一个tcp服务

探索 Linux io_uring 异步 I/O 接口,通过 liburing 库实现高效 TCP 服务。本文深入 io_uring 的基础概念,包括 Submission Queue 和 Completion Queue,并通过示例代码演示如何初始化、提交 I/O 请求和处理完成事件。了解 io_uring 如何提升 I/O 密集型应用的性能,以及在实际部署中需要注意的错误处理和连接管理。

今年一直没怎么写东西,看了一下上一篇文章,都过去半年了。前段时间一直想写一点redis的东西,但是一直没有时间去研读源码,所以就一直没有写。 最近有时间,看了一点 iouring 的东西,就写了一篇文章,介绍了一下 iouring 的基本使用,实现了一个简单的TCP server

很早就听说过 iouring ,但是一直没有时间学习。最近有时间就找了一些资料,学习了一下,写了一个简单的例子,实现了一个简单的tcp服务。

这篇文章主要介绍了如何使用 iouring 实现一个简单的 tcp 服务,只是介绍了iouring的基本使用,没有涉及底层实现。 后面有时间再去学一下 liburing 的源码,看看底层是如何实现的。

先解释一下两个名词:iouringliburingiouringLinux 内核在 5.1 版本引入的一个新的异步I/O接口。 liburingiouring 的一个用户态库,封装了 iouring 的接口,使得用户可以更方便的使用 iouring

简单说就是 io_uring 是linux内核的功能,对外提供了一套异步I/O的接口。因为直接使用 linux内核的 io_uring 接口太麻烦了, 所以在就写了 liburing 这个库,对内核的 io_uring 封装,提供了一套更加友好的接口,使得用户可以更方便的使用 io_uring

什么是iouring

io_uring是Linux内核在 5.1 版本引入的一个新的异步I/O接口。 io_uring的目标是提供一个高效的、统一的异步I/O接口,以替代现有的异步I/O接口(如aio、epoll、eventfd等)。 io_uring的设计目标是提供一个简单的、高效的、统一的异步I/O接口,以便应用程序可以更容易地利用异步I/O的优势。

虽然linux内核提供了很多异步I/O的接口,比如aio。 在linux上可以使用

fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK);

来设置 socket 为非阻塞模式,后续在读写 socketfd 时,使用 readwrite 函数 就是非阻塞的了。

在调用 readwrite 函数时,如果 fd 没有数据,或者 fd 的缓冲区满了,那么 readwrite 函数会立即返回, 不会阻塞

但是这种方式有一个问题,就是 readwrite 函数的调用是阻塞的,虽然 readwrite 函数会立即返回,但是 readwrite 函数的调用是阻塞的,会占用 CPU 的资源。

因为之前的异步I/O接口不够好用,所以 io_uring 就诞生了。

io_uring 的设计目标是提供一个简单的、高效的、统一的异步I/O接口,以便应用程序可以更容易地利用异步I/O的优势。 现在的 io_uring 支持 文件 I/O网络 I/O定时器信号 等。真正的统一了 I/O 异步操作。

说到异步I/O,就不得不提一下 epollepollLinux 内核提供的一种 I/O 多路复用机制,可以同时监控多个 fdI/O 事件。 epoll 只是这个 多路复用机制,只是用来监控 fdI/O 事件,当 fdI/O 事件时(fd 可以被读写时), epoll 会通知应用程序。 真正的 I/O 操作还是由 readwrite 函数来完成的。 所以说 epoll 并不是异步 I/O,只是 I/O 多路复用机制。

io_uring的基础概念

io_uring 有两个重要的概念:Submission QueueCompletion Queue。 简称 sqcq

Submission Queueio_uring 的提交队列,用来提交 I/O 请求。

Completion Queueio_uring 的完成队列,用来存放 I/O 请求的完成状态。

sqcq 相对应的是 sqecqe

sqeSubmission Queue 的元素,用来描述一个 I/O 请求。

cqeCompletion Queue 的元素,用来描述一个 I/O 请求的完成状态。

放一张iouring的工作流程图:

io_uring中非常重要的两个数据结构 sqcq。是用户空间和内核空间之间的通信桥梁。

io_uring 的工作流程如下:

  1. 应用程序向 Submission Queue 中提交 I/O 请求。
  2. io_uring 内核模块从 Submission Queue 中取出 I/O 请求,执行 I/O 操作。
  3. io_uring 内核模块将 I/O 请求的完成状态写入 Completion Queue
  4. 应用程序从 Completion Queue 中读取 I/O 请求的完成状态。
  5. 应用程序处理 I/O 请求的完成状态。

因为这次只是简单的介绍 iouring 的基本使用,所以就不深入讲 iouring 的原理了。

liburing的使用

liburingiouring 的一个用户态库,封装了 iouring 的接口,使得用户可以更方便的使用 iouring

liburing中封装了很多操作函数,这次也不会全部介绍,只介绍一些常用的函数。

liburing常用的函数

io_uring_queue_init

初始化 io_uring 对象。

1int io_uring_queue_init(unsigned entries, struct io_uring *ring,unsigned flags);

有三个参数:

  1. entriesSubmission QueueCompletion Queue 的大小。
  2. ringio_uring 对象。
  3. flags: 保留参数,传 0 即可。

io_uring_queue_exit

销毁 io_uring 对象。

1void io_uring_queue_exit(struct io_uring *ring);

传入 io_uring 对象即可。

io_uring_get_sqe

获取 Submission Queue 的元素。

1struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);

传入 io_uring 对象,返回一个 sqe 对象,用来描述一个 I/O 请求。

io_uring_submit

提交 I/O 请求。

1int io_uring_submit(struct io_uring *ring);

传入 io_uring 对象,提交 I/O 请求。

io_uring_wait_cqe

等待 I/O 请求的完成状态。

1int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);

传入 io_uring 对象,返回 cqe 对象,用来描述一个 I/O 请求的完成状态。

io_uring_submit_and_wait

提交 I/O 请求,并等待 I/O 请求的完成状态。

1int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr);

传入 io_uring 对象,提交 I/O 请求,并等待 I/O 请求的完成状态。

可以看做是 io_uring_submitio_uring_wait_cqe 的组合。

io_uring_prep_accept

Submission Queue 中添加一个 accept 请求。

1void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);

有五个参数:

  1. sqeSubmission Queue 的元素。
  2. fdsocketfd
  3. addrsockaddr 结构体。
  4. addrlensockaddr 结构体的长度。
  5. flags: 保留参数,传 0 即可。

io_uring_prep_recv

Submission Queue 中添加一个 recv 请求,也就是从fd中异步读取数据。

1void io_uring_prep_recv(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, unsigned flags);

有五个参数:

  1. sqeSubmission Queue 的元素。
  2. fdsocketfd
  3. buf: 接收数据的缓冲区,一般是 char 数组的地址。
  4. nbytes: 缓冲区的大小。
  5. flags: 保留参数,传 0 即可。

io_uring_prep_send

Submission Queue 中添加一个 send 请求,也就是向fd中异步写入数据。

1void io_uring_prep_send(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, unsigned flags);

有五个参数:

  1. sqeSubmission Queue 的元素。
  2. fdsocketfd
  3. buf: 发送数据的缓冲区,一般是 char 指针的地址。
  4. nbytes: 缓冲区的大小。
  5. flags: 保留参数,传 0 即可。

io_uring_sqe_set_data

设置 sqe 的数据,把用户态的数据和内核绑定。

1void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data);

传入 sqe 对象和用户态的数据,把用户态的数据和内核绑定。

io_uring_cqe_seen

标记 cqe 为已处理。

1void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe);

传入 io_uring 对象和 cqe 对象,标记 cqe 为已处理。

实现一个简单的tcp服务

io_uring 只能在 linux 内核 5.1 以上版本使用,所以在使用io_uring之前,需要先检查一下内核版本。 一般最新的 ubuntuDebian 都是支持 io_uring 的。

使用 liburing 需要安装 liburing 库。

1sudo apt-get install liburing-dev

下面是一个简单的 tcp 服务,功能是接收客户端的连接,然后向客户端发送一条消息。

  1#include <iostream>
  2
  3#include <liburing.h>
  4#include <netinet/in.h>
  5#include <map>
  6
  7const int QUEUE_DEPTH = 128;
  8const int BUFFER_SIZE = 4096;
  9
 10enum ConnectionType {
 11    ACCEPT,
 12    READ,
 13    WRITE,
 14};
 15
 16struct Connection {
 17    int fd;
 18    int type{};
 19    char readBuf[BUFFER_SIZE]{};
 20    std::string writeBuf;
 21
 22    explicit Connection(int _fd) { fd = _fd; }
 23};
 24
 25std::map<int, Connection *> connections;
 26
 27int createListener(sockaddr_in *addr) {
 28    int listener = socket(AF_INET, SOCK_STREAM, 0);
 29    if (listener < 0) {
 30        std::cerr << "socket failed errno:" << errno << std::endl;
 31        return -1;
 32    }
 33
 34    int opt = 1;
 35    if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
 36        std::cerr << "setsockopt failed errno:" << errno << std::endl;
 37        close(listener);
 38        return -1;
 39    }
 40
 41    if (bind(listener, reinterpret_cast<sockaddr *>(addr), sizeof(*addr)) < 0) {
 42        std::cerr << "bind failed errno:" << errno << std::endl;
 43        close(listener);
 44        return -1;
 45    }
 46
 47    if (listen(listener, 10) < 0) {
 48        std::cerr << "listen failed errno:" << errno << std::endl;
 49        return -1;
 50    }
 51
 52    return listener;
 53}
 54
 55void acceptConnection(io_uring *ring, Connection *conn, sockaddr *addr, socklen_t *clientLen) {
 56    conn->type = ACCEPT;
 57    auto sqe = io_uring_get_sqe(ring);
 58    io_uring_prep_accept(sqe, conn->fd, addr, clientLen, 0);
 59    io_uring_sqe_set_data(sqe, conn);
 60}
 61
 62void addSocketRead(io_uring *ring, Connection *conn) {
 63    conn->type = READ;
 64    auto sqe = io_uring_get_sqe(ring);
 65    io_uring_prep_recv(sqe, conn->fd, &conn->readBuf, BUFFER_SIZE, 0);
 66    io_uring_sqe_set_data(sqe, conn);
 67}
 68
 69void addSocketWrite(io_uring *ring, Connection *conn) {
 70    conn->type = WRITE;
 71    auto sqe = io_uring_get_sqe(ring);
 72    io_uring_prep_send(sqe, conn->fd, conn->writeBuf.data(), conn->writeBuf.size(), 0);
 73    io_uring_sqe_set_data(sqe, conn);
 74}
 75
 76Connection *newConn(io_uring *ring, int fd) {
 77    auto conn = new Connection(fd);
 78    connections[fd] = conn;
 79    addSocketRead(ring, conn);
 80    return conn;
 81}
 82
 83int uringRun() {
 84    sockaddr_in addr{};
 85    addr.sin_family = AF_INET;
 86    addr.sin_port = htons(8088);
 87    addr.sin_addr.s_addr = INADDR_ANY;
 88
 89    int listenFd = createListener(&addr);
 90    if (listenFd < 0) {
 91        return 1;
 92    }
 93    socklen_t clientLen = sizeof(addr);
 94    io_uring ring{};
 95    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
 96        std::cerr << "io_uring_queue_init failed errno:" << errno << std::endl;
 97        return 1;
 98    }
 99    auto lConn = newConn(&ring, listenFd);
100    acceptConnection(&ring, lConn, reinterpret_cast<sockaddr *> (&addr), &clientLen);
101    if (io_uring_submit(&ring) < 0) {
102        std::cerr << "io_uring_submit failed errno:" << errno << std::endl;
103        return 1;
104    }
105
106    io_uring_cqe *cqes[QUEUE_DEPTH];
107    while (true) {
108        //等待事件完成
109        int ret = io_uring_submit_and_wait(&ring, 1);
110        if (ret < 0) {
111            std::cerr << "io_uring_wait_cqe failed errno:" << errno << std::endl;
112            break;
113        }
114
115        //获取完成的事件
116        auto num = io_uring_peek_batch_cqe(&ring, cqes, QUEUE_DEPTH);
117        for (int i = 0; i < num; ++i) {
118            auto conn = reinterpret_cast<Connection *> (cqes[i]->user_data);
119            if (conn->type == ACCEPT) {//新连接
120                int clientFd = cqes[i]->res;
121                auto newCLi = newConn(&ring, clientFd);
122                addSocketRead(&ring, newCLi);
123                acceptConnection(&ring, lConn, reinterpret_cast<sockaddr *> (&addr), &clientLen);
124            } else if (conn->type == READ) {
125                int readSize = cqes[i]->res;
126                if (readSize < 0) {//读取失败(比如客户端断开连接)
127                    shutdown(conn->fd, SHUT_RDWR);
128                    connections.erase(conn->fd);
129                    delete conn;
130                } else {
131                    std::cout << "read:" << conn->readBuf << std::endl;
132                    conn->writeBuf = "hello client";
133                    addSocketWrite(&ring, conn);//向客户端写数据
134                }
135            } else if (conn->type == WRITE) {
136                addSocketRead(&ring, conn);// 把这个连接加入读事件
137            }
138            io_uring_cqe_seen(&ring, cqes[i]);// mark the cqe as processed
139        }
140    }
141    io_uring_queue_exit(&ring);
142    return 0;
143}
144
145int main() {
146    uringRun();
147    return 0;
148}

编译这段代码的时候,需要链接 liburing 库。

1g++ -o server server.cpp -luring

createListener 函数用来创建一个 socket 监听 8088 端口。这个函数没什么特别的,就是创建一个 socket,然后绑定 8088 端口,然后监听。

acceptConnection 函数用来接收客户端的连接。这个函数调用 io_uring_prep_accept 函数,向 Submission Queue 中添加一个 accept 请求。

addSocketRead 函数用来向 Submission Queue 中添加一个 recv 请求,也就是从 fd 中异步读取数据。

addSocketWrite 函数用来向 Submission Queue 中添加一个 send 请求,也就是向 fd 中异步写入数据。

newConn 函数用来创建一个新的连接。这个函数会调用 addSocketRead 函数,向 Submission Queue 中添加一个 recv 请求。

uringRun 函数是主函数。这个函数会创建一个 io_uring 对象,然后调用 createListener 函数创建一个监听 socket,然后调用 newConn 函数创建一个新的连接。

有一个知识点需要注意一下,在 io_uring中,每次获取的 cqe 在使用完之后,需要调用 io_uring_cqe_seen 函数,标记 cqe 为已处理。 同时处理的时候,也会把 这个 cqeCompletion Queue 中移除,对应的 fd 也会从 io_uring 中移除。 所以,为了保证后续能继续接收到 fdI/O 事件,需要在处理完 cqe 之后,再次调用 addSocketRead 函数,向 Submission Queue 中添加一个 recv 请求。

这些函数的内容都比较简单,就不再详细介绍了。主要看一下 while 循环的内容。

int ret = io_uring_submit_and_wait(&ring, 1); 这个函数会提交 I/O 请求,并等待 I/O 请求的完成状态。这个函数会阻塞,直到有 1个 I/O 请求完成。

这里可以使用 io_uring_submit 这个函数只提交 I/O 请求,不等待 I/O 请求的完成状态。但是这样的话,while循环会一直循环,不会阻塞,对应的表现就是 CPU 占用率会很高。 这种情况适用于 I/O 请求比较多的情况,比如 nginx 这种 web 服务器。

一般情况下,使用 io_uring_submit_and_wait 这个函数就可以了。

如果 返回的 ret 小于 0,说明出错了,这里简单的打印一下错误信息,然后退出。

auto num = io_uring_peek_batch_cqe(&ring, cqes, QUEUE_DEPTH); 这个函数会获取 Completion Queue 中的 cqe,一次最多获取 QUEUE_DEPTHcqe

这个函数有点类似于 epollepoll_wait 函数,会一次性获取多个 cqe

然后遍历 cqe,根据 cqetype 来处理 I/O 事件。在之前的submit的时候,指定了有 ACCEPTREADWRITE 三种 I/O 事件。

因为之前的 cqe 中的 user_dataConnection 对象的指针,所以可以根据 cqeuser_data 来获取 Connection 对象,然后根据 Connection 对象的 type 来处理 I/O 事件。 所以可以 reinterpret_cast<Connection *> (cqes[i]->user_data);强转来获取 Connection 对象。

如果 typeACCEPT,说明是新的连接,就调用 newConn 函数创建一个新的连接,然后调用 acceptConnection 函数接收新的连接。 在接收新的连接之后,需要再次调用 acceptConnection 函数,把listener的socket放回io_uring中,后面才能继续工作。

如果 typeREAD,说明是读事件,就读取 cqeres 字段,如果小于 0,说明读取失败,比如客户端断开连接,就关闭 fd,然后从 connections 中移除这个 fd,然后释放 Connection 对象。 如果读取成功,就打印读取的数据,然后向客户端发送一条消息。

如果 typeWRITE,说明是写事件,在这里就是向客户端写数据完成了。 调用 addSocketRead 函数,向 Submission Queue 中添加一个 读的请求,然后这个 fd 就会继续接收 I/O 事件。

最后调用 io_uring_cqe_seen 函数,标记 cqe 为已处理。

这样一个简单的 tcp 服务就实现了。这个服务只是一个简单的例子,没法用在生产环境中,只是用来学习 iouring 的基本使用。

像是中间的错误处理,连接的超时等问题都没有处理,这些问题需要根据实际情况来处理。

这个例子中,一个 socket 同时只能读或者写,不能同时读写,这个问题也需要根据实际情况来处理。

总结

这篇文章主要介绍了 iouring 的基本使用,实现了一个简单的 tcp 服务。只是介绍了 iouring 的基本使用,没有涉及底层实现。 后面有时间再去学一下 liburing 的源码,看看底层是如何实现的。 看很多人都说 iouring 性能很高,尤其是在 I/O 密集型的场景下,性能提升很明显。这部分内容还没有验证,后面有时间再去验证一下。

因为也是初学 iouring,所以文章中可能有错误,欢迎批评指正。

发表了55篇文章 · 总计119.18k字
本博客已稳定运行
© QX
使用 Hugo 构建
主题 StackJimmy 设计