#### ngx_http_write_filter_module分析[](http://tengine.taobao.org/book/chapter_12.html#ngx-http-write-filter-module "永久链接至标题")
ngx_http_write_filter_module是最后一个body filter,可以看到它的注册函数的特殊性:
[](http:// "点击提交Issue,反馈你的意见...")
static ngx_int_t
ngx_http_write_filter_init(ngx_conf_t *cf)
{
ngx_http_top_body_filter = ngx_http_write_filter;
return NGX_OK;
}
ngx_http_write_filter_module是第一个注册body filter的模块,于是它也是最后一个执行的body filter模块。
直接来看ngx_http_write_filter,下面的代码中去掉了一些调试代码:
[](http:// "点击提交Issue,反馈你的意见...")
ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
off_t size, sent, nsent, limit;
ngx_uint_t last, flush;
ngx_msec_t delay;
ngx_chain_t *cl, *ln, **ll, *chain;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
c = r->connection;
if (c->error) {
return NGX_ERROR;
}
size = 0;
flush = 0;
last = 0;
ll = &r->out;
/* find the size, the flush point and the last link of the saved chain */
for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
return NGX_ERROR;
}
#endif
size += ngx_buf_size(cl->buf);
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
/* add the new chain to the existent one */
for (ln = in; ln; ln = ln->next) {
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ln->buf;
*ll = cl;
ll = &cl->next;
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
return NGX_ERROR;
}
#endif
size += ngx_buf_size(cl->buf);
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
if (cl->buf->last_buf) {
last = 1;
}
}
*ll = NULL;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/*
* avoid the output if there are no last buf, no flush point,
* there are the incoming bufs and the size of all bufs
* is smaller than "postpone_output" directive
*/
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
return NGX_OK;
}
/* 如果请求由于被限速而必须延迟发送时,设置一个标识后退出 */
if (c->write->delayed) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
/* 如果buffer总大小为0,而且当前连接之前没有由于底层发送接口的原因延迟,
则检查是否有特殊标记 */
if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
/* last_buf标记,表示请求体已经发送结束 */
if (last) {
r->out = NULL;
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
/* flush生效,而且又没有实际数据,则清空当前的未发送队列 */
if (flush) {
do {
r->out = r->out->next;
} while (r->out);
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
return NGX_ERROR;
}
/* 请求有速率限制,则计算当前可以发送的大小 */
if (r->limit_rate) {
limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- (c->sent - clcf->limit_rate_after);
if (limit <= 0) {
c->write->delayed = 1;
ngx_add_timer(c->write,
(ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
if (clcf->sendfile_max_chunk
&& (off_t) clcf->sendfile_max_chunk < limit)
{
limit = clcf->sendfile_max_chunk;
}
} else {
limit = clcf->sendfile_max_chunk;
}
sent = c->sent;
/* 发送数据 */
chain = c->send_chain(c, r->out, limit);
if (chain == NGX_CHAIN_ERROR) {
c->error = 1;
return NGX_ERROR;
}
/* 更新限速相关的信息 */
if (r->limit_rate) {
nsent = c->sent;
if (clcf->limit_rate_after) {
sent -= clcf->limit_rate_after;
if (sent < 0) {
sent = 0;
}
nsent -= clcf->limit_rate_after;
if (nsent < 0) {
nsent = 0;
}
}
delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate);
if (delay > 0) {
limit = 0;
c->write->delayed = 1;
ngx_add_timer(c->write, delay);
}
}
if (limit
&& c->write->ready
&& c->sent - sent >= limit - (off_t) (2 * ngx_pagesize))
{
c->write->delayed = 1;
ngx_add_timer(c->write, 1);
}
/* 更新输出链,释放已经发送的节点 */
for (cl = r->out; cl && cl != chain; /* void */) {
ln = cl;
cl = cl->next;
ngx_free_chain(r->pool, ln);
}
r->out = chain;
/* 如果数据未发送完毕,则设置一个标记 */
if (chain) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
/* 如果由于底层发送接口导致数据未发送完全,且当前请求没有其他数据需要发送,
此时要返回NGX_AGAIN,表示还有数据未发送 */
if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
return NGX_AGAIN;
}
return NGX_OK;
}
Nginx将待发送的chain链表保存在r->out,上面的函数先检查之前未发送完的链表中是否有flush,recycled以及last_buf标识,并计算所有buffer的大小,接着对新输入的chain链表做同样的事情,并将新链表加到r->out的队尾。
如果没有输出链表中没有被标识为最后一块buffer的节点,而且没有需要flush或者急着回收的buffer,并且当前队列中buffer总大小不够postpone_output指令设置的大小(默认为1460字节)时,函数会直接返回。
ngx_http_write_filter会调用c->send_chain往客户端发送数据,c->send_chain的取值在不同操作系统,编译选项以及协议下(https下用的是ngx_ssl_send_chain)会取不同的函数,典型的linux操作系统下,它的取值为ngx_linux_sendfile_chain,也就是最终会调用这个函数来发送数据。它的函数原型为:
[](http:// "点击提交Issue,反馈你的意见...")
ngx_chain_t *
ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
第一个参数是当前的连接,第二个参数是所需要发送的chain,第三个参数是所能发送的最大值。
首先看一下这个函数定义的一些重要局部变量:
send表示将要发送的buf已经已经发送的大小;
sent表示已经发送的buf的大小;
prev_send表示上一次发送的大小,也就是已经发送的buf的大小;
fprev 和prev-send类似,只不过是file类型的;
complete表示是否buf被完全发送了,也就是sent是否等于send - prev_send;
header表示需要是用writev来发送的buf,也就是only in memory的buf;
struct iovec [*](http://tengine.taobao.org/book/chapter_12.html#id16)iov, headers[NGX_HEADERS] 这个主要是用于sendfile和writev的参数,这里注意上面header数组保存的就是iovec。
下面看函数开头的一些初始化代码:
[](http:// "点击提交Issue,反馈你的意见...")
wev = c->write;
if (!wev->ready) {
return in;
}
/* the maximum limit size is 2G-1 - the page size */
if (limit == 0 || limit > (off_t) (NGX_SENDFILE_LIMIT - ngx_pagesize)) {
limit = NGX_SENDFILE_LIMIT - ngx_pagesize;
}
send = 0;
/* 设置header,也就是in memory的数组 */
header.elts = headers;
header.size = sizeof(struct iovec);
header.nalloc = NGX_HEADERS;
header.pool = c->pool;
下面这段代码就是处理in memory的部分,然后将buf放入对应的iovec数组,处理核心思想就是合并内存连续并相邻的buf(不管是in memory还是in file):
[](http:// "点击提交Issue,反馈你的意见...")
for (cl = in; cl && send < limit; cl = cl->next) {
if (ngx_buf_special(cl->buf)) {
continue;
}
/* 如果既不在内存中,又不在文件中,则返回错误 */
if (!ngx_buf_in_memory(cl->buf) && !cl->buf->in_file) {
return NGX_CHAIN_ERROR;
}
/* 如果不只是在buf中,这是因为有时in file的buf可能需要内存中也有拷贝,
如果一个buf同时in memoey和in file的话,Nginx会把它当做in file来处理 */
if (!ngx_buf_in_memory_only(cl->buf)) {
break;
}
/* 得到buf的大小 */
size = cl->buf->last - cl->buf->pos;
/* 大于limit的话修改为size */
if (send + size > limit) {
size = limit - send;
}
/* 如果prev等于pos,则说明当前的buf的数据和前一个buf的数据是连续的 */
if (prev == cl->buf->pos) {
iov->iov_len += (size_t) size;
} else {
if (header.nelts >= IOV_MAX) {
break;
}
/* 否则说明是不同的buf,因此增加一个iovc */
iov = ngx_array_push(&header);
if (iov == NULL) {
return NGX_CHAIN_ERROR;
}
iov->iov_base = (void *) cl->buf->pos;
iov->iov_len = (size_t) size;
}
/* 这里可以看到prev保存了当前buf的结尾 */
prev = cl->buf->pos + (size_t) size;
/* 更新发送的大小 */
send += size;
}
然后是in file的处理,这里比较核心的一个判断就是fprev == cl->buf->file_pos,和上面的in memory类似,fprev保存的就是上一次处理的buf的尾部。这里如果这两个相等,那就说明当前的两个buf是连续的(文件连续):
[](http:// "点击提交Issue,反馈你的意见...")
/* 如果header的大小不为0则说明前面有需要发送的buf,
并且数据大小已经超过限制则跳过in file处理 */
if (header.nelts == 0 && cl && cl->buf->in_file && send < limit) {
/* 得到file
file = cl->buf;
/* 开始合并 */
do {
/* 得到大小 */
size = cl->buf->file_last - cl->buf->file_pos;
/* 如果太大则进行对齐处理 */
if (send + size > limit) {
size = limit - send;
aligned = (cl->buf->file_pos + size + ngx_pagesize - 1)
& ~((off_t) ngx_pagesize - 1);
if (aligned <= cl->buf->file_last) {
size = aligned - cl->buf->file_pos;
}
}
/* 设置file_size */
file_size += (size_t) size;
/* 设置需要发送的大小 */
send += size;
/* 和上面的in memory处理一样就是保存这次的last */
fprev = cl->buf->file_pos + size;
cl = cl->next;
} while (cl
&& cl->buf->in_file
&& send < limit
&& file->file->fd == cl->buf->file->fd
&& fprev == cl->buf->file_pos);
}
然后就是发送部分,这里in file使用sendfile,in memory使用writev。处理逻辑比较简单,就是发送后判断发送成功的大小
[](http:// "点击提交Issue,反馈你的意见...")
if (file) {
#if 1
if (file_size == 0) {
ngx_debug_point();
return NGX_CHAIN_ERROR;
}
#endif
#if (NGX_HAVE_SENDFILE64)
offset = file->file_pos;
#else
offset = (int32_t) file->file_pos;
#endif
/* 数据在文件中则调用sendfile发送数据 */
rc = sendfile(c->fd, file->file->fd, &offset, file_size);
...
/* 得到发送成功的字节数 */
sent = rc > 0 ? rc : 0;
} else {
/* 数据在内存中则调用writev发送数据 */
rc = writev(c->fd, header.elts, header.nelts);
...
/* 得到发送成功的字节数 */
sent = rc > 0 ? rc : 0;
}
接下来就是需要根据发送成功的字节数来更新chain:
[](http:// "点击提交Issue,反馈你的意见...")
/* 如果send - prev_send == sent则说明该发送的都发完了 */
if (send - prev_send == sent) {
complete = 1;
}
/* 更新congnect的sent域 */
c->sent += sent;
/* 开始重新遍历chain,这里是为了防止没有发送完全的情况,
此时我们就需要切割buf了 */
for (cl = in; cl; cl = cl->next) {
if (ngx_buf_special(cl->buf)) {
continue;
}
if (sent == 0) {
break;
}
/* 得到buf size */
size = ngx_buf_size(cl->buf);
/* 如果大于当前的size,则说明这个buf的数据已经被完全发送完毕了,
因此更新它的域 */
if (sent >= size){
/* 更新sent域 */
sent -= size;
/* 如果在内存则更新pos */
if (ngx_buf_in_memory(cl->buf)) {
cl->buf->pos = cl->buf->last;
}
/* 如果在file中则更显file_pos */
if (cl->buf->in_file) {
cl->buf->file_pos = cl->buf->file_last;
}
continue;
}
/* 到这里说明当前的buf只有一部分被发送出去了,因此只需要修改指针。
以便于下次发送 */
if (ngx_buf_in_memory(cl->buf)) {
cl->buf->pos += (size_t) sent;
}
/* 同上 */
if (cl->buf->in_file) {
cl->buf->file_pos += sent;
}
break;
}
最后一部分是一些是否退出循环的判断。这里要注意,Nginx中如果发送未完全的话,将会直接返回,返回的就是没有发送完毕的chain,它的buf也已经被更新。然后Nginx返回去处理其他的事情,等待可写之后再次发送未发送完的数据:
[](http:// "点击提交Issue,反馈你的意见...")
if (eintr) {
continue;
}
/* 如果未完成,则设置wev->ready为0后返回 */
if (!complete) {
wev->ready = 0;
return cl;
}
/* 发送数据超过限制,或没有数据了 */
if (send >= limit || cl == NULL) {
return cl;
}
/* 更新in,也就是开始处理下一个chain */
in = cl;
- 上篇:nginx模块开发篇
- nginx平台初探
- 初探nginx架构
- nginx基础概念
- connection
- request
- keepalive
- pipe
- lingering_close
- 基本数据结构
- ngx_str_t
- ngx_pool_t
- ngx_array_t
- ngx_hash_t
- ngx_hash_wildcard_t
- ngx_hash_combined_t
- ngx_hash_keys_arrays_t
- ngx_chain_t
- ngx_buf_t
- ngx_list_t
- ngx_queue_t
- nginx的配置系统
- 指令参数
- 指令上下文
- nginx的模块化体系结构
- 模块的分类
- nginx的请求处理
- handler模块
- handler模块简介
- 模块的基本结构
- 模块配置结构
- 模块配置指令
- 模块上下文结构
- 模块的定义
- handler模块的基本结构
- handler模块的挂载
- handler的编写步骤
- 示例: hello handler 模块
- handler模块的编译和使用
- 更多handler模块示例分析
- http access module
- http static module
- http log module
- 过滤模块
- 过滤模块简介
- 过滤模块的分析
- upstream模块
- upstream模块
- upstream模块接口
- memcached模块分析
- 本节回顾
- 负载均衡模块
- 配置
- 指令
- 钩子
- 初始化配置
- 初始化请求
- peer.get和peer.free回调函数
- 本节回顾
- 其他模块
- core模块
- event模块
- 模块开发高级篇
- 变量
- 下篇:nginx原理解析篇
- nginx架构详解
- nginx的源码目录结构
- nginx的configure原理
- 模块编译顺序
- nginx基础设施
- 内存池
- nginx的启动阶段
- 概述
- 共有流程
- 配置解析
- nginx的请求处理阶段
- 接收请求流程
- http请求格式简介
- 请求头读取
- 解析请求行
- 解析请求头
- 请求体读取
- 读取请求体
- 丢弃请求体
- 多阶段处理请求
- 多阶段执行链
- POST_READ阶段
- SERVER_REWRITE阶段
- FIND_CONFIG阶段
- REWRITE阶段
- POST_REWRITE阶段
- PREACCESS阶段
- ACCESS阶段
- POST_ACCESS阶段
- TRY_FILES阶段
- CONTENT阶段
- LOG阶段
- Nginx filter
- header filter分析
- body filter分析
- ngx_http_copy_filter_module分析
- ngx_http_write_filter_module分析
- subrequest原理解析
- https请求处理解析
- 附录A 编码风格
- 附录B 常用API
- 附录C 模块编译,调试与测试