IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    nginx模块开发(33)—upstream的keepalive模型

    cjhust发表于 2013-06-24 14:36:11
    love 0

    1、百科知识

    nginx upstream在某个版本之前只支持短连接,即通过HTTP/1.0向后端发起连接,并把请求的"Connection" header设为"close"。Nginx与前端的连接默认为长连接,一个用户跟nginx建立连接之后,通过这个长连接可以发送多个请求。如果Nginx只是作为reverse proxy的话,可能一个用户连接就需要多个向后端的短连接。如果后端的服务器(源站或是缓存服务器)处理并发连接能力不强的话(比如单进程的squid),就可能导致瓶颈的出现。

    长连接的好处显而易见,不用频繁3次握手,节省了网络资源,但是长连接同时也带来了很多负面影响,会在后面第5节中介绍。

    2、数据结构

    ngx_http_upstream_keepalive_cache_t

    typedef struct {

    ngx_http_upstream_keepalive_srv_conf_t *conf;

    ngx_queue_t queue;

    ngx_connection_t *connection; //c->fd=socket()

    socklen_t socklen;

    u_char sockaddr[NGX_SOCKADDRLEN];

    } ngx_http_upstream_keepalive_cache_t;

    ngx_http_upstream_keepalive_srv_conf_t

    typedef struct {

    ngx_uint_t max_cached; //keepalive的第一个参数

    ngx_uint_t single; // keepalive的第二个参数

    ngx_queue_t cache; //ngx_http_upstream_keepalive_cache_t

    ngx_queue_t free; //ngx_http_upstream_keepalive_cache_t

    //ngx_http_upstream_init_round_robin

    //或自定义的

    ngx_http_upstream_init_pt original_init_upstream;

    //ngx_http_upstream_init_keepalive_peer

    ngx_http_upstream_init_peer_pt original_init_peer;

    } ngx_http_upstream_keepalive_srv_conf_t;

    备注:这个是upstream里的keepalive指令真正填充的数据结构。

    ngx_http_upstream_keepalive_peer_data_t

    typedef struct {

    ngx_http_upstream_keepalive_srv_conf_t *conf;

    ngx_http_upstream_t *upstream;

    void *data;

    ngx_event_get_peer_pt original_get_peer;

    ngx_event_free_peer_pt original_free_peer;

    #if (NGX_HTTP_SSL)

    ngx_event_set_peer_session_pt original_set_session;

    ngx_event_save_peer_session_pt original_save_session;

    #endif

    ngx_uint_t failed; /* unsigned:1 */

    } ngx_http_upstream_keepalive_peer_data_t;

    3、操作函数

    clip_image001

    ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)

    函数功能:主要用于处理upstream的keepalive指令。

    static char *

    ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)

    {

    ngx_http_upstream_srv_conf_t *uscf;

    ngx_http_upstream_keepalive_srv_conf_t *kcf;

    ngx_int_t n;

    ngx_str_t *value;

    ngx_uint_t i;

    uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);

    kcf = ngx_http_conf_upstream_srv_conf(uscf,

    ngx_http_upstream_keepalive_module);

    kcf->original_init_upstream = uscf->peer.init_upstream

    ? uscf->peer.init_upstream

    : ngx_http_upstream_init_round_robin;

    uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;

    /* read options */

    value = cf->args->elts;

    n = ngx_atoi(value[1].data, value[1].len);

    if (n == NGX_ERROR || n == 0) {

    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,

    "invalid value \"%V\" in \"%V\" directive",

    &value;[1], &cmd-;>name);

    return NGX_CONF_ERROR;

    }

    kcf->max_cached = n;

    for (i = 2; i < cf->args->nelts; i++) {

    if (ngx_strcmp(value[i].data, "single") == 0) {

    kcf->single = 1;

    continue;

    }

    goto invalid;

    }

    return NGX_CONF_OK;

    invalid:

    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,

    "invalid parameter \"%V\"", &value;[i]);

    return NGX_CONF_ERROR;

    }

    ngx_http_upstream_init_keepalive(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)

    函数功能:init_main_conf()中调用。

    clip_image002

    clip_image002[4]

    static ngx_int_t

    ngx_http_upstream_init_keepalive(ngx_conf_t *cf,

    ngx_http_upstream_srv_conf_t *us)

    {

    ngx_uint_t i;

    ngx_http_upstream_keepalive_srv_conf_t *kcf;

    ngx_http_upstream_keepalive_cache_t *cached;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0,

    "init keepalive");

    kcf = ngx_http_conf_upstream_srv_conf(us,

    ngx_http_upstream_keepalive_module);

    if (kcf->original_init_upstream(cf, us) != NGX_OK) {

    return NGX_ERROR;

    }

    kcf->original_init_peer = us->peer.init;

    us->peer.init = ngx_http_upstream_init_keepalive_peer;

    /* allocate cache items and add to free queue */

    //这块是核心,首先初始化的是free队列,队列大小=max_cached

    cached = ngx_pcalloc(cf->pool,

    sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);

    if (cached == NULL) {

    return NGX_ERROR;

    }

    ngx_queue_init(&kcf-;>cache);

    ngx_queue_init(&kcf-;>free);

    for (i = 0; i < kcf->max_cached; i++) {

    ngx_queue_insert_head(&kcf-;>free, &cached;[i].queue);

    cached[i].conf = kcf;

    }

    return NGX_OK;

    }

    ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,ngx_http_upstream_srv_conf_t *us)

    clip_image001[4]

    clip_image002[6]

    static ngx_int_t

    ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,

    ngx_http_upstream_srv_conf_t *us)

    {

    ngx_http_upstream_keepalive_peer_data_t *kp;

    ngx_http_upstream_keepalive_srv_conf_t *kcf;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

    "init keepalive peer");

    kcf = ngx_http_conf_upstream_srv_conf(us,

    ngx_http_upstream_keepalive_module);

    kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));

    if (kp == NULL) {

    return NGX_ERROR;

    }

    //ngx_http_upstream_init_round_robin_peer

    // ngx_http_upstream_ci_init_peer

    //初始化peer.get/free等函数

    if (kcf->original_init_peer(r, us) != NGX_OK) {

    return NGX_ERROR;

    }

    kp->conf = kcf;

    kp->upstream = r->upstream;

    kp->data = r->upstream->peer.data;

    kp->original_get_peer = r->upstream->peer.get;

    kp->original_free_peer = r->upstream->peer.free;

    r->upstream->peer.data = kp;

    r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;

    r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;

    #if (NGX_HTTP_SSL)

    kp->original_set_session = r->upstream->peer.set_session;

    kp->original_save_session = r->upstream->peer.save_session;

    r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;

    r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;

    #endif

    return NGX_OK;

    }

    ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)

    clip_image002[8]

    static ngx_int_t

    ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)

    {

    ngx_http_upstream_keepalive_peer_data_t *kp = data;

    ngx_http_upstream_keepalive_cache_t *item;

    ngx_int_t rc;

    ngx_queue_t *q, *cache;

    ngx_connection_t *c;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,

    "get keepalive peer");

    kp->failed = 0;

    /* single pool of cached connections */

    if (kp->conf->single && !ngx_queue_empty(&kp-;>conf->cache)) { //kp->conf->single = 0

    q = ngx_queue_head(&kp-;>conf->cache);

    item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

    c = item->connection;

    ngx_queue_remove(q);

    ngx_queue_insert_head(&kp-;>conf->free, q);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,

    "get keepalive peer: using connection %p", c);

    c->idle = 0;

    c->log = pc->log;

    c->read->log = pc->log;

    c->write->log = pc->log;

    c->pool->log = pc->log;

    pc->connection = c;

    pc->cached = 1;

    return NGX_DONE;

    }

    rc = kp->original_get_peer(pc, kp->data); // ngx_http_upstream_get_round_robin_peer

    if (kp->conf->single || rc != NGX_OK) {

    return rc;

    }

    /* search cache for suitable connection */

    cache = &kp-;>conf->cache;

    //keepalive模块的核心

    //将节点从cache中移除,添加到free中

    //第一次请求过来的话,cache队列为空

    for (q = ngx_queue_head(cache);

    q != ngx_queue_sentinel(cache);

    q = ngx_queue_next(q))

    {

    item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); //here

    c = item->connection;

    if (ngx_memn2cmp((u_char *) &item-;>sockaddr, (u_char *) pc->sockaddr,

    item->socklen, pc->socklen)

    == 0)

    {

    ngx_queue_remove(q);

    ngx_queue_insert_head(&kp-;>conf->free, q);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,

    "get keepalive peer: using connection %p", c);

    c->idle = 0;

    c->log = pc->log;

    c->read->log = pc->log;

    c->write->log = pc->log;

    c->pool->log = pc->log;

    pc->connection = c;

    pc->cached = 1;

    return NGX_DONE;

    }

    }

    return NGX_OK;

    }

    ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)

    函数功能:跟upstream的server无论通过哪种方式建立socket连接后,根据HTTP协议,需要将请求发送给后端了,这块主要是根据回调来发送请求的。

    clip_image002[10]

    static void

    ngx_http_upstream_send_request_handler(ngx_http_request_t *r,

    ngx_http_upstream_t *u)

    {

    ngx_connection_t *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

    "http upstream send request handler");

    if (c->write->timedout) {

    ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); //here,有可能会到这里

    return;

    }

    #if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {

    ngx_http_upstream_ssl_init_connection(r, u, c);

    return;

    }

    #endif

    if (u->header_sent) {

    u->write_event_handler = ngx_http_upstream_dummy_handler;

    (void) ngx_handle_write_event(c->write, 0);

    return;

    }

    ngx_http_upstream_send_request(r, u);

    }

    ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)

    函数功能:真正发送请求的核心。

    //函数功能:将客户端请求发送给upstream

    static void

    ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)

    {

    ngx_int_t rc;

    ngx_connection_t *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,

    "http upstream send request");

    //如果test connect失败,则说明连接失败,于是跳到下一个upstream,然后返回

    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {

    ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);

    return;

    }

    c->log->action = "sending request to upstream";

    //发送数据,这里的u->output.output_filter已经被修改过了

    rc = ngx_output_chain(&u-;>output, u->request_sent ? NULL : u->request_bufs);

    u->request_sent = 1;

    if (rc == NGX_ERROR) {

    ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);

    return;

    }

    if (c->write->timer_set) {

    ngx_del_timer(c->write); //here

    }

    //和request的处理类似,如果again,则说明数据没有发送完毕,此时挂载写事件

    if (rc == NGX_AGAIN) {

    ngx_add_timer(c->write, u->conf->send_timeout); //proxy_send_timeout

    if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    return;

    }

    /* rc == NGX_OK */

    //设置tcp_cork,远离和前面的keepalive部分的处理类似

    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {

    if (ngx_tcp_push(c->fd) == NGX_ERROR) {

    ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,

    ngx_tcp_push_n " failed");

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;

    }

    ngx_add_timer(c->read, u->conf->read_timeout); //here

    #if 1

    //如果读也可以了,则开始解析头

    if (c->read->ready) {

    /* post aio operation */

    /*

    * TODO comment

    * although we can post aio operation just in the end

    * of ngx_http_upstream_connect() CHECK IT !!!

    * it's better to do here because we postpone header buffer allocation

    */

    ngx_http_upstream_process_header(r, u);

    return;

    }

    #endif

    u->write_event_handler = ngx_http_upstream_dummy_handler; //here

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) { //here

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    }

    ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)

    函数功能:请求信息发送给upstream了,现在需要去读upstream的结果了。

    static void

    ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)

    {

    ssize_t n;

    ngx_int_t rc;

    ngx_connection_t *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,

    "http upstream process header");

    c->log->action = "reading response header from upstream";

    。。。

    for ( ;; ) {

    n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); // 系统级别的recv

    //数据还没准备好

    if (n == NGX_AGAIN) {

    #if 0

    ngx_add_timer(rev, u->read_timeout); //proxy_read_timeout

    #endif

    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    return;

    }

    //如果网络断开,n=0

    if (n == 0) { //here

    ngx_log_error(NGX_LOG_ERR, c->log, 0,

    "upstream prematurely closed connection");

    }

    if (n == NGX_ERROR || n == 0) {

    ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); //here,下一个

    return;

    }

    u->buffer.last += n; //here

    #if 0

    u->valid_header_in = 0;

    u->peer.cached = 0;

    #endif

    rc = u->process_header(r); //ngx_http_proxy_process_status_line

    if (rc == NGX_AGAIN) { //数据没有读完

    if (u->buffer.last == u->buffer.end) {

    ngx_log_error(NGX_LOG_ERR, c->log, 0,

    "upstream sent too big header");

    ngx_http_upstream_next(r, u,

    NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);

    return;

    }

    continue;

    }

    break;

    }

    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {

    ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);

    return;

    }

    if (rc == NGX_ERROR) {

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    /* rc == NGX_OK */

    if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) { //302

    if (r->subrequest_in_memory) {

    u->buffer.last = u->buffer.pos;

    }

    //判断是否需要next 比如500的话,就下一个

    if (ngx_http_upstream_test_next(r, u) == NGX_OK) { //NGX_DECLINED

    return;

    }

    //error_page

    if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {

    return;

    }

    }

    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { //here

    return;

    }

    if (!r->subrequest_in_memory) {

    ngx_http_upstream_send_response(r, u); //here

    return;

    }

    /* subrequest content in memory */

    if (u->input_filter == NULL) {

    u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;

    u->input_filter = ngx_http_upstream_non_buffered_filter;

    u->input_filter_ctx = r;

    }

    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {

    ngx_http_upstream_finalize_request(r, u,

    NGX_HTTP_INTERNAL_SERVER_ERROR);

    return;

    }

    n = u->buffer.last - u->buffer.pos;

    if (n) {

    u->buffer.last -= n;

    u->state->response_length += n;

    if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {

    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);

    return;

    }

    if (u->length == 0) {

    ngx_http_upstream_finalize_request(r, u, 0);

    return;

    }

    }

    u->read_event_handler = ngx_http_upstream_process_body_in_memory;

    ngx_http_upstream_process_body_in_memory(r, u);

    }

    ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,ngx_uint_t state)

    函数功能:keepalive的核心,处理free和cache队列。

    clip_image002[12]

    static void

    ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,

    ngx_uint_t state)

    {

    ngx_http_upstream_keepalive_peer_data_t *kp = data;

    ngx_http_upstream_keepalive_cache_t *item;

    ngx_queue_t *q;

    ngx_connection_t *c;

    ngx_http_upstream_t *u;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,

    "free keepalive peer");

    /* remember failed state - peer.free() may be called more than once */

    if (state & NGX_PEER_FAILED) {

    kp->failed = 1;

    }

    /* cache valid connections */

    u = kp->upstream;

    c = pc->connection;

    if (kp->failed

    || c == NULL

    || c->read->eof //如果upstream关闭了连接,c->read->eof=1

    || c->read->error

    || c->read->timedout

    || c->write->error

    || c->write->timedout)

    {

    goto invalid;

    }

    if (!u->keepalive) {

    goto invalid;

    }

    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {

    goto invalid;

    }

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,

    "free keepalive peer: saving connection %p", c);

    //free和cache队列淘汰算法

    //upstream的server有3个,keepalive=2即free队列只有2个

    //在get_keepalive_peer的时候,2个free队列的节点被remove

    if (ngx_queue_empty(&kp-;>conf->free)) {

    q = ngx_queue_last(&kp-;>conf->cache); //淘汰cache中的节点q

    ngx_queue_remove(q);

    item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

    ngx_http_upstream_keepalive_close(item->connection); //关闭和节点q的连接

    } else {

    q = ngx_queue_head(&kp-;>conf->free);

    ngx_queue_remove(q);

    item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);

    }

    //对于free队列为空的情况下,其实是将最后一个节点移到最前面

    item->connection = c; //更新值pc->connection

    ngx_queue_insert_head(&kp-;>conf->cache, q);//一段时间后,cache队列长度=keepalive值

    pc->connection = NULL;

    if (c->read->timer_set) {

    ngx_del_timer(c->read);

    }

    if (c->write->timer_set) {

    ngx_del_timer(c->write);

    }

    c->write->handler = ngx_http_upstream_keepalive_dummy_handler;

    c->read->handler = ngx_http_upstream_keepalive_close_handler;

    c->data = item;

    c->idle = 1;

    c->log = ngx_cycle->log;

    c->read->log = ngx_cycle->log;

    c->write->log = ngx_cycle->log;

    c->pool->log = ngx_cycle->log;

    item->socklen = pc->socklen;

    ngx_memcpy(&item-;>sockaddr, pc->sockaddr, pc->socklen);

    if (c->read->ready) {

    ngx_http_upstream_keepalive_close_handler(c->read);

    }

    invalid:

    kp->original_free_peer(pc, kp->data, state);

    }

    4、工作流程

    函数流

    clip_image002

    数据流

    clip_image002[4]

    5、参考资料

    http://wiki.nginx.org/HttpUpstreamKeepaliveModule

    http://blog.csdn.net/wangbin579/article/details/6327655

    http://blog.sina.com.cn/s/blog_70898f3f0100s6uh.html



沪ICP备19023445号-2号
友情链接