uv_loop_s是Libuv的核心数据结构,每一个事件循环对应一个uv_loop_s结构体。它记录了整个事件循环中的核心数据。我们来分析每一个字段的意义。
1 用户自定义数据的字段
void* data;
2活跃的handle个数,会影响使用循环的退出
unsigned int active_handles;
3 handle队列,包括活跃和非活跃的
void* handle_queue[2];
4 request个数,会影响事件循环的退出
union { void* unused[2]; unsigned int count; } active_reqs;
5事件循环是否结束的标记
unsigned int stop_flag;
6 Libuv运行的一些标记,目前只有UV_LOOP_BLOCK_SIGPROF,主要是用于epoll_wait的时候屏蔽SIGPROF信号,提高性能,SIGPROF是调操作系统settimer函数设置从而触发的信号
unsigned long flags;
7 epoll的fd
int backend_fd;
8 pending阶段的队列
void* pending_queue[2];
9指向需要在epoll中注册事件的uv__io_t结构体队列
void* watcher_queue[2];
10 watcher_queue队列的节点中有一个fd字段,watchers以fd为索引,记录fd所在的uv__io_t结构体
uv__io_t** watchers;
11 watchers相关的数量,在maybe_resize函数里设置
unsigned int nwatchers;
12 watchers里fd个数,一般为watcher_queue队列的节点数
unsigned int nfds;
13线程池的子线程处理完任务后把对应的结构体插入到wq队列
void* wq[2];
14控制wq队列互斥访问,否则多个子线程同时访问会有问题
uv_mutex_t wq_mutex;
15用于线程池的子线程和主线程通信
uv_async_t wq_async;
16用于读写锁的互斥变量
uv_rwlock_t cloexec_lock;
17 事件循环close阶段的队列,由uv_close产生
uv_handle_t* closing_handles;
18 fork出来的进程队列
void* process_handles[2];
19 事件循环的prepare阶段对应的任务队列
void* prepare_handles[2];
20 事件循环的check阶段对应的任务队列
void* check_handles[2];
21 事件循环的idle阶段对应的任务队列
void* idle_handles[2];
21 async_handles队列,Poll IO阶段执行uv__async_io中遍历async_handles队列处理里面pending为1的节点
void* async_handles[2];
22用于监听是否有async handle任务需要处理
uv__io_t async_io_watcher;
23用于保存子线程和主线程通信的写端fd
int async_wfd;
24保存定时器二叉堆结构
struct {
void* min;
unsigned int nelts;
} timer_heap;
25 管理定时器节点的id,不断叠加
uint64_t timer_counter;
26当前时间,Libuv会在每次事件循环的开始和Poll IO阶段更新当前时间,然后在后续的各个阶段使用,减少对系统调用
uint64_t time;
27用于fork出来的进程和主进程通信的管道,用于子进程收到信号的时候通知主进程,然后主进程执行子进程节点注册的回调
int signal_pipefd[2];
28类似async_io_watcher,signal_io_watcher保存了管道读端fd和回调,然后注册到epoll中,在子进程收到信号的时候,通过write写到管道,最后在Poll IO阶段执行回调
uv__io_t signal_io_watcher;
29 用于管理子进程退出信号的handle
uv_signal_t child_watcher;
30备用的fd
int emfile_fd;
在Libuv中,uv_handle_t类似C++中的基类,有很多子类继承于它,Libuv主要通过控制内存的布局得到继承的效果。handle代表生命周期比较长的对象。例如 1 一个处于active状态的prepare handle,它的回调会在每次事件循环化的时候被执行。 2 一个TCP handle在每次有连接到来时,执行它的回调。
我们看一下uv_handle_t的定义
1 自定义数据,用于关联一些上下文,Node.js中用于关联handle所属的C++对象
void* data;
2 所属的事件循环
uv_loop_t* loop;
3 handle类型
uv_handle_type type;
4 handle调用uv_close后,在closing阶段被执行的回调
uv_close_cb close_cb;
5 用于组织handle队列的前置后置指针
void* handle_queue[2];
6 文件描述符
union {
int fd;
void* reserved[4];
} u;
7 当handle在close队列时,该字段指向下一个close节点
uv_handle_t* next_closing;
8 handle的状态和标记
unsigned int flags;
uv_stream_s是表示流的结构体。除了继承uv_handle_t的字段外,它额外定义下面字段
1 等待发送的字节数
size_t write_queue_size;
2 分配内存的函数
uv_alloc_cb alloc_cb;
3 读取数据成功时执行的回调
uv_read_cb read_cb;
4 发起连接对应的结构体
uv_connect_t *connect_req;
5 关闭写端对应的结构体
uv_shutdown_t *shutdown_req;
6 用于插入epoll,注册读写事件
uv__io_t io_watcher;
7 待发送队列
void* write_queue[2];
8 发送完成的队列
void* write_completed_queue[2];
9 收到连接时执行的回调
uv_connection_cb connection_cb;
10 socket操作失败的错误码
int delayed_error;
11 accept返回的fd
int accepted_fd;
12 已经accept了一个fd,又有新的fd,暂存起来
void* queued_fds;
uv_async_s是Libuv中实现异步通信的结构体。继承于uv_handle_t,并额外定义了以下字段。
1 异步事件触发时执行的回调
uv_async_cb async_cb;
2 用于插入async-handles队列
void* queue[2];
3 async_handles队列中的节点pending字段为1说明对应的事件触发了
int pending;
uv_tcp_s继承uv_handle_s和uv_stream_s。
1 发送字节数
size_t send_queue_size;
2 写队列节点的个数
size_t send_queue_count;
3 分配接收数据的内存
uv_alloc_cb alloc_cb;
4 接收完数据后执行的回调
uv_udp_recv_cb recv_cb;
5 插入epoll里的IO观察者,实现数据读写
uv__io_t io_watcher;
6 待发送队列
void* write_queue[2];
7 发送完成的队列(发送成功或失败),和待发送队列相关
void* write_completed_queue[2];
uv_tty_s继承于uv_handle_t和uv_stream_t。额外定义了下面字段。
1 终端的参数
struct termios orig_termios;
2 终端的工作模式
int mode;
uv_pipe_s继承于uv_handle_t和uv_stream_t。额外定义了下面字段。
1 标记管道是否可用于传递文件描述符
int ipc;
2 用于Unix域通信的文件路径
const char* pipe_fname;
上面三个结构体定义是类似的,它们都继承uv_handle_t,额外定义了两个字段。
1 prepare、check、idle阶段回调
uv_xxx_cb xxx_cb;
2 用于插入prepare、check、idle队列
void* queue[2];
uv_timer_s继承uv_handle_t,额外定义了下面几个字段。
1 超时回调
uv_timer_cb timer_cb;
2 插入二叉堆的字段
void* heap_node[3];
3 超时时间
uint64_t timeout;
4 超时后是否继续开始重新计时,是的话重新插入二叉堆
uint64_t repeat;
5 id标记,用于插入二叉堆的时候对比
uint64_t start_id
uv_process_s继承uv_handle_t,额外定义了
1 进程退出时执行的回调
uv_exit_cb exit_cb;
2 进程id
int pid;
3 用于插入队列,进程队列或者pending队列
void* queue[2];
4 退出码,进程退出时设置
int status;
uv_fs_event_s用于监听文件改动。uv_fs_event_s继承uv_handle_t,额外定义了
1 监听的文件路径(文件或目录)
char* path;
2 文件改变时执行的回调
uv_fs_event_cb cb;
uv_fs_poll_s继承uv_handle_t,额外定义了
1 poll_ctx指向poll_ctx结构体
void* poll_ctx;
struct poll_ctx {
// 对应的handle
uv_fs_poll_t* parent_handle;
// 标记是否开始轮询和轮询时的失败原因
int busy_polling;
// 多久检测一次文件内容是否改变
unsigned int interval;
// 每一轮轮询时的开始时间
uint64_t start_time;
// 所属事件循环
uv_loop_t* loop;
// 文件改变时回调
uv_fs_poll_cb poll_cb;
// 定时器,用于定时超时后轮询
uv_timer_t timer_handle;
// 记录轮询的一下上下文信息,文件路径、回调等
uv_fs_t fs_req;
// 轮询时保存操作系统返回的文件信息
uv_stat_t statbuf;
// 监听的文件路径,字符串的值追加在结构体后面
char path[1]; /* variable length */
};
uv_poll_s继承于uv_handle_t,额外定义了下面字段。
1 监听的fd有感兴趣的事件时执行的回调
uv_poll_cb poll_cb;
2 保存了fd和回调的IO观察者,注册到epoll中
uv__io_t io_watcher;
uv_signal_s继承uv_handle_t,额外定义了以下字段
1 收到信号时的回调
uv_signal_cb signal_cb;
2 注册的信号
int signum;
3 用于插入红黑树,进程把感兴趣的信号和回调封装成uv_signal_s,然后插入到红黑树,信号到来时,进程在信号处理号中把通知写入管道,通知Libuv。Libuv在Poll IO阶段会执行进程对应的回调。红黑树节点的定义如下
struct {
struct uv_signal_s* rbe_left;
struct uv_signal_s* rbe_right;
struct uv_signal_s* rbe_parent;
int rbe_color;
} tree_entry;
4 收到的信号个数
unsigned int caught_signals;
5 已经处理的信号个数
unsigned int dispatched_signals;
在Libuv中,uv_req_s也类似C++基类的作用,有很多子类继承于它,request代表一次请求,比如读写一个文件,读写socket,查询DNS。任务完成后这个request就结束了。request可以和handle结合使用,比如在一个TCP服务器上(handle)写一个数据(request),也可以单独使用一个request,比如DNS查询或者文件读写。我们看一下uv_req_s的定义。
1 自定义数据
void* data;
2 request类型
uv_req_type type;
3 保留字段
void* reserved[6];
uv_shutdown_s用于关闭流的写端,额外定义的字段
1 要关闭的流,比如TCP
uv_stream_t* handle;
2 关闭流的写端后执行的回调
uv_shutdown_cb cb;
uv_write_s表示一次写请求,比如在TCP流上发送数据,额外定义的字段
1 写完后的回调
uv_write_cb cb;
2 需要传递的文件描述符,在send_handle中
uv_stream_t* send_handle;
3 关联的handle
uv_stream_t* handle;
4 用于插入队列
void* queue[2];
5 保存需要写的数据相关的字段(写入的buffer个数,当前写成功的位置等)
unsigned int write_index;
uv_buf_t* bufs;
unsigned int nbufs;
uv_buf_t bufsml[4];
6 写出错的错误码
int error;
uv_connect_s表示发起连接请求,比如TCP连接,额外定义的字段
1 连接成功后执行的回调
uv_connect_cb cb;
2 对应的流,比如tcp
uv_stream_t* handle;
3 用于插入队列
void* queue[2];
uv_udp_send_s表示一次发送UDP数据的请求
1 所属udp的handle,udp_send_s代表一次发送
uv_udp_t* handle;
2 回调
uv_udp_send_cb cb;
3 用于插入待发送队列
void* queue[2];
4 发送的目的地址
struct sockaddr_storage addr;
5 保存了发送数据的缓冲区和个数
unsigned int nbufs;
uv_buf_t* bufs;
uv_buf_t bufsml[4];
6 发送状态或成功发送的字节数
ssize_t status;
7 发送完执行的回调(发送成功或失败)
uv_udp_send_cb send_cb;
uv_getaddrinfo_s表示一次通过域名查询IP的DNS请求,额外定义的字段
1 所属事件循环
uv_loop_t* loop;
2 用于异步DNS解析时插入线程池任务队列的节点
struct uv__work work_req;
3 DNS解析完后执行的回调
uv_getaddrinfo_cb cb;
4 DNS查询的配置
struct addrinfo* hints;
char* hostname;
char* service;
5 DNS解析结果
struct addrinfo* addrinfo;
6 DNS解析的返回码
int retcode;
uv_getnameinfo_s表示一次通过IP查询域名的DNS查询请求,额外定义的字段
1 所属事件循环
uv_loop_t* loop;
2 用于异步DNS解析时插入线程池任务队列的节点
struct uv__work work_req;
3 socket转域名完成的回调
uv_getnameinfo_cb getnameinfo_cb;
4 需要转域名的socket结构体
struct sockaddr_storage storage;
5 指示查询返回的信息
int flags;
6 查询返回的信息
char host[NI_MAXHOST];
char service[NI_MAXSERV];
7 查询返回码
int retcode;
uv_work_s用于往线程池提交任务,额外定义的字段
1 所属事件循环
uv_loop_t* loop;
2 处理任务的函数
uv_work_cb work_cb;
3 处理完任务后执行的函数
uv_after_work_cb after_work_cb;
4封装一个work插入到线程池队列,work_req的work和done函数是对上面work_cb和after_work_cb的封装
struct uv__work work_req;
uv_fs_s表示一次文件操作请求,额外定义的字段
1 文件操作类型
uv_fs_type fs_type;
2 所属事件循环
uv_loop_t* loop;
3文件操作完成的回调
uv_fs_cb cb;
4 文件操作的返回码
ssize_t result;
5 文件操作返回的数据
void* ptr;
6 文件操作路径
const char* path;
7 文件的stat信息
uv_stat_t statbuf;
8 文件操作涉及到两个路径时,保存目的路径
const char *new_path;
9 文件描述符
uv_file file;
10 文件标记
int flags;
11 操作模式
mode_t mode;
12 写文件时传入的数据和个数
unsigned int nbufs;
uv_buf_t* bufs;
13 文件偏移
off_t off;
14 保存需要设置的uid和gid,例如chmod的时候
uv_uid_t uid;
uv_gid_t gid;
15 保存需要设置的文件修改、访问时间,例如fs.utimes的时候
double atime;
double mtime;
16 异步的时候用于插入任务队列,保存工作函数,回调函数
struct uv__work work_req;
17 保存读取数据或者长度。例如read和sendfile
uv_buf_t bufsml[4];
IO观察者是Libuv中的核心概念和数据结构。我们看一下它的定义
1. struct uv__io_s {
2. // 事件触发后的回调
3. uv__io_cb cb;
4. // 用于插入队列
5. void* pending_queue[2];
6. void* watcher_queue[2];
7. // 保存本次感兴趣的事件,在插入IO观察者队列时设置
8. unsigned int pevents;
9. // 保存当前感兴趣的事件
10. unsigned int events;
11. int fd;
12. };
IO观察者封装了文件描述符、事件和回调,然后插入到loop维护的IO观察者队列,在Poll IO阶段,Libuv会根据IO观察者描述的信息,往底层的事件驱动模块注册文件描述符感兴趣的事件。当注册的事件触发的时候,IO观察者的回调就会被执行。我们看如何初IO观察者的一些逻辑。
1. void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
2. // 初始化队列,回调,需要监听的fd
3. QUEUE_INIT(&w->pending_queue);
4. QUEUE_INIT(&w->watcher_queue);
5. w->cb = cb;
6. w->fd = fd;
7. // 上次加入epoll时感兴趣的事件,在执行完epoll操作函数后设置
8. w->events = 0;
9. // 当前感兴趣的事件,在再次执行epoll函数之前设置
10. w->pevents = 0;
11. }
1. void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
2. // 设置当前感兴趣的事件
3. w->pevents |= events;
4. // 可能需要扩容
5. maybe_resize(loop, w->fd + 1);
6. // 事件没有变化则直接返回
7. if (w->events == w->pevents)
8. return;
9. // IO观察者没有挂载在其它地方则插入Libuv的IO观察者队列
10. if (QUEUE_EMPTY(&w->watcher_queue))
11. QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
12. // 保存映射关系
13. if (loop->watchers[w->fd] == NULL) {
14. loop->watchers[w->fd] = w;
15. loop->nfds++;
16. }
17. }
uv__io_start函数就是把一个IO观察者插入到Libuv的观察者队列中,并且在watchers数组中保存一个映射关系。Libuv在Poll IO阶段会处理IO观察者队列。
uv__io_stop修改IO观察者感兴趣的事件,如果还有感兴趣的事件的话,IO观察者还会在队列里,否则移出
1. void uv__io_stop(uv_loop_t* loop,
2. uv__io_t* w,
3. unsigned int events) {
4. if (w->fd == -1)
5. return;
6. assert(w->fd >= 0);
7. if ((unsigned) w->fd >= loop->nwatchers)
8. return;
9. // 清除之前注册的事件,保存在pevents里,表示当前感兴趣的事件
10. w->pevents &= ~events;
11. // 对所有事件都不感兴趣了
12. if (w->pevents == 0) {
13. // 移出IO观察者队列
14. QUEUE_REMOVE(&w->watcher_queue);
15. // 重置
16. QUEUE_INIT(&w->watcher_queue);
17. // 重置
18. if (loop->watchers[w->fd] != NULL) {
19. assert(loop->watchers[w->fd] == w);
20. assert(loop->nfds > 0);
21. loop->watchers[w->fd] = NULL;
22. loop->nfds--;
23. w->events = 0;
24. }
25. }
26. /*
27. 之前还没有插入IO观察者队列,则插入,
28. 等到Poll IO时处理,否则不需要处理
29. */
30. else if (QUEUE_EMPTY(&w->watcher_queue))
31. QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
32. }
uv__handle_init初始化handle的类型,设置REF标记,插入handle队列。
1. #define uv__handle_init(loop_, h, type_)
2. do {
3. (h)->loop = (loop_);
4. (h)->type = (type_);
5. (h)->flags = UV_HANDLE_REF;
6. QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue);
7. (h)->next_closing = NULL
8. }
9. while (0)
uv__handle_start设置标记handle为ACTIVE,如果设置了REF标记,则active handle的个数加一,active handle数会影响事件循环的退出。
1. #define uv__handle_start(h)
2. do {
3. if (((h)->flags & UV_HANDLE_ACTIVE) != 0) break;
4. (h)->flags |= UV_HANDLE_ACTIVE;
5. if (((h)->flags & UV_HANDLE_REF) != 0)
6. (h)->loop->active_handles++;
7. }
8. while (0)
uv__handle_stop和uv__handle_start相反。
1. #define uv__handle_stop(h)
2. do {
3. if (((h)->flags & UV_HANDLE_ACTIVE) == 0) break;
4. (h)->flags &= ~UV_HANDLE_ACTIVE;
5. if (((h)->flags & UV_HANDLE_REF) != 0) uv__active_handle_rm(h);
6. }
7. while (0)
Libuv中handle有REF和ACTIVE两个状态。当一个handle调用xxx_init函数的时候,它首先被打上REF标记,并且插入loop->handle队列。当handle调用xxx_start函数的时候,它被打上ACTIVE标记,并且记录active handle的个数加一。只有REF并且ACTIVE状态的handle才会影响事件循环的退出。
uv__req_init初始化请求的类型,记录请求的个数,会影响事件循环的退出。
1. #define uv__req_init(loop, req, typ)
2. do {
3. (req)->type = (typ);
4. (loop)->active_reqs.count++;
5. }
6. while (0)
请求的个数加一
1. #define uv__req_register(loop, req)
2. do {
3. (loop)->active_reqs.count++;
4. }
5. while (0)
请求个数减一
1. #define uv__req_unregister(loop, req)
2. do {
3. assert(uv__has_active_reqs(loop));
4. (loop)->active_reqs.count--;
5. }
6. while (0)
uv__handle_ref标记handle为REF状态,如果handle是ACTIVE状态,则active handle数加一
1. #define uv__handle_ref(h)
2. do {
3. if (((h)->flags & UV_HANDLE_REF) != 0) break;
4. (h)->flags |= UV_HANDLE_REF;
5. if (((h)->flags & UV_HANDLE_CLOSING) != 0) break;
6. if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_add(h);
7. }
8. while (0)
9. uv__handle_unref
uv__handle_unref去掉handle的REF状态,如果handle是ACTIVE状态,则active handle数减一
1. #define uv__handle_unref(h)
2. do {
3. if (((h)->flags & UV_HANDLE_REF) == 0) break;
4. (h)->flags &= ~UV_HANDLE_REF;
5. if (((h)->flags & UV_HANDLE_CLOSING) != 0) break;
6. if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_rm(h);
7. }
8. while (0)