原创 晓兵XB 云原生云 2022-04-10 16:18,接下来我们就来聊聊关于linux异步阻塞?以下内容大家不妨参考一二希望能帮到您!

linux异步阻塞(Linux-aio异步IO接口-POSIX语义)

linux异步阻塞

Linux-aio异步IO接口-POSIX语义

原创 晓兵XB 云原生云 2022-04-10 16:18

收录于话题

#linux1个

#aio1个

#存储3个

参考链接

linux-aio: https://github.com/littledan/linux-aio

man aio: https://man7.org/linux/man-pages/man7/aio.7.html

linux aio 实现概览: https://blog.csdn.net/Morphad/article/details/14138477

Introduction 简介

aio - POSIX asynchronous I/O overview POSIX异步I/O接口 POSIX 异步 I/O (AIO) 接口允许应用程序启动一个或多个执行的 I/O 异步操作(即在后台)。该应用程序可以选择在 I/O 操作完成时收到通知多种方式: 1. 通过传递信号, 2. 通过实例化线程, 3. 或者根本没有通知。

Note, Linux AIO is now subsumed by the io_uring API (tutorial, LWN coverage). The below explanation is mostly useful for old kernels.

请注意,Linux AIO 现在包含在 io_uring API(教程,LWN 覆盖范围)中。下面的解释对旧内核最有用。

The Asynchronous Input/Output (AIO) interface allows many I/O requests to be submitted in parallel without the overhead of a thread per request. The purpose of this document is to explain how to use the Linux AIO interface, namely the function family io_setup, io_submit, io_getevents, io_destroy. Currently, the AIO interface is best for O_DIRECT access to a raw block device like a disk, flash drive or storage array.

异步输入/输出 (AIO) 接口允许并行提交许多 I/O 请求,而不会产生每个请求的线程开销。本文档的目的是解释如何使用 Linux AIO 接口,即函数家族 io_setup、io_submit、io_getevents、io_destroy。目前,AIO 接口最适合直接“O_DIRECT”访问原始块设备,如磁盘、闪存驱动器或存储阵列。(访问裸盘)

What is AIO?

Input and output functions involve a device, like a disk or flash drive, which works much slower than the CPU. Consequently, the CPU can be doing other things while waiting for an operation on the device to complete. There are multiple ways to handle this:

输入和输出功能涉及一个设备,如磁盘或闪存驱动器,它的工作速度比 CPU 慢得多。因此,CPU 可以在等待设备上的操作完成时做其他事情。有多种方法可以处理这个问题:

Asynchronous I/O can be considered “lower level” than synchronous I/O because it does not make use of a system-provided concept of threads to organize its computation. However, it is often more efficient to use Aio than synchronous I/O due the nondeterministic overhead of threads.

异步 I/O 可以被认为比同步 I/O “更低级别(更底层)”,因为它没有使用系统提供的线程概念来组织其计算。但是,由于线程的不确定性开销,使用 AIO 通常比使用同步 I/O 更高效。

The Linux AIO model 异步IO模型(流程)

The Linux AIO model is used as follows:

  1. Open an I/O context to submit and reap I/O requests from. 打开 I/O 上下文以提交和获取 I/O 请求
  2. Create one or more request objects and set them up to represent the desired operation 创建一个或多个请求对象并设置它们以表示所需的操作
  3. Submit these requests to the I/O context, which will send them down to the device driver to process on the device 将这些请求提交到 I/O 上下文,这会将它们发送到设备驱动程序以在设备上进行处理
  4. Reap completions from the I/O context in the form of event completion objects, 以事件完成对象的形式从 I/O 上下文中完成IO(读取)
  5. Return to step 2 as needed. 回到第2步(如果需要)
I/O context 上下文

io_context_t is a pointer-sized opaque datatype that represents an “AIO context”. It can be safely passed around by value. Requests in the form of a struct iocb are submitted to an io_context_t and completions are read from the io_context_t. Internally, this structure contains a queue of completed requests. The length of the queue forms an upper bound on the number of concurrent requests which may be submitted to the io_context_t.

