《Linux/UNIX系统编程手册》第63章 IO多路复用、信号驱动IO以及epoll

2019年11月06日 阅读数:58
这篇文章主要向大家介绍《Linux/UNIX系统编程手册》第63章 IO多路复用、信号驱动IO以及epoll,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

关键词:fasync_helper、kill_async、sigsuspend、sigaction、fcntl、F_SETOWN_EX、F_SETSIG、select()、poll()、poll_wait()等。html

 

《Linux/UNIX系统编程手册》第63章主要介绍了select()/poll()、信号驱动IO、epoll三方面,以及他们之间异同、优劣点。node

这里准备结合项目中遇到的问题,分两个方向进行概括总结。一是一个IO模型从测试程序、API、内核实现进行纵向分析;二是横向不一样IO模型的优缺点对比。linux

 

IO多路复用容许进程同时检查多个文件描述符以找出它们中的任何一个是否可执行IO操做。系统调用select()和poll()用来执行IO多路复用。shell

信号驱动IO是指当有输入或者数据能够写到指定的文件描述符上时,内核向请求数据的进程发送一个信号。进程能够处理其余的任务,当IO操做可执行时经过接收信号来得到通知。当同时检查大量的文件描述符时,信号驱动IO相比select()和poll()有显著的性能提高。编程

epoll API是Linux专有的特性,首次出如今Linux 2.6中。同IO多路复用API同样,epoll API容许进程同时检查多个文件描述符,看其中任意一个是否能执行IO操做。通讯号驱动IO同样,当同时检查大量文件描述符时,epoll能提供更好的性能。数组

 

在实际应用中使用那种技术,下面是一些要点:缓存

  • 系统调用select()和poll()在UNIX系统中已经存在了很长时间。优点在于可移植性强;缺点在于当同时检查大量文件描述符时性能延展性不佳。
  • epoll API的关键优点在于它能让应用程序高效地检查大量的文件描述符。主要缺点在于它是Linux专用API。
  • 同epoll同样,信号驱动IO可让应用程序高效地检查大量的文件描述符。可是epoll有一些信号驱动IO所没有的优势。
    • 避免了处理信号的复杂性。
    • 能够指定想要检查的事件类型,好比读就绪或写就绪。
    • 能够选择水平触发或边缘出发形式来通知进程。

 

1. 水平触发和边沿触发

讨论多种IO机制以前,首先区分两种文件描述符准备就绪的通知模式。数据结构

水平触发通知:若是文件描述符上能够非阻塞地执行IO系统调用,此时认为它已经就绪。多线程

边缘触发通知:若是文件描述符自上次状态检查以来有了新的IO活动,此时须要触发通知。app

IO模式 水平触发 边缘触发
select()/poll()  
信号驱动IO  
epoll

当采用水平触发通知时,能够在任意时刻检查文件描述符的就绪状态。表示当文件描述符处于就绪态时,就能够对其执行一些IO操做;而后重复检查文件描述符。看看是否仍然处于就绪态,此时能够执行更多的IO。因为水平触发模式容许咱们在任意时刻重复检查IO状态,没有必要每次当文件描述符就绪后须要尽量多地址性IO。

 

当采用边缘触发时,只有当IO事件发生时才会收到通知,所以:

  • 在接收到一个IO事件通知后,程序应该在相应的文件描述符上尽量多地执行IO。若是不那么作,可能失去执行IO的机会,致使数据丢失或者程序出现阻塞。
  • 若是程序采用循环来对文件描述符执行尽量多的IO,而文件描述符又被设置为可阻塞的,那么最终当没有更多的IO可执行时,IO系统调用就会阻塞。所以,每一个被检查的文件描述符都应该设置为非阻塞模式,在获得IO事件通知后重复执行IO操做。

 

2. 信号驱动IO

信号驱动IO中,当文件描述符上可执行IO操做时,进程请求内核为本身发送一个信号。

进程能够执行其余任何任务直到IO就绪位置,此时内核会发送信号给进程。

2.1 信号驱动IO步骤及相关API

信号驱动IO的使用遵循必定的步骤:

1.为内核发送通知信号安装一个信号处理例程;一般是SIGIO,但在多线程环境下能够指定自定义的实时信号。

2.设定文件描述符属主,也就是当文件描述符上可执行IO操做时会接收通知信号的进程或进程组。经过fcntl()命令F_SETOWN或者F_SETOWN_EX来制定。

3.经过设定O_NONBLOCK标志是能非阻塞IO。

4.经过O_ASYNC标志是能信号驱动IO。

5.而后进程能够处理其余任务,当有信号到达时对应的信号处理函数就会被调用

6.一般状况下,若是须要等待IO操做,能够经过sigsuspend()进行等待,此时进程会放弃

 

2.2 async IO测试分析

首先构造内核驱动和用户空间测试程序,而后针对测试程序进行详细的分析。

2.2.1 async IO测试程序

建立signal_kernel.c做为模组插入内核,signal_user.c做为用户空间测试程序。

建立/dev/signal0设备,经过wirte可让内核发送信号到用户空间;wirte以后当即进入sigsuspend()等待。

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/cdev.h>
#include <linux/device.h>
#include <linux/fs.h>
#include <linux/uaccess.h>

static struct class *signal_class;
static int signal_major;
#define SIGNAL_NAME "signal"

struct timer_list signal_timer;
static struct fasync_struct *signal_async;
int signal_count = 0;
static unsigned char signal_text[30];

static int signal_release(struct inode *inode, struct file *file)
{
    return 0;
}

static void dummy_timer(unsigned long data)
{
    //printk("Send SIGIO signal.\n");
    kill_fasync (&signal_async, SIGIO, POLL_IN);----------------------------------------发送异步信号给fa绑定的进程。
    signal_count++;
}

static int signal_open(struct inode *inode, struct file *file)
{
    return 0;
}

static ssize_t signal_read(struct file * file, char __user * buf,
               size_t count, loff_t *ppos)
{
    int size;

    size = sprintf(signal_text, "%d", signal_count);
    copy_to_user(buf, signal_text, size);
    return size;
}

static ssize_t signal_write(struct file * file, const char __user * buf,
                size_t count, loff_t *ppos)
{
    char str[30];
    copy_from_user(str, buf,count);
    printk("Receive %s from user.\n",str);
    dummy_timer(0);
    return 0;
}

static int signal_fasync (int fd, struct file *filp, int on)
{
    return fasync_helper(fd, filp, on, &signal_async);----------------------------------建立fasync_struct并插入到当前fasync链表中。
}

static const struct file_operations signal_fops = {
    .owner        = THIS_MODULE,
    .open        = signal_open,
    .release    = signal_release,
    .read    = signal_read,
    .write    = signal_write,
    .fasync    = signal_fasync,----------------------------------------------------------当用户空间设置FASYNC的时候调用fasync函数。
};

static int __init signal_test_init(void)
{
    struct device *signal_device;

    signal_major = register_chrdev(0, SIGNAL_NAME, &signal_fops);
    if (signal_major < 0) {
        pr_err("register_chrdev failed\n");
        goto err;
    }

    signal_class = class_create(THIS_MODULE, SIGNAL_NAME);
    if (IS_ERR(signal_class)) {
        pr_err("device class file already in use\n");
        goto err_class;
    }

    signal_device = device_create(signal_class, NULL, MKDEV(signal_major, 0),
                    NULL, "%s%d", SIGNAL_NAME, 0);
    if (IS_ERR(signal_device)) {
        pr_err("failed to create device\n");
        goto err_device;
    }
    return 0;

err_device:
    class_destroy(signal_class);
err_class:
    unregister_chrdev(signal_major, SIGNAL_NAME);
err:
    return 0;
}

static void __exit signal_test_exit(void)
{
    del_timer(&signal_timer);
    device_destroy(signal_class, MKDEV(signal_major, 0));
    class_destroy(signal_class);
    unregister_chrdev(signal_major, SIGNAL_NAME);
}

module_init(signal_test_init);
module_exit(signal_test_exit);
MODULE_LICENSE("GPL");

对应的Makefile以下:

CONFIG_MODULE_SIG=n
EXTRA_CFLAGS += -D_GNU_SOURCE---------------------------------------------针对F_SETOWN_EX等扩展命令须要定义此宏。

obj-m := signal_kernel.o
KERN_DIR := /lib/modules/$(shell uname -r)/build 
PWD := $(shell pwd)

all:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules
    gcc $(EXTRA_CFLAGS) signal_user.c -o signal_user -pthread
clean:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean
    rm signal_user
modules_install:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install

用户空间测试程序signal_user.c以下。

测试程序建立一个sigio线程专门处理信号,重点是进行信号处理前进行一系列设置;而后调用sigsuspend()进入睡眠,等待内核信号唤醒。

在实际应用中,有多是中断调用kill_fasync()发送信号。

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <poll.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <pthread.h>
#include <sys/prctl.h>
#include <sys/syscall.h>
#include <memory.h>
 
int fd;
char *filename = "/dev/signal0";
char signal_response[] = "OK";
unsigned int handle_count = 0;
#define LOOP_COUNT 10

#define SIGTEST (SIGRTMIN+1)
 
void sig_handler(int sig, siginfo_t *si, void *uc)
{
    unsigned char signal_count[30];

    memset(signal_count, 30, 0);
    read(fd, &signal_count, 0);
    handle_count ++;

    printf(">>>>>PID %ld Receive sig=%d count=%d.\n", syscall(SYS_gettid), sig, handle_count);
    //Trigger a SIGTEST signal.
    if(handle_count < LOOP_COUNT)
        write(fd, signal_response, sizeof(signal_response));------------------重复触发内核发送SIGTEST信号。
    //if(atoi(signal_count) != handle_count)
        //printf("%s: receive_count=%d, handle_count=%d\n", __func__, atoi(signal_count), handle_count);
}

void *pthread_func(void *arg)
{
    int Oflags;
    struct timespec time1, time2;
    struct sigaction sa;
    sigset_t set, oldset;
    unsigned long int duration;
    struct f_owner_ex owner_ex;

    //Set thread name.
    prctl(PR_SET_NAME,"sigio");
    printf("main=%d, sigio=%ld.\n", getpid(), syscall(SYS_gettid));

    //Set SIGTEST actiong.
    memset(&sa, 0, sizeof(sa));
    sa.sa_sigaction = sig_handler;
    sa.sa_flags |= SA_RESTART | SA_SIGINFO;------------------------------------这里必须使用sa_sigaction和SA_SIGINFO。
    sigaction(SIGTEST, &sa, NULL);---------------------------------------------设置SIGTEST信号处理函数。
    //Set proc mask.
    sigemptyset(&set);
    sigprocmask(SIG_SETMASK, &set, NULL);
    sigaddset(&set, SIGTEST);

    fd = open(filename, O_RDWR);
    if (fd < 0)
    {
        printf("Can't open %s!\n", filename);
    }

    owner_ex.pid = syscall(SYS_gettid);
    owner_ex.type = F_OWNER_TID;
    fcntl(fd, F_SETOWN_EX, &owner_ex);------------------------------------------将fd文件句柄和当前线程绑定;若是设置F_SETOWN则是和线程组绑定,这二者区别后续重点介绍。
    Oflags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, Oflags | FASYNC);----------------------------------------对应内核驱动file_operations的fasync成员,给当前fd建立一个fasync_struct。
    fcntl(fd, F_SETSIG, SIGTEST);-----------------------------------------------设置SIGTEST替代SIGIO做为信号发送。

    clock_gettime(CLOCK_REALTIME, &time1);
    write(fd, signal_response, sizeof(signal_response));------------------------触发内核发送SIGTEST信号。
    //pthread_sigmask(SIG_BLOCK, &set, &oldset);
    sigprocmask(SIG_BLOCK, &set, &oldset);
    while(handle_count < LOOP_COUNT)
    {
        //Will suspend here, and later code will be executed after SIGTEST handler.
        sigsuspend(&oldset);----------------------------------------------------在此处睡眠,等待信号来唤醒。
    }
    //pthread_sigmask(SIG_UNBLOCK, &set, NULL);
    sigprocmask(SIG_UNBLOCK, &set, NULL);
    clock_gettime(CLOCK_REALTIME, &time2);
    duration =(time2.tv_sec-time1.tv_sec)*1000000000 + (time2.tv_nsec-time1.tv_nsec);
    printf("End time %ld.%ld\n", duration/1000000000, duration%1000000000);
    close(fd);
    pthread_exit(0);
}

