我们这里以单进程启动为例
nginx.c
中的main
函数调用ngx_single_process_cycle
这个函数回循环调用
ngx_process_cycle.c
中的
for ( ;; ) { .... ngx_process_events_and_timers ....}
事件循环的核心函数是 ngx_process_events_and_timers 。这个函数主要干了四件 事情:抢占 accept mutex,等待并分发事件,处理 accept 事件,处理其他io事件
我们这里只介绍等待分发事件
ngx_event.c
中的 (void) ngx_process_events(cycle, timer, flags);
这里开始 wait并分发事件, 我们来可以来看一下这个函数
可以看到在ngx_event.h
中的一个宏 #define ngx_process_events ngx_event_actions.process_events
我们来看一下 ngx_event_actions 这个数据结构
typedef struct { /* 添加事件方法,它将负责把1个感兴趣的事件添加到操作系统提供的事件驱动机制(如epoll,kqueue等)中, 这样,在事件发生之后,将可以在调用下面的process_envets时获取这个事件。 */ ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 删除事件方法,它将一个已经存在于事件驱动机制中的事件一出,这样以后即使这个事件发生,调用process_events方法时也无法再获取这个事件 */ ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 启用一个事件,目前事件框架不会调用这个方法,大部分事件驱动模块对于该方法的实现都是与上面的add方法完全一致的 */ ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 禁用一个事件,目前事件框架不会调用这个方法,大部分事件驱动模块对于该方法的实现都是与上面的del方法一致 */ ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); /* 向事件驱动机制中添加一个新的连接,这意味着连接上的读写事件都添加到事件驱动机制中了 */ ngx_int_t (*add_conn)(ngx_connection_t *c); // 从事件驱动机制中一出一个连续的读写事件 ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); // 仅在多线程环境下会被调用,目前,nginx在产品环境下还不会以多线程方式运行。 ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait); // 在正常的工作循环中,将通过调用process_events方法来处理事件。 // 这个方法仅在ngx_process_events_and_timers方法中调用,它是处理,分发事件的核心 ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); // 初始化事件驱动模块的方法 ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); // 退出事件驱动模块前调用的方法。 void (*done)(ngx_cycle_t *cycle);} ngx_event_actions_t;extern ngx_event_actions_t ngx_event_actions;
这个数据结构中定义了很多函数指针,
这里我们的配置是
events { use epoll; worker_connections 1024; #所以nginx支持的总连接数就等于worker_processes * worker_connections }
使用的是 epoll 事件模块,在epoll 模块初始化的时候调用
ngx_epoll_module.c
中的ngx_epoll_init
的函数
其中给ngx_event_actions
赋值
ngx_event_actions = ngx_epoll_module_ctx.actions
我们来看下 ngx_epoll_module_ctx
typedef struct { // 事件模块的名称 ngx_str_t *name; // 在解析配置项前,这个回调方法用于创建存储配置项参数的结构体 void *(*create_conf)(ngx_cycle_t *cycle); // 在解析配置项完成后,init_conf方法会被调用,用于综合处理当前事件模块感兴趣的全部配置项。 char *(*init_conf)(ngx_cycle_t *cycle, void *conf); // 对于事件驱动机制,每个事件模块需要实现的10个抽象方法 ngx_event_actions_t actions;} ngx_event_module_t;
初始化
//epoll是个event模块ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ { ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ }};
这些事件处理函数都在 ngx_epoll_module.c
这个文件中,大家可以看一下源码
综上,根据我们的配置, ngx_event.c
中的 ngx_process_events
ngx_epoll_module.c
中的 ngx_epoll_process_events
这个函数有点长,我们找些关键的点看一下,
//一开始就是等待事件,最长等待时间为timer;nginx为事件专门用红黑树维护了一个计时器 events = epoll_wait(ep, event_list, (int) nevents, timer);
所有收集到的事件都放在了event_list 中,我们来看一下这个event_list
static struct epoll_event *event_list;struct epoll_event { uint32_t events; epoll_data_t data;};typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64;} epoll_data_t;
下面就开始对这些事件就行处理, 先加锁,
ngx_mutex_lock(ngx_posted_events_mutex); //循环开始处理收到的所有事件 for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); rev = c->read; if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } //取得发生一个事件 revents = event_list[i].events; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr); //记录wait的错误返回状态 if (revents & (EPOLLERR|EPOLLHUP)) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents); }#if 0 if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "strange epoll_wait() events fd:%d ev:%04XD", c->fd, revents); }#endif //该事件是一个读事件,并该连接上注册的读事件是active的 if ((revents & (EPOLLERR|EPOLLHUP)) && (revents & (EPOLLIN|EPOLLOUT)) == 0) { /* * if the error events were returned without EPOLLIN or EPOLLOUT, * then add these flags to handle the events at least in one * active handler */ revents |= EPOLLIN|EPOLLOUT; } if ((revents & EPOLLIN) && rev->active) { if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; } else { rev->ready = 1; } //事件放入到相应的队列中 if (flags & NGX_POST_EVENTS) { queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events); ngx_locked_post_event(rev, queue); } else { rev->handler(rev); } } wev = c->write; if ((revents & EPOLLOUT) && wev->active) { if (c->fd == -1 || wev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c); continue; } if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; } else { wev->ready = 1; } if (flags & NGX_POST_EVENTS) { ngx_locked_post_event(wev, &ngx_posted_events); } else { wev->handler(wev); } } } ngx_mutex_unlock(ngx_posted_events_mutex);
先加锁,对event_list 中的事件循环处理,
在每个循环中, 先获取这个事件所在的连接, 然后判断是读事件还是写事件, 然后调用注册在这个事件上的的回调函数。 读事件的回调函数是ngx_http_init_connection
这样就进入了 HTTP框架处理流程