uloop是OpenWrt基础库libubox的核心模块,提供事件驱动功能。

其主体框架由uloop_init、uloop_run_timeout和uloop_done三个函数构成,分别完成初始化、事件处理循环和清理工作。

uloop支持五大核心功能:

  1. 文件描述符监控基于epoll实现,通过epoll_wait处理就绪事件;
  2. 定时器事件在循环中检测超时;
  3. 子进程管理响应SIGCHLD信号;
  4. 信号处理通过管道实现;
  5. 间隔定时器依赖文件描述符机制。

主体框架

uloop循环的主体框架有三个函数构成,如下所示:

int uloop_init(void);
int uloop_run_timeout(int timeout);
static inline int uloop_run(void)
{
	return uloop_run_timeout(-1);
}
void uloop_done(void);

在这个框架中,文件描述符事件、定时器事件以及子进程管理事件均在uloop_run_timeout函数中处理,而信号处理事件、间隔定时器事件依赖于文件描述符事件。

int uloop_init(void)
{
	if (uloop_init_pollfd() < 0)
		return -1;

	if (waker_init() < 0) {
		uloop_done();
		return -1;
	}

	uloop_setup_signals(true);

	return 0;
}

从实现上看,uloop_init函数主要实现了三点功能:

  1. epoll的初始化
  2. 信号处理管道的初始化
  3. 设置SIGINTSIGTERMSIGCHLD等内置处理
int uloop_run_timeout(int timeout)
{
	int next_time = 0;

	uloop_run_depth++;

    // 收到SIGINT或SIGTERM信号时会设置uloop_status=signo uloop_cancelled=true
	uloop_status = 0;
	uloop_cancelled = false;
	do {
		// 处理定时器事件
		// 每次都会检测是否有定时器事件超时
		uloop_process_timeouts();

		// do_sigchld表示是否收到SIGCHLD信号,然后处理进程事件
		if (do_sigchld)
			// 只有收到SIGCHLD信号才会处理进程事件
			uloop_handle_processes();

		if (uloop_cancelled)
			break;

		next_time = uloop_get_next_timeout();
		if (timeout >= 0 && (next_time < 0 || timeout < next_time))
				next_time = timeout;
		// 处理文件描述符事件
		// 依靠定时器事件中的所需的事件作为epoll_wait的等待事件,如果没有定时器事件则会传入-1永远等待,而不是一直循环消耗CPU
		uloop_run_events(next_time);
	} while (!uloop_cancelled && timeout < 0);

	--uloop_run_depth;

	return uloop_status;
}

uloop_run_timeout函数中可以看到该函数是在do while循环中依次处理超时定时器事件、子进程管理事件以及文件描述符事件。

uloop_run_depth用于表示循环是否在执行中,uloop_status会在收到SIGINTSIGTERM信号时被设置为signo,与此同时uloop_cancelled会被设置为true,表示要终止循环。

void uloop_done(void)
{
	uloop_setup_signals(false);

	if (poll_fd >= 0) {
		close(poll_fd);
		poll_fd = -1;
	}

	if (waker_pipe >= 0) {
		uloop_fd_delete(&waker_fd);
		close(waker_pipe);
		close(waker_fd.fd);
		waker_pipe = -1;
	}

	uloop_clear_timeouts();
	uloop_clear_processes();
}

uloop_done函数中,则是进行一些清理动作

  1. uloop_setup_signals函数传入false,恢复原来的信号处理逻辑
  2. 删除信号处理管道
  3. 清理超时定时器事件、子进程处理事件链表。

文件描述符事件

uloop的文件描述符事件是基于epoll的,通过epoll_wait返回就绪的文件描述符数目和事件,然后在进行遍历处理。

文件描述符事件相关的接口如下:

struct uloop_fd
{
	uloop_fd_handler cb;
	int fd;
	bool eof;
	bool error;
	// registered字段用来标识是否已经注册到poll中
	bool registered;
	uint8_t flags;
};

// 回调函数原型
typedef void (*uloop_fd_handler)(struct uloop_fd *u, unsigned int events);

// 添加或删除文件描述符事件
int uloop_fd_add(struct uloop_fd *sock, unsigned int flags);
int uloop_fd_delete(struct uloop_fd *sock);

初始化

首先是在uloop_init中初始化epoll,函数实现如下所示:

static int uloop_init_pollfd(void)
{
	if (poll_fd >= 0)
		return 0;

	poll_fd = epoll_create(32);
	if (poll_fd < 0)
		return -1;

	fcntl(poll_fd, F_SETFD, fcntl(poll_fd, F_GETFD) | FD_CLOEXEC);
	return 0;
}

uloop_init_pollfd中执行了两个操作

  1. 创建epoll文件描述符,最多监控32个文件描述符
  2. 设置close-on-exec标志

添加描述符事件

epoll初始化完成之后,接下来就是用户添加文件描述符事件了,uloop对外暴露的函数是uloop_fd_add,其实现逻辑如下:

int uloop_fd_add(struct uloop_fd *sock, unsigned int flags)
{
	unsigned int fl;
	int ret;

	if (!(flags & (ULOOP_READ | ULOOP_WRITE)))
		// 非读非写,删除描述符
		return uloop_fd_delete(sock);

	if (!sock->registered && !(flags & ULOOP_BLOCKING)) {
		// 未注册并且非阻塞
		fl = fcntl(sock->fd, F_GETFL, 0);
		fl |= O_NONBLOCK; // 设置非阻塞
		fcntl(sock->fd, F_SETFL, fl);
	}

	// register_poll中使用epoll_ctl将文件描述符加入到epoll等待队列
	ret = register_poll(sock, flags);
	if (ret < 0)
		goto out;

	// 设置用于添加、删除文件描述事件的回调
	if (uloop_fd_set_cb)
		uloop_fd_set_cb(sock, flags);

	sock->flags = flags;
	sock->registered = true;
	sock->eof = false;
	sock->error = false;

out:
	return ret;
}