void main(int argc, char **argv)
{
    pthread_t tidp;
    sigset_t set;

    sigemptyset(&set);
    sigaddset(&set, SIGTEST);
    sigprocmask(SIG_BLOCK, &set, NULL);

    if(pthread_create(&tidp, NULL, pthread_func, NULL) == -1)
    {
        printf("Create pthread error.\n");
        return;
    }

    if(pthread_join(tidp, NULL))
    {
        printf("Join pthread error.\n");
        return;
    }
    printf("Main exit.\n");

    return;
}

 

2.2.2 测试结果

将编译的module signal_kernel.ko插入内核,而后用户空间执行sudo ./signal_user结果以下。 

main=10485, sigio=10486.
>>>>>PID 10486 Receive sig=35 count=1.
>>>>>PID 10486 Receive sig=35 count=2.
>>>>>PID 10486 Receive sig=35 count=3.
>>>>>PID 10486 Receive sig=35 count=4.
>>>>>PID 10486 Receive sig=35 count=5.
>>>>>PID 10486 Receive sig=35 count=6.
>>>>>PID 10486 Receive sig=35 count=7.
>>>>>PID 10486 Receive sig=35 count=8.
>>>>>PID 10486 Receive sig=35 count=9.
>>>>>PID 10486 Receive sig=35 count=10.
End time 0.103711
Main exit.

 

2.3 信号驱动IO流程解读

首先整个流程在用户空间配置;而后由内核发起,用户空间处理。

2.3.1 sigaction()设置信号处置

除了signal(),sigaction()是另外一种设置信号处置的选择。sigaction()更加复杂,但也根据灵活性。

#include <signal.h>
int sigaction(int sig, const struct sigaction *act, struct sigaction *oldact);

sig参数标识想要获取或改变的信号编号,可使处SIGKILL和SIGSTOP以外任何信号。

act指向描述信号新处置数据结构,oldact用于返回以前信号处置相关信息。二者均可以设置为NULL。

struct sigaction {
    void (*sa_handler)(int); /* Address of handler */
    sigset_t sa_mask; /* Signals blocked during handler invocation */
    int sa_flags; /* Flags controlling handler invocation */
    void (*sa_restorer)(void); /* Not for application use */
};

sa_handler指向sig对应的处理函数。

sa_mask定义一组新号,不容许它们中断此处理程序的执行。调用信号处理程序以前,将该组信号中其余尚未处于进程掩码之列的信号添加到进程掩码中;在信号处理返回以后再将,以前添加的掩码移除。这就保证,在处理程序执行期间,sa_mask全部信号不会中断此处理程序。

引起处理程序自身的信号将自动添加到进程掩码中,这意味着信号处理程序不容许嵌套。当正在执行处理程序时,若是同一信号第二次抵达,将不会递归中断本身。

因为标准信号(sig < SIGRTMIN)信号处理程序不会被队列化,因此标准信号处理程序执行期间第二次达到的同一信号将会被忽略。

可是事实信号不会存在这种现象。

struct sighand_struct {
    atomic_t        count;
    struct k_sigaction    action[_NSIG];------------------------------------------------一共_NSIG,即64个信号1-31是标准信号,32-64是实时信号。
    spinlock_t        siglock;
    wait_queue_head_t    signalfd_wqh;
};

int do_sigaction(int sig, struct k_sigaction *act, struct k_sigaction *oact)
{
    struct task_struct *p = current, *t;
    struct k_sigaction *k;
    sigset_t mask;

    if (!valid_signal(sig) || sig < 1 || (act && sig_kernel_only(sig)))
        return -EINVAL;

    k = &p->sighand->action[sig-1];------------------------------------------------------将k指向当前进程sig对应的k_sigaction结构。

    spin_lock_irq(&p->sighand->siglock);
    if (oact)
        *oact = *k;

    sigaction_compat_abi(act, oact);

    if (act) {
        sigdelsetmask(&act->sa.sa_mask,
                  sigmask(SIGKILL) | sigmask(SIGSTOP));
        *k = *act;-----------------------------------------------------------------------将act和k关联。
...
    }

    spin_unlock_irq(&p->sighand->siglock);
    return 0;
}

 

2.3.2  F_SETOWN_EX和F_SETOWN

F_SETOWN是标准fcntl;F_SETOWN_EX是Linux扩展命令,若是要使用须要加上_D_GNU_SOURCE。

这两个命令都经过fcntl()系统调用,设置文件控制操做。

struct fown_struct是内核中关联文件句柄和其拥有者信息的结构体:

struct fown_struct {
    rwlock_t lock;          /* protects pid, uid, euid fields */
    struct pid *pid;    /* pid or -pgrp where SIGIO should be sent */----------------------拥有此文件SIGIO信号的进程。
    enum pid_type pid_type;    /* Kind of process group SIGIO should be sent to */---------进程类型。
    kuid_t uid, euid;    /* uid/euid of process setting the owner */
    int signum;        /* posix.1b rt signal to be delivered on IO */----------------------kill_fasync()发送的信号。
};

下面就来分析这两命令的区别,以及他们是如何影响信号处理行为的。

enum pid_type
{
    PIDTYPE_PID,
    PIDTYPE_PGID,
    PIDTYPE_SID,
    PIDTYPE_MAX,
    /* only valid to __task_pid_nr_ns() */
    __PIDTYPE_TGID
};


static long do_fcntl(int fd, unsigned int cmd, unsigned long arg,
        struct file *filp)
{
    long err = -EINVAL;

    switch (cmd) {
...
    case F_GETFL:------------------------------------------------获取、设置文件句柄的f_flags。
        err = filp->f_flags;
        break;
    case F_SETFL:
        err = setfl(fd, filp, arg);
        break;
...
case F_GETOWN:-----------------------------------------------获取、设置文件句柄的拥有者进程pid。 err = f_getown(filp); force_successful_syscall_return(); break; case F_SETOWN: f_setown(filp, arg, 1); err = 0; break; case F_GETOWN_EX:--------------------------------------------进阶获取、设置文件句柄拥有者进程pid。 err = f_getown_ex(filp, arg); break; case F_SETOWN_EX: err = f_setown_ex(filp, arg); break; ... case F_GETSIG:-----------------------------------------------设置文件句柄和其拥有者之间异步通讯的信号。 err = filp->f_owner.signum; break; case F_SETSIG: if (!valid_signal(arg)) { break; } err = 0; filp->f_owner.signum = arg; break; ... } return err; } pid_t f_getown(struct file *filp) { pid_t pid; read_lock(&filp->f_owner.lock); pid = pid_vnr(filp->f_owner.pid); if (filp->f_owner.pid_type == PIDTYPE_PGID)-----------------若是是进程组返回其负值。 pid = -pid; read_unlock(&filp->f_owner.lock); return pid; } void f_setown(struct file *filp, unsigned long arg, int force) { enum pid_type type; struct pid *pid; int who = arg; type = PIDTYPE_PID; if (who < 0) { type = PIDTYPE_PGID;-----------------------------------默认类型是进程,若是who为负,则是进程组。 who = -who; } rcu_read_lock(); pid = find_vpid(who); __f_setown(filp, pid, type, force); rcu_read_unlock(); } static int f_getown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; int ret = 0; read_lock(&filp->f_owner.lock); owner.pid = pid_vnr(filp->f_owner.pid); switch (filp->f_owner.pid_type) {-------------------------相对于F_SETOWN,多了pid_type类型设置。其中PIDTYPE_MAX表示该文件句柄被一个线程所拥有。而不是进程或者进程组。这点对于信号究竟发给谁,有着很是重要的做用。 case PIDTYPE_MAX: owner.type = F_OWNER_TID; break; case PIDTYPE_PID: owner.type = F_OWNER_PID; break; case PIDTYPE_PGID: owner.type = F_OWNER_PGRP; break; default: WARN_ON(1); ret = -EINVAL; break; } read_unlock(&filp->f_owner.lock); if (!ret) { ret = copy_to_user(owner_p, &owner, sizeof(owner)); if (ret) ret = -EFAULT; } return ret; } static int f_setown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; struct pid *pid; int type; int ret; ret = copy_from_user(&owner, owner_p, sizeof(owner)); if (ret) return -EFAULT; switch (owner.type) { case F_OWNER_TID: type = PIDTYPE_MAX; break; case F_OWNER_PID: type = PIDTYPE_PID; break; case F_OWNER_PGRP: type = PIDTYPE_PGID; break; default: return -EINVAL; } rcu_read_lock(); pid = find_vpid(owner.pid); if (owner.pid && !pid) ret = -ESRCH; else __f_setown(filp, pid, type, 1); rcu_read_unlock(); return ret; } static void f_modown(struct file *filp, struct pid *pid, enum pid_type type, int force) { write_lock_irq(&filp->f_owner.lock); if (force || !filp->f_owner.pid) { put_pid(filp->f_owner.pid); filp->f_owner.pid = get_pid(pid); filp->f_owner.pid_type = type; if (pid) { const struct cred *cred = current_cred(); filp->f_owner.uid = cred->uid; filp->f_owner.euid = cred->euid; } } write_unlock_irq(&filp->f_owner.lock); } void __f_setown(struct file *filp, struct pid *pid, enum pid_type type, int force)----------------------------------------------------------------不管是F_SETOWN仍是F_SETOWN_EX二者的force都是1。 { security_file_set_fowner(filp); f_modown(filp, pid, type, force); }

因此F_SETOWN和F_SETOWN_EX主要区别就在于,F_SETOWN_EX能够设置进程的类型。这对于后续信号的发送,有重要做用。

F_SETOWN_EX更加细致,能够指定只发送给某个线程;而F_SETOWN优先发送给线程。若是线程被阻塞,则选择同一进程中的其余线程接收。

相关问题调试详见:《sigsuspend()阻塞:异步信号SIGIO为何会被截胡?》。

 

2.3..3 F_SETSIG和SA_SIGINFO

若是要使用实时信号替代SIGIO做为kill_fasync()做为信号发送,能够设置F_SETSIG。

参照do_fcntl(),也即经过设置fown_struct的signum值。

若是须要在信号处理函数中获取更多信息,还须要在sa.sa_flags增长SA_SIGINFO标志。

struct sigaction {
    union {
        void (*sa_handler)(int);
        void (*sa_sigaction)(int, siginfo_t *, void *);
    } __sigaction_handler;
    sigset_t sa_mask;
    int sa_flags;
    void (*sa_restorer)(void);
};

在定义sa_handler是就须要定义以下类型函数:

void handler(int sig, siginfo_t *siginfo, void *ucontext);

其中第二个参数siginfo包含的字段标识出了在哪一个文件描述上发生了事件。

使用F_SETSIG有两个优势:

1.指定不一样信号做为信号驱动IO通知信号,解决了在同一进程范围内,多线程使用信号驱动IO冲突问题。由于默认都是发送SIGIO,而信号处理是进程范围的。

2.默认的SIGIO是非排队信号,若是有对个IO时间发送了信号,而SIGIO被阻塞了,除了第一个通知外,其余后续的通知都会丢失。使用实时信号就不会存在这个问题,信号的处理会被排队;除非信号队列溢出。

 

2.3.4 FASYNC以及kill_fasync()发送信号

此表示是经过fcntl进行设置的,因此看一些setfl()。

static int setfl(int fd, struct file * filp, unsigned long arg)
{
    struct inode * inode = file_inode(filp);
    int error = 0;
...
    /*
     * ->fasync() is responsible for setting the FASYNC bit.
     */
    if (((arg ^ filp->f_flags) & FASYNC) && filp->f_op->fasync) {----------------------能够看出主要当前文件具备FASYNC标志位,而且f_op->fasync()定义了就会执行相关函数。
        error = filp->f_op->fasync(fd, filp, (arg & FASYNC) != 0);
        if (error < 0)
            goto out;
        if (error > 0)
            error = 0;
    }
    spin_lock(&filp->f_lock);
    filp->f_flags = (arg & SETFL_MASK) | (filp->f_flags & ~SETFL_MASK);
    spin_unlock(&filp->f_lock);

 out:
    return error;
}

咱们看到对应的驱动中调用fasync_helper()进行fasync_struct建立,并插入到fasync列表中。