io_context_t 是一个指针大小的不透明数据类型,表示“AIO 上下文”。它可以通过值安全地传递。struct iocb 形式的请求被提交到 io_context_t 并从 io_context_t 读取完成。在内部,此结构包含已完成请求的队列。队列的长度构成了可以提交给 io_context_t 的并发请求数的上限。

To create a new io_context_t, use the function 使用io_setup创建io上下文io_context_t

int io_setup(int maxevents, io_context_t *ctxp);

Here, ctxp is the output and maxevents is the input. The function creates an io_context_t with an internal queue of length maxevents. To deallocate an io_context_t, use

这里,ctxp 是输出,maxevents 是输入。该函数创建一个具有长度为“maxevents”的内部队列的“io_context_t”。要释放 io_context_t,请使用

int io_destroy(io_context_t ctx);

There is a system-wide maximum number of allocated io_context_t objects, set at 65536.

系统范围内分配的 io_context_t 对象的最大数量设置为 65536。

An io_context_t object can be shared between threads, both for submission and completion. No guarantees are provided about ordering of submission and completion with respect to interaction from multiple threads. There may be performance implications from sharing io_context_t objects between threads.

io_context_t 对象可以在线程之间共享,用于提交和完成。对于来自多个线程的交互,不保证提交和完成的顺序。在线程之间共享 io_context_t 对象可能会影响性能。

Submitting requests 提交请求

struct iocb represents a single request for a read or write operation. The following struct shows a simplification on the struct definition; a full definition is found in <libaio.h> within the libaio source code.

struct iocb 表示对读取或写入操作的单个请求。以下结构显示了结构定义的简化;在 libaio 源代码中的 <libaio.h> 中可以找到完整的定义。

struct iocb { void *data; short aio_lio_opcode; int aio_fildes; union { struct { void *buf; unsigned long nbytes; long long offset; } c; } u; };

The meaning of the fields is as follows: 字段的含义如下:

The convenience functions io_prep_pread and io_prep_pwrite can be used to initialize a struct iocb.New operations are sent to the device with io_submit.

便利函数 io_prep_pread 和 io_prep_pwrite 可用于初始化 struct iocb。新操作通过 io_submit 发送到设备。

int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);

io_submit allows an array of pointers to struct iocbs to be submitted all at once. In this function call, nr is the length of the ios array. If multiple operations are sent in one array, then no ordering guarantees are given between the iocbs. Submitting in larger batches sometimes results in a performance improvement due to a reduction in CPU usage. A performance improvement also sometimes results from keeping many I/Os ‘in flight’ simultaneously.

io_submit 允许一次提交所有指向 struct iocb 的指针数组。在这个函数调用中,nr 是 ios 数组的长度。如果在一个数组中发送多个操作,则在 iocb 之间不提供排序保证。由于 CPU 使用率的降低,大批量提交有时会导致性能提升。有时,同时保持许多 I/O 处于“运行状态”也会导致性能提升。

If the submission includes too many iocbs such that the internal queue of the io_context_t would overfill on completion, then io_submit will return a non-zero number and set errno to EAGAIN.

如果提交包含太多 iocb,以至于 io_context_t 的内部队列在完成时会溢出,则 io_submit 将返回一个非零数字并将 errno 设置为 EAGAIN(重试)。

When used under the right conditions, io_submit should not block. However, when used in certain ways, it may block, undermining the purpose of asynchronous I/O. If this is a problem for your application, be sure to use the O_DIRECT flag when opening a file, and operate on a raw block device. Work is ongoing to fix the problem.

在正确的条件下使用时,io_submit 不应阻塞。但是,当以某些方式使用时,它可能会阻塞,从而破坏异步 I/O 的目的。如果这对您的应用程序来说是个问题,请务必在打开文件时使用 O_DIRECT 标志,并在原始块设备上进行操作。社区正在解决这个问题

Processing results 处理结果

Completions read from an io_context_t are of the type struct io_event, which contains the following relevant fields.

从 io_context_t 读取的完成属于 struct io_event 类型,其中包含以下相关字段。

struct io_event { void *data; struct iocb *obj; long long res; };

Here, data is the same data pointer that was passed in with the struct iocb, and obj is the original struct iocb. res is the return value of the read or write.

这里,data 是与 struct iocb 一起传入的相同数据指针,而 obj 是原始的 struct iocb。res 是读取或写入的返回值。

