IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Making subprocess async friendly in Python

    est发表于 2024-06-22 12:26:00
    love 0

    It's been a while since i wrote something in English, mostly because there's nothing really interesting, until now.

    Occasionally, when facing a long running task in Python, I would choose either a distrubuted tasks queue system, or for the convenience, just the subprocess module. It's built-in and well designed for grabbing outputs of a child process running for a short period of time.

    But what if the child-process takes a really, really long time? In my case it's an expensive query, or some CPU/GPU intensive task, which needs to be launched from a running Web framework, like FastAPI.

    Popen() fire and forget

    if child-process's output, end state and the return code are irrelevant, a simple Popen would do

    subprocess.Popen(..., stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    The problem is, after the child-process finishes, it will hang as a zombie, because the parents refused to claim its exit status. To fix this, add an extra parameter in Popen() like subprocess.Popen(..., start_new_session=True)

    And write a simple loop periodically check for WNOHANG

    while 1: try: chpid, retcode, res = os.wait3(os.WNOHANG) except ChildProcessError: break sleep(5) if chpid == os.getpid(): do_sth() break

    This can be done using BackgroundTasks in FastAPI/Starlette.

    In a way, the child-process hebaves like nohup or screen/tmux, running in a detatched fashion.

    If you hate this many lines of code, just subprocess.run("blah.sh &", shell=True) and wrap your commands in blah.sh

    Make .communicate() async

    Sometimes I need to monitor and handle the stdout/stderr of a child-process, like forward the outputs as an EventSource response to the browser.

    First I tried .communicate() it will block until the process quites.

    Then I tried Popen.stdout.read(), it will also block. Eventually I found a great hack from Stackoverlow like this:

    p = subprocess.Popen(
        cmd, bufsize=0, text=True, stdin=subprocess.PIPE,
        stderr=subprocess.PIPE,  stdout=subprocess.PIPE, close_fds=True)
    fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
    

    Now p.stdout.read(1024) would return immediately, or with a TypeError bnecause internal messed up with None as non-blocking empty return.

    Wrap it with try...except inside a loop, it worked fine as expected.

    When the parent process crashes unexpectedly, the child-process is still working, to detect this, just check for BrokenPipeError carefully in child-process and gracefully shutdown.

    Solved with asyncio

    I tried harder reading the official Python docs, turns out the most easy solution is already there:

    proc = await asyncio.create_subprocess_exec(
        sys.executable, '-c', code,
        stdout=asyncio.subprocess.PIPE)
    
    # Read one line of output.
    data = await proc.stdout.readline()
    line = data.decode('ascii').rstrip()
    
    # Wait for the subprocess exit.
    await proc.wait()
    return line
    

    I guess another lesson learned today.



沪ICP备19023445号-2号
友情链接