static int register_poll(struct uloop_fd *fd, unsigned int flags)
{
	struct epoll_event ev;
	int op = fd->registered ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;

	memset(&ev, 0, sizeof(struct epoll_event));

    // EPOLLRDHUP用于接收对端关闭的事件通知
	if (flags & ULOOP_READ)
		ev.events |= EPOLLIN | EPOLLRDHUP;

	if (flags & ULOOP_WRITE)
		ev.events |= EPOLLOUT;

	// 是否设置边沿触发
	// 设置边沿触发,收到事件但是未处理,之后事件不会再进行通知
	// 水平触发则是收到事件但是未处理,则会反复通知
	if (flags & ULOOP_EDGE_TRIGGER)
		ev.events |= EPOLLET;

	ev.data.ptr = fd;

	return epoll_ctl(poll_fd, op, fd->fd, &ev);
}

uloop_fd_add函数中,首先会判断添加文件描述符的标志是否为ULOOP_READ|ULOOP_WRITE,如果不是则会调用uloop_fd_delete删除该文件描述符事件。

其次改变文件描述符的默认阻塞状态,如果文件描述符未注册,并且也未传入阻塞标志,则设置为非阻塞状态。正常情况下,系统创建的文件描述符是阻塞的。

接下来调用register_poll将文件描述符注册到epoll事件中,如果之前已经注册,则对其进行修改。

最后判断是否有设置文件描述符事件添加、删除回调(通过接口uloop_fd_set_cb),如果有设置,则执行该回调。

执行逻辑

文件描述符事件和定时器超时事件、子进程管理事件均是在uloop_run_timeout函数中进行处理的。

下面是uloop_run_timeout函数的实现:

int uloop_run_timeout(int timeout)
{
	int next_time = 0;

	uloop_run_depth++;

    // 收到SIGINT或SIGTERM信号时会设置uloop_status=signo uloop_cancelled=true
	uloop_status = 0;
	uloop_cancelled = false;
	do {
		// 处理定时器事件
		// 每次都会检测是否有定时器事件超时
		uloop_process_timeouts();

		// do_sigchld表示是否收到SIGCHLD信号,然后处理进程事件
		if (do_sigchld)
			// 只有收到SIGCHLD信号才会处理进程事件
			uloop_handle_processes();

		if (uloop_cancelled)
			break;

		next_time = uloop_get_next_timeout();
		if (timeout >= 0 && (next_time < 0 || timeout < next_time))
				next_time = timeout;
		// 处理文件描述符事件
		// 依靠定时器事件中的所需的事件作为epoll_wait的等待事件,如果没有定时器事件则会传入-1永远等待,而不是一直循环消耗CPU
		uloop_run_events(next_time);
	} while (!uloop_cancelled && timeout < 0);

	--uloop_run_depth;

	return uloop_status;
}

// 该函数用于确定epoll_wait的等待时间
// 如果超时事件链表timeouts为空,则返回-1,会使epoll_wait阻塞,持续等待
// 如果超时事件链表不为空,则去获取第一个超时事件,判断是否超时:
// 1. 如果超时,则返回0,epoll_wait不阻塞直接返回
// 2. 如果未超时,则返回INT_MAX,用来设置epoll_wait的等待时间
int uloop_get_next_timeout(void)
{
	struct uloop_timeout *timeout;
	struct timeval tv;
	int64_t diff;

	if (list_empty(&timeouts))
		// 如果没有超时事件,则返回-1,导致epoll_wait持续等待
		return -1;

	uloop_gettime(&tv);

	// 如果有超时事件,并且已经超时,则返回0,如果没有超时则返回INT_MAX
	timeout = list_first_entry(&timeouts, struct uloop_timeout, list);
	diff = tv_diff(&timeout->time, &tv);
	if (diff < 0)
		// 当前时间大于定时器中设置的时间,定时器事件已经超时
		return 0;
	if (diff > INT_MAX)
		return INT_MAX;

	return diff;
}

uloop_run_timeout函数中有两个和文件描述符事件相关的操作:

  1. 通过uloop_get_next_timeout函数获取等待时间,该时间用于epoll_wait等待。
  2. 通过uloop_run_events函数具体获取就绪的文件描述符状态并处理执行回调。

uloop_get_next_timeout函数中执行以下操作:

  1. 判断超时定时器事件链表是否为空,如果为空则返回-1。
  2. 获取超时定时器链表中的第一个定时器事件的剩余时间,如果已超时则返回0,否则返回剩余时间。
// 保存文件描述符和对应的事件
struct uloop_fd_event {
	struct uloop_fd *fd;
	unsigned int events;
};

// 用来保存所有描述符事件的链表
struct uloop_fd_stack {
	struct uloop_fd_stack *next;
	struct uloop_fd *fd;
	unsigned int events;
};

static struct uloop_fd_stack *fd_stack = NULL;

#define ULOOP_MAX_EVENTS 10

// cur_fds中记录epoll返回的就绪的文件描述符事件
// cur_nfds记录就绪的文件描述符状态
// cur_fd记录正在处理的文件描述符索引
static struct uloop_fd_event cur_fds[ULOOP_MAX_EVENTS];
static int cur_fd, cur_nfds;