int fasync_helper(int fd, struct file * filp, int on, struct fasync_struct **fapp)
{
    if (!on)
        return fasync_remove_entry(filp, fapp);
    return fasync_add_entry(fd, filp, fapp);
}
static int fasync_add_entry(int fd, struct file *filp, struct fasync_struct **fapp)
{
    struct fasync_struct *new;

    new = fasync_alloc();
    if (!new)
        return -ENOMEM;

    if (fasync_insert_entry(fd, filp, fapp, new)) {
        fasync_free(new);
        return 0;
    }

    return 1;
}

当设备IO就需后,经过kill_fasync()发送信号。

void kill_fasync(struct fasync_struct **fp, int sig, int band)
{
    if (*fp) {
        rcu_read_lock();
        kill_fasync_rcu(rcu_dereference(*fp), sig, band);
        rcu_read_unlock();
    }
}

static void kill_fasync_rcu(struct fasync_struct *fa, int sig, int band)
{
    while (fa) {-------------------------------------------------------------------遍历当前文件句柄的全部fasync列表,并进行信号发送操做。
        struct fown_struct *fown;
        unsigned long flags;

        if (fa->magic != FASYNC_MAGIC) {
            printk(KERN_ERR "kill_fasync: bad magic number in "
                   "fasync_struct!\n");
            return;
        }
        spin_lock_irqsave(&fa->fa_lock, flags);
        if (fa->fa_file) {
            fown = &fa->fa_file->f_owner;
            if (!(sig == SIGURG && fown->signum == 0))-----------------------------只有在signum为0,并sig等于SIGURQ放弃发送信号,由于SIGURQ有本身的特殊处理。
                send_sigio(fown, fa->fa_fd, band);
        }
        spin_unlock_irqrestore(&fa->fa_lock, flags);
        fa = rcu_dereference(fa->fa_next);
    }
}

void send_sigio(struct fown_struct *fown, int fd, int band)
{
    struct task_struct *p;
    enum pid_type type;
    struct pid *pid;
    int group = 1;
    
    read_lock(&fown->lock);

    type = fown->pid_type;
    if (type == PIDTYPE_MAX) {----------------------------------------------------这里体现了F_SETOWN_EX因为F_SETOWN的地方,若是设置为PIDTYPE_MAX,那么group则为0,只发送给线程,不会选择其余线程做为替代。
        group = 0;
        type = PIDTYPE_PID;
    }

    pid = fown->pid;
    if (!pid)
        goto out_unlock_fown;
    
    read_lock(&tasklist_lock);
    do_each_pid_task(pid, type, p) {
        send_sigio_to_task(p, fown, fd, band, group);
    } while_each_pid_task(pid, type, p);
    read_unlock(&tasklist_lock);
 out_unlock_fown:
    read_unlock(&fown->lock);
}

static void send_sigio_to_task(struct task_struct *p,
                   struct fown_struct *fown,
                   int fd, int reason, int group)
{
    /*
     * F_SETSIG can change ->signum lockless in parallel, make
     * sure we read it once and use the same value throughout.
     */
    int signum = ACCESS_ONCE(fown->signum);--------------------------------------------------------在使用F_SETSIG设置信号后最好很差改变,不然可能形成发送和信号处理函数不匹配。

    if (!sigio_perm(p, fown, signum))
        return;

    switch (signum) {
        siginfo_t si;
        default:-----------------------------------------------------------------------------------其余状况发送一个RT信号,提供更加丰富的返回信息。
            si.si_signo = signum;
            si.si_errno = 0;
                si.si_code  = reason;
            BUG_ON((reason & __SI_MASK) != __SI_POLL);
            if (reason - POLL_IN >= NSIGPOLL)
                si.si_band  = ~0L;
            else
                si.si_band = band_table[reason - POLL_IN];
            si.si_fd    = fd;
            if (!do_send_sig_info(signum, &si, p, group))------------------------------------------使用signum代提SIGIO做为信号发送。
                break;
        case 0:------------------------------------------------------------------------------------在没有经过F_SETSIG设置signum的状况下,默认发送SIGIO信号。
            do_send_sig_info(SIGIO, SEND_SIG_PRIV, p, group);
    }
}

int do_send_sig_info(int sig, struct siginfo *info, struct task_struct *p,
            bool group)
{
    unsigned long flags;
    int ret = -ESRCH;

    if (lock_task_sighand(p, &flags)) {
        ret = send_signal(sig, info, p, group);
        unlock_task_sighand(p, &flags);
    }

    return ret;
}
static inline struct sighand_struct *lock_task_sighand(struct task_struct *tsk,
                               unsigned long *flags)
{
    struct sighand_struct *ret;

    ret = __lock_task_sighand(tsk, flags);
    (void)__cond_lock(&tsk->sighand->siglock, ret);
    return ret;
}

static inline void unlock_task_sighand(struct task_struct *tsk,
                        unsigned long *flags)
{
    spin_unlock_irqrestore(&tsk->sighand->siglock, *flags);
}

 

2.3.5 标准信号和实时信号区别

实时信号相对于标准信号有以下优点:

  • 实时信号扩大了自定义信号的范围,从SIGRTMIN~SIGRTMAX。
  • 对实时信号采起队列化管理。将一个实时信号多个实例发送给一个进程,那么将会屡次传递信号。若是一个标准信号已经在等待某一进程,即便再次向该进程发送信号,信号也只会被传递一次。
  • 发送一个实时信号能够传递更多信息。
  • 不一样实时信号传递顺序获得保障,若是多个实时信号处于等待状态,那么将优先传递具备最小编号的信号。信号编号越小,其优先级越高。同一类型的多个信号在排队,那么传递顺序与信号发送来的顺序保持一致。

标准信号和实时信号都经过seng_signal()发送。下面看一下二者是如何被区别对待的。

static int send_signal(int sig, struct siginfo *info, struct task_struct *t,
            int group)
{
    int from_ancestor_ns = 0;

#ifdef CONFIG_PID_NS
    from_ancestor_ns = si_fromuser(info) &&
               !task_pid_nr_ns(current, task_active_pid_ns(t));
#endif

    return __send_signal(sig, info, t, group, from_ancestor_ns);
}


static int __send_signal(int sig, struct siginfo *info, struct task_struct *t,
            int group, int from_ancestor_ns)
{
    struct sigpending *pending;
    struct sigqueue *q;
    int override_rlimit;
    int ret = 0, result;

    assert_spin_locked(&t->sighand->siglock);

    result = TRACE_SIGNAL_IGNORED;
    if (!prepare_signal(sig, t,
            from_ancestor_ns || (info == SEND_SIG_FORCED)))
        goto ret;

    pending = group ? &t->signal->shared_pending : &t->pending;----------------------------group起做用的地方,当group为0,则将信号放在当前线程私有的pending列表上;若是group为1,则将信号放在线程组共享的shared_pending列表上。
    result = TRACE_SIGNAL_ALREADY_PENDING;
    if (legacy_queue(pending, sig))--------------------------------------------------------若是sig<SIGRTMIN,而且sig已经被处于pending中。也即标准信号sig已经在pending队列中,则返回表示当前sig信号已经放入到队列中。这也说明标准信号没有被队列化。
        goto ret;

    result = TRACE_SIGNAL_DELIVERED;

    if (info == SEND_SIG_FORCED)
        goto out_set;

    if (sig < SIGRTMIN)
        override_rlimit = (is_si_special(info) || info->si_code >= 0);--------------------标准信号存在override_rlimit为1状况,那么即便当前信号队列达到了RLIMIT_SIGPENDING。仍然能够建立信号队列告诉缓存。
    else
        override_rlimit = 0;--------------------------------------------------------------实时信号是不容许超过RLIMIT_SIGPENDING限制的。

    q = __sigqueue_alloc(sig, t, GFP_ATOMIC | __GFP_NOTRACK_FALSE_POSITIVE,
        override_rlimit);-----------------------------------------------------------------建立一个高速缓存sigqueue_cachep。
    if (q) {
        list_add_tail(&q->list, &pending->list);------------------------------------------将新建立的信号加入到队列中。
        switch ((unsigned long) info) {
        case (unsigned long) SEND_SIG_NOINFO:
            q->info.si_signo = sig;
            q->info.si_errno = 0;
            q->info.si_code = SI_USER;
            q->info.si_pid = task_tgid_nr_ns(current,
                            task_active_pid_ns(t));
            q->info.si_uid = from_kuid_munged(current_user_ns(), current_uid());
            break;
        case (unsigned long) SEND_SIG_PRIV:
            q->info.si_signo = sig;
            q->info.si_errno = 0;
            q->info.si_code = SI_KERNEL;
            q->info.si_pid = 0;
            q->info.si_uid = 0;
            break;
        default:
            copy_siginfo(&q->info, info);
            if (from_ancestor_ns)
                q->info.si_pid = 0;
            break;
        }

        userns_fixup_signal_uid(&q->info, t);

    } else if (!is_si_special(info)) {
        if (sig >= SIGRTMIN && info->si_code != SI_USER) {
            result = TRACE_SIGNAL_OVERFLOW_FAIL;
            ret = -EAGAIN;
            goto ret;
        } else {
            result = TRACE_SIGNAL_LOSE_INFO;
        }
    }

out_set:
    signalfd_notify(t, sig);
    sigaddset(&pending->signal, sig);
    complete_signal(sig, t, group);
ret:
    trace_signal_generate(sig, info, t, group, result);
    return ret;
}

static struct sigqueue *
__sigqueue_alloc(int sig, struct task_struct *t, gfp_t flags, int override_rlimit)
{
  struct sigqueue *q = NULL;
  struct user_struct *user;

 
 

  rcu_read_lock();
  user = get_uid(__task_cred(t)->user);
  atomic_inc(&user->sigpending);
  rcu_read_unlock();

 
 

  if (override_rlimit ||
      atomic_read(&user->sigpending) <=
        task_rlimit(t, RLIMIT_SIGPENDING)) {----------------------------------这里面说明了override_rlimit为1状况能够超出RLIMIT_SIGPENDING限制。实时信号在队列超过RLIMIT_SIGPENDING后则不容许建立信号队列。
    q = kmem_cache_alloc(sigqueue_cachep, flags);
  } else {
    print_dropped_signal(sig);---------------------------------------------------当出现丢弃信号的状况,打印信息。
  }

 
 

  if (unlikely(q == NULL)) {
    atomic_dec(&user->sigpending);
    free_uid(user);
  } else {
    INIT_LIST_HEAD(&q->list);
    q->flags = 0;
    q->user = user;
  }

 
 

  return q;
}

这里详细解释了信号驱动IO的流程;但信号产生放入pending队列以后,什么时候被处理呢?能够参考《信号什么时候被处理》。 

3. IO多路复用select()/poll()

IO多路复用容许咱们同时检查多个文件描述符,看其中任意一个是否可执行IO操做。能够在普通文件、终端、管道等字符型设备上使用select()/poll()来检查文件描述符。

下面分别对select()/poll()的API、内核流程、测试程序进行分析,而后对二者进行对比。

3.1 select()详解

 

3.1.1 select() API

select()会一直阻塞,直到一个或多个文件描述符集合成为就绪态。

#include <sys/time.h> /* For portability */
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);

参数nfds、readfds、writefds、exceptfds指定了select()要检查的文件描述符集合。

参数timeout用来设定select()阻塞的时间上限。

文件描述符集合操做

#include <sys/select.h>
void FD_ZERO(fd_set *fdset);--------------------将fdset指向的集合初始化为空。
void FD_SET(int fd, fd_set *fdset);-------------将描述符fd添加到fdset所指向的集合中。
void FD_CLR(int fd, fd_set *fdset);-------------将描述符fd从fdset指向的集合中移除。
int FD_ISSET(int fd, fd_set *fdset);------------判断描述符fd是否在fdset结合中设置。

文件描述符集合有一个最大容量限制,由常量FD_SETSIZE来决定。Linux一般为1024。

timeout参数

参数timeout控制着select()阻塞行为,NULL时select()会一直阻塞。或者指向一个timeval结构体。

struct timeval {
    time_t tv_sec; /* Seconds */
    suseconds_t tv_usec; /* Microseconds (long int) */
};

当timeout为NULL或者指向结构体字段非0时,select()阻塞直到下列事件发生:

  • readfds、writefds、exceptfds中指定的文件描述符中至少有一个成为就绪态。
  • 该调用被信号处理例程中断。
  • timeout中指定的时间上限已超时。

若是timeout非空,且一个或多个文件描述符就绪返回时,timeout所指向的结构体表示剩余超时时间。

select()返回值