Completions are reaped with io_getevents. 使用 io_getevents 获得处理完成的结果

int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

This function has a good number of parameters, so an explanation is in order: 这个函数有很多参数,所以解释一下

The return value represents how many completions were reported, i.e. how much of events was written. The return value will be between 0 and nr. The return value may be lower than min_nr if the timeout expires; if the timeout is NULL, then the return value will be between min_nr and nr.

返回值表示报告了多少完成,即写入了多少事件。返回值将介于 0 和 nr 之间。如果超时,返回值可能低于min_nr;如果超时为 NULL,则返回值将介于 min_nr 和 nr 之间。

The parameters give a broad range of flexibility in how AIO can be used. 这些参数为如何使用 AIO 提供了广泛的灵活性。

Even if min_nr = 0 or 1, it is useful to make nr a bit bigger for performance reasons: more than one event may be already complete, and it could be processed without multiple calls to io_getevents. The only cost of a larger nr value library is that the user must allocate a larger array of events and be prepared to accept them.

即使 min_nr = 0 或 1,出于性能原因,将 nr 设置得更大一点也很有用:可能已经完成了多个事件,并且可以在不多次调用 io_getevents 的情况下对其进行处理。更大的 nr 值库的唯一成本是用户必须分配更大的事件数组并准备好接受它们。

Use with epoll 使用epoll

Any iocb can be set to notify an eventfd on completion using the libaio function io_set_eventfd. The eventfd can be put in an epoll object. When the eventfd is triggered, then the io_getevents function can be called on the corresponding io_context_t. 任何 iocb 都可以使用 libaio 函数 io_set_eventfd 设置为在完成时通知 eventfd。eventfd 可以放在epoll 对象中。当 eventfd 被触发时,可以在对应的 io_context_t 上调用 io_getevents 函数。

There is no way to use this API to trigger an eventfd only when multiple operations are complete--the eventfd will always be triggered on the first operation. Consequently, as described in the previous section, it will often make sense to use min_nr = 1 when using io_getevents after an epoll_wait call that indicates an eventfd involved in AIO. 仅当多个操作完成时,无法使用此 API 触发 eventfd - eventfd 将始终在第一个操作时触发。因此,如前一节所述,在指示 AIO 中涉及的 eventfd 的调用 epoll_wait 之后使用 io_getevents 时,使用 min_nr = 1 通常是有意义的。

Performance considerations 性能注意事项Alternatives to Linux AIO 替代方案Sample code 示例代码

Below is some example code which uses Linux AIO. I wrote it at Google, so it uses the Google glog logging library and the Google gflags command-line flags library, as well as a loose interpretation of Google’s C coding conventions. When compiling it with gcc, pass -laio to dynamically link with libaio. (It isn’t included in glibc, so it must be explicitly included.)

下面是一些使用 Linux AIO 的示例代码。我是在 Google 写的,所以它使用 Google glog 日志库 和 Google gflags 命令行标志库,以及对 Google 的 C 编码约定 的松散解释。使用 gcc 编译时,通过 -laio 与 libaio 动态链接。(它不包含在 glibc 中,因此必须明确包含。)

