Making subprocess async friendly in Python
Posted | stdout
It's been a while since i wrote something in English, mostly because there's nothing really interesting, until now.
Occasionally, when facing a long running task in Python, I would choose either a distrubuted tasks queue system, or for the convenience, just the subprocess
module. It's built-in and well designed for grabbing outputs of a child process running for a short period of time.
But what if the child-process takes a really, really long time? In my case it's an expensive query, or some CPU/GPU intensive task, which needs to be launched from a running Web framework, like FastAPI.
Popen() fire and forget
if child-process's output, end state and the return code are irrelevant, a simple Popen would do
subprocess.Popen(..., stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
The problem is, after the child-process finishes, it will hang as a zombie, because the parents refused to claim its exit status. To fix this, add an extra parameter in Popen()
like subprocess.Popen(..., start_new_session=True)
And write a simple loop periodically check for WNOHANG
while 1:
try:
chpid, retcode, res = os.wait3(os.WNOHANG)
except ChildProcessError:
break
sleep(5)
if chpid == os.getpid():
do_sth()
break
This can be done using BackgroundTasks
in FastAPI/Starlette.
In a way, the child-process hebaves like nohup
or screen/tmux, running in a detatched fashion.
If you hate this many lines of code, just subprocess.run("blah.sh &", shell=True)
and wrap your commands in blah.sh
Make .communicate()
async
Sometimes I need to monitor and handle the stdout/stderr of a child-process, like forward the outputs as an EventSource response to the browser.
First I tried .communicate()
it will block until the process quites.
Then I tried Popen.stdout.read()
, it will also block. Eventually I found a great hack from Stackoverlow like this:
p = subprocess.Popen(
cmd, bufsize=0, text=True, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True)
fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
Now p.stdout.read(1024)
would return immediately, or with a TypeError
bnecause internal messed up with None
as non-blocking empty return.
Wrap it with try...except
inside a loop, it worked fine as expected.
When the parent process crashes unexpectedly, the child-process is still working, to detect this, just check for BrokenPipeError
carefully in child-process and gracefully shutdown.
Solved with asyncio
I tried harder reading the official Python docs, turns out the most easy solution is already there:
proc = await asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdout=asyncio.subprocess.PIPE)
# Read one line of output.
data = await proc.stdout.readline()
line = data.decode('ascii').rstrip()
# Wait for the subprocess exit.
await proc.wait()
return line
I guess another lesson learned today.
Comments