-1表示有错误发生。EBADF表示有一个文件描述符是非法的;EINTR表示该调用被信号处理例程中断了。

0表示在任何文件描述符成为就绪态前select()已经超时。

正整数表示一个或多个文件描述符已达到就绪态。返回值表示就绪态的文件描述符个数。

 

3.1.2 select()内核详解

select系统调用路径是select()->core_sys_select()->do_select(),最终是遍历每一个文件句柄的poll成员,根据poll()返回的参数判断当前文件状态。

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
        fd_set __user *, exp, struct timeval __user *, tvp)
{
    struct timespec64 end_time, *to = NULL;
    struct timeval tv;
    int ret;

    if (tvp) {
        if (copy_from_user(&tv, tvp, sizeof(tv)))
            return -EFAULT;

        to = &end_time;
        if (poll_select_set_timeout(to,
                tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
                (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))------------------------------------从内核拷贝信息并转换成内核数据结构struct timespec64。
            return -EINVAL;
    }

    ret = core_sys_select(n, inp, outp, exp, to);------------------------------------------------和系统调用相似,只是to已经转变成内核时间。
    ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);------------------------------------计算end_time和当前时间的差值,并转化成tvp返回给用户空间。

    return ret;
}


int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
               fd_set __user *exp, struct timespec64 *end_time)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    size_t size, alloc_size;
    struct fdtable *fdt;
    /* Allocate small arguments on the stack to save memory and be faster */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];----------------------------------------------SELECT_STACK_ALLOC默认是256,32位系统stack_fds一共64成员。

    ret = -EINVAL;
    if (n < 0)
        goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;

    size = FDS_BYTES(n);------------------------------------------------------------------------一个文件描述符占用一bit,size表示这些fd_set须要用掉多少个字节。
    bits = stack_fds;
    if (size > sizeof(stack_fds) / 6) {---------------------------------------------------------若是stack空间足够当前存放当前strcut fd_set_bits fds的时候,优先使用stack内存,好处是更加快而且节省内存。否则就须要kmalloc去申请内存。
        /* Not enough space in on-stack array; must use kmalloc */------------------------------stack_fds大小为256字节,因此要使用stack的最大size=256/6/sizeof(long)=10。
        ret = -ENOMEM;
        if (size > (SIZE_MAX / 6))
            goto out_nofds;

        alloc_size = 6 * size;
        bits = kmalloc(alloc_size, GFP_KERNEL|__GFP_NOWARN);-------------------------------------经过kmalloc来分配6个size大小的内存;大于一个页面使用vmalloc。
        if (!bits && alloc_size > PAGE_SIZE)
            bits = vmalloc(alloc_size);

        if (!bits)
            goto out_nofds;
    }
    fds.in      = bits;--------------------------------------------------------------------------此时bits指向的内存都已经分配完毕,而且是6个一样size大小的。
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;

    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);------------------------------------------------------------------将用户空间传入的fds拷贝到fds中,并清空res_in、res_out、res_ex。

    ret = do_select(n, &fds, end_time);

    if (ret < 0)
        goto out;
    if (!ret) {
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }

    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))----------------------------------------------------------res_int/res_out/res_ex中包含了状态变化的文件句柄,拷贝到in/out/ex返回给用户空间。
        ret = -EFAULT;

out:
    if (bits != stack_fds)
        kvfree(bits);
out_nofds:
    return ret;
}

int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
    ktime_t expire, *to = NULL;
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i, timed_out = 0;
    u64 slack = 0;
    unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;---------------------------这个参数影响下面一轮查询全部文件描述符以后,才作busy loop仍是睡眠。这个很影响CPU占用率,busy loop不会主动放弃CPU,睡眠则主动放弃CPU。
    unsigned long busy_end = 0;

    rcu_read_lock();
    retval = max_select_fd(n, fds);
    rcu_read_unlock();

    if (retval < 0)
        return retval;
    n = retval;

    poll_initwait(&table);-----------------------------------------------------------------------将当前进程放入本身的等待队列table,并将该等待队列加入到测试表wait。
    wait = &table.pt;
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {-----------------------------------这也是timeout参数为0时候的特殊状况,直接timed_out置1。
        wait->_qproc = NULL;
        timed_out = 1;
    }

    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);

    retval = 0;
    for (;;) {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        bool can_busy_loop = false;

        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {--------------------------------------------遍历全部的fd。
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;

            in = *inp++; out = *outp++; ex = *exp++;---------------------------------------------先取出当前循环周期中的32个文件描述符对应的bitmaps。
            all_bits = in | out | ex;------------------------------------------------------------in/out/ex三者组合,有的fd可能监测读或写或异常,或者都检测。
            if (all_bits == 0) {-----------------------------------------------------------------32个文件描述符不检测任何状态,调到下一个循环。
                i += BITS_PER_LONG;
                continue;
            }

            for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
                struct fd f;
                if (i >= n)
                    break;
                if (!(bit & all_bits))-----------------------------------------------------------bit每次循环左移一位,若是没有监测当前为则跳过进入下一次循环。
                    continue;
                f = fdget(i);
                if (f.file) {
                    const struct file_operations *f_op;
                    f_op = f.file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op->poll) {
                        wait_key_set(wait, in, out,
                                 bit, busy_flag);-----------------------------------------------设置当前fd待检测的事件掩码。
                        mask = (*f_op->poll)(f.file, wait);-------------------------------------调用每一个文件句柄的poll成员,返回文件的状态mask。下面分别检查POLLIN_SET、POLLOUT_SET、POLLEX_SET三种状态。
                    }
                    fdput(f);
                    if ((mask & POLLIN_SET) && (in & bit)) {
                        res_in |= bit;
                        retval++;
                        wait->_qproc = NULL;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                        wait->_qproc = NULL;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                        wait->_qproc = NULL;
                    }
                    /* got something, stop busy polling */
                    if (retval) {
                        can_busy_loop = false;
                        busy_flag = 0;

                    /*
                     * only remember a returned
                     * POLL_BUSY_LOOP if we asked for it
                     */
                    } else if (busy_flag & mask)
                        can_busy_loop = true;

                }
            }
            if (res_in)--------------------------------------------------------------------------根据poll结果写回到输出位图里。
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            cond_resched();
        }
        wait->_qproc = NULL;
        if (retval || timed_out || signal_pending(current))-------------------------------------三种返回状况,retval表示有文件句柄知足条件;timed_out表示在有timeout参数的状况下超时;signal_pending()表示被信号中断。
            break;
        if (table.error) {
            retval = table.error;
            break;
        }

        if (can_busy_loop && !need_resched()) {--------------------------------------------------这里面会一直占用CPU。
            if (!busy_end) {
                busy_end = busy_loop_end_time();
                continue;
            }
            if (!busy_loop_timeout(busy_end))
                continue;
        }
        busy_flag = 0;

        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
                       to, slack))-------------------------------------------------------------进程进入睡眠等待唤醒,若是poll_schedule_timeout()返回0表示超时,下面timed_out被置1。to是根据end_time计算的超时时间。
            timed_out = 1;
    }

    poll_freewait(&table);

    return retval;
}

回顾一下select()大体流程以下:

1.进入系统调用。

2.进行时间转换;数据准备,分红in、out、exception三部分。

3.对全部文件句柄进行循环,调用对应文件句柄的poll()函数;分别查询是那种类型文件句柄有状态变化。

4.若是一轮循环下来没有变化,则进入休眠等待,直到超时。

5.若是有数据则唤醒进程,将变化句柄数返回给用户空间。返回值是三种状态的综合,具体哪一种状态哪一个文件句柄变化,须要查看分别查看返回的文件句柄。

 

3.1.3 select()测试程序

select()测试程序和poll()使用一样的内核驱动,区别就是设置文件句柄和API参数。

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <poll.h>
#include <memory.h>
#include <unistd.h>
#include <stdlib.h>

#define POLLTEST_NAME "/dev/polltest0"
#define LOOP_COUNT 3

int main(int argc, char **argv)
{
    int fd, loop_count = 0;
    unsigned char polltest_count[30];
    int ret;
    fd_set rds;
    
    memset(polltest_count, 30, 0);
    
    fd = open(POLLTEST_NAME, O_RDWR);
    if (fd < 0)
    {
        printf("can't open!\n");
    }


    FD_ZERO(&rds);
    FD_SET(fd, &rds);--------------------------------------------------------------将fd加入到rds句柄集中,在select()中对其进行监控。

    while (1)
    {
        ret = select(fd+1, &rds, NULL, NULL, NULL);
        if (ret == 0)
        {
            printf("time out\n");
        }
        else
        {
            if(FD_ISSET(fd, &rds))-------------------------------------------------判断句柄fd是否有POLLIN事件发生。
            {
                read(fd, &polltest_count, 0);
                loop_count = atoi(polltest_count);
                printf("key_val = %d\n", loop_count);
                if(loop_count >= LOOP_COUNT)
                    break;
            }
        }
    }
    
    return 0;
}

 

 

3.2 poll()详解

一样的从认识poll() API开始;而后详细分析poll系统调用内核是如何实现的;最后对poll()测试程序流程结合内核进行详细分析。

3.2.1 poll() API

poll()提供一列文件描述符,在每一个文件描述符上标明感兴趣的事件。

#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);

nfds指定了fds中的元素个数;参数fds列出了须要poll()来检查的文件描述符:

struct pollfd {
    int fd; /* File descriptor */
    short events; /* Requested events bit mask */-------指定须要为描述符fd作检查的事件。
    short revents; /* Returned events bit mask */-------表示fd上实际发生的事件。
};

 

timeout参数

timeout参数的单位是毫秒。

-1,poll()会一直阻塞直到fds数组中列出的有一个达到就绪态或者捕获到一个信号。

0,poll()不会阻塞,只是执行一次检查看看哪一个文件描述符处于就绪态。

>0,poll()至多阻塞timeout毫秒,直到fds列出的文件描述符中有一个达到就绪态,或者捕获到一个信号为止。

poll()返回值

-1标识有错误发生,一种多是被信号中断返回EINTR。

0表示调用在任意一个文件描述符就绪以前就超时了。

正整数表示一个或多个文件描述符处于就绪态。返回值表示数组fds中拥有非零revents字段的pollfd结构体数量。

 

3.2.2 poll()内核流程

 

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
        int, timeout_msecs)
{
    struct timespec64 end_time, *to = NULL;
    int ret;

    if (timeout_msecs >= 0) {-------------------------------------------从poll()参数timeout可知小于0表示poll()会一直阻塞;等于0表示只检查一次;大于0表示等待一个超时时间。
        to = &end_time;
        poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
            NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));------------根据timeout_msecs转换成内核超时数据结构to。
    }

    ret = do_sys_poll(ufds, nfds, to);

    if (ret == -EINTR) {
        struct restart_block *restart_block;

        restart_block = &current->restart_block;
        restart_block->fn = do_restart_poll;
        restart_block->poll.ufds = ufds;
        restart_block->poll.nfds = nfds;

        if (timeout_msecs >= 0) {
            restart_block->poll.tv_sec = end_time.tv_sec;
            restart_block->poll.tv_nsec = end_time.tv_nsec;
            restart_block->poll.has_timeout = 1;
        } else
            restart_block->poll.has_timeout = 0;

        ret = -ERESTART_RESTARTBLOCK;
    }
    return ret;
}

int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
        struct timespec64 *end_time)
{
    struct poll_wqueues table;
     int err = -EFAULT, fdcount, len, size;

    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    struct poll_list *const head = (struct poll_list *)stack_pps;
     struct poll_list *walk = head;
     unsigned long todo = nfds;

    if (nfds > rlimit(RLIMIT_NOFILE))-----------------------------------nfds不能超过系统对打开文件数目限制。
        return -EINVAL;

    len = min_t(unsigned int, nfds, N_STACK_PPS);-----------------------取nfds和栈能容纳的fds最小值。
    for (;;) {----------------------------------------------------------遍历全部的ufds[],分配对应的struct poll_list,而且连接起来;每一个struct poll_list里面又包含了若干个struct pollfd,他的大小经过len肯定。
        walk->next = NULL;
        walk->len = len;
        if (!len)
            break;

        if (copy_from_user(walk->entries, ufds + nfds-todo,
                    sizeof(struct pollfd) * walk->len))-----------------首先将使用栈提供的空间,将用户空间struct pollfd拷贝到内核空间。
            goto out_fds;

        todo -= walk->len;----------------------------------------------假设len==nfds,todo也等于walk->len;因此此处退出for(;;)。
        if (!todo)
            break;

        len = min(todo, POLLFD_PER_PAGE);
        size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
        walk = walk->next = kmalloc(size, GFP_KERNEL);------------------若是占空间不够存放struct pollfd,那么kmalloc()申请内存。
        if (!walk) {
            err = -ENOMEM;
            goto out_fds;
        }
    }