// 使用cur_nfds判断是否有上次未处理完的已就绪的文件描述符,如果有未处理完的则会继续处理,而非获取新的
// 调用uloop_fetch_events获取新的已就绪的文件描述符,cur_nfds用来标识已就绪的文件描述符数
// uloop_fetch_events中对于每个已就绪的文件描述符,其对应的事件保存在cur_fds[]数组中,最大能够保存10个就绪事件
// cur_fd是已就绪文件描述cur_fds[]索引,每次获取到已就绪文件描述符,会将其重置为0
// 在while循环中从cur_fds中以此取出就绪的文件描述符,在do while循环中以此取出文件描述符对应的每个事件,执行对应的fd->cb()
// uloop_fd_stack_event函数的作用在于避免递归调用过程中多次执行fd->cb(),当第一次调用之后就会将文件描述符的事件信息入栈,如果fd->cb()中再次调用uloop_run_events,第二次就会返回true然后跳过
static void uloop_run_events(int64_t timeout)
{
	struct uloop_fd_event *cur;
	struct uloop_fd *fd;

	if (!cur_nfds) {
		cur_fd = 0;
		// 获取就绪的文件描述符,timeout用于表示epoll_wait的阻塞事件,
		// 从该参数的来源可以看到,如果没有定时器事件就取-1,表示永远阻塞
		// 如果有定时器事件,就判断定时器事件列表中的第一个事件,是否超时,如果已超时就取0,表示立即返回不等待。如果未超时,就取所需时间作为epoll_wait的等待时间
		cur_nfds = uloop_fetch_events(timeout);
		if (cur_nfds < 0)
			cur_nfds = 0;
	}

	while (cur_nfds > 0) {
		struct uloop_fd_stack stack_cur;
		unsigned int events;

		cur = &cur_fds[cur_fd++];
		cur_nfds--;

		fd = cur->fd;
		events = cur->events;
		if (!fd)
			continue;

		if (!fd->cb)
			continue;

		// 第一次进入之后这个函数发现栈为空,返回false,然后下面将fd事件入栈
		// 如果在fd->cb()中再次调用uloop_run_events,第二次执行到uloop_fd_stack_event,会保留第二次的新事件(通过传入的cur->events)并标记事件为缓存状态,返回true,然后跳过下面的部分,使fd->cb()退出。
		// 但是如果第二次的事件中包含ULOOP_EVENT_MASK,那么在fd->cb()退出之后,会发现还存在待处理的events,会在do while中再次循环处理
		// 如果没有这个处理,则会再次执行fd->cb()
		if (uloop_fd_stack_event(fd, cur->events))
			continue;

		// fd_stack 入栈(链表头部插入)
		stack_cur.next = fd_stack;
		stack_cur.fd = fd;
		fd_stack = &stack_cur;
		do {
			stack_cur.events = 0;
			fd->cb(fd, events);
			// 如果第二次触发的事件中包含ULOOP_READ或ULOOP_WRITE,那么stack_cur.events则为ULOOP_READ|ULOOP_WRITE|ULOOP_EVENT_BUFFERED,
			// 这将会导致再次进入循环执行一次回调
			// 如果没有ULOOP_EVENT_MASK时则不会再次执行回调,满足避免递归执行的要求
			events = stack_cur.events & ULOOP_EVENT_MASK;
		} while (stack_cur.fd && events);
		// 出栈
		fd_stack = stack_cur.next;

		return;
	}
}
static int uloop_fetch_events(int timeout)
{
	int n, nfds;

	// 返回就绪的文件描述符数,超时返回0
	// timeout -1无限期阻塞,0立即返回
	nfds = epoll_wait(poll_fd, events, ARRAY_SIZE(events), timeout);
	for (n = 0; n < nfds; ++n) {
		struct uloop_fd_event *cur = &cur_fds[n];
		struct uloop_fd *u = events[n].data.ptr;
		unsigned int ev = 0;

		cur->fd = u;
		if (!u)
			continue;

		if (events[n].events & (EPOLLERR|EPOLLHUP)) {
			u->error = true;
			if (!(u->flags & ULOOP_ERROR_CB))
				uloop_fd_delete(u);
		}

		if(!(events[n].events & (EPOLLRDHUP|EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP))) {
			cur->fd = NULL;
			continue;
		}

		if(events[n].events & EPOLLRDHUP)
			u->eof = true;

		if(events[n].events & EPOLLIN)
			ev |= ULOOP_READ;

		if(events[n].events & EPOLLOUT)
			ev |= ULOOP_WRITE;

		cur->events = ev;
	}

	return nfds;
}
static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
{
	struct uloop_fd_stack *cur;

	/*
	 * Do not buffer events for level-triggered fds, they will keep firing.
	 * Caller needs to take care of recursion issues.
	 */
	
	// 如果是水平触发,则返回false,因为事件反复通知是正常情况
	// 如果是边沿触发,则需要缓存事件,防止多次调用事件描述符回调函数
	if (!(fd->flags & ULOOP_EDGE_TRIGGER))
		return false;

	// 从栈顶往下遍历
	for (cur = fd_stack; cur; cur = cur->next) {
		if (cur->fd != fd)
			continue;

		if (events < 0)
			cur->fd = NULL;
		else
			// 和传入的fd相等,标记事件为缓存状态
			// 保存触发的事件和缓存标志
			// 如果触发的事件events中包含读写事件(ULOOP_EVENT_MASK),则仍旧会执行回调
			cur->events |= events | ULOOP_EVENT_BUFFERED;

		return true;
	}

	return false;
}

uloop_run_events函数中接收一个timeout参数,该参数被用来传递给uloop_fetch_events函数,最终传递给epoll_wait函数,用于指示epoll_wait函数的动作。

uloop_get_next_timeout函数的执行逻辑可以看到

  1. timeout为-1时,表示超时定时器事件链表为空,此时epoll_wait就会永远等待,此时uloop_run_timeout中的循环就会阻塞,不会浪费CPU资源。
  2. 而当超时定时器事件列表不为空时,就会等待指定时间(这里的指定时间指的是超时定时器链表中第一个定时器事件的剩余时间)以便在uloop_run_timeout函数的循环中及时处理超时定时器事件。

uloop_run_events函数中可以看到

  1. 首先通过cur_nfds变量判断是否还有未处理的已就绪的文件描述符时间,如果有则直接执行循环先进行处理,否则就调用uloop_fetch_events函数获取新的就绪的文件描述符,并将其保存在cur_fds数组中,就绪的文件描述符数量则保存在cur_nfds变量中。
  2. 然后在while循环中逐个处理文件描述符事件,其中cur_fd表示文件描述符事件数组cur_fds数组的索引。
  3. 最后会调用fd->cb()执行文件描述符事件回调。
  4. 至于uloop_fd_stack结构体和uloop_fd_stack_event函数则是一个很有意思的操作,下面详细解释其作用。

uloop_fd_stack_event函数机制

下面是uloop_fd_stack结构体和uloop_fd_stack_event函数的相关提交:

commit b9ebdbcc648274cc630b6349374f9fb21e53f396
Author: Felix Fietkau <nbd@openwrt.org>
Date:   Tue Jun 18 12:01:08 2013 +0200

    uloop: fix corner cases with recursive uloop_run calls

    With multiple recursive calls to uloop_run, the callback for the same fd
    can be run multiple times from different levels in the stack.
    Prevent this by tracking the stack of uloop_fd callbacks and buffering new
    incoming events for fds already on the stack.

    Signed-off-by: Felix Fietkau <nbd@openwrt.org>