// Code written by Daniel Ehrenberg, released into the public domain #include <fcntl.h> #include <gflags/gflags.h> #include <glog/logging.h> #include <libaio.h> #include <stdlib.h> #include <stdio.h> #include <sys/stat.h> #include <sys/types.h> DEFINE_string(path, "/tmp/testfile", "Path to the file to manipulate"); DEFINE_int32(file_size, 1000, "Length of file in 4k blocks"); DEFINE_int32(concurrent_requests, 100, "Number of concurrent requests"); DEFINE_int32(min_nr, 1, "min_nr"); DEFINE_int32(max_nr, 1, "max_nr"); // The size of operation that will occur on the device static const int kPageSize = 4096; class AIORequest { public: int* buffer_; virtual void Complete(int res) = 0; AIORequest() { int ret = posix_memalign(reinterpret_cast<void**>(&buffer_), kPageSize, kPageSize); CHECK_EQ(ret, 0); } virtual ~AIORequest() { free(buffer_); } }; class Adder { public: virtual void Add(int amount) = 0; virtual ~Adder() { }; }; class AIOReadRequest : public AIORequest { private: Adder* adder_; public: AIOReadRequest(Adder* adder) : AIORequest(), adder_(adder) { } virtual void Complete(int res) { CHECK_EQ(res, kPageSize) << "Read incomplete or error " << res; int value = buffer_[0]; LOG(INFO) << "Read of " << value << " completed"; adder_->Add(value); } }; class AIOWriteRequest : public AIORequest { private: int value_; public: AIOWriteRequest(int value) : AIORequest(), value_(value) { buffer_[0] = value; } virtual void Complete(int res) { CHECK_EQ(res, kPageSize) << "Write incomplete or error " << res; LOG(INFO) << "Write of " << value_ << " completed"; } }; class AIOAdder : public Adder { public: int fd_; io_context_t ioctx_; int counter_; int reap_counter_; int sum_; int length_; AIOAdder(int length) : ioctx_(0), counter_(0), reap_counter_(0), sum_(0), length_(length) { } void Init() { LOG(INFO) << "Opening file"; fd_ = open(FLAGS_path.c_str(), O_RDWR | O_DIRECT | O_CREAT, 0644); PCHECK(fd_ >= 0) << "Error opening file"; LOG(INFO) << "Allocating enough space for the sum"; PCHECK(fallocate(fd_, 0, 0, kPageSize * length_) >= 0) << "Error in fallocate"; LOG(INFO) << "Setting up the io context"; PCHECK(io_setup(100, &ioctx_) >= 0) << "Error in io_setup"; } virtual void Add(int amount) { sum_ = amount; LOG(INFO) << "Adding " << amount << " for a total of " << sum_; } void SubmitWrite() { LOG(INFO) << "Submitting a write to " << counter_; struct iocb iocb; struct iocb* iocbs = &iocb; AIORequest *req = new AIOWriteRequest(counter_); io_prep_pwrite(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize); iocb.data = req; int res = io_submit(ioctx_, 1, &iocbs); CHECK_EQ(res, 1); } void WriteFile() { reap_counter_ = 0; for (counter_ = 0; counter_ < length_; counter_ ) { SubmitWrite(); Reap(); } ReapRemaining(); } void SubmitRead() { LOG(INFO) << "Submitting a read from " << counter_; struct iocb iocb; struct iocb* iocbs = &iocb; AIORequest *req = new AIOReadRequest(this); io_prep_pread(&iocb, fd_, req->buffer_, kPageSize, counter_ * kPageSize); iocb.data = req; int res = io_submit(ioctx_, 1, &iocbs); CHECK_EQ(res, 1); } void ReadFile() { reap_counter_ = 0; for (counter_ = 0; counter_ < length_; counter_ ) { SubmitRead(); Reap(); } ReapRemaining(); } int DoReap(int min_nr) { LOG(INFO) << "Reaping between " << min_nr << " and " << FLAGS_max_nr << " io_events"; struct io_event* events = new io_event[FLAGS_max_nr]; struct timespec timeout; timeout.tv_sec = 0; timeout.tv_nsec = 100000000; int num_events; LOG(INFO) << "Calling io_getevents"; num_events = io_getevents(ioctx_, min_nr, FLAGS_max_nr, events, &timeout); LOG(INFO) << "Calling completion function on results"; for (int i = 0; i < num_events; i ) { struct io_event event = events[i]; AIORequest* req = static_cast<AIORequest*>(event.data); req->Complete(event.res); delete req; } delete events; LOG(INFO) << "Reaped " << num_events << " io_events"; reap_counter_ = num_events; return num_events; } void Reap() { if (counter_ >= FLAGS_min_nr) { DoReap(FLAGS_min_nr); } } void ReapRemaining() { while (reap_counter_ < length_) { DoReap(1); } } ~AIOAdder() { LOG(INFO) << "Closing AIO context and file"; io_destroy(ioctx_); close(fd_); } int Sum() { LOG(INFO) << "Writing consecutive integers to file"; WriteFile(); LOG(INFO) << "Reading consecutive integers from file"; ReadFile(); return sum_; } }; int main(int argc, char* argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); AIOAdder adder(FLAGS_file_size); adder.Init(); int sum = adder.Sum(); int expected = (FLAGS_file_size * (FLAGS_file_size - 1)) / 2; LOG(INFO) << "AIO is complete"; CHECK_EQ(sum, expected) << "Expected " << expected << " Got " << sum; printf("Successfully calculated that the sum of integers from 0" " to %d is %d\n", FLAGS_file_size - 1, sum); return 0; }