    poll_initwait(&table);----------------------------------------------初始化一个struct poll_wqueues。
    fdcount = do_poll(head, &table, end_time);
    poll_freewait(&table);

    for (walk = head; walk; walk = walk->next) {
        struct pollfd *fds = walk->entries;
        int j;

        for (j = 0; j < walk->len; j++, ufds++)
            if (__put_user(fds[j].revents, &ufds->revents))-------------将全部内核处理的revents返回给用户空间。
                goto out_fds;
      }

    err = fdcount;
out_fds:
    walk = head->next;
    while (walk) {
        struct poll_list *pos = walk;
        walk = walk->next;
        kfree(pos);-----------------------------------------------------释放申请的内存。
    }

    return err;
}

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
           struct timespec64 *end_time)
{
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    u64 slack = 0;
    unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;-----------------------决定下面是循环仍是睡眠等待超时。
    unsigned long busy_end = 0;

    /* Optimise the no-wait case */
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {------------------------------对timeout为0特殊状况的处理,只查询一次就退出。
        pt->_qproc = NULL;
        timed_out = 1;
    }

    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);

    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;

        for (walk = list; walk != NULL; walk = walk->next) {---------------------------------遍历全部的struct poll_list。
            struct pollfd * pfd, * pfd_end;

            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            for (; pfd != pfd_end; pfd++) {

                if (do_pollfd(pfd, pt, &can_busy_loop,
                          busy_flag)) {
                    count++;
                    pt->_qproc = NULL;
                    /* found something, stop busy polling */
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }

        pt->_qproc = NULL;
        if (!count) {
            count = wait->error;
            if (signal_pending(current))
                count = -EINTR;
        }
        if (count || timed_out)-----------------------------------------------------------------count是全部的有状态变化的描述符数量;timed_out表示是否超时。
            break;

        /* only if found POLL_BUSY_LOOP sockets && not out of time */
        if (can_busy_loop && !need_resched()) {
            if (!busy_end) {
                busy_end = busy_loop_end_time();
                continue;
            }
            if (!busy_loop_timeout(busy_end))
                continue;
        }
        busy_flag = 0;

        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,
                     bool *can_busy_poll,
                     unsigned int busy_flag)
{
    unsigned int mask;
    int fd;

    mask = 0;
    fd = pollfd->fd;------------------------------------------------------------------------------根据struct pollfd找到对应的fd,而后调用所属的poll()函数。
    if (fd >= 0) {
        struct fd f = fdget(fd);
        mask = POLLNVAL;
        if (f.file) {
            mask = DEFAULT_POLLMASK;
            if (f.file->f_op->poll) {
                pwait->_key = pollfd->events|POLLERR|POLLHUP;
                pwait->_key |= busy_flag;
                mask = f.file->f_op->poll(f.file, pwait);----------------------------------------调用具体文件的poll()函数。
                if (mask & busy_flag)
                    *can_busy_poll = true;
            }
            /* Mask out unneeded events. */
            mask &= pollfd->events | POLLERR | POLLHUP;
            fdput(f);
        }
    }
    pollfd->revents = mask;

    return mask;
}

 回顾一下,poll系统调用大体流程是:

1.进入poll系统调用。

2.进行timeout时间转换;准备每一个文件句柄对应的数据,并将这些数据串联起来。

3.对全部的struct pollfd进行循环,调用do_pollfd()查询状态。

4.do_pollfd()调用具体文件的file->f_op->poll()查询状态。

5.一次轮询以后,将当前线程挂起等待超时获唤醒。

6.有数据到达后退出循环,将数据返回给用户空间。

 

3.2.3 poll()测试程序

首先建立polltest_kernel.c和Makefile文件。

#define DEBUG
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/cdev.h>
#include <linux/device.h>
#include <linux/fs.h>
#include <linux/uaccess.h>
#include <linux/poll.h>

#define POLL_EXPRIES 1
#define POLLTEST_NAME "polltest"


static struct class *polltest_class;
static int polltest_major;


static struct hrtimer polltest_hrtimer;

static volatile int ev_press = 0;
int polltest_count = 0;
static unsigned char polltest_text[30];
DECLARE_WAIT_QUEUE_HEAD(polltest_waitq);

static enum hrtimer_restart hrtimer_func(struct hrtimer *timer)
{
    wake_up_interruptible(&polltest_waitq);--------------------------------------------------调用队列上全部等待项wait_queue_t->func()函数,这个函数主要用来唤醒等待的进程。
    printk("%s line=%d\n", __func__, __LINE__);
    ev_press = 1;----------------------------------------------------------------------------只有ev_press置位,poll()才会返回POLLIN状态。其余状况就会阻塞。
    polltest_count++;
    hrtimer_forward_now(&polltest_hrtimer, ktime_set(0, 10000000));
    return HRTIMER_RESTART;
}

static int polltest_open(struct inode *inode, struct file *file)
{
    hrtimer_init(&polltest_hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
    polltest_hrtimer.function = hrtimer_func;
    hrtimer_start(&polltest_hrtimer, ktime_set(0, 10000000), HRTIMER_MODE_REL);
    printk("%s line=%d\n", __func__, __LINE__);

    return 0;
}

static int polltest_release(struct inode *inode, struct file *file)
{
    hrtimer_cancel(&polltest_hrtimer);
    printk("%s line=%d\n", __func__, __LINE__);
    return 0;
}

static ssize_t polltest_read(struct file * file, char __user * buf,
               size_t count, loff_t *ppos)
{
    int size;

    size = snprintf(polltest_text, 30, "%d\n", polltest_count);
    copy_to_user(buf, polltest_text, size);
return size;
}

static ssize_t polltest_write(struct file * file, const char __user * buf,
                size_t count, loff_t *ppos)
{
    char str[30];
    copy_from_user(str, buf,count);
    return 0;
}

static unsigned int polltest_poll(struct file *file, struct poll_table_struct *wait)
{
    unsigned int mask = 0;

    poll_wait(file, &polltest_waitq, wait);-------------------------------------------------将当前struct poll_table_struct的wait加入到polltest_waitq队列头中。那么这个wait合适被从polltest_waitq队列头移出呢?在poll_freewait()中。
    printk("%s line=%d\n", __func__, __LINE__);
    if(ev_press)----------------------------------------------------------------------------在hrtimer超时以后,ev_press置位,才会发送POLLIN状态。
{
mask
|= POLLIN | POLLRDNORM;--------------------------------------------------------若是返回POLLIN状态,poll系统调用才会返回;不然会进入睡眠状态及poll_schedule_timeout()。
ev_press = 0;-----------------------------------------------------------------------恢复为0,避免下次poll()返回POLLIN。
}
return mask; } static const struct file_operations polltest_fops = { .owner = THIS_MODULE, .open = polltest_open, .release = polltest_release, .read = polltest_read, .write = polltest_write, .poll = polltest_poll, }; static int __init polltest_test_init(void) { struct device *polltest_device; polltest_major = register_chrdev(0, POLLTEST_NAME, &polltest_fops); if (polltest_major < 0) { pr_err("register_chrdev failed\n"); goto err; } polltest_class = class_create(THIS_MODULE, POLLTEST_NAME); if (IS_ERR(polltest_class)) { pr_err("device class file already in use\n"); goto err_class; } polltest_device = device_create(polltest_class, NULL, MKDEV(polltest_major, 0), NULL, "%s%d", POLLTEST_NAME, 0); if (IS_ERR(polltest_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(polltest_class); err_class: unregister_chrdev(polltest_major, POLLTEST_NAME); err: return 0; } static void __exit polltest_test_exit(void) { device_destroy(polltest_class, MKDEV(polltest_major, 0)); class_destroy(polltest_class); unregister_chrdev(polltest_major, POLLTEST_NAME); } module_init(polltest_test_init); module_exit(polltest_test_exit); MODULE_LICENSE("GPL");

Makefile:

CONFIG_MODULE_SIG=n

obj-m := polltest_kernel.o
KERN_DIR := /lib/modules/$(shell uname -r)/build 

PWD := $(shell pwd)
all:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules
    gcc polltest_user.c -o polltest_user
    gcc polltest_select.c -o polltest_select
clean:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean
    rm polltest_user
    rm polltest_select 
modules_install:
    $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install

 

而后建立用户空间测试程序:

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <poll.h>
#include <memory.h>
#include <unistd.h>
#include <stdlib.h>

#define POLLTEST_NAME "/dev/polltest0"
#define LOOP_COUNT 3

int main(int argc, char **argv)
{
    int fd, loop_count = 0;
    unsigned char polltest_count[30];
    int ret;
    struct pollfd fds[1];
    
    memset(polltest_count, 30, 0);
    
    fd = open(POLLTEST_NAME, O_RDWR);--------------------------------打开/dev/polltest0此时启动一个10ms的hrtimer。
    if (fd < 0)
    {
        printf("can't open!\n");
    }

    fds[0].fd = fd;
    fds[0].events = POLLIN;

    while (1)
    {
        ret = poll(fds, 1, 5000);------------------------------------调用/dev/polltest0的poll()函数。若是返回状态有POLLIN则ret表示知足状态的句柄数,应该为1;若是返回0表示超时。
        if (ret == 0)
        {
            printf("time out\n");
        }
        else
        {
            if(fds[0].revents == POLLIN)
            {
                read(fd, &polltest_count, 0);
                loop_count = atoi(polltest_count);
                printf("key_val = %d\n", loop_count);
                if(loop_count >= LOOP_COUNT)
                    break;
            }
        }
    }
    
    return 0;
}

结果以下:

[ 2366.904469] polltest_open line=41---------------------------------打开/dev/polltest0,启动10ms的hrtimer。
[ 2366.904475] polltest_poll line=78---------------------------------紧接着poll()系统调用,因为ev_press为0,没有知足POLLIN条件;因此进程进入睡眠状态。
[ 2366.914893] hrtimer_func line=29----------------------------------hrtimer 10ms超时,唤醒进程而且ev_press为1。
[ 2366.914951] polltest_poll line=78---------------------------------进程被唤醒以后,从新调用/dev/polltest0的poll()检查状态;由于此时ev_press为1,因此返回POLLIN。知足条件,poll()系统调用退出,用户空间read()且置ev_press为0。
[ 2366.915197] polltest_poll line=78---------------------------------再次调用poll()系统调用,此时ev_press为0,因此进入睡眠状态。
[ 2366.924575] hrtimer_func line=29
[ 2366.924584] polltest_poll line=78
[ 2366.924641] polltest_poll line=78
[ 2366.934632] hrtimer_func line=29
[ 2366.934683] polltest_poll line=78
[ 2366.934980] polltest_release line=49

 

 

3.3 select()/poll()流程详解

从上面的分析能够看出select()/poll()的内核实现其实大同小异,最终都是调用具体文见的poll()函数查询状态。

select()和poll()系统调用有以下几个重要的结构体:struct poll_wqueues、struct poll_table_struct、struct poll_table_page、struct poll_table_entry,其中poll()还有两个文件句柄相关结构体:struct poll_list、struct pollfd。

其中struct poll_wqueues是核心,用来统一辅助实现这个进程中全部监测fd的轮训工做。

struct poll_list {-------------------------------------------------一次poll()的poll_list可能有多个,经过next连接起来。一个poll_list里能够有多个pollfd。
    struct poll_list *next;
    int len;-------------------------------------------------------下面entries[]的数目。
    struct pollfd entries[0];
};

struct pollfd {---------------------------------------------------和用户空间一致的数据结构,表示一个poll句柄。
    int fd;
    short events;
    short revents;
};

struct poll_wqueues {
    poll_table pt;
    struct poll_table_page *table;----------------------------------指向poll_table_page类型页面,多个页面能够互相连接起来。
    struct task_struct *polling_task;-------------------------------保存当前调用select()/poll()的进程struct task_struct结构体。
    int triggered;--------------------------------------------------当前用户进程被唤醒后置位,以避免该进程接着睡眠。
    int error;------------------------------------------------------错误码。
    int inline_index;-----------------------------------------------数组inline_entries[]的下标。
    struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

  struct poll_table_page {------------------------------------------申请的物理页面都会将起始地址强制转换成该结构体指针。
      struct poll_table_page * next;---------------------------------指向下一个申请的物理页面地址。
      struct poll_table_entry * entry;-------------------------------指向entries[]中首个待分配poll_table_entry地址。
      struct poll_table_entry entries[0];----------------------------该页后面剩余的空间都是待分配的poll_table_entry结构体。
  };

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {
    poll_queue_proc _qproc;
    unsigned long _key;
} poll_table;

struct poll_table_entry {
    struct file *filp;--------------------------------------------指向特定fd对应的file结构体。
    unsigned long key;--------------------------------------------等待特定fd对应硬件设备的事件掩码,如POLLIN、POLLOUT等。
    wait_queue_t wait;--------------------------------------------表明调用select()/poll()的应用程序,等待在fd对应设备的特定时间的等待队列头上的等待队列项。
    wait_queue_head_t *wait_address;------------------------------设备驱动中特定时间的等待队列头。
};

一次select()/poll()调用对应一个strcut poll_wqueue结构体;struct poll_table_struct就是一个函数和一个key值。

一次select()/poll()可能包含一个或者多个struct poll_table_page,这个可能根据文件句柄的数量而变化。

一个fd对应一个struct poll_table_entry。

因此fd、struct poll_fd、strcut poll_table_entry是一一对应的;一次select()/poll()和struct poll_wqueues、strcut poll_table_strcut是一一对应的。

 

这两个结构体都经过poll_initwait()初始化,而后在poll_wait()的时候调用poll_queue_proc函数。

 

poll_initwait()是select()/poll()中对相关结构体进行初始化的入口;poll_wait()是驱动中实现file_operations成员poll()函数的主要功能,它将这次调用转换成一个wait_queue_t放入到驱动的等待队列中。

而后wake_up_interruptible()是唤醒全部wait_queue_head_t上的等待项,这些等待项调用对应的func()函数;默认是用来唤醒default_wake_function()。

 

void poll_initwait(struct poll_wqueues *pwq)
{
    init_poll_funcptr(&pwq->pt, __pollwait);-----------------------------------------------------------------初始化struct poll_table,函数是__pollwait。在后面poll_wait()会被调用。
    pwq->polling_task = current;-----------------------------------------------------------------------------polling_task指向当前进程结构体。
    pwq->triggered = 0;
    pwq->error = 0;
    pwq->table = NULL;
    pwq->inline_index = 0;
}

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
    pt->_qproc = qproc;
    pt->_key   = ~0UL; /* all events enabled */
}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)-------------wait_address是驱动程序提供的等待队列头,来容纳后需等待该硬件设备就绪的进程对应的等待队列项。p是系统调用传下来的struct poll_table_strcut。
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);-----------------------------------------------------------------------调用__pollwait()函数。
}

/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
                poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    struct poll_table_entry *entry = poll_get_entry(pwq);-------------------------------------------------------首先从struct poll_wqueues->inline_entries[]中获取poll_table_entry;若是不够用则经过盛情一个页面转换成struct poll_table_page类型指针。
    if (!entry)
        return;
    entry->filp = get_file(filp);
    entry->wait_address = wait_address;
    entry->key = p->_key;
    init_waitqueue_func_entry(&entry->wait, pollwake);----------------------------------------------------------等待队列项的操做函数指定为pollwake(),这个函数做用就是唤醒polling_task对应的进程。
    entry->wait.private = pwq;----------------------------------------------------------------------------------私有变量指向pwq,在__pollwake()中会使用到pwq->polling_task来唤醒对应进程。
    add_wait_queue(wait_address, &entry->wait);-----------------------------------------------------------------将poll_table_entry对应的wait加入到驱动的wait_queue_head_t上。
}

  static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
  {
      struct poll_table_page *table = p->table;

 
 

      if (p->inline_index < N_INLINE_POLL_ENTRIES)
          return p->inline_entries + p->inline_index++;

 
 

      if (!table || POLL_TABLE_FULL(table)) {
          struct poll_table_page *new_table;

 
 

          new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);--------------------------------------申请一个空闲页面,转换成struct poll_table_page类型指针;在poll_freewait()中被释放。
          if (!new_table) {
              p->error = -ENOMEM;
              return NULL;
          }
          new_table->entry = new_table->entries;
          new_table->next = table;
          p->table = new_table;
          table = new_table;
      }

 
 

      return table->entry++;
  }