diff --git a/uloop.c b/uloop.c
index bf13199..54ebe8d 100644
--- a/uloop.c
+++ b/uloop.c
@@ -43,6 +43,14 @@ struct uloop_fd_event {
        unsigned int events;
 };

+struct uloop_fd_stack {
+       struct uloop_fd_stack *next;
+       struct uloop_fd *fd;
+       unsigned int events;
+};
+
+static struct uloop_fd_stack *fd_stack = NULL;
+
 #define ULOOP_MAX_EVENTS 10

 static struct list_head timeouts = LIST_HEAD_INIT(timeouts);
@@ -285,6 +293,32 @@ static int uloop_fetch_events(int timeout)

 #endif

+static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
+{
+       struct uloop_fd_stack *cur;
+
+       /*
+        * Do not buffer events for level-triggered fds, they will keep firing.
+        * Caller needs to take care of recursion issues.
+        */
+       if (!(fd->flags & ULOOP_EDGE_TRIGGER))
+               return false;
+
+       for (cur = fd_stack; cur; cur = cur->next) {
+               if (cur->fd != fd)
+                       continue;
+
+               if (events < 0)
+                       cur->fd = NULL;
+               else
+                       cur->events |= events | ULOOP_EVENT_BUFFERED;
+
+               return true;
+       }
+
+       return false;
+}
+
 static void uloop_run_events(int timeout)
 {
        struct uloop_fd_event *cur;
@@ -298,17 +332,33 @@ static void uloop_run_events(int timeout)
        }

        while (cur_nfds > 0) {
+               struct uloop_fd_stack stack_cur;
+               unsigned int events;
+
                cur = &cur_fds[cur_fd++];
                cur_nfds--;

                fd = cur->fd;
+               events = cur->events;
                if (!fd)
                        continue;

                if (!fd->cb)
                        continue;

-               fd->cb(fd, cur->events);
+               if (uloop_fd_stack_event(fd, cur->events))
+                       continue;
+
+               stack_cur.next = fd_stack;
+               stack_cur.fd = fd;
+               fd_stack = &stack_cur;
+               do {
+                       stack_cur.events = 0;
+                       fd->cb(fd, events);
+                       events = stack_cur.events & ULOOP_EVENT_MASK;
+               } while (stack_cur.fd && events);
+               fd_stack = stack_cur.next;
+
                return;
        }
 }
@@ -352,6 +402,7 @@ int uloop_fd_delete(struct uloop_fd *fd)
                cur_fds[cur_fd + i].fd = NULL;
        }
        fd->registered = false;
+       uloop_fd_stack_event(fd, -1);
        return __uloop_fd_delete(fd);
 }

diff --git a/uloop.h b/uloop.h
index 39b9b58..98dd818 100644
--- a/uloop.h
+++ b/uloop.h
@@ -44,8 +44,13 @@ typedef void (*uloop_process_handler)(struct uloop_process *c, int ret);
 #define ULOOP_WRITE            (1 << 1)
 #define ULOOP_EDGE_TRIGGER     (1 << 2)
 #define ULOOP_BLOCKING         (1 << 3)
+
+#define ULOOP_EVENT_MASK       (ULOOP_READ | ULOOP_WRITE)
+
+/* internal flags */
+#define ULOOP_EVENT_BUFFERED   (1 << 4)
 #ifdef USE_KQUEUE
-#define ULOOP_EDGE_DEFER       (1 << 4)
+#define ULOOP_EDGE_DEFER       (1 << 5)
 #endif

 struct uloop_fd

考虑一下这种场景,在没有引入这次提交之前,在uloop_run_events函数中会直接执行文件描述符事件的回调。如果仅注册了一个文件描述符事件并且该描述符事件回调中调用了uloop_run函数,假设该文件描述符已就绪(cur_nfds为1),在while循环中调用回调之前会执行cur_nfds--,此时cur_nfds为0(满足接收该文件描述符新就绪事件的条件),在执行的该文件描述符事件回调的过程中又执行到uloop_run_events函数,此时存在两种情况,一是收到该文件描述符上的新就绪事件,二是没有收到该文件描述符上的就绪事件。

  1. 对于情况一,在上次文件描述符事件的回调还没有执行完,这次又进入到回调中,很可能造成问题(取决于回调函数的处理逻辑)。
  2. 对于情况二,uloop_run_envents函数会直接返回,影响不大。

从提交记录中可以看到,引入的这个操作就是用来解决在递归调用uloop_run函数(例如在回调函数中调用)时,文件描述符事件回调会被多次调用的情况。

对于上面的情况一,由于第一次收到就绪事件时已经将该文件描述符入栈(不包含该文件描述符就绪状态的事件,并且在执行回调之前也已清空stack_cur.events = 0;),因此此次可以从栈中查询到并返回true,跳过这次循环。那么问题来了,如果这次收到的就绪事件是正常的需要处理的读写事件呢,跳过之后可能就会发生遗漏(例如EPOLL的边沿触发,无论是否处理事件,仅会通知一次)。

对于这种情况的处理方式则是在跳过这次循环,在当前文件描述符回调执行结束之后,再次判断该文件描述符的就绪状态中是否有待处理的读写事件(因为第二次执行uloop_fd_stack_event时将新的就绪事件events和缓存标志ULOOP_EVENT_BUFFERED均赋值给当前文件描述符,即cur->events |= events | ULOOP_EVENT_BUFFERED;),如果有则在do while循环中再次调用回调,并且在调用回调之前将栈中该文件描述符的事件清空,用来接收下次就绪状态的事件。

uloop_fd_stack_event函数中

  1. 判断文件描述符标志是否为水平触发,如果是水平触发则不能阻止执行回调,因为有正常的未处理的就绪文件描述符事件。epoll的水平触发逻辑是如果收到就绪的文件描述符事件但未处理,那么后续会一直进行事件通知。
  2. 接下来从栈中查找是否有正在执行的文件描述符事件,如果有则设置标志ULOOP_EVENT_BUFFERED,并返回true

销毁

文件描述符事件的销毁则容易的多,是通过uloop_fd_delete函数来执行。

