IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    SCGI与线程

    Xupeng发表于 2011-12-08 00:00:00
    love 0

    最近在写一个配置推送客户端,结构如下图:

    cfgreceiver architecture

    每一个应用服务进程会起一个额外的线程,与 ZooKeeper 保持连接,需要变更配置时,将新配置更新到 ZooKeeper,ZooKeeper 将配置推送到所有的客户端,客户端收到配置之后,即时更新进程内的配置信息,并将更新配置成功与否、延时、错误等信息反馈到 redis,以这样的方式做到不重启服务更新配置。

    同时也会有一个独立的客户端与 ZooKeeper 保持连接,收到 ZooKeeper 推送的配置之后,将配置写回并提交到 Puppet 配置仓库中,这样仍然保持只在一个地方修改配置的习惯。

    这个推送客户端的第一个应用场景是更新线上的 MySQL 配置,线上服务是 Quixote + SCGI,由于在 SCGI server fork 子进程之前就已经 import 了部分自有库,其中包括这个配置推送客户端的使用者,因此,这个客户端必须是 lazy 的,只能在 fork 发生之后才能启动线程、建立与 ZooKeeper 和 redis 的连接。

    将这个客户端与 Quixote + SCGI 服务进行联调时,奇怪的事情发生了,配置发生变更时,客户端并没有立刻收到 ZooKeeper 的推送,而是等到下一次用户请求到达时才会收到,起初以为是线程根本就没有在工作,后来发现,线程也并不是完全不工作,而是阻塞在了某个地方,当有用户请求到达时,线程才会接着执行。这就意味着,如果一个进程闲置了比较长的时间,ZooKeeper 会认为客户端已失去响应,从而断开连接,而客户端重连 ZooKeeper 也只会发生在下一次请求到来之后,这是个很别扭很诡异的问题。

    去掉目前使用的 Quixote 包装,写了一个最简单的 Quixote app,发现同样的问题依然存在,这排除了自有 Quixote 包装的问题。接下来又写了一个裸的 SCGI app:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    #!/usr/bin/env python
    # t.py
    import sys
    import time
    import threading
    
    class T(threading.Thread):
        daemon = True
        def run(self):
            while True:
                print >> sys.stderr, 'sleeping'
                time.sleep(1)
                print >> sys.stderr, 'id:%s time:%s\n' % (id(self), time.time())
    
    t = T()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
    #!/usr/bin/env python
    # s.py
    import sys
    from scgi import scgi_server
    from t import t
    
    class MyHandler(scgi_server.SCGIHandler):
    def produce_cgilike(self, env, bodysize):
        if not t.is_alive():
            print >> sys.stderr, 'start thread: %s' % id(t)
            t.start()
        else:
            sys.stdout.write('Content-Type: text/plain\r\n\r\n')
            print id(t)
    
    scgi_server.SCGIServer(MyHandler, host='0.0.0.0',port=9002).serve()
    

    这样的一个服务仍然有上面提到的问题,每有一次用户访问,就会输出一行 “id:xxx time:xxx”,然后输出一行 “sleeping”,之后就阻塞在了 time.sleep(1) 处(起码看起来是阻塞在了这里)。至此,也可以排除 Quixote 的问题,问题应该出在 SCGI 这里,向 hongqn 请教之后,最终定位到了问题所在。

    在 SCGIHandler.serve 方法中:

    1
    2
    
    os.write(self.parent_fd, "1") # indicates that child is ready
    fd = passfd.recvfd(self.parent_fd)pre>
    

    passfd.recvfd 是一个阻塞读,它的代码是这样的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    static PyObject *
    passfd_recvfd(PyObject *self, PyObject *args)
    {
        int sockfd, fd;
    
        if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
            return NULL;
    
        if ((fd = recv_fd(sockfd)) < 0) {
            PyErr_SetFromErrno(PyExc_IOError);
            return NULL;
        }
    
        return PyInt_FromLong((long) fd);
    }
    

    fd = recv_fd(sockfd) 是一个阻塞读,在开始阻塞读之前没有释放 GIL ,于是就导致了整个解释器阻塞,这也与之前问题的症状吻合。将代码作如下修改,在开始阻塞读之前释放 GIL,可以解决这个问题:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    static PyObject *
    passfd_recvfd(PyObject *self, PyObject *args)
    {
        int sockfd, fd;
    
        if (!PyArg_ParseTuple(args, "i:revcfd", &sockfd))
            return NULL;
    
        Py_BEGIN_ALLOW_THREADS
        fd = recv_fd(sockfd);
        Py_END_ALLOW_THREADS
        if (fd < 0) {
            PyErr_SetFromErrno(PyExc_IOError);
            return NULL;
        }
    
        return PyInt_FromLong((long) fd);
    }
    

    当然了,还需要做进一步的测试和观察,看一下打这样的 patch 之后会不会有其他的副作用。

    参考:http://docs.python.org/release/2.6.7/c-api/init.html#thread-state-and-the-global-interpreter-lock



沪ICP备19023445号-2号
友情链接