吐槽pynsq,另外给subprocess糊了一个异步
Posted | stdout
pynsq太烂了
起初是因为需要在 Web API 里消费一个消息队列(nsq),给浏览器返回 EventSource 做实时输出。但是没想到官方库pynsq居然写得这么渣:
def _handle_term_signal(sig_num, frame):
logging.getLogger(__name__).info(
'TERM Signal handler called with signal %r', sig_num)
tornado.ioloop.IOLoop.current().stop()
def run():
"""
Starts any instantiated :class:`nsq.Reader` or :class:`nsq.Writer`
"""
signal.signal(signal.SIGTERM, _handle_term_signal)
signal.signal(signal.SIGINT, _handle_term_signal)
tornado.ioloop.IOLoop.current().start()
这都什么玩意儿?一个库何德何能也敢去乱劫持 signal?最他妈逗的是这个库的推荐写法:
import nsq
r = nsq.Reader(message_handler=handler,
lookupd_http_addresses=['http://127.0.0.1:4161'],
topic='nsq_reader', channel='asdf', lookupd_poll_interval=15)
nsq.run()
这个 r
莫名其妙的就能跑起来了。真是魔法。anyway,在Web API兼容这一坨是失败了,爆了一堆错,对付魔法的办法就是隔离,所以打算糊个脚本,通过多进程+PIPE来得到数据。
给 subprocess 加一个异步补丁
糊的时候发现,subprocess
模块的 .communicate()
方法是 blocking的, 居然不支持 async/await,于是只能继续糊:
import subprocess, fcntl, asyncio
p = subprocess.Popen(
cmd, bufsize=0, text=True, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True)
fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
buf = ''
while p.poll() is None:
try:
# read() with O_NONBLOCK might gets None thus TypeError
buf += (p.stdout.read(1024) or '')
contents = buf.split('\n')
if len(contents) < 2:
raise TypeError('Not enough')
except TypeError:
await asyncio.sleep(1)
yield None
continue
buf = contents.pop(-1)
for x in contents:
yield x
p.terminate()
这里用 buf
、contents
是因为直接调用消息队列,每个消息都是分割好的;但是通过管道来读。得自己去处理字节流如何分割的问题,也就是传说简中特供的「粘包」问题。
另外子进程可能因为Web框架的种种原因,不自觉退出或者变僵尸进程,需要向 stdout/stderr 做输出的时候检查一下 BrokenPipeError 处理后事主动退出即可。
还有子进程如果输出完毕,需要代码里调用一下 p.terminate()
,才能回收,避免变zombie
试了一下,也不是不能用,成功!正觉得自己又行了的时候,发现官方:
https://docs.python.org/3.8/library/asyncio-subprocess.html
啊这。白忙活。
超长待机 subprocess
想起来之前还有个问题,一个API调用一个subprocess,然后等它执行,不想管。这个时候可以加一个参数 detach 模式:
subprocess.Popen(..., start_new_session=True)
然后在同进程用里跑一个后台检查循环去检查子进程的 WNOHANG
:
while 1:
try:
chpid, retcode, res = os.wait3(os.WNOHANG)
except ChildProcessError:
break
sleep(5)
if chpid == os.getpid():
do_sth()
break
想了下好像也搞复杂了。要不直接跑一个 sh -c "blah.sh &"
算了。
Comments