从源码角度看Handler

简介

Handler这套线程异步通信框架在Android中的地位是不亚于Binder的,因为其基础设计简单、涉及的知识面广、业务使用场景多等原因,十分适合应用层的初中级的工程师进行深入学习

这篇文章中我将分析Handler核心功能的源码,分析将贯穿着framework, native和kernel的知识点:

  1. Handler发送异步消息原理
  2. Looper派发消息原理
  3. 消息分割栏的原理与视图绘制的运用
  4. epoll_create, epoll_ctl, epoll_wait三部曲的源码分析
  5. epoll中生产者与消费者模型的运用

同时按惯例,会在开篇给出总括全局的类图与架构图,方面阅读中定位理解与阅读后的回顾

设计图

类图

  • Message: 消息的抽象
  • Handler: 发送消息的工具类
  • MessageQueue: 主要维护了消息队列,同时也是与native通信的中枢
  • Looper: 循环获取消息并进行派发
  • Messenger: 可以跨进程传输的消息抽象

架构图

  1. 一个APP中运行着多个线程,不同线程间可以互相拿到对方的Handler对象
  2. MessageQueue和native直接通信,native中又和kernel通信,这样的调用链赋予了APP使用系统内核资源的能力
  3. epoll机制在kernel中维护了一个链表与一颗红黑树是它效率优于poll与select的基础

发送跨线程异步消息: Handler.post()

使用Handler的前提是获取到它的引用对象,然后才能够在对应的MessageQueue的消息队列插入消息。能够这样做的根本原因在于,线程之间的内存是可以相互访问的,这也是Handler能够实现跨线程通信的基本原理之一

下面从最常用的Handler.post()方法入手,看看消息发送的实现原理,这里需要说明的一点是,使用Handler发送消息的花样很多,但最终都需要调用到MessageQueue.enqueueMessage()方法来实现,这里就不一一介绍API的使用了

frameworks/base/core/java/android/os/Handler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean postAtTime(Runnable r, long uptimeMillis)
{
return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
// 这里获取到队列,队列是该线程唯一的,在Handler初始化时获取
MessageQueue queue = mQueue;
...
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
// 判断是否为异步消息,异步消息将不会受到分割栏的影响
if (mAsynchronous) {
msg.setAsynchronous(true);
}
// 最终调用到MessageQueue去插入消息
return queue.enqueueMessage(msg, uptimeMillis);
}

可以看到Handler只是一个类似于工具的类,最终的消息管理方面的操作还是需要委托给MessageQueue去做

frameworks/base/core/java/android/os/MessageQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
boolean enqueueMessage(Message msg, long when) {
// 这里会强制target成员的设置,分割栏的插入不是调用这个方法实现的
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
// 因为不同的线程都可以调用这个方法,所以使用类锁保证消息队列的异步安全
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 遍历消息队列,插入到合适的位置
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// 调用到native尝试唤醒对端设备
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
}

frameworks/base/core/jni/android_os_MessageQueue.cpp

1
2
3
4
5
6
7
8
9
10
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 通过ptr获取到NativeMessageQueue对象
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
// 委托给native层的Looper进行处理
mLooper->wake();
}

system/core/libutils/Looper.cpp

1
2
3
4
5
6
void Looper::wake() {
uint64_t inc = 1;
// 向文件描述符为mWakeEventFd设备写入,以此来唤醒监听设备的epoll
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
...
}

总结一下消息发送的主要工作:

  1. 在java层的消息队列中根据时间插入消息
  2. 在native层,向对应的fd设备写入数据,以此来唤醒监听该设备的epoll

处理消息: Handler.handleMessage()

handleMessage完全是处于被动调用的状态,每当消息到来时,会从kernel依次调用的native,再到java层的Looper

这里直接看Looper是如何获取到消息,并调用handleMessage的,省去了Looper.prepare()代码的分析

frameworks/base/core/java/android/os/Looper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void loop() {
// 从ThreadLocal中取出之前放入的Looper对象
final Looper me = myLooper();
// 获取到队列
final MessageQueue queue = me.mQueue;
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
// 获取下一个消息,这个方法在没有消息时会产生阻塞,只有在消息到来时才会触发
Message msg = queue.next(); // might block
...
// 派发消息,最终会调用到Handler.handleMessage方法
try {
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
...
// 回收Message
msg.recycleUnchecked();
}
}