static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    struct poll_table_entry *entry;

    entry = container_of(wait, struct poll_table_entry, wait);--------------------------------------------------根据wait找到struct poll_table_entry,进而得到关注的事件值key。
    if (key && !((unsigned long)key & entry->key))
        return 0;
    return __pollwake(wait, mode, sync, key);
}

static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    struct poll_wqueues *pwq = wait->private;------------------------------------------------------------------__pollwait()中设置变量,此处使用其polling_task成员。
    DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);----------------------------------------------------------定义一个临时的wait_queue_t,给下面的default_wake_functio作参数。

    smp_wmb();
    pwq->triggered = 1;

    return default_wake_function(&dummy_wait, mode, sync, key);------------------------------------------------唤醒pwq->polling_task对应的线程。
}

int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
              void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

释放poll_wqueues申请的内存,主要是struct poll_table_page对应的页面。

poll_schedule_timeout()实现超时功能。

void poll_freewait(struct poll_wqueues *pwq)
{
    struct poll_table_page * p = pwq->table;
    int i;
    for (i = 0; i < pwq->inline_index; i++)
        free_poll_entry(pwq->inline_entries + i);------------------------------------------这里将poll_table_entry->wait从其所在队列头移出。在poll_wait()中被添加进去。
    while (p) {
        struct poll_table_entry * entry;
        struct poll_table_page *old;

        entry = p->entry;
        do {
            entry--;
            free_poll_entry(entry);
        } while (entry > p->entries);
        old = p;
        p = p->next;
        free_page((unsigned long) old);
    }
}

  static void free_poll_entry(struct poll_table_entry *entry)
  {
      remove_wait_queue(entry->wait_address, &entry->wait);
      fput(entry->filp);
  }


int poll_schedule_timeout(struct poll_wqueues *pwq, int state,
              ktime_t *expires, unsigned long slack)
{
    int rc = -EINTR;

    set_current_state(state);
    if (!pwq->triggered)
        rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);-------------------调用schedule_hrtimeout_range()来完成timeout功能。
    __set_current_state(TASK_RUNNING);

    smp_store_mb(pwq->triggered, 0);

    return rc;
}

 

wake_up_interruptible()用于执行等待队列wake_queue_head_t上的全部wait_queue_t对应func函数,这里对应的就是pollwake(),即唤醒对应的进程。

#define wake_up_interruptible(x)    __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)

void __wake_up(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;

    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, 0, key);
    spin_unlock_irqrestore(&q->lock, flags);
}

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {------------------------q->task_list是全部wait_queue_head_t上wait_queue_t对应的进程链表,遍历链表上进程结构体,进而找到对应的wait_queue_t。而后调用wait_queue_t->func函数。
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)------------------------------
            break;
    }
}

  void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)------------------------------将wait_queue_t加入到wait_queue_head_t的时候经过将wait_queue_t->task_list加入到wait_queue_head_t->task_list链表上。
  {
      unsigned long flags;

 
 

      wait->flags &= ~WQ_FLAG_EXCLUSIVE;
      spin_lock_irqsave(&q->lock, flags);
      __add_wait_queue(q, wait);
      spin_unlock_irqrestore(&q->lock, flags);
  }


  static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
  {
      list_add(&new->task_list, &head->task_list);
  }

 

3.4 select()和poll()比较

select()和poll()基于一样的内核机制poll_wqueues,不一样点在于select()区分读写异常句柄等。

3.4.1 select()和poll()共同点

select()和poll()使用了相同的内核poll例程集合,

3.4.2 select()和poll()的区别

  • select()检查的文件描述符数量有一个限制(FD_SETSIZE),默认是1024;而poll()对于被检查的文件描述符则没有限制。
  • select()参数fd_set同时也保存结果,因此每次重复使用前必须从新初始化fd_set;poll()经过独立字段events和revents来区分,则不须要每次都从新初始化参数。
  • select()提供微秒级的超时精度,poll()只提供毫秒级的超时精度。
  • 历史上,select()比poll()移植性更好。
  • 当检查范围较小时或者待检查大量文件描述符分布很密集,select()和poll()二者性能类似。
  • 若是被检查文件描述符分布很稀疏,且只有一个或几个要被检查,poll()性能表现优于select()。

3.4.3 select()和poll()的局限性

当select()和poll()用来检查大量文件描述符时,可能会遇到一些问题。

  • 检查全部文件描述符耗时大:每次调用select()或poll()内核都必须检查全部被指定的文件描述符,当检查大量处于密集范围内的文件描述符时,该操做耗费的时间将大大超过接下来的操做。
  • 用户<->内核数据来回拷贝耗时大:每次调用select()或poll()程序都必须传递一个表示全部须要被检查文件描述符的数据结构到内核,内核检查事后再传回给程序。从用户空间和内核空间来回拷贝这些数据将占用大量的CPU时间。
  • 返回数据检查全部数据:select()或poll()调用完成后,必须检查返回数据结构中的每一个元素,以此查明哪一个文件描述符处于就绪态。
  • 程序重复使用这些文件描述符集合,可是内核并不会在每次调用成功后记录他们。

如上种种特性形成select()或poll()在性能延展性上要低于信号驱动IO和epoll()。

4. epoll

 epoll API有如下优势:

  • 当检查大量文件描述符时,epoll性能延展性比select()和poll()高不少。
  • epoll API既支持水平触发也支持边缘触发。select/poll只支持水平触发,而信号驱动IO只支持边缘触发。

性能表现上,epoll通讯号驱动IO类似。但epoll有一些赛过信号驱动IO:

  • 能够避免复杂的信号处理流程。
  • 灵活性高,能够指定咱们但愿检查的事件类型。

 

4.1 epoll APIs

epoll API由如下三个系统调用组成:

epoll_create()建立一个epoll实例,返回表明该实例的文件描述符。

epoll_ctl()操做同epoll实例相关联的兴趣列表。经过epoll_ctl()能够增长到新的描述符到列表中,将已有的文件描述符从该列表中移除,以及修改表明文件描述符上时间类型的掩码。

epoll_wait()返回与epoll实例相关联的就绪列表中的成员。

#include <sys/epoll.h>
int epoll_create(int size);

epoll_create()建立了一个新的epoll实例,其对应的兴趣列表初始化为空。

参数size指定了想要经过epoll实例老检查的文件描述符个数。Linux 2.6.8以来,size参数被忽略不用。

返回值表明新建立的epoll实例的文件描述符,这个文件描述符在其余几个epoll系统调用中用来表示epoll实例。

当文件描述符再也不须要时,能够经过close()关闭。

struct epoll_event {
    uint32_t events; /* epoll events (bit mask) */
    epoll_data_t data; /* User data */
};

typedef union epoll_data {
    void *ptr; /* Pointer to user-defined data */
    int fd; /* File descriptor */
    uint32_t u32; /* 32-bit integer */
    uint64_t u64; /* 64-bit integer */
} epoll_data_t;

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

参数epfd是epoll_create()建立的文件描述符。

参数fd指明了要修改兴趣列表中的哪个文件描述符的设定。

参数op用来指定须要执行的操做,能够是:

  • EPOLL_CTL_ADD:将描述符fd添加到epoll实例中的兴趣列表中去。
  • EPOLL_CTL_MOD:修改描述符fd上设定的事件,须要用到由ev所指向的结构体中的信息。
  • EPOLL_CTL_DEL:将描述符fd从epfd的兴趣列表中移除。

参数ev为文件描述符fd所作的设置以下:

  • 结构体epoll_event中的events字段是一个位掩码,指定了咱们为待检查的描述符fd上所感兴趣的事件集合。
  • data字段是一个联合体,当描述符fd稍后成为就绪态时,联合体成员可用来指定船会给调用进程的信息。

 max_user_watches上限

每一个注册到epoll实例上的文件描述符都占用一小段不能被交换的内核交换空间。

内核提供了一个接口用来定义每一个用户能够注册到epoll实例上的文件描述符总数,这个上限值能够经过max_user_watches来修改,这个文件在/proc/sys/fs/epoll/max_user_watches。

 

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