另一个示例:

#define _GNU_SOURCE #define __STDC_FORMAT_MACROS #include <stdio.h> #include <errno.h> #include <libaio.h> #include <sys/eventfd.h> #include <sys/epoll.h> #include <stdlib.h> #include <sys/types.h> #include <unistd.h> #include <stdint.h> #include <sys/stat.h> #include <fcntl.h> #include <inttypes.h> #define TEST_FILE "aio_test_file" #define TEST_FILE_SIZE (127 * 1024) #define NUM_EVENTS 128 #define ALIGN_SIZE 512 #define RD_WR_SIZE 1024 /* 环境: 在centos 6.2 (libaio-devel 0.3.107-10) 上运行通过 参考: linux异步IO编程实例分析(https://zhuanlan.zhihu.com/p/258464210) 执行: gcc aio.c -laio */ struct custom_iocb { struct iocb iocb; int nth_request; }; void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct custom_iocb *iocbp = (struct custom_iocb *)iocb; printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n", iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE", iocb->u.c.offset, iocb->u.c.nbytes, res, res2); } int main(int argc, char *argv[]) { int efd, fd, epfd; io_context_t ctx; struct timespec tms; struct io_event events[NUM_EVENTS]; struct custom_iocb iocbs[NUM_EVENTS]; struct iocb *iocbps[NUM_EVENTS]; struct custom_iocb *iocbp; int i, j, r; void *buf; struct epoll_event epevent; // 这里的resfd是通过系统调用eventfd生成的, eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是由内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取, 有了eventfd,就可以很好地将libaio和epoll事件循环结合起来 efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (efd == -1) { perror("eventfd"); return 2; } fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644); if (fd == -1) { perror("open"); return 3; } ftruncate(fd, TEST_FILE_SIZE); ctx = 0; // 1. 建立IO任务 int io_setup (int maxevents, io_context_t *ctxp); if (io_setup(8192, &ctx)) { perror("io_setup"); return 4; } // 读写的buf都必须是按扇区对齐的,可以用posix_memalign来分配 if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) { perror("posix_memalign"); return 5; } printf("buf: %p\n", buf); for (i = 0, iocbp = iocbs; i < NUM_EVENTS; i, iocbp) { iocbps[i] = &iocbp->iocb; // 填充iocb结构体, void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset) io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE); io_set_eventfd(&iocbp->iocb, efd); // 将eventfd设置到iocb中 io_set_callback(&iocbp->iocb, aio_callback); iocbp->nth_request = i 1; } // 2.提交IO任务, long io_submit (aio_context_t ctx_id, long nr, struct iocb **iocbpp); if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) { perror("io_submit"); return 6; } // 创建一个epollfd,并将eventfd加到epoll中 epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create"); return 7; } epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) { perror("epoll_ctl"); return 8; } i = 0; while (i < NUM_EVENTS) { uint64_t finished_aio; if (epoll_wait(epfd, &epevent, 1, -1) != 1) { perror("epoll_wait"); return 9; } // 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) { perror("read"); return 10; } printf("finished io number: %" PRIu64 "\n", finished_aio); while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; // 3.获取完成的IO, 提供一个io_event数组给内核来copy完成的IO请求到这里,数组的大小是io_setup时指定的maxevents,timeout是指等待IO完成的超时时间,设置为NULL表示一直等待所有到IO的完成 // long io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout); r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms); // printf("r:%d", r); if (r > 0) { for (j = 0; j < r; j) { ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2); } i = r; finished_aio -= r; } else { finished_aio = 0; printf("finished_aio:%" PRIu64 "\n", finished_aio); } } } printf("end\n"); close(epfd); free(buf); // 4.销毁IO任务, int io_destroy (io_context_t ctx); io_destroy(ctx); close(fd); close(efd); remove(TEST_FILE); return 0; }

更多:

https://github.com/ssbandjl/it

,