ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
#### ngx_http_write_filter_module分析[](http://tengine.taobao.org/book/chapter_12.html#ngx-http-write-filter-module "永久链接至标题") ngx_http_write_filter_module是最后一个body filter,可以看到它的注册函数的特殊性: [](http:// "点击提交Issue,反馈你的意见...") static ngx_int_t ngx_http_write_filter_init(ngx_conf_t *cf) { ngx_http_top_body_filter = ngx_http_write_filter; return NGX_OK; } ngx_http_write_filter_module是第一个注册body filter的模块,于是它也是最后一个执行的body filter模块。 直接来看ngx_http_write_filter,下面的代码中去掉了一些调试代码: [](http:// "点击提交Issue,反馈你的意见...") ngx_int_t ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in) { off_t size, sent, nsent, limit; ngx_uint_t last, flush; ngx_msec_t delay; ngx_chain_t *cl, *ln, **ll, *chain; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; c = r->connection; if (c->error) { return NGX_ERROR; } size = 0; flush = 0; last = 0; ll = &r->out; /* find the size, the flush point and the last link of the saved chain */ for (cl = r->out; cl; cl = cl->next) { ll = &cl->next; #if 1 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { return NGX_ERROR; } #endif size += ngx_buf_size(cl->buf); if (cl->buf->flush || cl->buf->recycled) { flush = 1; } if (cl->buf->last_buf) { last = 1; } } /* add the new chain to the existent one */ for (ln = in; ln; ln = ln->next) { cl = ngx_alloc_chain_link(r->pool); if (cl == NULL) { return NGX_ERROR; } cl->buf = ln->buf; *ll = cl; ll = &cl->next; #if 1 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { return NGX_ERROR; } #endif size += ngx_buf_size(cl->buf); if (cl->buf->flush || cl->buf->recycled) { flush = 1; } if (cl->buf->last_buf) { last = 1; } } *ll = NULL; clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); /* * avoid the output if there are no last buf, no flush point, * there are the incoming bufs and the size of all bufs * is smaller than "postpone_output" directive */ if (!last && !flush && in && size < (off_t) clcf->postpone_output) { return NGX_OK; } /* 如果请求由于被限速而必须延迟发送时,设置一个标识后退出 */ if (c->write->delayed) { c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } /* 如果buffer总大小为0,而且当前连接之前没有由于底层发送接口的原因延迟, 则检查是否有特殊标记 */ if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) { /* last_buf标记,表示请求体已经发送结束 */ if (last) { r->out = NULL; c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; return NGX_OK; } /* flush生效,而且又没有实际数据,则清空当前的未发送队列 */ if (flush) { do { r->out = r->out->next; } while (r->out); c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; return NGX_OK; } return NGX_ERROR; } /* 请求有速率限制,则计算当前可以发送的大小 */ if (r->limit_rate) { limit = r->limit_rate * (ngx_time() - r->start_sec + 1) - (c->sent - clcf->limit_rate_after); if (limit <= 0) { c->write->delayed = 1; ngx_add_timer(c->write, (ngx_msec_t) (- limit * 1000 / r->limit_rate + 1)); c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } if (clcf->sendfile_max_chunk && (off_t) clcf->sendfile_max_chunk < limit) { limit = clcf->sendfile_max_chunk; } } else { limit = clcf->sendfile_max_chunk; } sent = c->sent; /* 发送数据 */ chain = c->send_chain(c, r->out, limit); if (chain == NGX_CHAIN_ERROR) { c->error = 1; return NGX_ERROR; } /* 更新限速相关的信息 */ if (r->limit_rate) { nsent = c->sent; if (clcf->limit_rate_after) { sent -= clcf->limit_rate_after; if (sent < 0) { sent = 0; } nsent -= clcf->limit_rate_after; if (nsent < 0) { nsent = 0; } } delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate); if (delay > 0) { limit = 0; c->write->delayed = 1; ngx_add_timer(c->write, delay); } } if (limit && c->write->ready && c->sent - sent >= limit - (off_t) (2 * ngx_pagesize)) { c->write->delayed = 1; ngx_add_timer(c->write, 1); } /* 更新输出链,释放已经发送的节点 */ for (cl = r->out; cl && cl != chain; /* void */) { ln = cl; cl = cl->next; ngx_free_chain(r->pool, ln); } r->out = chain; /* 如果数据未发送完毕,则设置一个标记 */ if (chain) { c->buffered |= NGX_HTTP_WRITE_BUFFERED; return NGX_AGAIN; } c->buffered &= ~NGX_HTTP_WRITE_BUFFERED; /* 如果由于底层发送接口导致数据未发送完全,且当前请求没有其他数据需要发送, 此时要返回NGX_AGAIN,表示还有数据未发送 */ if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) { return NGX_AGAIN; } return NGX_OK; } Nginx将待发送的chain链表保存在r->out,上面的函数先检查之前未发送完的链表中是否有flush,recycled以及last_buf标识,并计算所有buffer的大小,接着对新输入的chain链表做同样的事情,并将新链表加到r->out的队尾。 如果没有输出链表中没有被标识为最后一块buffer的节点,而且没有需要flush或者急着回收的buffer,并且当前队列中buffer总大小不够postpone_output指令设置的大小(默认为1460字节)时,函数会直接返回。 ngx_http_write_filter会调用c->send_chain往客户端发送数据,c->send_chain的取值在不同操作系统,编译选项以及协议下(https下用的是ngx_ssl_send_chain)会取不同的函数,典型的linux操作系统下,它的取值为ngx_linux_sendfile_chain,也就是最终会调用这个函数来发送数据。它的函数原型为: [](http:// "点击提交Issue,反馈你的意见...") ngx_chain_t * ngx_linux_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) 第一个参数是当前的连接,第二个参数是所需要发送的chain,第三个参数是所能发送的最大值。 首先看一下这个函数定义的一些重要局部变量: send表示将要发送的buf已经已经发送的大小; sent表示已经发送的buf的大小; prev_send表示上一次发送的大小,也就是已经发送的buf的大小; fprev 和prev-send类似,只不过是file类型的; complete表示是否buf被完全发送了,也就是sent是否等于send - prev_send; header表示需要是用writev来发送的buf,也就是only in memory的buf; struct iovec [*](http://tengine.taobao.org/book/chapter_12.html#id16)iov, headers[NGX_HEADERS] 这个主要是用于sendfile和writev的参数,这里注意上面header数组保存的就是iovec。 下面看函数开头的一些初始化代码: [](http:// "点击提交Issue,反馈你的意见...") wev = c->write; if (!wev->ready) { return in; } /* the maximum limit size is 2G-1 - the page size */ if (limit == 0 || limit > (off_t) (NGX_SENDFILE_LIMIT - ngx_pagesize)) { limit = NGX_SENDFILE_LIMIT - ngx_pagesize; } send = 0; /* 设置header,也就是in memory的数组 */ header.elts = headers; header.size = sizeof(struct iovec); header.nalloc = NGX_HEADERS; header.pool = c->pool; 下面这段代码就是处理in memory的部分,然后将buf放入对应的iovec数组,处理核心思想就是合并内存连续并相邻的buf(不管是in memory还是in file): [](http:// "点击提交Issue,反馈你的意见...") for (cl = in; cl && send < limit; cl = cl->next) { if (ngx_buf_special(cl->buf)) { continue; } /* 如果既不在内存中,又不在文件中,则返回错误 */ if (!ngx_buf_in_memory(cl->buf) && !cl->buf->in_file) { return NGX_CHAIN_ERROR; } /* 如果不只是在buf中,这是因为有时in file的buf可能需要内存中也有拷贝, 如果一个buf同时in memoey和in file的话,Nginx会把它当做in file来处理 */ if (!ngx_buf_in_memory_only(cl->buf)) { break; } /* 得到buf的大小 */ size = cl->buf->last - cl->buf->pos; /* 大于limit的话修改为size */ if (send + size > limit) { size = limit - send; } /* 如果prev等于pos,则说明当前的buf的数据和前一个buf的数据是连续的 */ if (prev == cl->buf->pos) { iov->iov_len += (size_t) size; } else { if (header.nelts >= IOV_MAX) { break; } /* 否则说明是不同的buf,因此增加一个iovc */ iov = ngx_array_push(&header); if (iov == NULL) { return NGX_CHAIN_ERROR; } iov->iov_base = (void *) cl->buf->pos; iov->iov_len = (size_t) size; } /* 这里可以看到prev保存了当前buf的结尾 */ prev = cl->buf->pos + (size_t) size; /* 更新发送的大小 */ send += size; } 然后是in file的处理,这里比较核心的一个判断就是fprev == cl->buf->file_pos,和上面的in memory类似,fprev保存的就是上一次处理的buf的尾部。这里如果这两个相等,那就说明当前的两个buf是连续的(文件连续): [](http:// "点击提交Issue,反馈你的意见...") /* 如果header的大小不为0则说明前面有需要发送的buf, 并且数据大小已经超过限制则跳过in file处理 */ if (header.nelts == 0 && cl && cl->buf->in_file && send < limit) { /* 得到file file = cl->buf; /* 开始合并 */ do { /* 得到大小 */ size = cl->buf->file_last - cl->buf->file_pos; /* 如果太大则进行对齐处理 */ if (send + size > limit) { size = limit - send; aligned = (cl->buf->file_pos + size + ngx_pagesize - 1) & ~((off_t) ngx_pagesize - 1); if (aligned <= cl->buf->file_last) { size = aligned - cl->buf->file_pos; } } /* 设置file_size */ file_size += (size_t) size; /* 设置需要发送的大小 */ send += size; /* 和上面的in memory处理一样就是保存这次的last */ fprev = cl->buf->file_pos + size; cl = cl->next; } while (cl && cl->buf->in_file && send < limit && file->file->fd == cl->buf->file->fd && fprev == cl->buf->file_pos); } 然后就是发送部分,这里in file使用sendfile,in memory使用writev。处理逻辑比较简单,就是发送后判断发送成功的大小 [](http:// "点击提交Issue,反馈你的意见...") if (file) { #if 1 if (file_size == 0) { ngx_debug_point(); return NGX_CHAIN_ERROR; } #endif #if (NGX_HAVE_SENDFILE64) offset = file->file_pos; #else offset = (int32_t) file->file_pos; #endif /* 数据在文件中则调用sendfile发送数据 */ rc = sendfile(c->fd, file->file->fd, &offset, file_size); ... /* 得到发送成功的字节数 */ sent = rc > 0 ? rc : 0; } else { /* 数据在内存中则调用writev发送数据 */ rc = writev(c->fd, header.elts, header.nelts); ... /* 得到发送成功的字节数 */ sent = rc > 0 ? rc : 0; } 接下来就是需要根据发送成功的字节数来更新chain: [](http:// "点击提交Issue,反馈你的意见...") /* 如果send - prev_send == sent则说明该发送的都发完了 */ if (send - prev_send == sent) { complete = 1; } /* 更新congnect的sent域 */ c->sent += sent; /* 开始重新遍历chain,这里是为了防止没有发送完全的情况, 此时我们就需要切割buf了 */ for (cl = in; cl; cl = cl->next) { if (ngx_buf_special(cl->buf)) { continue; } if (sent == 0) { break; } /* 得到buf size */ size = ngx_buf_size(cl->buf); /* 如果大于当前的size,则说明这个buf的数据已经被完全发送完毕了, 因此更新它的域 */ if (sent >= size){ /* 更新sent域 */ sent -= size; /* 如果在内存则更新pos */ if (ngx_buf_in_memory(cl->buf)) { cl->buf->pos = cl->buf->last; } /* 如果在file中则更显file_pos */ if (cl->buf->in_file) { cl->buf->file_pos = cl->buf->file_last; } continue; } /* 到这里说明当前的buf只有一部分被发送出去了,因此只需要修改指针。 以便于下次发送 */ if (ngx_buf_in_memory(cl->buf)) { cl->buf->pos += (size_t) sent; } /* 同上 */ if (cl->buf->in_file) { cl->buf->file_pos += sent; } break; } 最后一部分是一些是否退出循环的判断。这里要注意,Nginx中如果发送未完全的话,将会直接返回,返回的就是没有发送完毕的chain,它的buf也已经被更新。然后Nginx返回去处理其他的事情,等待可写之后再次发送未发送完的数据: [](http:// "点击提交Issue,反馈你的意见...") if (eintr) { continue; } /* 如果未完成,则设置wev->ready为0后返回 */ if (!complete) { wev->ready = 0; return cl; } /* 发送数据超过限制,或没有数据了 */ if (send >= limit || cl == NULL) { return cl; } /* 更新in,也就是开始处理下一个chain */ in = cl;