参数evlist指向的结构体数组中返回的是有关就绪态文件描述符的信息。数组evlist的空间由调用者负责申请,所包含的元素个数在参数maxevents中指定。

数组evlist中,每一个元素返回的都是单个就绪态文件描述符的信息。events字段返回了在该描述符上已经发生的事件掩码。data字段返回的是使用epoll_ctl()注册感兴趣的事件时在ev.data中所指定的值。

参数timeout用来肯定epoll_wait()的阻塞行为:

  • 若是timeout为-1,调用将一直阻塞,直到兴趣列表中的文件描述符有事件产生,或者知道捕获到一个信号为止。
  • 若是timeout为0,执行一次非阻塞式检查,看兴趣列表中文件描述符产生了哪一个事件。
  • 若是timeout大于0,调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止。

调用成功后epoll_wait()返回数组evlist中的元素个数,若是在timeout超时间隔内没有任何文件描述符处于就绪态,返回0.

出错时返回-1,并在errno总设定错误码以表示错误缘由。

epoll事件

调用epoll_ctl()时能够在ev.events中指定位掩码以及由epoll_wait()返回的evlist[].events中给出。

位掩码 epoll_ctl()输入 epoll_wait()返回 描述
EPOLLIN 可读取非高优先级的数据
EPOLLPRI 可读取高优先级的数据
EPOLLRDHUP  
EPOLLOUT 普通数据可写
EPOLLET   采用边缘触发事件通知
EPOLLONESHOT   在完成事件通知以后禁用检查
EPOLLERR   有错误发生
EPOLLHUP   出现挂断

 

EPOLLONESHOT标志

一旦经过epoll_ctl()的EPOLL_CTL_ADD将文件描述符添加到epoll实例兴趣列表中,它会保持激活状态,直到显式地经过EPOLL_CTL_DEL操做将其从列表中移除。

若是咱们但愿在某个特定的文件描述符上只获得一次通知,那么能够在ev.events中指定EPOLLONESHOT标志。

4.2 epoll测试程序

epoll API测试程序内核部分能够和select/poll共用,用户空间测试程序以下。

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>

#define POLLTEST_NAME "/dev/polltest0"
#define LOOP_COUNT 3
#define MAX_EVENTS 5


void main(void)
{
    int epfd, i;
    struct epoll_event ev;
    struct epoll_event evlist[MAX_EVENTS];
    int fd, loop_count = 0;
    unsigned char polltest_count[30];
    int ret;
    

    fd = open(POLLTEST_NAME, O_RDWR);
    if (fd < 0)
    {
        printf("%s() %s %s\n", __func__, strerror(errno), POLLTEST_NAME);
        return;
    }

    epfd = epoll_create(MAX_EVENTS);---------------------------------------------里面的size其实已经不重要。
    if(epfd == -1)
    {
        printf("%s() %s\n", __func__, strerror(errno));
        return;
    }
    
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1)----------------------------增长监控fd文件句柄上EPOLLIN事件。
    {
        printf("%s() %s\n", __func__, strerror(errno));
        return;
    }

    while(1)
    {
        ret = epoll_wait(epfd, evlist, MAX_EVENTS, -1);--------------------------在epfd上等待,直到fd上有事件发生。
        if (ret == 0)
        {
            printf("time out\n");
        }
        else if(ret == -1)
        {
            printf("%s() %s\n", __func__, strerror(errno));
        }
        else
        {
            for(i = 0; i < ret; i++)
            {
                if(evlist[i].events & EPOLLIN)-----------------------------------检查事件类型,并从fd中读取信息。
                {
                    read(fd, &polltest_count, 0);
                    loop_count = atoi(polltest_count);
                    printf("epoll key_val = %d\n", loop_count);
                    if(loop_count >= LOOP_COUNT)
                        return;
                }
            }
        }
    }
}

 

4.3 epoll API内核解析

 

4.3.1 epoll_create()

 

struct eventpoll {
    spinlock_t lock;
    struct mutex mtx;
    /* Wait queue used by sys_epoll_wait() */
    wait_queue_head_t wq;-----------------------------后面epoll_wait()使用的等待队列头。
    /* Wait queue used by file->poll() */
    wait_queue_head_t poll_wait;----------------------file->poll()使用的等待队列头。
    /* List of ready file descriptors */
    struct list_head rdllist;-------------------------准备就绪的文件描述符链表,在epoll_wait()中返回给用户空间。
    /* RB tree root used to store monitored fd structs */
    struct rb_root rbr;
    struct epitem *ovflist;
    struct wakeup_source *ws;
    struct user_struct *user;
    struct file *file;
    int visited;
    struct list_head visited_list_link;
};

struct epitem {
    union {
        struct rb_node rbn;
        struct rcu_head rcu;
    };
    /* List header used to link this structure to the eventpoll ready list */
    struct list_head rdllink;
    struct epitem *next;
    struct epoll_filefd ffd;
    int nwait;
    struct list_head pwqlist;
    struct eventpoll *ep;
    struct list_head fllink;
    struct wakeup_source __rcu *ws;

    /* The structure that describe the interested events and the source fd */
    struct epoll_event event;
};

struct epoll_filefd {
    struct file *file;
    int fd;
} __packed;

 

 

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;

    /* Check the EPOLL_* constant for consistency.  */
    BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;
    /*
     * Create the internal data structure ("struct eventpoll").
     */
    error = ep_alloc(&ep);----------------------------------------------分配一个struct eventpoll结构体,而且初始化。
    if (error < 0)
        return error;
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure and a free file descriptor.
     */
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));-------------在当前进程中,寻找一个空闲的文件描述符并返回;同时对文件设置O_RDWR等标志。
    if (fd < 0) {
        error = fd;
        goto out_free_ep;
    }
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));-------------------------建立一个匿名inode文件。
    if (IS_ERR(file)) {
        error = PTR_ERR(file);
        goto out_free_fd;
    }
    ep->file = file;----------------------------------------------------将struct eventpoll和struct file关联起来。
    fd_install(fd, file);-----------------------------------------------将fd和file在current->files->fdt中关联起来。
    return fd;

out_free_fd:
    put_unused_fd(fd);
out_free_ep:
    ep_free(ep);
    return error;
}

SYSCALL_DEFINE1(epoll_create, int, size)
{
    if (size <= 0)
        return -EINVAL;

    return sys_epoll_create1(0);
}

static int ep_alloc(struct eventpoll **pep)
{
    int error;
    struct user_struct *user;
    struct eventpoll *ep;

    user = get_current_user();
    error = -ENOMEM;
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);-----------------------分配一个struct eventpoll结构体。
    if (unlikely(!ep))
        goto free_uid;

    spin_lock_init(&ep->lock);-----------------------------------初始化自旋锁、等待队列、红黑树等。
    mutex_init(&ep->mtx);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT;
    ep->ovflist = EP_UNACTIVE_PTR;
    ep->user = user;

    *pep = ep;

    return 0;

free_uid:
    free_uid(user);
    return error;
}

int get_unused_fd_flags(unsigned flags)
{
    return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags);----------current->files表示当前进程打开文件相关信息,经过RLIMIT_NOFILE能够获取系统最大打开文件数目。
}

int __alloc_fd(struct files_struct *files,
           unsigned start, unsigned end, unsigned flags)------------------------__alloc_fd()在start和end之间,寻找一个合适的fd返回。
{
    unsigned int fd;
    int error;
    struct fdtable *fdt;

    spin_lock(&files->file_lock);
repeat:
    fdt = files_fdtable(files);
    fd = start;
    if (fd < files->next_fd)
        fd = files->next_fd;

    if (fd < fdt->max_fds)
        fd = find_next_fd(fdt, fd);

    error = -EMFILE;
    if (fd >= end)
        goto out;

    error = expand_files(files, fd);
    if (error < 0)
        goto out;

    if (error)
        goto repeat;

    if (start <= files->next_fd)
        files->next_fd = fd + 1;

    __set_open_fd(fd, fdt);-----------------------------------------------------将fd设置为打开状态。
    if (flags & O_CLOEXEC)
        __set_close_on_exec(fd, fdt);
    else
        __clear_close_on_exec(fd, fdt);
    error = fd;
...
out:
    spin_unlock(&files->file_lock);
    return error;
}

系统调用epoll_create()就是进程在内核中建立了一个从epoll文件描述符到struct event结构的通道;首先调用ep_alloc()分配一个struct eventpoll数据结构,并初始化;而后寻找一个空闲的文件描述符,并建立一个匿名文件inode;最后将二者在进程task_struct结构体中关联起来。

最终返回打开的文件描述符。

后面的epoll_ctl()和epoll_wait()均可以经过此文件描述符关联到内核中的struct eventpoll结构体。

4.3.2 epoll_ctl()

 

struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};

typedef struct poll_table_struct {
    poll_queue_proc _qproc;
    unsigned long _key;
} poll_table;

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

 

 

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    int error;
    int full_check = 0;
    struct fd f, tf;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    struct eventpoll *tep = NULL;

    error = -EFAULT;
    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))--------------------将用户空间的event拷贝到内核的epds中。
        goto error_return;

    error = -EBADF;
    f = fdget(epfd);-----------------------------------------------------------------根据epfd找到对应的struct file结构体。
    if (!f.file)
        goto error_return;

    tf = fdget(fd);------------------------------------------------------------------获取目标文件的struct file结构体。
    if (!tf.file)
        goto error_fput;

    /* The target file descriptor must support poll */
    error = -EPERM;
    if (!tf.file->f_op->poll)
        goto error_tgt_fput;

    /* Check if EPOLLWAKEUP is allowed */
    if (ep_op_has_event(op))
        ep_take_care_of_epollwakeup(&epds);

    error = -EINVAL;
    if (f.file == tf.file || !is_file_epoll(f.file))
        goto error_tgt_fput;

    if (epds.events & EPOLLEXCLUSIVE) {
        if (op == EPOLL_CTL_MOD)
            goto error_tgt_fput;
        if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
                (epds.events & ~EPOLLEXCLUSIVE_OK_BITS)))
            goto error_tgt_fput;
    }

    ep = f.file->private_data;

    mutex_lock_nested(&ep->mtx, 0);
    if (op == EPOLL_CTL_ADD) {
        if (!list_empty(&f.file->f_ep_links) ||
                        is_file_epoll(tf.file)) {
            full_check = 1;
            mutex_unlock(&ep->mtx);
            mutex_lock(&epmutex);
            if (is_file_epoll(tf.file)) {
                error = -ELOOP;
                if (ep_loop_check(ep, tf.file) != 0) {
                    clear_tfile_check_list();
                    goto error_tgt_fput;
                }
            } else
                list_add(&tf.file->f_tfile_llink,
                            &tfile_check_list);
            mutex_lock_nested(&ep->mtx, 0);
            if (is_file_epoll(tf.file)) {
                tep = tf.file->private_data;
                mutex_lock_nested(&tep->mtx, 1);
            }
        }
    }

    epi = ep_find(ep, tf.file, fd);-------------------------------------------------在ep->rbr红黑树中查找file对应的文件,若是没有找到返回NULL。

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {-----------------------------------------------------------------若是以前ep_find()没有找到对应的epi,那么此处插入到ep->rbr红黑树中。
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tf.file, fd, full_check);
        } else
            error = -EEXIST;
        if (full_check)
            clear_tfile_check_list();
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);---------------------------------------------在ep_find()找到epi的状况下,将其从ep->rbr中移除。
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                epds.events |= POLLERR | POLLHUP;
                error = ep_modify(ep, epi, &epds);----------------------------------修改epi的事件类型。
            }
        } else
            error = -ENOENT;
        break;
    }
    if (tep != NULL)
        mutex_unlock(&tep->mtx);
    mutex_unlock(&ep->mtx);
...
    return error;
}
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
             struct file *tfile, int fd, int full_check)
{
    int error, revents, pwake = 0;
    unsigned long flags;
    long user_watches;
    struct epitem *epi;
    struct ep_pqueue epq;