int uloop_fd_delete(struct uloop_fd *fd)
{
	int ret;
	int i;

	// 从描述符事件数组中删除对应的描述符指向的事件
	for (i = 0; i < cur_nfds; i++) {
		if (cur_fds[cur_fd + i].fd != fd)
			continue;

		cur_fds[cur_fd + i].fd = NULL;
	}

	if (!fd->registered)
		// 未注册的话直接返回
		return 0;

	if (uloop_fd_set_cb)
		uloop_fd_set_cb(fd, 0);

	fd->registered = false;
	// 从栈中清除文件描述符事件
	uloop_fd_stack_event(fd, -1);
	// 从epoll表中清除
	ret = __uloop_fd_delete(fd);
	fd->flags = 0;

	return ret;
}

static int __uloop_fd_delete(struct uloop_fd *sock)
{
	sock->flags = 0;
	return epoll_ctl(poll_fd, EPOLL_CTL_DEL, sock->fd, 0);
}

该函数的处理逻辑如下:

  1. 判断当前文件描述符是否处于就绪状态,如果是则从保存处于就绪状态的文件描述符事件数组cur_fds[]中删除。
  2. 如果该文件描述符没有注册到epoll中则直接返回,如果已经注册到epoll中,则下面通过__uloop_fd_delete函数从epoll中删除。
  3. 如果有设置文件描述符事件添加、删除回调,则执行回调。
  4. 通过向uloop_fd_stack_event函数传入-1参数,从保存文件描述符事件的栈中删除对应的条目。

定时器事件

现在uloop支持两种定时器事件,分别是超时定时器和间隔定时器,以下是这两种定时器事件的接口

struct uloop_timeout
{
	struct list_head list;
	// pending表示是否已经在等待中
	bool pending;

	uloop_timeout_handler cb;
	// 超时时间,会使用当前时间与这个时间对比,判断是否超时
	struct timeval time;
};
// 获取最近的定时器事件的剩余时间,如果不存在定时器事件则返回-1
int uloop_get_next_timeout(void);
int uloop_timeout_add(struct uloop_timeout *timeout);
int uloop_timeout_set(struct uloop_timeout *timeout, int msecs);
int uloop_timeout_cancel(struct uloop_timeout *timeout);
int uloop_timeout_remaining(struct uloop_timeout *timeout) __attribute__((deprecated("use uloop_timeout_remaining64")));
// 获取指定定时器事件的剩余时间
int64_t uloop_timeout_remaining64(struct uloop_timeout *timeout);


// 间隔定时器事件
struct uloop_interval
{
	uloop_interval_handler cb;
	// 表示定时器的累计超时次数
	uint64_t expirations;

	union {
		struct uloop_fd ufd;
		struct {
			int64_t fired;
			unsigned int msecs;
		} time;
	} priv;
};
int uloop_interval_set(struct uloop_interval *timer, unsigned int msecs);
int uloop_interval_cancel(struct uloop_interval *timer);
// 获取定时器距离下次超时的剩余时间
int64_t uloop_interval_remaining(struct uloop_interval *timer);

这两种定时器的实现方式很不一样,下面分别介绍一下这两种定时器。

超时定时器

这种定时器的描述结构为struct uloop_timeout,从结构中基本就可以看出,这种定时器是保存在一个全局链表static struct list_head timeouts = LIST_HEAD_INIT(timeouts);中。

下面给出添加、设置以及取消定时器事件的处理代码:

static int64_t tv_diff(struct timeval *t1, struct timeval *t2)
{
	return
		(t1->tv_sec - t2->tv_sec) * 1000 +
		(t1->tv_usec - t2->tv_usec) / 1000;
}

int uloop_timeout_add(struct uloop_timeout *timeout)
{
	struct uloop_timeout *tmp;
	struct list_head *h = &timeouts;

	if (timeout->pending)
		return -1;

	list_for_each_entry(tmp, &timeouts, list) {
		// 链表的超时时间按照从小到达排列,找出插入当前超时事件的位置
		// 当前节点的超时时间大于新插入超时事件的时间时退出循环
		if (tv_diff(&tmp->time, &timeout->time) > 0) {
			h = &tmp->list;
			break;
		}
	}

	// 将timeout事件插入到h指向节点的前面
	list_add_tail(&timeout->list, h);
	timeout->pending = true;

	return 0;
}

static void uloop_gettime(struct timeval *tv)
{
	struct timespec ts;

	clock_gettime(CLOCK_MONOTONIC, &ts);
	tv->tv_sec = ts.tv_sec;
	tv->tv_usec = ts.tv_nsec / 1000;
}

int uloop_timeout_set(struct uloop_timeout *timeout, int msecs)
{
	struct timeval *time = &timeout->time;

	if (timeout->pending)
		uloop_timeout_cancel(timeout);

	// 获取当前时间
	uloop_gettime(time);

	// 设置超时时间
	time->tv_sec += msecs / 1000;
	time->tv_usec += (msecs % 1000) * 1000;

	if (time->tv_usec > 1000000) {
		time->tv_sec++;
		time->tv_usec -= 1000000;
	}

	// 添加超时事件
	return uloop_timeout_add(timeout);
}

int uloop_timeout_cancel(struct uloop_timeout *timeout)
{
	if (!timeout->pending)
		return -1;

	// 双向链表移除只需要当前节点即可
	list_del(&timeout->list);
	timeout->pending = false;

	return 0;
}

uloop_timeout_add函数用来添加定时器,其中定时器的超时时间已经保存在struct uloop_timeout结构体中并随参数传入,然后在定时器链表中按照剩余事件从小到大的顺序找到第一个大于新定时器事件的位置,并在该位置插入新定时器事件。

uloop_timeout_set函数也用于设置定时器,但是由于传入的是超时的毫秒数,因此需要先计算出超时时间,然后再调用uloop_timeout_add插入到定时器链表中。

uloop_timeout_cancel函数用于从定时器链表中删除指定的定时器事件。

uloop_timeout_remaining64函数用于获取指定定时器事件的剩余时间:

int64_t uloop_timeout_remaining64(struct uloop_timeout *timeout)
{
	struct timeval now;

	if (!timeout->pending)
		return -1;

	uloop_gettime(&now);

	return tv_diff(&timeout->time, &now);
}