继续看看MessageQueue.next的实现:

frameworks/base/core/java/android/os/MessageQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
Message next() {
// mPtr实际上是NativeMesssageQueue的引用,方便在native层查询到MessageQueue
final long ptr = mPtr;
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 调用native方法,之后调用到epoll_wait
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 消息分割栏,后面会做分析
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 如果当前时间没有已经就绪的message,那么重置poll事件的时间
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
// 如果该消息已经就绪,那么将它返回给Looper进行派发
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
}
// 如果此次没有处理消息,则用来处理IdleHandler的操作,把优先级不高的操作放到这里去执行,尽量不浪费事件片
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
}
}

核心操作nativePollOnce同样是在native层进行处理

frameworks/base/core/jni/android_os_MessageQueue.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 同样也是委托Looper进行处理
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
}

system/core/libutils/Looper.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 获取操作实际上是从reponse队列中拿取的
...
if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
// 本质上在循环的调用pollInner方法,直到获取到了结果
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
...
// Poll.
int result = POLL_WAKE;
// 清空response队列
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
// 初始化epoll_event
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 使用epoll_wait调用的kernel去请求事件,kernel获取到事件后会通过mmap将epoll_event信息返回到native层
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
...
// 获取到信息后,接下来就是读取了
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 插入到response队列中供后续获取
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
Done: ;
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
// 首先处理native层的Message
handler->handleMessage(message);
} // release handler
}
}
// Release lock.
mLock.unlock();
...
return result;
}

native层Looper.pollInner()方法是获取Message的主要操作:

  1. 调用epoll_wait在kernel中获取设备事件,该方法后续会做分析
  2. 获取到事件后进行解析并插入到response队列中
  3. 处理native事件

可以看到,该方法会首先处理native层的Message,也就是说Handler这套框架对于native的消息是优先派发的

设置同步分割栏: MessageQueue.postSyncBarrier()

同步分割栏的原理其实很简单,本质上就是通过创建一个target成员为NULL的Message并插入到消息队列中,这样在这个特殊的Message之后的消息就不会被处理了,只有当这个Message被移除后才会继续执行之后的Message

最经典的实现就是ViewRootImpl调用scheduleTraversals方法进行视图更新时的使用:

frameworks/base/core/java/android/view/ViewRootImpl.java

1
2
3
4
5
6
7
8
9
10
11
void scheduleTraversals() {
if (!mTraversalScheduled) {
mTraversalScheduled = true;
// 执行分割操作后会获取到分割令牌,使用它可以移除分割栏
mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
// 发出一个有异步标志的Message,避免被分割
mChoreographer.postCallback(
Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
...
}
}

在执行doTraversal方法后,才会移出分割栏:

1
2
3
4
5
6
7
8
9
10
void doTraversal() {
if (mTraversalScheduled) {
mTraversalScheduled = false;
mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
performTraversals();
...
}
}

这样做的原因是,doTraversal的操作是通过Handler进行处理的,然而这个消息队列却是整个主线程公用的,比如说四大组件的各个生命周期的调用,然而doTraversal的内容是更新视图UI,这个任务无疑是最高优先级的。所以在这之前,需要确保队列中其它同步消息不会影响到它的执行

这里继续跟一下MessageQueue.postSyncBarrier()的实现:

frameworks/base/core/java/android/os/MessageQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
// 注意这里,并没有为target成员进行初始化
Message prev = null;
Message p = mMessages;
// 插入到队列中
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}

可以看到,设置分割栏和普通的post Message是一样的,不同的是target是空的

下面接着来看看分割栏真正起作用的地方:

frameworks/base/core/java/android/os/MessageQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Message next() {
...
for (;;) {
...
// 进行队列遍历
Message msg = mMessages;
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
// 如果target为NULL,将会陷入这个循环,除非是有异步标志的消息才会跳出循环
} while (msg != null && !msg.isAsynchronous());
}
...
}
}

跨进程异步消息: Messenger

Messenger是Android开发中算是比较冷门的一种跨进程通信方式,它的实现是依托着Binder+Handler这两种技术的配合,使得跨进程调用后服务端任务能够在指定的线程中被执行