    user_watches = atomic_long_read(&ep->user->epoll_watches);
    if (unlikely(user_watches >= max_user_watches))
        return -ENOSPC;
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))-------------------------从slab中分配缓存struct epitem结构体空间。
        return -ENOMEM;

    /* Item initialization follow here ... */
    INIT_LIST_HEAD(&epi->rdllink);------------------------------------------------初始化epi数据结构。
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;
    if (epi->event.events & EPOLLWAKEUP) {
        error = ep_create_wakeup_source(epi);
        if (error)
            goto error_create_wakeup_source;
    } else {
        RCU_INIT_POINTER(epi->ws, NULL);
    }

    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);-----------------------------将epq.pt->_qproc指向ep_ptable_queue_proc()。设置后面poll()等待队列被唤醒的时候,将要调用到的函数。

    revents = ep_item_poll(epi, &epq.pt);-----------------------------------------调用epi对应文件的poll()函数,插入到poll()的等待队列中。;返回的事件类型时用户空间关心事件类型的交集。

    error = -ENOMEM;
    if (epi->nwait < 0)
        goto error_unregister;

    spin_lock(&tfile->f_lock);
    list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
    spin_unlock(&tfile->f_lock);

    ep_rbtree_insert(ep, epi);----------------------------------------------------将新的epi插入到ep->rbr的红黑树中。
...
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 0;
...
    return error;
}

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;---------------------------------------------------------struct epoll_entry主要完成struct epitem和和epitem事件发生时的callback函数之间的关联。

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {---------申请一个struct epoll_entry结构体缓存。
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);----------------------初始化等待队列函数的入口,也就是poll醒来时要调用的回调函数。
        pwq->whead = whead;
        pwq->base = epi;
        if (epi->event.events & EPOLLEXCLUSIVE)
            add_wait_queue_exclusive(whead, &pwq->wait);
        else
            add_wait_queue(whead, &pwq->wait);---------------------------------------将pwq->wait加入到等待队列中。
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)---主要功能试在被监视的文件等待时间就绪时,将文件对应的epitem实例添加到就绪列表中,当用户调用epoll_wait()时,内核会将就绪队列中的时间报告给用户。
{
    int pwake = 0;
    unsigned long flags;
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;
    int ewake = 0;

    if ((unsigned long)key & POLLFREE) {
        ep_pwq_from_wait(wait)->whead = NULL;
        list_del_init(&wait->task_list);
    }
    spin_lock_irqsave(&ep->lock, flags);
    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;
    if (key && !((unsigned long) key & epi->event.events))
        goto out_unlock;
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
            if (epi->ws) {
                __pm_stay_awake(ep->ws);
            }

        }
        goto out_unlock;
    }
    if (!ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake_rcu(epi);
    }
    if (waitqueue_active(&ep->wq)) {
        if ((epi->event.events & EPOLLEXCLUSIVE) &&
                    !((unsigned long)key & POLLFREE)) {
            switch ((unsigned long)key & EPOLLINOUT_BITS) {
            case POLLIN:
                if (epi->event.events & POLLIN)
                    ewake = 1;
                break;
            case POLLOUT:
                if (epi->event.events & POLLOUT)
                    ewake = 1;
                break;
            case 0:
                ewake = 1;
                break;
            }
        }
        wake_up_locked(&ep->wq);
    }
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
...
    return 1;
}

static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)
{
    int pwake = 0;
    unsigned int revents;
    poll_table pt;

    init_poll_funcptr(&pt, NULL);

    epi->event.events = event->events; /* need barrier below */
    epi->event.data = event->data; /* protected by mtx */
    if (epi->event.events & EPOLLWAKEUP) {
        if (!ep_has_wakeup_source(epi))
            ep_create_wakeup_source(epi);
    } else if (ep_has_wakeup_source(epi)) {
        ep_destroy_wakeup_source(epi);
    }
    smp_mb();

    revents = ep_item_poll(epi, &pt);

    if (revents & event->events) {
        spin_lock_irq(&ep->lock);
        if (!ep_is_linked(&epi->rdllink)) {
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ep_pm_stay_awake(epi);

            /* Notify waiting tasks that events are available */
            if (waitqueue_active(&ep->wq))
                wake_up_locked(&ep->wq);
            if (waitqueue_active(&ep->poll_wait))
                pwake++;
        }
        spin_unlock_irq(&ep->lock);
    }

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 0;
}

static int ep_remove(struct eventpoll *ep, struct epitem *epi)-------------------------------------将epi从当前ep中移除。
{
    unsigned long flags;
    struct file *file = epi->ffd.file;

    ep_unregister_pollwait(ep, epi);

    spin_lock(&file->f_lock);
    list_del_rcu(&epi->fllink);
    spin_unlock(&file->f_lock);

    rb_erase(&epi->rbn, &ep->rbr);

    spin_lock_irqsave(&ep->lock, flags);
    if (ep_is_linked(&epi->rdllink))
        list_del_init(&epi->rdllink);
    spin_unlock_irqrestore(&ep->lock, flags);

    wakeup_source_unregister(ep_wakeup_source(epi));

    call_rcu(&epi->rcu, epi_rcu_free);

    atomic_long_dec(&ep->user->epoll_watches);

    return 0;
}

epoll_ctl() 函数首先就分配空间, 将结构从用户空间复制到内核空间中, 在进行方法(op)判断以前, 先采用ep_find函数进行查找, 以确保该数据已经设置好回调函数了, 而后使用fget函数获取该epoll的匿名文件的文件描述符, 最后进行方法(op)判断, 肯定是EPOLL_CTL_ADD, EPOLL_CTL_MOD仍是 EPOLL_CTL_DEL。

这里主要讲的是EPOLL_CTL_ADD, 因此当是选择加入时, 就调用ep_insert函数, 将回调函数设置为ep_ptable_queue_proc函数, 也就是将消息到达后, 须要自动启动ep_ptable_proc函数, 进而调用ep_poll_callback函数, 该函数就是把来的消息所对应的结构和文件信息加入到就绪链表中, 以便以后调用 epoll_wait 能够直接从就绪队列链表中夺得就绪的文件. 也正是这样, epoll的回调函数使epoll不用每次都轮询遍历数据, 而是自动唤醒回调, 更加的高效. 而且回调函数也只是在进程加入的时侯才设置, 并且只设置一次.

 

4.3.3 epoll_wait()

 

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct fd f;
    struct eventpoll *ep;

    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)----------------------------------判断maxevents合法性。
        return -EINVAL;
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
        return -EFAULT;
    f = fdget(epfd);------------------------------------------------------------------获取epoll_create()函数建立的匿名文件描述符。
    if (!f.file)
        return -EBADF;

    error = -EINVAL;
    if (!is_file_epoll(f.file))-------------------------------------------------------经过判断f.file->f_op是否为eventpoll_fops来肯定是否为epoll文件。
        goto error_fput;

    ep = f.file->private_data;--------------------------------------------------------私有数据指向struct eventpoll数据结构。

    error = ep_poll(ep, events, maxevents, timeout);----------------------------------等待消息到来;没有消息到来就阻塞本身。

error_fput:
    fdput(f);
    return error;
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;
    unsigned long flags;
    u64 slack = 0;
    wait_queue_t wait;
    ktime_t expires, *to = NULL;

    if (timeout > 0) {----------------------------------------------------------------将用户空间传入的timeout事件转换成内核超时时间。
        struct timespec64 end_time = ep_set_mstimeout(timeout);

        slack = select_estimate_accuracy(&end_time);
        to = &expires;
        *to = timespec64_to_ktime(end_time);
    } else if (timeout == 0) {

        timed_out = 1;
        spin_lock_irqsave(&ep->lock, flags);
        goto check_events;
    }

fetch_events:
    spin_lock_irqsave(&ep->lock, flags);

    if (!ep_events_available(ep)) {
        init_waitqueue_entry(&wait, current);-----------------------------------------初始化一个等待队列入口,并将其添加到等待队列上。
        __add_wait_queue_exclusive(&ep->wq, &wait);

        for (;;) {
            set_current_state(TASK_INTERRUPTIBLE);------------------------------------进程设置为可中断的。
            if (ep_events_available(ep) || timed_out)---------------------------------退出的条件是是否有ready事件、是否超时、是否有信号中断,三者任一则退出循环。
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))---------------时间达到以前,让出cpu调用其余进程;超时后,从新经过中断回调该进程。
                timed_out = 1;

            spin_lock_irqsave(&ep->lock, flags);
        }

        __remove_wait_queue(&ep->wq, &wait);
        __set_current_state(TASK_RUNNING);
    }
check_events:
    eavail = ep_events_available(ep);
    spin_unlock_irqrestore(&ep->lock, flags);
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && !timed_out)-----------------若是中间没有被信号中断,而且ep->rdlist不为空,则调用ep_send_events()给用户空间发送消息。
        goto fetch_events;
    return res;
}

static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;

    return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}


static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                   void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;
    struct wakeup_source *ws;
    poll_table pt;

    init_poll_funcptr(&pt, NULL);

    for (eventcnt = 0, uevent = esed->events;
         !list_empty(head) && eventcnt < esed->maxevents;) {---------------------------从就绪队列去除一个个epi,进行处理;并将相关事件返回给用户空间。
        epi = list_first_entry(head, struct epitem, rdllink);--------------------------取出第一个消息数据对应的struct epitem结构,而后从就绪列表上移除。

        ws = ep_wakeup_source(epi);
        if (ws) {
            if (ws->active)
                __pm_stay_awake(ep->ws);
            __pm_relax(ws);
        }

        list_del_init(&epi->rdllink);

        revents = ep_item_poll(epi, &pt);----------------------------------------------调用epi对应文件描述符的poll()函数,返回值是。

        if (revents) {
            if (__put_user(revents, &uevent->events) ||
                __put_user(epi->event.data, &uevent->data)) {--------------------------将revents和event.data发送到用户空间。
                list_add(&epi->rdllink, head);
                ep_pm_stay_awake(epi);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;----------------------------------------------------------------返回给用户空间的参数eventcnt递增,uevent也递增。
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {

                list_add_tail(&epi->rdllink, &ep->rdllist);
                ep_pm_stay_awake(epi);
            }
        }
    }

    return eventcnt;
}

得到匿名文件的文件指针, 在经过调用ep_poll函数, 进行时间片的设置, 在时间片结束后就绪队列为空, 就退出等待; 若是时间设置的是负数, ep_poll函数会调用schedule_timeout, 执行进程调度, 当设置的时间片结束后又继续回到ep_poll进程 查看就绪队列是否为空, 为空的话就继续cinching调度, 此时wait又变成阻塞态; 当就绪队列准备好后, 就退出进程调度, 执行ep_send_events函数, 主要是为了将就绪队列的链表从内核空间发送给用户空间。

ep_send_events函数, 先将就绪链表rdllist复制到另外一个新的链表, 从新将就绪链表清零, 而后程序调用__put_user将新链表的数据发送给用户空间, 同时, 发送的个数吉拉路下来, 以便函数的返回 , 可是, 若是在发送的时候又有就绪信号到来, 就未来的就绪信号保存在ovflist链表中, 最后又从新数据拷贝给rdllist中, 再重复执行ep_send_events函数。

从对epoll的分析也能够看出,为何性能要优于select()/poll()。

  • epoll使用了三个API来达到select()/poll()一样的功能,epoll将每次查询没必要要重复的部分和须要重复的部分区分开来。下降了没必要要的开销。
  • 每次调用select()/poll()时,内核必须检查全部在调用中指定的文件描述符。相反,经过epoll_ctl()制定了须要监视的文件描述符时,内核会在与打开的文件描述符上下文相关联的列表中记录该描述符。以后每当执行IO操做使得文件描述符成为就绪态时,内核就在epoll描述就绪列表中添加一个元素。以后的epoll_wait()就从就绪列表中简单地去除这些元素。
  • 每次调用select()/poll()时,传递一个标记了全部待监视的文件描述符的数据结构给内核,调用返回时,内核将全部标记为就绪态的文件描述符的数据结构在传回到用户空间。相反,在epoll中使用epoll_ctl()在内核空间中创建一个数据结构,该数据结构会将待监视的文件描述符都记录下来。一旦这个数据机构创建完成,稍后每次调用epoll_wait()时就不须要再传递任何与文件描述符有关的信息给内核了,而调用返回的信息中只包含那些已经处于就绪态的描述符。

 

参考文档:

select(poll)系统调用实现解析(一)》《select(poll)系统调用实现解析(二)》《select(poll)系统调用实现解析(三)》《epoll源码分析(一)》《epoll源码分析(二)》《epoll源码分析(三)》《Linux epoll模型详解及源码分析》《epoll源码实现分析[整理]》《Linux下的I/O复用与epoll详解》《epoll源码分析(全)》《epoll内核源码详解+本身总结的流程