吐槽pynsq,另外给subprocess糊了一个异步

pynsq太烂了

起初是因为需要在 Web API 里消费一个消息队列(nsq),给浏览器返回 EventSource 做实时输出。但是没想到官方库pynsq居然写得这么渣:

def _handle_term_signal(sig_num, frame):
    logging.getLogger(__name__).info(
        'TERM Signal handler called with signal %r', sig_num)
    tornado.ioloop.IOLoop.current().stop()

def run():
    """
    Starts any instantiated :class:`nsq.Reader` or :class:`nsq.Writer`
    """
    signal.signal(signal.SIGTERM, _handle_term_signal)
    signal.signal(signal.SIGINT, _handle_term_signal)
    tornado.ioloop.IOLoop.current().start()

这都什么玩意儿?一个库何德何能也敢去乱劫持 signal?最他妈逗的是这个库的推荐写法:

import nsq
r = nsq.Reader(message_handler=handler,
        lookupd_http_addresses=['http://127.0.0.1:4161'],
        topic='nsq_reader', channel='asdf', lookupd_poll_interval=15)
nsq.run()

这个 r 莫名其妙的就能跑起来了。真是魔法。anyway,在Web API兼容这一坨是失败了,爆了一堆错,对付魔法的办法就是隔离,所以打算糊个脚本,通过多进程+PIPE来得到数据。

给 subprocess 加一个异步补丁

糊的时候发现,subprocess 模块的 .communicate() 方法是 blocking的, 居然不支持 async/await,于是只能继续糊:

import subprocess, fcntl, asyncio
p = subprocess.Popen(
    cmd, bufsize=0, text=True, stdin=subprocess.PIPE,
    stderr=subprocess.PIPE,  stdout=subprocess.PIPE, close_fds=True)
fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
buf = ''
while p.poll() is None:
    try:
        # read() with O_NONBLOCK might gets None thus TypeError
        buf += (p.stdout.read(1024) or '')
        contents = buf.split('\n')
        if len(contents) < 2:
            raise TypeError('Not enough')
    except TypeError:
        await asyncio.sleep(1)
        yield None
        continue
    buf = contents.pop(-1)
    for x in contents:
        yield x

p.terminate()

这里用 bufcontents 是因为直接调用消息队列,每个消息都是分割好的;但是通过管道来读。得自己去处理字节流如何分割的问题,也就是传说简中特供的「粘包」问题。

另外子进程可能因为Web框架的种种原因,不自觉退出或者变僵尸进程,需要向 stdout/stderr 做输出的时候检查一下 BrokenPipeError 处理后事主动退出即可。

还有子进程如果输出完毕,需要代码里调用一下 p.terminate(),才能回收,避免变zombie

试了一下,也不是不能用,成功!正觉得自己又行了的时候,发现官方:

https://docs.python.org/3.8/library/asyncio-subprocess.html

啊这。白忙活。

超长待机 subprocess

想起来之前还有个问题,一个API调用一个subprocess,然后等它执行,不想管。这个时候可以加一个参数 detach 模式:

subprocess.Popen(..., start_new_session=True)

然后在同进程用里跑一个后台检查循环去检查子进程的 WNOHANG

while 1:
    try:
        chpid, retcode, res = os.wait3(os.WNOHANG)
    except ChildProcessError:
        break
    sleep(5)
    if chpid == os.getpid():
        do_sth()
        break

想了下好像也搞复杂了。要不直接跑一个 sh -c "blah.sh &" 算了。

Comments