这种方式其实就类似于ActivityThread的binder call接收后,使用指定Handler去post消息是一样的,不同的在于Messenger的对其进行了封装,使得开发者可以轻松的实现而不需要更多的代码

原生有个典型实现的MessengerService,可以参考它的思路:

frameworks/base/core/tests/coretests/src/android/os/MessengerService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MessengerService extends Service {
private final Handler mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
Message reply = Message.obtain();
reply.copyFrom(msg);
try {
// 这里的replyTo的mTarget实际上是对端的binder proxy,用以实现回调的功能
msg.replyTo.send(reply);
} catch (RemoteException e) {
}
}
};
// 创建一个Messenger,mTarget将会创建一个binder stub,服务方法运行在mHandler所在线程
private final Messenger mMessenger = new Messenger(mHandler);
public MessengerService() {
}
@Override
public IBinder onBind(Intent intent) {
// 返回的是被创建的Messenger服务binder句柄
return mMessenger.getBinder();
}
}
  1. 服务端可以通过继承MessengerService,定义自己的业务
  2. 客户端通过bind该service,可以获取Messenger的binder proxy句柄,从而实现通信。调用到服务端后,操作将会运行在mHandler所在线程
  3. replyTo是客户端指定的,用来接受服务端的回复

实现基础: epoll

epoll和select和poll一样都是I/O多路复用技术,和它们不同的是,epoll只关心”活跃”的链接,不需要遍历全部的描述符集合,能够处理大量的链接请求

以下是一个使用epoll的典型代码示例,主要分为三部:

  1. 使用epoll_create创建epoll的fd
  2. 使用epoll_ctl将需要监听的fd添加到epoll中
  3. 使用epoll_wait等待事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct epoll_event ev, events[10];
int listen_sock, conn_sock, nfds, epollfd;
// 创建epoll的文件描述符
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
// 将我们需要监听的文件描述符和事件通过epoll_ctl加入到epoll描述符中
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
// 等待事件的发生
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
// 监听到事件,处理事件
...
}

epoll的实现很简单,简单到所有的主要逻辑都在单个文件中实现,文件路径在kernel/msm-3.18/fs/eventpoll.c

下面接着分析epoll各个重要函数的源码

epoll 初始化

在使用epoll之前,系统需要在启动期间对epoll的基础设施进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int __init eventpoll_init(void)
{
struct sysinfo si;
// 获取系统内存状态
si_meminfo(&si);
// 根据系统内存配置设置单个用户可见监听的最大epoll个数
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) / EP_ITEM_COST;
// 初始化调用队列,方便检查是否超过最大嵌套次数
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// 创建用于分配epitem数据结构的的SLAB缓存
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
// 创建用于分配eppoll_entry数据结构的SLAB缓存
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
// 在系统启动过程中,init进程设备初始化时对epoll进行初始化
fs_initcall(eventpoll_init);

epoll 初始化期间主要完成的三个工作:

  1. 根据内存配置,初始化单个用户最大可见听epoll个数
  2. 初始化调用队列,以此来判断嵌套调用是超过上限
  3. 初始化SLAB内存缓存,以此来提升epoll使用内存的效率

epoll_create

在native Looper.cpp的实现中,epoll_create的调用函数用以创建一个epoll,但是在内核源码中并没有直接发现epoll_create之类的函数

查阅资料后发现,内核将epoll系列的函数都使用了SYSCALL_DEFINE宏调用来实现了,SYSCALL_DEFINE后的数字x代表了该函数拥有x个参数,之所以使用这种方式来声明函数是为了修复64位Linux系统上的CVE-2009-2009漏洞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 为了阅读源码的便捷性,可以直接将该宏调用看成 epoll_create(int flags);
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 初始化eventpoll结构
error = ep_alloc(&ep);
// 获取一个没有使用过的fd
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
// 创建eventpoll文件,file_operations为eventpoll_fops
// 同时将ep设置到file->private_data中
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
ep->file = file;
// 将fd与文件进行关联
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}