执行流程

超时定时器事件是在uloop_run_timeout函数的do while函数中通过调用uloop_process_timeouts函数来执行的,下面是uloop_process_timeouts函数的处理逻辑:

  1. 判断定时器事件链表是否为空,如果为空则直接返回。
  2. 从定时器事件链表中取出第一个定时器事件,并判断是否超时,如果超时则通过uloop_timeout_cancel函数从该链表中删除(这里可以看出如果需要再次调用定时器,则需要在回调中再次进行设置),并执行回调;如果未超时则直接返回,因为定时器链表是按照超时时间从小到大的顺序排列。
// 处理超时事件
static void uloop_process_timeouts(void)
{
	struct uloop_timeout *t;
	struct timeval tv;

	// 判断超时事件列表是否为空
	if (list_empty(&timeouts))
		return;

	// 获取当前事件
	uloop_gettime(&tv);
	while (!list_empty(&timeouts)) {
		t = list_first_entry(&timeouts, struct uloop_timeout, list);

		// 判断第一个超时事件的时间是否已到达
		if (tv_diff(&t->time, &tv) > 0)
			break;

		// 如果已到达,则取消该事件
		uloop_timeout_cancel(t);
		// 如果已设置回调,则调用回调
		if (t->cb)
			t->cb(t);
	}
}

间隔定时器

间隔定时器使用struct uloop_interval来描述,该定时器主要使用标准C提供的定时器接口。

#include <sys/timerfd.h>

int timerfd_create(int clockid, int flags);

int timerfd_settime(int fd, int flags,
                    const struct itimerspec *new_value,
                    struct itimerspec *_Nullable old_value);
int timerfd_gettime(int fd, struct itimerspec *curr_value);

该定时器的设置、取消以及获取指定定时器事件的剩余时间接口实现如下:

int uloop_interval_set(struct uloop_interval *timer, unsigned int msecs)
{
	return timer_register(timer, msecs);
}

int uloop_interval_cancel(struct uloop_interval *timer)
{
	return timer_remove(timer);
}

int64_t uloop_interval_remaining(struct uloop_interval *timer)
{
	return timer_next(timer);
}
static int timer_register(struct uloop_interval *tm, unsigned int msecs)
{
	// 判断是否已经注册在文件描述符事件中
	if (!tm->priv.ufd.registered) {
		// TFD_CLOEXEC和TFD_NONBLOCK用于对定时器文件描述符设置close-on-exec和非阻塞标志
		// 创建定时器文件描述符
		int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);

		if (fd == -1)
			return -1;

		tm->priv.ufd.fd = fd;
		tm->priv.ufd.cb = dispatch_timer;
	}

	// itmerspec用于定时器的结构体
	// 用于指定定时器的第一次启动时间和后续的周期触发间隔
	struct itimerspec spec = {
		// 第一次执行时间,为msec指定的时间,表示msec毫秒之后执行
		.it_value = {
			.tv_sec = msecs / 1000,
			.tv_nsec = (msecs % 1000) * 1000000
		},
		// 周期触发间隔,也为msec指定的时间,表示每隔msec毫秒执行一次
		.it_interval = {
			.tv_sec = msecs / 1000,
			.tv_nsec = (msecs % 1000) * 1000000
		}
	};

	// int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
	// timerfd_settime函数用于创建定时器,会在指定的时间之后通过文件描述符通知
	// 参数如下:
	// fd:使用timerfd_create创建的文件描述符
	// flag:0表示相对时间,1表示绝对时间
	// new_value:上面的itemrspec结构体,用于指定定时器的第一次启动时间和后续的周期触发间隔
	// old_value:如果不为NULL,则返回定时器之前设置的超时时间
	// 定时器超时后会向文件描述符中写入一个超时次数
	if (timerfd_settime(tm->priv.ufd.fd, 0, &spec, NULL) == -1)
		goto err;

	// 将该定时器事件添加到文件描述符事件中
	if (uloop_fd_add(&tm->priv.ufd, ULOOP_READ) == -1)
		goto err;

	return 0;

err:
	uloop_fd_delete(&tm->priv.ufd);
	close(tm->priv.ufd.fd);
	memset(&tm->priv.ufd, 0, sizeof(tm->priv.ufd));

	return -1;
}

static int timer_remove(struct uloop_interval *tm)
{
	// 先从epoll_wail等待文件描述中去除
	int ret = __uloop_fd_delete(&tm->priv.ufd);

	if (ret == 0) {
		close(tm->priv.ufd.fd);
		memset(&tm->priv.ufd, 0, sizeof(tm->priv.ufd));
	}

	return ret;
}

// 获取间隔定时器设置的超时时间
static int64_t timer_next(struct uloop_interval *tm)
{
	struct itimerspec spec;

	if (!tm->priv.ufd.registered)
		return -1;

	// timerfd_gettime用来获取定时器距离下次超时的剩余时间
	if (timerfd_gettime(tm->priv.ufd.fd, &spec) == -1)
		return -1;

	return spec.it_value.tv_sec * 1000 + spec.it_value.tv_nsec / 1000000;
}

在上面的uloop_interval_set函数中调用了timer_register函数,该函数中通过标准C提供的timerfd_create创建文件描述符,并将其与dispatch_timer函数一起赋值给文件描述符事件结构体(dispatch_timer作为文件描述符事件回调),随后通过timerfd_settime设置间隔提醒事件,最后添加到文件描述符事件处理流程中。

timer_remove函数在删除该间隔定时器事件是先从epoll中取消,随后关闭文件描述符。

time_next函数返回指定间隔定时器的剩余事件。

接下来了解以下,当间隔定时器超时被触发之后的处理逻辑:

// 通过文件描述符事件通知该定时器文件描述符是否准备就绪,如果就绪就会调用该回调
static void dispatch_timer(struct uloop_fd *u, unsigned int events)
{
	if (!(events & ULOOP_READ))
		return;

	uint64_t fired;

	// 定时器超时后会向对应文件描述符中写入超时次数,这里可以读取出来
	if (read(u->fd, &fired, sizeof(fired)) != sizeof(fired))
		return;

	// 通过传入的定时器结构体的元素u获取到结构体指针
	struct uloop_interval *tm = container_of(u, struct uloop_interval, priv.ufd);

	tm->expirations += fired;
	// 定时器超时执行回调
	tm->cb(tm);
}

