在nginx的模块中,分为3种类型,分别是handler,filter和upstream,其中upstream可以看做一种特殊的handler,它主要用来实现和后端另外的服务器(php/jboss等)进行通信,由于在nginx中全部都是使用非阻塞,并且是一个流式的处理,所以upstream的实现很复杂。
upstream顾名思义,真正产生内容的地方在“上游”而不是nginx,也就是说nginx是位于client和后端的upstream之间的桥梁,在这种情况下,一个upstream需要做的事情主要有2个,第一个是当client发送http请求过来之后,需要创建一个到后端upstream的请求。第二个是当后端发送数据过来之后,需要将后端upstream的数据再次发送给client。在这里,我将会以round_robin函数为例来浅析upstream的实现与机制,主要描述请求从client发送到upstream的过程。
server{
listen 9010;
location /{
proxy_pass http://backend-img;
}
}
upstream backend-img {
server www.alipay.com max_fails=5 fail_timeout=10s freeze_time=60s;
server 10.253.70.103:9001 max_fails=5 fail_timeout=10s freeze_time=60s;
}
typedef struct {
struct sockaddr *sockaddr; //存放socket相关信息
socklen_t socklen;
ngx_str_t name; //server后第一个参数
ngx_int_t current_weight;
ngx_int_t weight;
ngx_uint_t fails;
time_t accessed;
time_t checked;
ngx_uint_t max_fails;
time_t fail_timeout;
ngx_uint_t down; /* unsigned down:1; */
#if (NGX_HTTP_SSL)
ngx_ssl_session_t *ssl_session; /* local to a process */
#endif
} ngx_http_upstream_rr_peer_t;
struct ngx_http_upstream_rr_peers_s {
ngx_uint_t single; //是否只有一个,不是则为0
ngx_uint_t number; //server个数
ngx_uint_t last_cached;
/* ngx_mutex_t *mutex; */
ngx_connection_t **cached;
ngx_str_t *name; //upstream后面的参数
ngx_http_upstream_rr_peers_t *next; //backup
ngx_http_upstream_rr_peer_t peer[1];
};
typedef struct {
ngx_http_upstream_rr_peers_t *peers;
ngx_uint_t current;
uintptr_t *tried;
uintptr_t data;
} ngx_http_upstream_rr_peer_data_t;
备注:从上面的调用流程可以看出,upstrem其实是一个处于content phase的handler。
static char *
ngx_http_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_proxy_loc_conf_t *plcf = conf;
。。。
if (plcf->upstream.upstream || plcf->proxy_lengths) {
return "is duplicate";
}
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
//设置handler,即你配置了proxy_pass后,content handler处理时就会进入这个函数
clcf->handler = ngx_http_proxy_handler;
。。。
return NGX_CONF_OK;
}
static ngx_int_t
ngx_http_proxy_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_proxy_ctx_t *ctx;
ngx_http_proxy_loc_conf_t *plcf;
//对r->UPSTREAM分配空间
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
。。。
//读取client request body
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}
ngx_int_t
ngx_http_read_client_request_body(ngx_http_request_t *r,
ngx_http_client_body_handler_pt post_handler)
{
r->main->count++; //主请求
if (r->request_body || r->discard_body) {
post_handler(r);
return NGX_OK;
}
if (ngx_http_test_expect(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
if (rb == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
r->request_body = rb;
if (r->headers_in.content_length_n < 0) {
post_handler(r); //ngx_http_upstream_init(r)
return NGX_OK;
}
。。。
}
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
。。。
//挂载写事件
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
if (!c->write->active) {
if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
== NGX_ERROR) // ngx_epoll_add_event
{
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); //500
return;
}
}
}
ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
。。。
//调用create_request来创建请求ngx_http_proxy_create_request
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
。。。
if (u->resolved == NULL) { //不解析resolv地址
uscf = u->conf->upstream;
} else { //需要解析resolv地址
。。。
}
found:
//填充数据到 r->upstream->peer.data
if (uscf->peer.init(r, uscf) != NGX_OK) { // ngx_http_upstream_init_round_robin_peer
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
}
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
r->connection->log->action = "connecting to upstream";
。。。
//初始化时间
tp = ngx_timeofday();
u->state->response_sec = tp->sec;
u->state->response_msec = tp->msec;
//连接后端,get_peer,建立socket,bind等
rc = ngx_event_connect_peer(&u-;>peer);
if (rc == NGX_ERROR) {
。。。
}
u->state->peer = u->peer.name;
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* rc == NGX_OK || rc == NGX_AGAIN */
。。。
ngx_http_upstream_send_request(r, u);
}
函数功能:将客户端请求发送给upstream。
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
。。。
c = u->peer.connection;
//如果test connect失败,则说明连接失败,于是跳到下一个upstream,然后返回
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
//发送数据,这里的u->output.output_filter已经被修改过了
rc = ngx_output_chain(&u-;>output, u->request_sent ? NULL : u->request_bufs);
u->request_sent = 1;
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
//和request的处理类似,如果again,则说明数据没有发送完毕,此时挂载写事件
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
//设置tcp_cork,远离和前面的keepalive部分的处理类似
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
ngx_add_timer(c->read, u->conf->read_timeout);
#if 1
//如果读也可以了,则开始解析头
if (c->read->ready) {
/* post aio operation */
/*
* TODO comment
* although we can post aio operation just in the end
* of ngx_http_upstream_connect() CHECK IT !!!
* it's better to do here because we postpone header buffer allocation
*/
ngx_http_upstream_process_header(r, u);
return;
}
#endif
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
函数功能:被ngx_http_upstream_init_main_conf调用。
ngx_int_t
ngx_http_upstream_init_round_robin(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us)
{
ngx_url_t u;
ngx_uint_t i, j, n;
ngx_http_upstream_server_t *server;
ngx_http_upstream_rr_peers_t *peers, *backup;
us->peer.init = ngx_http_upstream_init_round_robin_peer;
if (us->servers) {
server = us->servers->elts;
n = 0;
for (i = 0; i < us->servers->nelts; i++) {
if (server[i].backup) {
continue;
}
n += server[i].naddrs; //写IP的情况下,naddrs=1,主机名可能有多个
}
if (n == 0) { //都是backup或者没有server
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no servers in upstream \"%V\" in %s:%ui",
&us-;>host, us->file_name, us->line);
return NGX_ERROR;
}
peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)
+ sizeof(ngx_http_upstream_rr_peer_t) * (n - 1));
//因为peers[1],可以少分配一个
if (peers == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1); //upstream下是否只有一个server
peers->number = n; //upstream下server个数
peers->name = &us-;>host; // backend-img
n = 0;
// one hostname can have multiple IP addresses in DNS
for (i = 0; i < us->servers->nelts; i++) {
for (j = 0; j < server[i].naddrs; j++) {
if (server[i].backup) {
continue;
}
peers->peer[n].sockaddr = server[i].addrs[j].sockaddr;
peers->peer[n].socklen = server[i].addrs[j].socklen;
peers->peer[n].name = server[i].addrs[j].name; //server后面的第一个参数
peers->peer[n].max_fails = server[i].max_fails;
peers->peer[n].fail_timeout = server[i].fail_timeout;
peers->peer[n].down = server[i].down;
peers->peer[n].weight = server[i].down ? 0 : server[i].weight;
peers->peer[n].current_weight = peers->peer[n].weight;
n++;
}
}
us->peer.data = peers;
ngx_sort(&peers-;>peer[0], (size_t) n,
sizeof(ngx_http_upstream_rr_peer_t),
ngx_http_upstream_cmp_servers); //根据wight排序
/* backup servers */
n = 0;
for (i = 0; i < us->servers->nelts; i++) {
if (!server[i].backup) { //上面的是if (server[i].backup)
continue;
}
n += server[i].naddrs;
}
if (n == 0) {
return NGX_OK;
}
backup = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)
+ sizeof(ngx_http_upstream_rr_peer_t) * (n - 1));
if (backup == NULL) {
return NGX_ERROR;
}
peers->single = 0;
backup->single = 0;
backup->number = n;
backup->name = &us-;>host;
n = 0;
/* one hostname can have multiple IP addresses in DNS */
for (i = 0; i < us->servers->nelts; i++) {
for (j = 0; j < server[i].naddrs; j++) {
if (!server[i].backup) { //上面的server[i].backup
continue;
}
backup->peer[n].sockaddr = server[i].addrs[j].sockaddr;
backup->peer[n].socklen = server[i].addrs[j].socklen;
backup->peer[n].name = server[i].addrs[j].name;
backup->peer[n].weight = server[i].weight;
backup->peer[n].current_weight = server[i].weight;
backup->peer[n].max_fails = server[i].max_fails;
backup->peer[n].fail_timeout = server[i].fail_timeout;
backup->peer[n].down = server[i].down;
n++;
}
}
peers->next = backup;
ngx_sort(&backup-;>peer[0], (size_t) n,
sizeof(ngx_http_upstream_rr_peer_t),
ngx_http_upstream_cmp_servers);
return NGX_OK;
}
/* an upstream implicitly defined by proxy_pass, etc. */
if (us->port == 0 && us->default_port == 0) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"no port in upstream \"%V\" in %s:%ui",
&us-;>host, us->file_name, us->line);
return NGX_ERROR;
}
ngx_memzero(&u;, sizeof(ngx_url_t));
u.host = us->host;
u.port = (in_port_t) (us->port ? us->port : us->default_port);
if (ngx_inet_resolve_host(cf->pool, &u;) != NGX_OK) {
if (u.err) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"%s in upstream \"%V\" in %s:%ui",
u.err, &us-;>host, us->file_name, us->line);
}
return NGX_ERROR;
}
n = u.naddrs;
peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)
+ sizeof(ngx_http_upstream_rr_peer_t) * (n - 1));
if (peers == NULL) {
return NGX_ERROR;
}
peers->single = (n == 1);
peers->number = n;
peers->name = &us-;>host;
for (i = 0; i < u.naddrs; i++) {
peers->peer[i].sockaddr = u.addrs[i].sockaddr;
peers->peer[i].socklen = u.addrs[i].socklen;
peers->peer[i].name = u.addrs[i].name;
peers->peer[i].weight = 1;
peers->peer[i].current_weight = 1;
peers->peer[i].max_fails = 1;
peers->peer[i].fail_timeout = 10;
}
us->peer.data = peers;
/* implicitly defined upstream has no backup servers */
return NGX_OK;
}
函数功能:向r->upstream->peer.data里填充数据。设置peer.get和peer.free函数。
ngx_int_t
ngx_http_upstream_init_round_robin_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
ngx_uint_t n;
ngx_http_upstream_rr_peer_data_t *rrp;
rrp = r->upstream->peer.data; // ngx_peer_connection_s.data(自定义)
if (rrp == NULL) {
rrp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_rr_peer_data_t)); //分配空间
if (rrp == NULL) {
return NGX_ERROR;
}
r->upstream->peer.data = rrp;
}
// ngx_http_upstream_srv_conf_t->data,在init_main_conf里数据已经填充
rrp->peers = us->peer.data;
rrp->current = 0;
n = rrp->peers->number; //server个数
if (rrp->peers->next && rrp->peers->next->number > n) { //还有backup
n = rrp->peers->next->number;
}
if (n <= 8 * sizeof(uintptr_t)) { //n<32
rrp->tried = &rrp-;>data;
rrp->data = 0;
} else {
n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t));
rrp->tried = ngx_pcalloc(r->pool, n * sizeof(uintptr_t));
if (rrp->tried == NULL) {
return NGX_ERROR;
}
}
r->upstream->peer.get = ngx_http_upstream_get_round_robin_peer;
r->upstream->peer.free = ngx_http_upstream_free_round_robin_peer;
r->upstream->peer.tries = rrp->peers->number;
#if (NGX_HTTP_SSL)
r->upstream->peer.set_session =
ngx_http_upstream_set_round_robin_peer_session;
r->upstream->peer.save_session =
ngx_http_upstream_save_round_robin_peer_session;
#endif
return NGX_OK;
}
函数功能:根据data(在ngx_http_upstream_init_round_robin_peer已填充)里的内容填充数据结构pc。
ngx_int_t
ngx_http_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data)
{
。。。
pc->cached = 0;
pc->connection = NULL;
if (rrp->peers->single) {
peer = &rrp-;>peers->peer[0];
} else {
/* there are several peers */
if (pc->tries == rrp->peers->number) {
i = pc->tries;
for ( ;; ) {
rrp->current = ngx_http_upstream_get_peer(rrp->peers);
n = rrp->current / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << rrp->current % (8 * sizeof(uintptr_t));
if (!(rrp->tried[n] & m)) {
peer = &rrp-;>peers->peer[rrp->current];
if (!peer->down) {
if (peer->max_fails == 0
|| peer->fails < peer->max_fails)
{
break;
}
if (now - peer->checked > peer->fail_timeout) {
peer->checked = now;
break;
}
peer->current_weight = 0;
} else {
rrp->tried[n] |= m;
}
pc->tries--;
}
if (pc->tries == 0) {
goto failed;
}
if (--i == 0) {
ngx_log_error(NGX_LOG_ALERT, pc->log, 0,
"round robin upstream stuck on %ui tries",
pc->tries);
goto failed;
}
}
peer->current_weight--;
} else {
i = pc->tries;
for ( ;; ) {
n = rrp->current / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << rrp->current % (8 * sizeof(uintptr_t));
if (!(rrp->tried[n] & m)) {
peer = &rrp-;>peers->peer[rrp->current];
if (!peer->down) {
if (peer->max_fails == 0
|| peer->fails < peer->max_fails)
{
break;
}
if (now - peer->checked > peer->fail_timeout) {
peer->checked = now;
break;
}
peer->current_weight = 0;
} else {
rrp->tried[n] |= m;
}
pc->tries--;
}
rrp->current++;
if (rrp->current >= rrp->peers->number) {
rrp->current = 0;
}
if (pc->tries == 0) {
goto failed;
}
if (--i == 0) {
ngx_log_error(NGX_LOG_ALERT, pc->log, 0,
"round robin upstream stuck on %ui tries",
pc->tries);
goto failed;
}
}
peer->current_weight--;
}
rrp->tried[n] |= m;
}
pc->sockaddr = peer->sockaddr;
pc->socklen = peer->socklen;
pc->name = &peer-;>name;
。。。
return NGX_OK;
failed:
peers = rrp->peers;
if (peers->next) {
/* ngx_unlock_mutex(peers->mutex); */
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "backup servers");
rrp->peers = peers->next;
pc->tries = rrp->peers->number;
n = rrp->peers->number / (8 * sizeof(uintptr_t)) + 1;
for (i = 0; i < n; i++) {
rrp->tried[i] = 0;
}
rc = ngx_http_upstream_get_round_robin_peer(pc, rrp);
if (rc != NGX_BUSY) {
return rc;
}
/* ngx_lock_mutex(peers->mutex); */
}
/* all peers failed, mark them as live for quick recovery */
for (i = 0; i < peers->number; i++) {
peers->peer[i].fails = 0;
}
/* ngx_unlock_mutex(peers->mutex); */
pc->name = peers->name;
return NGX_BUSY;
}
void
ngx_http_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_upstream_rr_peer_data_t *rrp = data;
time_t now;
ngx_http_upstream_rr_peer_t *peer;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free rr peer %ui %ui", pc->tries, state);
if (state == 0 && pc->tries == 0) {
return;
}
/* TODO: NGX_PEER_KEEPALIVE */
if (rrp->peers->single) {
pc->tries = 0;
return;
}
peer = &rrp-;>peers->peer[rrp->current];
if (state & NGX_PEER_FAILED) {
now = ngx_time();
/* ngx_lock_mutex(rrp->peers->mutex); */
peer->fails++;
peer->accessed = now;
peer->checked = now;
if (peer->max_fails) {
peer->current_weight -= peer->weight / peer->max_fails;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free rr peer failed: %ui %i",
rrp->current, peer->current_weight);
if (peer->current_weight < 0) {
peer->current_weight = 0;
}
/* ngx_unlock_mutex(rrp->peers->mutex); */
} else {
/* mark peer live if check passed */
if (peer->accessed < peer->checked) {
peer->fails = 0;
}
}
rrp->current++;
if (rrp->current >= rrp->peers->number) {
rrp->current = 0;
}
if (pc->tries) {
pc->tries--;
}
/* ngx_unlock_mutex(rrp->peers->mutex); */
}
http://www.pagefault.info/?p=251
http://www.pagefault.info/?p=259
http://www.pagefault.info/?p=273
http://www.pagefault.info/?p=324
http://www.pagefault.info/?p=339