IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    nginx模块开发(30)—读取HTTP的body部分

    cjhust发表于 2013-05-07 14:29:28
    love 0

    1、知识百科

    http body的长度可大可小,网络环境也比较复杂,nginx是采取非阻塞的方式进行的。

    2、数据结构

    image

    ngx_chain_s

    struct ngx_chain_s {

    ngx_buf_t *buf;

    ngx_chain_t *next;

    };

    ngx_buf_s

    struct ngx_buf_s {

    u_char *pos; //当buf所指向的数据在内存里的时候,pos指向的是这段数据开始的位置

    u_char *last; //当buf所指向的数据在内存里的时候,last指向的是这段数据结束的位置

    off_t file_pos; //当buf所指向的数据是在文件里的时候,file_pos指向的是这段数据的开始位置在文件中的偏移量

    off_t file_last;// 当buf所指向的数据是在文件里的时候,file_last指向的是这段数据的结束位置在文件中的偏移量

    u_char *start; // 如果ngx_buf_t缓冲区用于内存,那么start指向这段内存的起始地址

    u_char *end; //解释参照start

    ngx_buf_tag_t tag; //表示当前缓冲区的类型,例如由哪个模块使用就指向这个模块ngx_module_t变量的地址

    ngx_file_t *file; // 当buf所包含的内容在文件中是,file字段指向对应的文件对象

    ngx_buf_t *shadow;// 当这个buf完整copy了另外一个buf的所有字段的时候,那么这两个buf指向的实际上是同一块内存,或者是同一个文件的同一部分,此时这两个buf的shadow字段都是指向对方的。那么对于这样的两个buf,在释放的时候,就需要使用者特别小心,具体是由哪里释放,要提前考虑好,如果造成资源的多次释放,可能会造成程序崩溃

    unsigned temporary:1; //为1时表示该buf所包含的内容是在一个用户创建的内存块中,并且可以被在filter处理的过程中进行变更,而不会造成问题

    unsigned memory:1; // 为1时表示该buf所包含的内容是在内存中,但是这些内容确不能被进行处理的filter进行变更

    unsigned mmap:1; //为1时表示该buf所包含的内容是在内存中, 是通过mmap使用内存映射从文件中映射到内存中的,这些内容确不能被进行处理的filter进行变更

    unsigned recycled:1;

    unsigned in_file:1; //为1时表示该buf所包含的内容是在文件中

    unsigned flush:1; //遇到有flush字段被设置为1的的buf的chain,则该chain的数据即便不是最后结束的数据(last_buf被设置,标志所有要输出的内容都完了),也会进行输出,不会受postpone_output配置的限制,但是会受到发送速率等其他条件的限制

    unsigned sync:1;

    unsigned last_buf:1; //数据被以多个chain传递给了过滤器,此字段为1表明这是最后一个buf

    unsigned last_in_chain:1; //在当前的chain里面,此buf是最后一个。特别要注意的是last_in_chain的buf不一定是last_buf,但是last_buf的buf一定是last_in_chain的。这是因为数据会被以多个chain传递给某个filter模块

    unsigned last_shadow:1;

    unsigned temp_file:1;

    /* STUB */ int num;

    };

    备注:r->header_in也是ngx_buf_t结构。

    3、操作函数

    ngx_http_read_client_request_body

    /*

    * on completion ngx_http_read_client_request_body() adds to

    * r->request_body->bufs one or two bufs:

    * *) one memory buf that was preread in r->header_in;

    * *) one memory or file buf that contains the rest of the body

    */

    ngx_int_t

    ngx_http_read_client_request_body(ngx_http_request_t *r,

    ngx_http_client_body_handler_pt post_handler)

    {

    size_t preread;

    ssize_t size;

    ngx_buf_t *b;

    ngx_chain_t *cl, **next;

    ngx_temp_file_t *tf;

    ngx_http_request_body_t *rb;

    ngx_http_core_loc_conf_t *clcf;

    r->main->count++; //主请求

    if (r->request_body || r->discard_body) { //还没有分配空间

    post_handler(r);

    return NGX_OK;

    }

    if (ngx_http_test_expect(r) != NGX_OK) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); //分配空间

    if (rb == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    r->request_body = rb; //rb=request_body

    if (r->headers_in.content_length_n < 0) { // body有数据,则不为0

    post_handler(r); //here here ngx_http_upstream_init

    return NGX_OK;

    }

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    // curl -d "" http://127.0.0.1:8003/rest length_n=0

    // curl -d "hello=world" http://127.0.0.1:8003/rest length_n=11

    if (r->headers_in.content_length_n == 0) { //post请求假如带参数,则不为0

    if (r->request_body_in_file_only) {

    tf = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));

    if (tf == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    tf->file.fd = NGX_INVALID_FILE;

    tf->file.log = r->connection->log;

    tf->path = clcf->client_body_temp_path;

    tf->pool = r->pool;

    tf->warn = "a client request body is buffered to a temporary file";

    tf->log_level = r->request_body_file_log_level;

    tf->persistent = r->request_body_in_persistent_file;

    tf->clean = r->request_body_in_clean_file;

    if (r->request_body_file_group_access) {

    tf->access = 0660;

    }

    rb->temp_file = tf;

    if (ngx_create_temp_file(&tf-;>file, tf->path, tf->pool,

    tf->persistent, tf->clean, tf->access)

    != NGX_OK)

    {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    }

    post_handler(r); //ngx_http_upstream_init(r)

    return NGX_OK;

    }

    rb->post_handler = post_handler; // ngx_http_upstream_init

    /*

    * set by ngx_pcalloc():

    *

    * rb->bufs = NULL;

    * rb->buf = NULL;

    * rb->rest = 0;

    */

    //已经读进来但是还没有处理的数据

    //curl -d "hello=world" http://127.0.0.1:8003/rest pos="hello=world" preread=11

    //因为一直在移位,在这里的时候,就是读完HTTP HEADER后的数据了

    preread = r->header_in->last - r->header_in->pos;

    if (preread) { //length

    /* there is the pre-read part of the request body */

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

    "http client request body preread %uz", preread);

    b = ngx_calloc_buf(r->pool);

    if (b == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    b->temporary = 1;

    b->start = r->header_in->pos;

    b->pos = r->header_in->pos;

    b->last = r->header_in->last;

    b->end = r->header_in->end;

    rb->bufs = ngx_alloc_chain_link(r->pool);

    if (rb->bufs == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    rb->bufs->buf = b; //r->request_body->bufs->buf

    rb->bufs->next = NULL;

    rb->buf = b;

    //所有数据都读进来了么?

    if ((off_t) preread >= r->headers_in.content_length_n) { //带附件的时候时可能是小于

    /* the whole request body was pre-read */

    r->header_in->pos += (size_t) r->headers_in.content_length_n; //start 没有变

    r->request_length += r->headers_in.content_length_n;

    b->last = r->header_in->pos;

    if (r->request_body_in_file_only) {

    if (ngx_http_write_request_body(r, rb->bufs) != NGX_OK) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    }

    post_handler(r); // ngx_http_upstream_init

    return NGX_OK;

    }

    /*

    * to not consider the body as pipelined request in

    * ngx_http_set_keepalive()

    */

    //将pos指针移到目前读到的数据末尾,保证每次buffer掉数据后,pos始终指向数据末尾的位置

    r->header_in->pos = r->header_in->last;

    r->request_length += preread;

    rb->rest = r->headers_in.content_length_n - preread; //还没有读进来的数据大小

    if (rb->rest <= (off_t) (b->end - b->last)) {

    /* the whole request body may be placed in r->header_in */

    rb->to_write = rb->bufs;

    r->read_event_handler = ngx_http_read_client_request_body_handler; //

    return ngx_http_do_read_client_request_body(r);

    }

    next = &rb-;>bufs->next; //另一片新的buf

    } else {

    b = NULL;

    rb->rest = r->headers_in.content_length_n;

    next = &rb-;>bufs;

    }

    size = clcf->client_body_buffer_size; //client_body_buffer_size= 8192

    size += size >> 2; //10240=8192/4+8192

    if (rb->rest < size) { //r->headers_in.content_length_n - preread

    size = (ssize_t) rb->rest; //未读的数据不大

    if (r->request_body_in_single_buf) {

    size += preread;

    }

    } else {

    size = clcf->client_body_buffer_size; //未读的数据过大

    /* disable copying buffer for r->request_body_in_single_buf */

    b = NULL; //前面分配的空间置为NULL

    }

    rb->buf = ngx_create_temp_buf(r->pool, size); //size=8192的空间分配

    if (rb->buf == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    cl = ngx_alloc_chain_link(r->pool); //chain

    if (cl == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    cl->buf = rb->buf;

    cl->next = NULL;

    if (b && r->request_body_in_single_buf) { //clcf->client_body_in_single_buffer默认off

    size = b->last - b->pos;

    ngx_memcpy(rb->buf->pos, b->pos, size);

    rb->buf->last += size;

    next = &rb-;>bufs;

    }

    *next = cl; //next = &rb-;>bufs->next;

    if (r->request_body_in_file_only || r->request_body_in_single_buf){

    rb->to_write = rb->bufs;

    } else {

    rb->to_write = rb->bufs->next ? rb->bufs->next : rb->bufs;

    }

    r->read_event_handler = ngx_http_read_client_request_body_handler; //非阻塞方式的handler

    return ngx_http_do_read_client_request_body(r); //读数据

    }

    ngx_http_do_read_client_request_body

    函数功能:进行实际的数据读取工作。

    static ngx_int_t

    ngx_http_do_read_client_request_body(ngx_http_request_t *r)

    {

    。。。

    c = r->connection;

    rb = r->request_body;

    for ( ;; ) {

    for ( ;; ) {

    if (rb->buf->last == rb->buf->end) { //size大小的数据读完

    //把数据写入临时文件

    if (ngx_http_write_request_body(r, rb->to_write) != NGX_OK) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR; //500

    }

    //循环使用size大小的内存,只有rest是判断是否结束的标准

    rb->to_write = rb->bufs->next ? rb->bufs->next : rb->bufs;

    rb->buf->last = rb->buf->start;

    }

    size = rb->buf->end - rb->buf->last; // size

    if ((off_t) size > rb->rest) {

    size = (size_t) rb->rest; //读最后的一部分

    }

    n = c->recv(c, rb->buf->last, size); //recv接收新的数据,n可能小于size

    if (n == NGX_AGAIN) { //n=size -2

    break;

    }

    //client prematurely closed connection 读数据的时候,关闭链接

    if (n == 0) {

    ngx_log_error(NGX_LOG_INFO, c->log, 0,

    "client prematurely closed connection");

    }

    if (n == 0 || n == NGX_ERROR) {

    c->error = 1;

    return NGX_HTTP_BAD_REQUEST;

    }

    rb->buf->last += n; //last一直移位

    rb->rest -= n; //rest一直减少

    r->request_length += n;

    if (rb->rest == 0) { //数据全部读完

    break;

    }

    //按规定size读取数据,但是缓冲区没有足够的数据

    //这里是关键,非阻塞的方式

    if (rb->buf->last < rb->buf->end) {

    break;

    }

    } //第一层for

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,

    "http client request body rest %O", rb->rest);

    if (rb->rest == 0) { //数据全部读完

    break;

    }

    if (!c->read->ready) { //大小为size的数据,缓冲区未准备好

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    //client_body_timeout 超时的话,返回408 默认是60s

    ngx_add_timer(c->read, clcf->client_body_timeout); //ngx_http_request_handler

    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    return NGX_AGAIN;

    }

    }

    if (c->read->timer_set) { //数据读完,清除定时器

    ngx_del_timer(c->read);

    }

    if (rb->temp_file || r->request_body_in_file_only) {

    /* save the last part */

    if (ngx_http_write_request_body(r, rb->to_write) != NGX_OK) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    b = ngx_calloc_buf(r->pool);

    if (b == NULL) {

    return NGX_HTTP_INTERNAL_SERVER_ERROR;

    }

    b->in_file = 1;

    b->file_pos = 0;

    b->file_last = rb->temp_file->file.offset;

    b->file = &rb-;>temp_file->file;

    if (rb->bufs->next) {

    rb->bufs->next->buf = b;

    } else {

    rb->bufs->buf = b;

    }

    }

    if (rb->bufs->next

    && (r->request_body_in_file_only || r->request_body_in_single_buf))

    {

    rb->bufs = rb->bufs->next;

    }

    r->read_event_handler = ngx_http_block_reading;

    rb->post_handler(r); // ngx_http_upstream_init

    return NGX_OK;

    }

    ngx_http_read_client_request_body_handler

    函数功能:提供非阻塞的读取body方式。

    static void

    ngx_http_read_client_request_body_handler(ngx_http_request_t *r)

    {

    ngx_int_t rc;

    if (r->connection->read->timedout) { //如果超时,client_body_timeout,返回408

    r->connection->timedout = 1;

    ngx_http_finalize_request(r, NGX_HTTP_REQUEST_TIME_OUT); //408

    return;

    }

    rc = ngx_http_do_read_client_request_body(r); //非阻塞方式继续读取数据

    if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {

    ngx_http_finalize_request(r, rc);

    }

    }

    4、扩展知识

    recv(int s, void *buf, size_t len, int flags)

    函数功能:从一个套接口接收数据。

    表头文件:

    #includetypes.h><font>

    #includesocket.h><font>

    参数说明:

    s:一个标识已连接套接口的描述字。

    buf:用于接收数据的缓冲区。

    len:缓冲区长度。

    flags:指定调用方式。

    工作流程:

    这里只描述阻塞Socket的recv函数的执行流程。当应用程序调用recv函数时:

    (1)recv先等待s的发送缓冲中的数据被协议传送完毕,如果协议在传送s的发送缓冲中的数据时出现网络错误,那么recv函数返回SOCKET_ERROR;

    (2)如果s的发送缓冲中没有数据或者数据被协议成功发送完毕后,recv先检查套接字s的接收缓冲区,如果s接收缓冲区中没有数据或者协议正在接收数据,那么recv就一直等待,直到协议把数据接收完毕。当协议把数据接收完毕,recv函数就把s的接收缓冲中的数据copy到buf中(注意协议接收到的数据可能大于buf的长度,所以在这种情况下要调用几次recv函数才能把s的接收缓冲中的数据copy完。recv函数仅仅是copy数据,真正的接收数据是协议来完成的);

    recv函数返回其实际copy的字节数。如果recv在copy时出错,那么它返回SOCKET_ERROR;如果recv函数在等待协议接收数据时网络中断了,那么它返回0。

    注意:在Unix系统下,如果recv函数在等待协议接收数据时网络断开了,那么调用recv的进程会接收到一个SIGPIPE信号,进程对该信号的默认处理是进程终止。

    阻塞与非阻塞

    在网络编程中对于一个网络句柄socket会遇到阻塞IO 和非阻塞IO 的概念。其中阻塞IO意味着必须要做完IO 操作(包括错误)才会返回。非阻塞IO意味着无论操作是否完成都会立刻返回,需要通过其他方式来判断具体操作是否成功。(对于connect,accpet操作,通过select判断,对于recv,recvfrom,send,sendto通过返回值+错误码来判断)

    可以用fcntl 设置阻塞与非阻塞模式,用F_GETFL获取flags,用F_SETFL设置flags|O_NONBLOCK。recv、send 时使用非阻塞的方式读取和发送消息,即flags设置为MSG_DONTWAIT实现。

    flags = fcntl(sockfd, F_GETFL, 0); //获取文件的flags值

    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); //设置成非阻塞模式

    flags = fcntl(sockfd,F_GETFL,0); //获取文件的flags值

    fcntl(sockfd,F_SETFL,flags&~O_NONBLOCK); //设置成阻塞模式

    recv(sockfd, buff, buff_size,MSG_DONTWAIT); //非阻塞模式的消息发送

    send(scokfd, buff, buff_size, MSG_DONTWAIT); //非阻塞模式的消息接受

    读

    读的阻塞和非阻塞的区别在于没有数据到达的时候是否立刻返回。读(read/recv/msgrcv):

    读的本质来说其实不能是读,在实际中,具体的接收数据不是由这些调用来进行,是由系统底层自动完成的。read也好,recv 也好只负责把数据从底层缓冲copy到我们指定的位置。

    阻塞情况下:

    (1)如果没有发现数据在网络缓冲中会一直等待;

    (2)当发现有数据的时候会把数据读到用户指定的缓冲区,但是如果这个时候读到的数据量比较少,比参数中指定的长度要小,read并不会一直等待下去,而是立刻返回。

    备注:read 的原则是数据在不超过指定的长度的时候有多少读多少,没有数据就会一直等待。所以一般情况下,我们读取数据都需要采用循环读的方式读取数据,因为一次read 完毕不能保证读到我们需要长度的数据,read完一次需要判断读到的数据长度再决定是否还需要再次读取。

    非阻塞情况下:

    (1)如果发现没有数据就直接返回;

    (2)如果发现有数据那么也是采用有多少读多少的进行处理;

    备注:read 完一次需要判断读到的数据长度再决定是否还需要再次读取。

    写

    写(send/write/msgsnd):

    写的本质也不是进行发送操作,而是把用户态的数据copy到系统底层去,然后再由系统进行发送操作,send、write返回成功,只表示数据已经copy到底层缓冲,而不表示数据已经发出,更不能表示对方端口已经接收到数据。

    阻塞情况下:

    阻塞情况下,write会将数据发送完。(不过可能被中断)

    在阻塞的情况下,是会一直等待,直到write 完,全部的数据再返回.这点行为上与读操作有所不同。

    原因:读主要是读数据的时候我们并不知道对端到底有没有数据,数据是在什么时候结束发送的,如果一直等待就可能会造成死循环,所以并没有去进行这方面的处理;写,而对于write,由于需要写的长度是已知的,所以可以一直再写,直到写完.不过问题是write 是可能被打断的,造成write一次只write 一部分数据, 所以write 的过程还是需要考虑循环write,只不过多数情况下一次write 调用就可能成功.

    非阻塞情况下:

    非阻塞写的情况下,是采用可以写多少就写多少的策略.与读不一样的地方在于,有多少读多少是由网络发送的那一端是否有数据传输到为标准,但是对于可以写多少是由本地的网络堵塞情况为标准的,在网络阻塞严重的时候,网络层没有足够的内存来进行写操作,这时候就会出现写不成功的情况,阻塞情况下会尽可能(有可能被中断)等待到数据全部发送完毕,对于非阻塞的情况就是一次写多少算多少,没有中断的情况下也还是会出现write 到一部分的情况。

    5、参考资料

    http://www.cnblogs.com/donj/archive/2012/09/07/2675378.html

    http://blog.csdn.net/russell_tao/article/details/5637545

    http://blog.163.com/xychenbaihu@yeah/blog/static/132229655201121793744671/



沪ICP备19023445号-2号
友情链接