深入剖析 Netty 源码设计(一)——深入理解 select poll epoll 机制



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

前言

打算输出一系列Netty源码分析与实践的文章,也作为后端开发学习过程中的沉淀,此文章为第一篇,从操作系统底层的 IO 讲起,为Netty 的出场做下知识准备。

一些概念

文件描述符

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。读写文件也需要使用文件描述符来指定待读写的文件。

I/O 模型分类

根据 UNIX 网络编程对 I/O 模型的分类,UNIX 提供了 5 种 IO 模型:

阻塞 I/O:

3e7cbb471b074d6eab593677bd02125f.png

在进程空间中调用 recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间会一直被阻塞

非阻塞 I/O

b7999f8b9a294138857459ddb773ef91.png

recvfrom 从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个 EWOULDBLOCK 错误, 一般都对非阻塞 I/O 模型进行轮询检查这个状态,看内核是不是有数据到来。

I/O 复用模型

bed5b684bd4243518bccdfe705640696.png
将一个或者多个 fd 通过 select/poll 调用, 然后阻塞在 select 上,一个进程用来监控多个 fd,即所谓的 I/O 复用,细节后面会描述。

信号驱动 I/O 模型

f38f312103934e9da15c0ffbf53d6a34.png
首先开启套接字信号驱动 I/O 功能,并通过系统调用 sigaction 执行一个信号处理函数并立即返回(非阻塞)。当数据就绪时,就为该进程生成一个 SIGIO 信号,通过信号回调通知应用程序调用 recvfrom 来读取数据。

异步 I/O

797e656e11ed464c8dfdda6c4fb20ef5.png
告知内核启动某个操作, 内核会将数据从内核态自己 copy 到用户态缓冲区,然后通知我们。其和信号驱动模型的主要区别是: 信号驱动 I/O 由内核通知我们何时可以开始一个 I/O 操作;异步 I/O 模型由内核通知我们 I/O 操作何时已经完成。

Linux 2.6+ 内核的 wakeup callback 机制

Linux 通过socket的睡眠队列(sleep_list)来管理所有等待socket的某个事件的进程(task), selectpollepoll_wait 函数操作会陷入内核,判断监控的 socket 是否有关心的事件发生了,如果没,则为当前task构建一个wait_entry节点,然后插入到每个 socket 的sleep_list里,直到超时或事件发生,同时通过wakeup机制来异步唤醒整个睡眠队列上等待事件的task,通知task相关事件发生, 每一个sleep_list上的wait_entry都拥有一个callbackwakeup逻辑在唤醒睡眠队列时,会遍历该队列链表上的每一个wait_entry,直到完成队列的遍历或遇到某个wait_entry节点是排他的才停止,调用每一个wait_entrycallback,并将当前taskwait_entry节点从 socket 的sleep_list中删除。

select

select 是一种同步 IO,函数签名如下:

int  select(int  nfds,  fd_set  *readfds,  fd_set  *writefds,  fd_set  *exceptfds,  struct  timeval  *timeout);
  • nfds 为最大的文件描述符值 +1
  • readfds 某些文件描述符所指向的 socket 已经有数据可读或者数据 EOF
  • writefds 某些文件描述符所指向的 socket 是否可写数据了
  • exceptfds 某些文件描述符所指向的 socket 出现异常

使用示例:

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <wait.h>
#include <signal.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>

#define MAXBUF 256

void child_process(void)
{
  sleep(2);
  char msg[MAXBUF];
  struct sockaddr_in addr = {0};
  int n, sockfd,num=1;
  srandom(getpid());
  /* Create socket and connect to server */
  sockfd = socket(AF_INET, SOCK_STREAM, 0);
  addr.sin_family = AF_INET;
  addr.sin_port = htons(2000);
  addr.sin_addr.s_addr = inet_addr("127.0.0.1");

  connect(sockfd, (struct sockaddr*)&addr, sizeof(addr));

  printf("child {%d} connected \n", getpid());
  while(1){
        int sl = (random() % 10 ) +  1;
        num++;
     	sleep(sl);
  	sprintf (msg, "Test message %d from client %d", num, getpid());
  	n = write(sockfd, msg, strlen(msg));	/* Send message */
  }

}