epoll_create的主要做了两个工作:

  1. 新建并初始化一个epollpoll结构
  2. 创建eventpoll文件、获取一个空闲fd,并将两者进行关联

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
struct eventpoll *tep = NULL;
error = -EFAULT;
if (ep_op_has_event(op) &&
// 将epoll_event信息使用mmap从native拷贝到kernel
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
// 获取mEpollFd, mWakeEventFd对应文件
f = fdget(epfd);
tf = fdget(fd);
// 从文件的private_data中获取到eventpoll
ep = f.file->private_data;
mutex_lock_nested(&ep->mtx, 0);
...
// 通过file, fd在eventpoll的红黑树中进行查询,看是否存在已经保存的eventitem
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
// 进行op操作分流
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 如果未查询到,则创建并插入新的eventitem
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
// 如果查询到,则移除该eventitem
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
// 如果查询到,则进行替换
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (full_check)
mutex_unlock(&epmutex);
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
}

epoll_ctl主要做了两个工作:

  1. 将epoll_event信息使用mmap拷贝到内核中
  2. 通过fd获取到对应的file文件,并通过private_data获取到eventpoll
  3. 通过file,fd在eventpoll中进行查询,看是否存在对应的eventitem
  4. 根据查询到的eventitem进行对应的操作

这里详细分析下ep_insert方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
// 使用SLAB内存分配器创建eventitem
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
// 初始化链表节点
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
// 初始化基础属性
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 初始化poll表,并注册回调函数ep_ptable_queue_proc
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 调用被插入文件的poll方法,最终会将当前epollitem放入到ready list并唤醒eventpoll中的等待进程
revents = ep_item_poll(epi, &epq.pt);
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
// 将fllink加入到文件的f_ep_links列表中作为子项
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
// 将epollitem的rbn红黑树节点插入到eventpoll中的rbr红黑树中
// 注意一点的是,节点的是根据epoll_filefd来比对的
ep_rbtree_insert(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
// 唤醒该队列
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
// 唤醒并调用该队列中的事件
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
...
}

epoll_ctl主要做了三个工作:

  1. 创建并初始化eventitem
  2. 为poll_wait函数初始化,当设备唤醒时会执行回调函数
  3. 检查当前设备状态,如果已经就绪那么直接唤醒等待队列

epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// epoll_wait(int epfd, epoll_event *events, int maxevents, int timeout)
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
// 通过fd获取到文件信息
f = fdget(epfd);
// 通过private_data获取到eventpoll
ep = f.file->private_data;
// 调用ep_poll获取就绪的事件,并将它们派发到接收者的事件缓冲中
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fdput(f);
return error;
}

epoll_wait的操作很简单,通过fd检索到相关eventpoll后马上调用核心函数ep_poll,这里详细分析一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;
if (timeout > 0) {
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
// 如果未设置超时事件,则直接获取事件,不进行阻塞
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
if (!ep_events_available(ep)) {
// 使用队列进行睡眠,直到ep_poll_callback被调用,队列被唤醒,才会接着执行下去
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
// 如果事件到来则退出循环
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!freezable_schedule_hrtimeout_range(to, slack,
HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
check_events:
// 是否存在入队的事件
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
// 取出事件并发送到客户端进程
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}

ep_poll的主要操作在于从eventpoll中取出已经准备就绪的eventitem,随后调用ep_send_events,通过timeout参数判断是否需要阻塞以及需要阻塞的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
// ep_send_events_proc 为扫描回调函数
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
...
// 调用ep_send_events_proc进行回调
error = (*sproc)(ep, &txlist, priv);
...
return error;
}
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
// 从rdlink列表中获取到epollitem
epi = list_first_entry(head, struct epitem, rdllink);
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink);
// 从设备中读取事件
revents = ep_item_poll(epi, &pt);
if (revents) {
// 使用mmap将事件发送到客户端
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
...
}
}
return eventcnt;
}

epoll中的生产者消费者模型

  • 生产者: 由epoll_ctl最终调用的ep_insert触发,在驱动唤醒等待队列后会调用ep_poll_callback方法将epollitem添加到epollevent的rdllist
  • 消费者: 由epoll_wait最终调用的ep_poll触发,在等待队列响应后会继续执行,从rdllist中取出epoll_item,随后读取设备
扫码支持0.99元,您的支持将鼓励我继续创作!