FIO线程模型

网友投稿 220 2022-10-13

FIO线程模型

目的

fio 本身支持测试多种引擎,为了提高准确度,其自身线程开销就要求比较小,线程模型就要求比较合适。为此,有必要了解FIO内部io 请求提交和完成的流程。

异步提交过程

fio.c main()

51 fio_time_init(); 1508 /* 52 1509 * Entry point for the thread based jobs. The process based jobs end up 53 if (nr_clients) { 1510 * here as well, after a little setup. 54 set_genesis_time(); 1511 */ 55 1512 static void *thread_main(void *data) 56 if (fio_start_all_clients()) 1513 { 57 goto done_key; 1514 struct fork_data *fd = data; 58 ret = fio_handle_clients(&fio_client_ops); 1515 unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, }; 59 } else 1516 struct thread_data *td = fd->td; 60 ret = fio_backend(NULL);

重点在fio_backend()函数里面,主要内容如下:

int fio_backend(struct sk_out *sk_out) { struct thread_data *td; int i; if (exec_profile) { if (load_profile(exec_profile)) return 1; free(exec_profile); exec_profile = NULL; } if (!thread_number) return 0; if (write_bw_log) { struct log_params p = { .log_type = IO_LOG_TYPE_BW, }; setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log"); setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log"); setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log"); } startup_sem = fio_sem_init(FIO_SEM_LOCKED); if (startup_sem == NULL) return 1; set_genesis_time(); stat_init(); helper_thread_create(startup_sem, sk_out); cgroup_list = smalloc(sizeof(*cgroup_list)); if (cgroup_list) INIT_FLIST_HEAD(cgroup_list); run_threads(sk_out); helper_thread_exit(); ......

上面主要的函数是 run_threads(),它会建立主要的IO线程:

建立IO 提交的线程 thread_main

td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED); td->update_rusage = 0; /* * Set state to created. Thread will transition * to TD_INITIALIZED when it's done setting up. */ td_set_runstate(td, TD_CREATED); map[this_jobs++] = td; nr_started++; fd = calloc(1, sizeof(*fd)); fd->td = td; fd->sk_out = sk_out; if (td->o.use_thread) { int ret; dprint(FD_PROCESS, "will pthread_create\n"); ret = pthread_create(&td->thread, NULL, thread_main, fd); if (ret) { log_err("pthread_create: %s\n", strerror(ret)); free(fd); nr_started--; break; }

thread_main里和IO相关的核心函数是:do_io

while (keep_running(td)) { uint64_t verify_bytes; fio_gettime(&td->start, NULL); memcpy(&td->ts_cache, &td->start, sizeof(td->start)); if (clear_state) { clear_io_state(td, 0); if (o->unlink_each_loop && unlink_all_files(td)) break; } prune_io_piece_log(td); if (td->o.verify_only && td_write(td)) verify_bytes = do_dry_run(td); else { do_io(td, bytes_done); if (!ddir_rw_sum(bytes_done)) { fio_mark_td_terminate(td); verify_bytes = 0; } else { verify_bytes = bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM]; } }

分析do_io函数可以看到IO 提交和收割的过程:

/* * Main IO worker function. It retrieves io_u's to process and queues * and reaps them, checking for rate and errors along the way. * * Returns number of bytes written and trimmed. */ static void do_io(struct thread_data *td, uint64_t *bytes_done) { unsigned int i; int ret = 0; uint64_t total_bytes, bytes_issued = 0; for (i = 0; i < DDIR_RWDIR_CNT; i++) bytes_done[i] = td->bytes_done[i]; if (in_ramp_time(td)) td_set_runstate(td, TD_RAMP); else td_set_runstate(td, TD_RUNNING); lat_target_init(td); total_bytes = td->o.size; /* * Allow random overwrite workloads to write up to io_size * before starting verification phase as 'size' doesn't apply. */ if (td_write(td) && td_random(td) && td->o.norandommap) total_bytes = max(total_bytes, (uint64_t) td->o.io_size); /*

重点关注下面的提交请求的函数:

ret = io_u_submit(td, io_u); if (should_check_rate(td)) td->rate_next_io_time[ddir] = usec_for_io(td, ddir); if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time)) break; /* * See if we need to complete some commands. Note that * we can get BUSY even without IO queued, if the * system is resource starved. */ reap: full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); if (full || io_in_polling(td)) ret = wait_for_completions(td, &comp_time);

其中上面io_u_submit()会调用实际存储引擎注册的io_submit 函数;

wait_for_completions 会调用后端实际存储引擎注册的getevents 函数。(wait_for_completions <--io_u_queued_complete() 循环<--ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete_max, tvp);)io_queue_event()判断当前是否还有没有处理完的io events;

特别要关注的是何时收割event的逻辑:当可用来提交io 请求的空闲槽位都占满了,或者前端有正在执行的polling 操作的时候,就调用注册的存储引擎的get_events函数。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Mybatis中特殊SQL的执行
下一篇:Win11环境下编译Theia源码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~