int main()
{
  char buffer[MAXBUF];
  int fds[5];
  struct sockaddr_in addr;
  struct sockaddr_in client;
  int addrlen, n,i,max=0;;
  int sockfd, commfd;
  fd_set rset;
  
  //创建了5个子进程, 每个进程都向server发送了数据

  for(i=0;i<5;i++)
  {
  	if(fork() == 0)
  	{
  		child_process();
  		exit(0);
  	}
  }

  sockfd = socket(AF_INET, SOCK_STREAM, 0);
  memset(&addr, 0, sizeof (addr));
  addr.sin_family = AF_INET;
  addr.sin_port = htons(2000);
  addr.sin_addr.s_addr = INADDR_ANY;
  bind(sockfd,(struct sockaddr*)&addr ,sizeof(addr));
  listen (sockfd, 5); ///告诉内核服务端的一些信息 连接队列个数为5,大于5个socket连接,会出现延时
  
  for (i=0;i<5;i++) 
  {
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    fds[i] = accept(sockfd,(struct sockaddr*)&client, &addrlen);
	
	//保留最大的 文件描述符值
    if(fds[i] > max)
    	max = fds[i];
  }
  
  while(1){ 
  //将文件描述符数组每一位全都置为0
	FD_ZERO(&rset);
	//每次while循环都要重新设置要监控的socket
  	for (i = 0; i< 5; i++ ) {
  		FD_SET(fds[i],&rset);
  	}

   	puts("round again");
	//一直阻塞直到有读事件已ready
	select(max+1, &rset, NULL, NULL, NULL);

	for(i=0;i<5;i++) {
	//循环判断是哪个socket可读
		if (FD_ISSET(fds[i], &rset)){
			memset(buffer,0,MAXBUF);
			read(fds[i], buffer, MAXBUF);
			puts(buffer);
		}
	}	
  }
  return 0;
}

为了要高效的处理网络 IO 数据,不可能为每个socket 创建一个进程task,进程创建是一种高昂的性能损耗,所以采用一个task来监控多个socket,但这一个task不可能去阻塞式的监控某一个 socket 的事件发生,我们应该 block 在关心的 N 个 socket 中一个或多个 socket 有数据可读的事件,意味着当 block 解除的时候,我们一定可以找到一个或多个 socket 上有可读的数据(至少一个可读),select将这个task放到每个 socket 的sleep_list,等待任意一个 socket 可读事件发生而被唤醒,当 task 被唤醒的时候,其callback里面应该有个逻辑去检查具体哪些 socket 可读了。然后把这些事件反馈给用户程序,select为每个 socket 引入一个poll逻辑,该逻辑用于收集 socket 发生的事件。对于可读事件来说,简单伪码如下:

private int sk_event;

void poll() {
    //其他逻辑...
    when (receive queue is not empty) {
        sk_event |= POLL_IN;
    }
   //其他逻辑...
}

receive queue不为空的时候,我们就给这个 socket 的sk_event添加一个POLL_IN事件,用来表示当前这个 socket 可读。将来 task 遍历到这个 socket,发现其sk_event包含POLL_IN的时候,就说明这个 socket 已是可读的。当用户 task 调用 select 的时候,select 会将需要监控的 readfds 集合拷贝到内核空间,然后遍历自己监控的 socket,挨个调用 socket 的 poll 逻辑以便检查该 socket 是否有可读事件。

遍历完所有的 socket 后,如果没有任何一个 sk 可读,那么 select 会调用 schedule,使得 task 进入睡眠。如果在 timeout 时间内某个 socket 上有数据可读了,或者等待 timeout 了,则调用 select 的 task 会被唤醒。唤醒后 select 就是遍历监控的 socket 集合,挨个收集可读事件并返回给用户了,相应的伪码如下:

for (socket in readfds) {
    sk_event.evt = socket.poll();
    sk_event.sk = socket;
    return_event_for_process;
}

就像示例代码一样 while 循环内的 for 循环,在 select 返回后,task 需要遍历已 ready 的描述符集合,循环的次数就是之前记录的 fd 值。

select 的问题:

  • 每次 select 都需要将需要监控的文件描述符集合从用户态 copy 到内核态,内核并将 ready 的描述符集合再从内核态 copy 到用户态,如果 socket 很大,会有很大的上下文切换的损耗。
  • 由于 readfds 是长度为 32 的整型数组,32*32=1024,bitmap 机制来表示的 fd 最多可表示 1024 个,socket 连接有上限
  • 每次都是 O(n) 复杂度遍历所有 socket 收集有事件的 socket。
  • 每次都是 O(n) 复杂度(n 是最大的 fd 值)遍历从内核态返回来的 ready 的 fdset

poll

poll 实际上在 Unix 系统是不支持的,不像 select 使用 bitmap 集合来存储 fd 值,它通过一个大小为 nfds 的 pollfd 结构来表示需要监控的 fd set,函数签名如下:

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

pollfd 的结构如下, 每个 fd 都有对应的监听事件 events,和就绪返回的事件 revents,现在 fd 的大小是 int 最大值了。

struct  pollfd  {

      int  fd;

      short  events;

      short  revents;

};

代码示例:

  for (i=0;i<5;i++) 
  {
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    pollfds[i].fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
    pollfds[i].events = POLLIN;
  }
  sleep(1);
  while(1){
  	puts("round again");
	poll(pollfds, 5, 50000);

	for(i=0;i<5;i++) {
		if (pollfds[i].revents & POLLIN){
			pollfds[i].revents = 0;
			memset(buffer,0,MAXBUF);
			read(pollfds[i].fd, buffer, MAXBUF);
			puts(buffer);
		}
	}
  }

