nginx upstream在某个版本之前只支持短连接,即通过HTTP/1.0向后端发起连接,并把请求的"Connection" header设为"close"。Nginx与前端的连接默认为长连接,一个用户跟nginx建立连接之后,通过这个长连接可以发送多个请求。如果Nginx只是作为reverse proxy的话,可能一个用户连接就需要多个向后端的短连接。如果后端的服务器(源站或是缓存服务器)处理并发连接能力不强的话(比如单进程的squid),就可能导致瓶颈的出现。
长连接的好处显而易见,不用频繁3次握手,节省了网络资源,但是长连接同时也带来了很多负面影响,会在后面第5节中介绍。
typedef struct {
ngx_http_upstream_keepalive_srv_conf_t *conf;
ngx_queue_t queue;
ngx_connection_t *connection; //c->fd=socket()
socklen_t socklen;
u_char sockaddr[NGX_SOCKADDRLEN];
} ngx_http_upstream_keepalive_cache_t;
typedef struct {
ngx_uint_t max_cached; //keepalive的第一个参数
ngx_uint_t single; // keepalive的第二个参数
ngx_queue_t cache; //ngx_http_upstream_keepalive_cache_t
ngx_queue_t free; //ngx_http_upstream_keepalive_cache_t
//ngx_http_upstream_init_round_robin
//或自定义的
ngx_http_upstream_init_pt original_init_upstream;
//ngx_http_upstream_init_keepalive_peer
ngx_http_upstream_init_peer_pt original_init_peer;
} ngx_http_upstream_keepalive_srv_conf_t;
备注:这个是upstream里的keepalive指令真正填充的数据结构。
typedef struct {
ngx_http_upstream_keepalive_srv_conf_t *conf;
ngx_http_upstream_t *upstream;
void *data;
ngx_event_get_peer_pt original_get_peer;
ngx_event_free_peer_pt original_free_peer;
#if (NGX_HTTP_SSL)
ngx_event_set_peer_session_pt original_set_session;
ngx_event_save_peer_session_pt original_save_session;
#endif
ngx_uint_t failed; /* unsigned:1 */
} ngx_http_upstream_keepalive_peer_data_t;
函数功能:主要用于处理upstream的keepalive指令。
static char *
ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_upstream_srv_conf_t *uscf;
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_int_t n;
ngx_str_t *value;
ngx_uint_t i;
uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
kcf = ngx_http_conf_upstream_srv_conf(uscf,
ngx_http_upstream_keepalive_module);
kcf->original_init_upstream = uscf->peer.init_upstream
? uscf->peer.init_upstream
: ngx_http_upstream_init_round_robin;
uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;
/* read options */
value = cf->args->elts;
n = ngx_atoi(value[1].data, value[1].len);
if (n == NGX_ERROR || n == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid value \"%V\" in \"%V\" directive",
&value;[1], &cmd-;>name);
return NGX_CONF_ERROR;
}
kcf->max_cached = n;
for (i = 2; i < cf->args->nelts; i++) {
if (ngx_strcmp(value[i].data, "single") == 0) {
kcf->single = 1;
continue;
}
goto invalid;
}
return NGX_CONF_OK;
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value;[i]);
return NGX_CONF_ERROR;
}
函数功能:init_main_conf()中调用。
static ngx_int_t
ngx_http_upstream_init_keepalive(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us)
{
ngx_uint_t i;
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_http_upstream_keepalive_cache_t *cached;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0,
"init keepalive");
kcf = ngx_http_conf_upstream_srv_conf(us,
ngx_http_upstream_keepalive_module);
if (kcf->original_init_upstream(cf, us) != NGX_OK) {
return NGX_ERROR;
}
kcf->original_init_peer = us->peer.init;
us->peer.init = ngx_http_upstream_init_keepalive_peer;
/* allocate cache items and add to free queue */
//这块是核心,首先初始化的是free队列,队列大小=max_cached
cached = ngx_pcalloc(cf->pool,
sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);
if (cached == NULL) {
return NGX_ERROR;
}
ngx_queue_init(&kcf-;>cache);
ngx_queue_init(&kcf-;>free);
for (i = 0; i < kcf->max_cached; i++) {
ngx_queue_insert_head(&kcf-;>free, &cached;[i].queue);
cached[i].conf = kcf;
}
return NGX_OK;
}
static ngx_int_t
ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
ngx_http_upstream_keepalive_peer_data_t *kp;
ngx_http_upstream_keepalive_srv_conf_t *kcf;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"init keepalive peer");
kcf = ngx_http_conf_upstream_srv_conf(us,
ngx_http_upstream_keepalive_module);
kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));
if (kp == NULL) {
return NGX_ERROR;
}
//ngx_http_upstream_init_round_robin_peer
// ngx_http_upstream_ci_init_peer
//初始化peer.get/free等函数
if (kcf->original_init_peer(r, us) != NGX_OK) {
return NGX_ERROR;
}
kp->conf = kcf;
kp->upstream = r->upstream;
kp->data = r->upstream->peer.data;
kp->original_get_peer = r->upstream->peer.get;
kp->original_free_peer = r->upstream->peer.free;
r->upstream->peer.data = kp;
r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;
#if (NGX_HTTP_SSL)
kp->original_set_session = r->upstream->peer.set_session;
kp->original_save_session = r->upstream->peer.save_session;
r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
#endif
return NGX_OK;
}
static ngx_int_t
ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_cache_t *item;
ngx_int_t rc;
ngx_queue_t *q, *cache;
ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer");
kp->failed = 0;
/* single pool of cached connections */
if (kp->conf->single && !ngx_queue_empty(&kp-;>conf->cache)) { //kp->conf->single = 0
q = ngx_queue_head(&kp-;>conf->cache);
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
c = item->connection;
ngx_queue_remove(q);
ngx_queue_insert_head(&kp-;>conf->free, q);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using connection %p", c);
c->idle = 0;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
c->pool->log = pc->log;
pc->connection = c;
pc->cached = 1;
return NGX_DONE;
}
rc = kp->original_get_peer(pc, kp->data); // ngx_http_upstream_get_round_robin_peer
if (kp->conf->single || rc != NGX_OK) {
return rc;
}
/* search cache for suitable connection */
cache = &kp-;>conf->cache;
//keepalive模块的核心
//将节点从cache中移除,添加到free中
//第一次请求过来的话,cache队列为空
for (q = ngx_queue_head(cache);
q != ngx_queue_sentinel(cache);
q = ngx_queue_next(q))
{
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); //here
c = item->connection;
if (ngx_memn2cmp((u_char *) &item-;>sockaddr, (u_char *) pc->sockaddr,
item->socklen, pc->socklen)
== 0)
{
ngx_queue_remove(q);
ngx_queue_insert_head(&kp-;>conf->free, q);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"get keepalive peer: using connection %p", c);
c->idle = 0;
c->log = pc->log;
c->read->log = pc->log;
c->write->log = pc->log;
c->pool->log = pc->log;
pc->connection = c;
pc->cached = 1;
return NGX_DONE;
}
}
return NGX_OK;
}
函数功能:跟upstream的server无论通过哪种方式建立socket连接后,根据HTTP协议,需要将请求发送给后端了,这块主要是根据回调来发送请求的。
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request handler");
if (c->write->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); //here,有可能会到这里
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
if (u->header_sent) {
u->write_event_handler = ngx_http_upstream_dummy_handler;
(void) ngx_handle_write_event(c->write, 0);
return;
}
ngx_http_upstream_send_request(r, u);
}
函数功能:真正发送请求的核心。
//函数功能:将客户端请求发送给upstream
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
//如果test connect失败,则说明连接失败,于是跳到下一个upstream,然后返回
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
//发送数据,这里的u->output.output_filter已经被修改过了
rc = ngx_output_chain(&u-;>output, u->request_sent ? NULL : u->request_bufs);
u->request_sent = 1;
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
if (c->write->timer_set) {
ngx_del_timer(c->write); //here
}
//和request的处理类似,如果again,则说明数据没有发送完毕,此时挂载写事件
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout); //proxy_send_timeout
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
//设置tcp_cork,远离和前面的keepalive部分的处理类似
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
ngx_add_timer(c->read, u->conf->read_timeout); //here
#if 1
//如果读也可以了,则开始解析头
if (c->read->ready) {
/* post aio operation */
/*
* TODO comment
* although we can post aio operation just in the end
* of ngx_http_upstream_connect() CHECK IT !!!
* it's better to do here because we postpone header buffer allocation
*/
ngx_http_upstream_process_header(r, u);
return;
}
#endif
u->write_event_handler = ngx_http_upstream_dummy_handler; //here
if (ngx_handle_write_event(c->write, 0) != NGX_OK) { //here
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
函数功能:请求信息发送给upstream了,现在需要去读upstream的结果了。
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process header");
c->log->action = "reading response header from upstream";
。。。
for ( ;; ) {
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); // 系统级别的recv
//数据还没准备好
if (n == NGX_AGAIN) {
#if 0
ngx_add_timer(rev, u->read_timeout); //proxy_read_timeout
#endif
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
//如果网络断开,n=0
if (n == 0) { //here
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream prematurely closed connection");
}
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); //here,下一个
return;
}
u->buffer.last += n; //here
#if 0
u->valid_header_in = 0;
u->peer.cached = 0;
#endif
rc = u->process_header(r); //ngx_http_proxy_process_status_line
if (rc == NGX_AGAIN) { //数据没有读完
if (u->buffer.last == u->buffer.end) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream sent too big header");
ngx_http_upstream_next(r, u,
NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
continue;
}
break;
}
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* rc == NGX_OK */
if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) { //302
if (r->subrequest_in_memory) {
u->buffer.last = u->buffer.pos;
}
//判断是否需要next 比如500的话,就下一个
if (ngx_http_upstream_test_next(r, u) == NGX_OK) { //NGX_DECLINED
return;
}
//error_page
if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
return;
}
}
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { //here
return;
}
if (!r->subrequest_in_memory) {
ngx_http_upstream_send_response(r, u); //here
return;
}
/* subrequest content in memory */
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
n = u->buffer.last - u->buffer.pos;
if (n) {
u->buffer.last -= n;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
}
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
ngx_http_upstream_process_body_in_memory(r, u);
}
函数功能:keepalive的核心,处理free和cache队列。
static void
ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_upstream_keepalive_peer_data_t *kp = data;
ngx_http_upstream_keepalive_cache_t *item;
ngx_queue_t *q;
ngx_connection_t *c;
ngx_http_upstream_t *u;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer");
/* remember failed state - peer.free() may be called more than once */
if (state & NGX_PEER_FAILED) {
kp->failed = 1;
}
/* cache valid connections */
u = kp->upstream;
c = pc->connection;
if (kp->failed
|| c == NULL
|| c->read->eof //如果upstream关闭了连接,c->read->eof=1
|| c->read->error
|| c->read->timedout
|| c->write->error
|| c->write->timedout)
{
goto invalid;
}
if (!u->keepalive) {
goto invalid;
}
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto invalid;
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free keepalive peer: saving connection %p", c);
//free和cache队列淘汰算法
//upstream的server有3个,keepalive=2即free队列只有2个
//在get_keepalive_peer的时候,2个free队列的节点被remove
if (ngx_queue_empty(&kp-;>conf->free)) {
q = ngx_queue_last(&kp-;>conf->cache); //淘汰cache中的节点q
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
ngx_http_upstream_keepalive_close(item->connection); //关闭和节点q的连接
} else {
q = ngx_queue_head(&kp-;>conf->free);
ngx_queue_remove(q);
item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
}
//对于free队列为空的情况下,其实是将最后一个节点移到最前面
item->connection = c; //更新值pc->connection
ngx_queue_insert_head(&kp-;>conf->cache, q);//一段时间后,cache队列长度=keepalive值
pc->connection = NULL;
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
c->read->handler = ngx_http_upstream_keepalive_close_handler;
c->data = item;
c->idle = 1;
c->log = ngx_cycle->log;
c->read->log = ngx_cycle->log;
c->write->log = ngx_cycle->log;
c->pool->log = ngx_cycle->log;
item->socklen = pc->socklen;
ngx_memcpy(&item-;>sockaddr, pc->sockaddr, pc->socklen);
if (c->read->ready) {
ngx_http_upstream_keepalive_close_handler(c->read);
}
invalid:
kp->original_free_peer(pc, kp->data, state);
}
http://wiki.nginx.org/HttpUpstreamKeepaliveModule