Making subprocess async friendly in Python

It's been a while since i wrote something in English, mostly because there's nothing really interesting, until now.

Occasionally, when facing a long running task in Python, I would choose either a distrubuted tasks queue system, or for the convenience, just the subprocess module. It's built-in and well designed for grabbing outputs of a child process running for a short period of time.

But what if the child-process takes a really, really long time? In my case it's an expensive query, or some CPU/GPU intensive task, which needs to be launched from a running Web framework, like FastAPI.

Popen() fire and forget

if child-process's output, end state and the return code are irrelevant, a simple Popen would do

subprocess.Popen(..., stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

The problem is, after the child-process finishes, it will hang as a zombie, because the parents refused to claim its exit status. To fix this, add an extra parameter in Popen() like subprocess.Popen(..., start_new_session=True)

And write a simple loop periodically check for WNOHANG

while 1: try: chpid, retcode, res = os.wait3(os.WNOHANG) except ChildProcessError: break sleep(5) if chpid == os.getpid(): do_sth() break

This can be done using BackgroundTasks in FastAPI/Starlette.

In a way, the child-process hebaves like nohup or screen/tmux, running in a detatched fashion.

If you hate this many lines of code, just subprocess.run("blah.sh &", shell=True) and wrap your commands in blah.sh

Make .communicate() async

Sometimes I need to monitor and handle the stdout/stderr of a child-process, like forward the outputs as an EventSource response to the browser.

First I tried .communicate() it will block until the process quites.

Then I tried Popen.stdout.read(), it will also block. Eventually I found a great hack from Stackoverlow like this:

p = subprocess.Popen(
    cmd, bufsize=0, text=True, stdin=subprocess.PIPE,
    stderr=subprocess.PIPE,  stdout=subprocess.PIPE, close_fds=True)
fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

Now p.stdout.read(1024) would return immediately, or with a TypeError bnecause internal messed up with None as non-blocking empty return.

Wrap it with try...except inside a loop, it worked fine as expected.

When the parent process crashes unexpectedly, the child-process is still working, to detect this, just check for BrokenPipeError carefully in child-process and gracefully shutdown.

Solved with asyncio

I tried harder reading the official Python docs, turns out the most easy solution is already there:

proc = await asyncio.create_subprocess_exec(
    sys.executable, '-c', code,
    stdout=asyncio.subprocess.PIPE)

# Read one line of output.
data = await proc.stdout.readline()
line = data.decode('ascii').rstrip()

# Wait for the subprocess exit.
await proc.wait()
return line

I guess another lesson learned today.

Comments