select VS poll:

  • poll 不需要每次都重新构建需要监控的 fd set,但还是会有引起上下文切换的内存 copy
  • poll 不需要像 select 那样需要用户计算 fd 的最大值 +1,作为 select 函数的第一个参数
  • poll 减少了 fd 的遍历,在 select 中监控的某 socket 所对应的 fd 值为 1000,那么需要做 1000 次循环
  • poll 解除了 select 对于 fd 数量 1024 的限制
  • poll 在 unix 下不支持

epoll

细看selectpoll的函数原型,我们会发现,每次调用selectpoll都在重复地准备整个需要监控的 fds 集合。我们需要监控三个 socket,就要准备一个readfds,然后新增监控一个 socket,就要再准备一个readfds(包含旧的和新的 socket 的readfds)。然而对于频繁调用的selectpoll而言,fds 集合的变化频率要低得多,我们没必要每次都重新准备整个 fds 集合。

于是,epoll引入了epoll_ctl系统调用,将高频调用的epoll_wait和低频的epoll_ctl隔离开。epoll_ctlepoll的事件注册函数,它不同与select()是在监听事件时,告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。到了有变化才变更,将selectpoll高频、大块内存拷贝变成epoll_ctl的低频、小块内存的拷贝,避免了大量的内存拷贝。

同时,对于高频epoll_wait的可读就绪的 fd 集合返回的拷贝问题,epoll通过内核与用户空间mmap同一块内存来解决。mmap将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址(不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理地址),使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换。

另外,epoll通过epoll_ctl来对监控的 fds 集合来进行增、删、改,那么必须涉及到 fd 的快速查找问题。于是在 linux 2.6.8 以后的内核中采用了红黑树的结构来组织 fds。
示例代码:

 struct epoll_event events[5];
  int epfd = epoll_create(10);
  ...
  ...
  for (i=0;i<5;i++) 
  {
    static struct epoll_event ev;
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    ev.data.fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
    ev.events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); 
  }
  
  while(1){
  	puts("round again");
  	nfds = epoll_wait(epfd, events, 5, 10000);
	
	for(i=0;i<nfds;i++) {
			memset(buffer,0,MAXBUF);
			read(events[i].data.fd, buffer, MAXBUF);
			puts(buffer);
	}
  }
 

遍历就绪的 fds 集合
通过上面的 socket 的睡眠队列唤醒逻辑我们知道,socket 唤醒睡眠在其睡眠队列的wait_entry的时候会调用wait_entry的回调函数callback,并且,我们可以在callback中做任何事情。为了做到只遍历就绪的 fd,我们需要有个地方来组织那些已经就绪的 fd。

为此,epoll引入了一个中间层,一个双向链表ready_list,一个单独的睡眠队列single_epoll_wait_list,并且,与selectpoll不同的是,epoll的 task 不需要同时插入到多路复用的 socket 集合的所有睡眠队列中,相反 task 只是插入到中间层的epoll的单独睡眠队列中(即single_epoll_wait_list),task 睡眠在epoll的单独队列上,等待事件的发生。同时,引入一个中间的wait_entry_sk,它与某个 socket 密切相关,wait_entry_sk睡眠在 socket 的睡眠队列上,其callback函数逻辑是将当前 socket 排入到epollready_list中,并唤醒epollsingle_epoll_wait_list。而single_epoll_wait_list上睡眠的 task 的回调函数就明朗了:遍历ready_list上的所有 socket,挨个调用 socket 的poll函数收集事件,然后唤醒 task 从epoll_wait返回。

select VS poll VS epoll:

  • epoll 减少了用户态和内核态间的内存 copy
  • epoll 有着高效的 fd 操作的红黑树结构
  • epoll 基本没有 fd 数量限制
  • epoll 每次只需遍历 ready_list 中就绪的 socket 即可

额,epoll 模型太常用了,碉碉的。。。。
上一张大佬画的图:
8288a9f3fac043669648a809c5f0bd4d.png

参考文档

[1] https://blog.csdn.net/dog250/article/details/50528373

[2] https://stackoverflow.com/questions/4093185/whats-the-difference-between-epoll-poll-threadpool/5449827#5449827

[3] https://blog.csdn.net/tennysonsky/article/details/45621341/

[4]https://wyj.shiwuliang.com/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3SELECT%E3%80%81POLL%E5%92%8CEPOLL+.html

[5] https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/

[6] https://github.com/angrave/SystemProgramming/wiki/Networking,-Part-7:-Nonblocking-I-O,-select(),-and-epoll

[7] https://blog.csdn.net/pugu12/article/details/46863715

[8]http://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll/#.XEWEj1N95E5


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com