当间隔定时器超时之后,内核会向timerfd_settime传入的文件描述符中写入自上一次读取该文件描述符以来,定时器发生超时的总次数。而由于dispatch_timer是注册为文件描述符事件的回调中,因此触发之后则会被调用。

在该函数中,读取到数据是该间隔定时器的触发次数,并将其赋值叠加给expirations参数,用以表示定时器超时的总次数。然后执行注册该定时器事件时设置的回调。

子进程事件

子进程事件的处理方式与超时定时器的处理方式大同小异,均是在保存在一个链表中,但子进程事件的处理是由条件的,该条件就是收到SIGCHLD信号。

下面是子进程事件的相关数据结构

struct uloop_process
{
	struct list_head list;
	// 表示进程事件是否插入到对应的链表中,插入到链表之后收到SIGCHLD信号才会调用回调函数
	bool pending;

	uloop_process_handler cb;
	// 子进程的pid,收到SIGCHLD信号之后会判断信号来源是否与该pid相等,如果相等则执行回调
	pid_t pid;
};
int uloop_process_add(struct uloop_process *p);
int uloop_process_delete(struct uloop_process *p);

下面是添加、删除子进程的处理逻辑:

// 将参数p指定的进程事件插入到链表中,插入顺序按照pid从小到大的顺序
int uloop_process_add(struct uloop_process *p)
{
	struct uloop_process *tmp;
	struct list_head *h = &processes;

	if (p->pending)
		return -1;

	// 按照进程id从小达到的顺序插入
	list_for_each_entry(tmp, &processes, list) {
		if (tmp->pid > p->pid) {
			// 找到刚好大于传入进程事件中指定pid的节点后退出
			h = &tmp->list;
			break;
		}
	}

	list_add_tail(&p->list, h);
	// 表示进程事件正在处理中
	p->pending = true;

	return 0;
}

int uloop_process_delete(struct uloop_process *p)
{
	if (!p->pending)
		return -1;

	list_del(&p->list);
	p->pending = false;

	return 0;
}

可以看到,这和添加、删除超时定时器事件的操作基本一直,唯一不同的地方在于,链表排序的依据是子进程ID。

处理逻辑

uloop_run_timeout函数中,有以下两行代码是用来处理子进程事件的

// do_sigchld表示是否受到SIGCHLD信号,然后处理进程事件
if (do_sigchld)
    // 只有收到SIGCHLD信号才会处理进程事件
    uloop_handle_processes();

其中do_sigchld标志是否收到SIGCHLD信号,如果收到则为true

下面是SIGCHLD信号处理逻辑

static struct list_head processes = LIST_HEAD_INIT(processes);

static void uloop_signal_wake(int signo)
{
	uint8_t sigbyte = signo;

	// SIGCHLD 系统通知父进程其子进程终止,父进程此时需要处理回收动作
	if (signo == SIGCHLD)
		do_sigchld = true;
    //...
	} while (1);
}
static void uloop_install_handler(int signum, void (*handler)(int), struct sigaction* old, bool add)
{
	struct sigaction s;
	struct sigaction *act;

	act = NULL;
	// 获取备份旧的信号处理方式
	sigaction(signum, NULL, &s);

	if (add) {
		if (s.sa_handler == SIG_DFL) { /* Do not override existing custom signal handlers */
			// 旧的信号处理方式是默认,则将旧信号处理方式保存在old中
			memcpy(old, &s, sizeof(struct sigaction));
			// 设置新的信号处理方式
			s.sa_handler = handler;
			s.sa_flags = 0;
			act = &s;
		}
	}
	else if (s.sa_handler == handler) { /* Do not restore if someone modified our handler */
			// 如果不是添加信号处理方式,则恢复旧的
			act = old;
	}

	if (act != NULL)
		// 注册新的信号处理方式
		sigaction(signum, act, NULL);
}
// 所有信号事件均通过管道和文件描述符事件来处理,只是SIGINT和SIGTERM信号需要做额外的参数设置
// 该函数用来设置默认的信号处理
static void uloop_setup_signals(bool add)
{
	static struct sigaction old_sigint, old_sigchld, old_sigterm;

	uloop_install_handler(SIGINT, uloop_handle_sigint, &old_sigint, add);
	uloop_install_handler(SIGTERM, uloop_handle_sigint, &old_sigterm, add);

	if (uloop_handle_sigchld)
		uloop_install_handler(SIGCHLD, uloop_signal_wake, &old_sigchld, add);

	uloop_ignore_signal(SIGPIPE, add);
}

通过uloop_init->uloop_setup_signals->uloop_install_handler的调用链,可以看到uloop_signal_wake函数也被注册为SIGCHLD信号处理方式。

uloop_signal_wake函数中,如果收到SIGCHLD信号,则将do_sigchld设置为true。此时在uloop_run_timeout的下一次循环中就会执行uloop_handle_processes处理子进程事件了。

那具体是怎么处理子进程事件的呢

static void uloop_handle_processes(void)
{
	struct uloop_process *p, *tmp;
	pid_t pid;
	int ret;

	do_sigchld = false;

	while (1) {
		// 等待任何子进程的终止,WNOHANG表示没有子进程在等待时立即返回,不阻塞
		pid = waitpid(-1, &ret, WNOHANG);
		// <0表示出错
		if (pid < 0 && errno == EINTR)
			continue;

		// 出错或者没有子进程
		if (pid <= 0)
			return;

		list_for_each_entry_safe(p, tmp, &processes, list) {
			if (p->pid < pid)
				continue;

			if (p->pid > pid)
				break;

			// 从链表中删除当前进程事件,并执行回调
			uloop_process_delete(p);
			p->cb(p, ret);
		}
	}

}

在收到SIGCHLD信号之后,已经可以确认有子进程等待回收,因此会在while循环中调用waitpid回收子进程,回收完成之后会遍历子进程事件列表,从中删除对应pid的子进程事件,然后调用子进程事件的回调。

