FastAPI/Starlette graceful shutdown server-sent events

缘起

假如你用 FastAPI/Starlette框架,写了一段 Server-sent Events

@app.get('/api/my_stream')
async def api_stream():
  async def gen():
    while 1:
      yield "data: {}\n\n"
      await asyncio.sleep(1.0)

  return StreamingResponse(gen, media_type="text/event-stream", headers={
    'X-Accel-Buffering': 'no',
    'Cache-Control': "no-cache",
    'Connection': "keep-alive",
  })

FastAPI 事件

然后你重启了服务器,比如 gunicorn+uvicorn,你会发现这个连接不会断开,一直输出结果。直到 worker 进程超过配置里的 timeout 参数(默认60秒),被 master 强行杀死然后重启。

一开始尝试了 FastAPI 官方的 shutdown 事件,再在代码的 while 1 加一个 if 判断。发现不管用

@app.on_event('shutdown')
def on_server_shutdown():
    app.state.running = False

发现这段代码执行是执行了,但是是在杀死worker的时候生效的。得更加提前。用ASGI推荐的lifespanhttp.disconnect 太复杂,放弃。

Linux signal

尝试暴力监听 worker signal

import signal
signal.signal(signal.SIGINT, on_server_shutdown)  # ctrl+c
signal.signal(signal.SIGUSR2, on_server_shutdown)
signal.signal(signal.SIGTERM, on_server_shutdown)
signal.signal(signal.SIGWINCH, on_server_shutdown)
signal.signal(signal.SIGHUP, on_server_shutdown)
signal.signal(signal.SIGQUIT, on_server_shutdown)

发现也不管用。研究了半天,似乎是 UvicornWorker.init_signals 的时候signal.SIG_DFL。无语ing

uvicorn shutdown 流程

看gunicorn日志:

[2023-11-02 09:50:38 +0800] [25918] [INFO] Shutting down
[2023-11-02 09:50:38 +0800] [25918] [INFO] Waiting for application shutdown.
[2023-11-02 09:50:38 +0800] [25918] [INFO] Waiting for connections to close. (CTRL+C to force quit)
....
[2023-11-02 09:51:18 +0800] [25918] [INFO] Application shutdown complete.
[2023-11-02 09:51:18 +0800] [25918] [INFO] Finished server process [25918]
[2023-11-02 09:51:18 +0800] [25918] [INFO] Worker exiting (pid: 25918)

观察到 Waiting for connections to close 之后卡住。

属于 uvicorn 的 Server.shutdown() 方法

这个方法调用链:

  1. Server.install_signal_handlers 里注册 signal
  2. Server.handle_exit 里设置 Server.should_exit = True
  3. 每秒一次的 Server.on_tick 就会打断 Server.main_loop() 死循环
  4. 调用 Server.shutdown

如果能拿到 Server.should_exit 自行判断就好了,但是 ASGI 是容器无感的,只继续研究

断开http连接

上面日志有一个比较关键:

# Request shutdown on all existing connections.
for connection in list(self.server_state.connections):
    connection.shutdown()
    await asyncio.sleep(0.1)

其http连接关闭方法实现为:

def shutdown(self):
    """
    Called by the server to commence a graceful shutdown.
    """
    if self.cycle is None or self.cycle.response_complete:
        self.transport.close()
    else:
        self.cycle.keep_alive = False

这里应该走的是 else 分支。所以解决方法就是去代码里拿到 cycle.keep_alive 这个属性。

ASGI 接口

跟了一会儿,发现ASGI 在 starlette 里如下流程:

  1. Route() 初始化 self.app = request_response(endpoint)
  2. Route().handle() 的时候会调用 self.app(scope, receive, send)
  3. unicornrun_asgiresult = await app(self.scope, self.receive, self.send) 。其中 app 就是 starlette 的 Route() 实例

所以解决方案逐渐明朗了

黎明的前夜

通过 FastAPI/Starlette 请求的 .receive 属性的 __self__ 拿到 unicorn 的 cycle 实例,然后定时判断上面 shutdown 赋值的 self.cycle.keep_alive = False

@app.get('/api/my_stream')
async def api_stream(req: Request):
  async def gen():
    while 1:
      yield "data: {}\n\n"
      await asyncio.sleep(1.0)
      if getattr(req.receive.__self__, 'keep_alive', None) is False:
        break

  return StreamingResponse(...

这里用了个 getattr() 是保证这个 hack 在非 unicorn 下代码也能正常跑

git commit . -m "haha" 上机联调,发现坑了。。。我特么。。。。这个 cycle.keep_alive 默认就是 False

扭转 keep_alive

这玩意一直为 False 的原因是,unicorn 的 HttpToolsProtocol.on_headers_complete 赋值过程:

self.cycle = RequestResponseCycle(..., keep_alive=http_version != "1.0")

这里 http_version 可以通过 req.scope['http_version'] 得到,打印了一下,你猜怎么着,还真tm是。。。原因就是厂里反代 nginx的proxy_http_version没配置。这里是个常见的坑,非常影响性能,因为每个请求会生成一个新的 http 连接。

但是也等不急SA去改配置了。于是直接在代码 async def gen(): 前面写死:

req.receive.__self__.keep_alive = True 。提交,再试。。。咦,怎么 keep_alive 依然为 False????明明刚刚赋值了?

继续读代码,发现第二个坑,发现 unicorn 的 RequestResponseCycle.send() 方法里,在构造返回的时候,有一句

for name, value in headers:
    elif name == b"connection" and value.lower() == b"close":
        self.keep_alive = False

好吧。。。那么解决方法就是,把 keep_alive = True 挪到返回内部。先完成返回构造,再强行改值。

最后的代码

@app.get('/api/my_stream')
async def api_stream(req: Request):
  # req.receive.__self__.keep_alive = True  # doesn't work here
  async def gen():
    req.receive.__self__.keep_alive = True  # works here
    while 1:
      yield "data: {}\n\n"
      await asyncio.sleep(1.0)
      if req.receive.__self__.keep_alive is False:
        break

  return StreamingResponse(gen, media_type="text/event-stream", headers={
    'X-Accel-Buffering': 'no',
    'Cache-Control': "no-cache",
    'Connection': "keep-alive",
  })

测试重启 worker,该连接在1秒后断开,worker平稳重启。完美。

苦逼撸业务的一天又开始了。

Comments