信号事件

信号事件类似于信号处理程序,但是比sigaction函数注册信号处理程序要简单一些,但比直接使用signal函数好像要复杂。

信号事件的数据结构如下:

// 信号事件
// SIGINT、SIGTERM、SIGCHLD信号的处理会内置
// 其余信号需要用户通过接口来添加
//
// 默认信号处理函数是一个写管道的函数
// 收到信号之后会将其写入管道,管道的另一侧注册在文件描述符事件中,通过文件描述符事件进行提醒,然后获取信号执行回调
struct uloop_signal
{
	struct list_head list;
	// 保存旧的信号处理动作
	struct sigaction orig;
	// 判断是否已经加入到信号事件列表
	bool pending;

	uloop_signal_handler cb;
	int signo;
};
int uloop_signal_add(struct uloop_signal *s);
int uloop_signal_delete(struct uloop_signal *s);

信号事件的处理并不在uloop_run_timeout函数中,而是在uloop_init中就已经注册。

信号事件的添加和删除逻辑如下:

static struct list_head signals = LIST_HEAD_INIT(signals);
// 该函数用于添加自定义的信号处理事件
int uloop_signal_add(struct uloop_signal *s)
{
	struct list_head *h = &signals;
	struct uloop_signal *tmp;
	struct sigaction sa;

	if (s->pending)
		return -1;

	list_for_each_entry(tmp, &signals, list) {
		if (tmp->signo > s->signo) {
			h = &tmp->list;
			break;
		}
	}

	list_add_tail(&s->list, h);
	s->pending = true;

	// 获取旧的信号处理函数
	sigaction(s->signo, NULL, &s->orig);

	// 判断旧的信号处理动作
	if (s->orig.sa_handler != uloop_signal_wake) {
		// 设置信号处理函数为写管道的函数,收到信号时写入管道,管道另一端执行信号事件
		sa.sa_handler = uloop_signal_wake;
		sa.sa_flags = 0;
		// 信号处理期间无阻塞信号
		sigemptyset(&sa.sa_mask);
		sigaction(s->signo, &sa, NULL);
	}

	return 0;
}

int uloop_signal_delete(struct uloop_signal *s)
{
	if (!s->pending)
		return -1;

	list_del(&s->list);
	s->pending = false;

	if (s->orig.sa_handler != uloop_signal_wake)
		sigaction(s->signo, &s->orig, NULL);

	return 0;
}

uloop_signal_add函数中,首先会按照信号值的从小到大的顺序将信号事件插入的对应的链表中,其次会判断当前信号的处理方式是否为uloop_signal_wake,如果不是则进行设置,并将旧的信号处理程序保存在orig中。

删除信号则很简单,在uloop_signal_delete中先从全局链表中删除对应的信号事件,随后恢复原有的信号处理函数。

当收到信号时,则会调用uloop_signal_wake函数,该函数的处理逻辑如下:

static void uloop_signal_wake(int signo)
{
	uint8_t sigbyte = signo;

	// SIGCHLD 系统通知父进程其子进程终止,父进程此时需要处理回收动作
	if (signo == SIGCHLD)
		do_sigchld = true;

	// 收到信号之后向管道写入信号值,然后通过文件描述符事件通知管道的另一侧
	do {
		if (write(waker_pipe, &sigbyte, 1) < 0) {
			if (errno == EINTR)
				continue;
		}
		break;
	} while (1);
}

在收到信号后,uloop_signal_wake函数会将信号值写入waker_pipe管道,接下来看看管道的另一侧怎么处理

管道的另一侧是在waker_init函数中处理,该函数也是在uloop_init中进行调用。

static int waker_pipe = -1;
static struct uloop_fd waker_fd = {
	.fd = -1,
	.cb = signal_consume,
};

static void waker_init_fd(int fd)
{
	fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
	fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
}

static int waker_init(void)
{
	int fds[2];

	if (waker_pipe >= 0)
		return 0;

	if (pipe(fds) < 0)
		return -1;

	waker_init_fd(fds[0]);
	waker_init_fd(fds[1]);
	waker_pipe = fds[1];

	// 管道的另一侧,接收文件描述符读事件
	waker_fd.fd = fds[0];
	waker_fd.cb = signal_consume;
	uloop_fd_add(&waker_fd, ULOOP_READ);

	return 0;
}

waker_init函数中可以看到,管道的另一侧即fds[0]被注册为文件描述符事件,事件回调为signal_consume

static void set_signo(uint64_t *signums, int signo)
{
	if (signo >= 1 && signo <= 64)
		*signums |= (1u << (signo - 1));
}

static bool get_signo(uint64_t signums, int signo)
{
	return (signo >= 1) && (signo <= 64) && (signums & (1u << (signo - 1)));
}

// 读取到信号后处理信号事件
static void signal_consume(struct uloop_fd *fd, unsigned int events)
{
	struct uloop_signal *usig, *usig_next;
	// signums中的每一位表示一个信号,比如收到信号9,则设置第9位为1
	uint64_t signums = 0;
	uint8_t buf[32];
	ssize_t nsigs;

	do {
		nsigs = read(fd->fd, buf, sizeof(buf));

		for (ssize_t i = 0; i < nsigs; i++)
			set_signo(&signums, buf[i]);
	}
	while (nsigs > 0);

	// 遍历信号事件列表,判断是否添加了该信号,如果以添加则执行信号回调
	list_for_each_entry_safe(usig, usig_next, &signals, list)
		if (get_signo(signums, usig->signo))
			usig->cb(usig);
}

而在signal_consume回调函数中会接收所有的信号,并将其保存在一个64位的无符号整数中,其中每位代表一个信号。然后遍历信号事件链表,找到对应的信号事件处理回调并执行。

总结一下

  1. 初始化信号处理管道,并将其注册到文件描述符事件中,设置文件描述符事件回调为signal_consume
  2. 通过struct uloop_signal添加信号处理事件,并添加到全局链表中
  3. 注册通用信号处理函数uloop_signal_wake
  4. 收到信号后,在uloop_signal_wake中将其写入管道,触发文件描述符事件,执行回调signal_consume
  5. 在文件描述符事件回调中接收信号,并执行全局链表中注册的信号事件回调