FastAPI/Starlette graceful shutdown server-sent events
缘起
假如你用 FastAPI/Starlette框架,写了一段 Server-sent Events
@app.get('/api/my_stream')
async def api_stream():
async def gen():
while 1:
yield "data: {}\n\n"
await asyncio.sleep(1.0)
return StreamingResponse(gen, media_type="text/event-stream", headers={
'X-Accel-Buffering': 'no',
'Cache-Control': "no-cache",
'Connection': "keep-alive",
})
FastAPI 事件
然后你重启了服务器,比如 gunicorn+uvicorn,你会发现这个连接不会断开,一直输出结果。直到 worker 进程超过配置里的 timeout
参数(默认60秒),被 master 强行杀死然后重启。
一开始尝试了 FastAPI 官方的 shutdown
事件,再在代码的 while 1
加一个 if
判断。发现不管用
@app.on_event('shutdown')
def on_server_shutdown():
app.state.running = False
发现这段代码执行是执行了,但是是在杀死worker的时候生效的。得更加提前。用ASGI推荐的lifespan
和 http.disconnect
太复杂,放弃。
Linux signal
尝试暴力监听 worker signal
import signal
signal.signal(signal.SIGINT, on_server_shutdown) # ctrl+c
signal.signal(signal.SIGUSR2, on_server_shutdown)
signal.signal(signal.SIGTERM, on_server_shutdown)
signal.signal(signal.SIGWINCH, on_server_shutdown)
signal.signal(signal.SIGHUP, on_server_shutdown)
signal.signal(signal.SIGQUIT, on_server_shutdown)
发现也不管用。研究了半天,似乎是 UvicornWorker.init_signals
的时候给signal.SIG_DFL
了。无语ing
uvicorn shutdown 流程
看gunicorn日志:
[2023-11-02 09:50:38 +0800] [25918] [INFO] Shutting down
[2023-11-02 09:50:38 +0800] [25918] [INFO] Waiting for application shutdown.
[2023-11-02 09:50:38 +0800] [25918] [INFO] Waiting for connections to close. (CTRL+C to force quit)
....
[2023-11-02 09:51:18 +0800] [25918] [INFO] Application shutdown complete.
[2023-11-02 09:51:18 +0800] [25918] [INFO] Finished server process [25918]
[2023-11-02 09:51:18 +0800] [25918] [INFO] Worker exiting (pid: 25918)
观察到 Waiting for connections to close 之后卡住。
属于 uvicorn 的 Server.shutdown()
方法
这个方法调用链:
Server.install_signal_handlers
里注册 signal
Server.handle_exit
里设置 Server.should_exit = True
- 每秒一次的
Server.on_tick
就会打断 Server.main_loop()
死循环
- 调用
Server.shutdown
如果能拿到 Server.should_exit
自行判断就好了,但是 ASGI 是容器无感的,只继续研究
断开http连接
上面日志有一个比较关键:
# Request shutdown on all existing connections.
for connection in list(self.server_state.connections):
connection.shutdown()
await asyncio.sleep(0.1)
其http连接关闭方法实现为:
def shutdown(self):
"""
Called by the server to commence a graceful shutdown.
"""
if self.cycle is None or self.cycle.response_complete:
self.transport.close()
else:
self.cycle.keep_alive = False
这里应该走的是 else
分支。所以解决方法就是去代码里拿到 cycle.keep_alive
这个属性。
ASGI 接口
跟了一会儿,发现ASGI 在 starlette 里如下流程:
Route()
初始化 self.app = request_response(endpoint)
Route().handle()
的时候会调用 self.app(scope, receive, send)
unicorn
的 run_asgi
会 result = await app(self.scope, self.receive, self.send)
。其中 app
就是 starlette 的 Route()
实例
所以解决方案逐渐明朗了
黎明的前夜
通过 FastAPI/Starlette 请求的 .receive
属性的 __self__
拿到 unicorn 的 cycle
实例,然后定时判断上面 shutdown
赋值的 self.cycle.keep_alive = False
@app.get('/api/my_stream')
async def api_stream(req: Request):
async def gen():
while 1:
yield "data: {}\n\n"
await asyncio.sleep(1.0)
if getattr(req.receive.__self__, 'keep_alive', None) is False:
break
return StreamingResponse(...
这里用了个 getattr()
是保证这个 hack 在非 unicorn 下代码也能正常跑
git commit . -m "haha"
上机联调,发现坑了。。。我特么。。。。这个 cycle.keep_alive
默认就是 False
扭转 keep_alive
这玩意一直为 False
的原因是,unicorn 的 HttpToolsProtocol.on_headers_complete
赋值过程:
self.cycle = RequestResponseCycle(..., keep_alive=http_version != "1.0")
这里 http_version
可以通过 req.scope['http_version']
得到,打印了一下,你猜怎么着,还真tm是。。。原因就是厂里反代 nginx的proxy_http_version没配置。这里是个常见的坑,非常影响性能,因为每个请求会生成一个新的 http 连接。
但是也等不急SA去改配置了。于是直接在代码 async def gen():
前面写死:
req.receive.__self__.keep_alive = True
。提交,再试。。。咦,怎么 keep_alive
依然为 False
????明明刚刚赋值了?
继续读代码,发现第二个坑,发现 unicorn 的 RequestResponseCycle.send()
方法里,在构造返回的时候,有一句
for name, value in headers:
elif name == b"connection" and value.lower() == b"close":
self.keep_alive = False
好吧。。。那么解决方法就是,把 keep_alive = True
挪到返回内部。先完成返回构造,再强行改值。
最后的代码
@app.get('/api/my_stream')
async def api_stream(req: Request):
# req.receive.__self__.keep_alive = True # doesn't work here
async def gen():
req.receive.__self__.keep_alive = True # works here
while 1:
yield "data: {}\n\n"
await asyncio.sleep(1.0)
if req.receive.__self__.keep_alive is False:
break
return StreamingResponse(gen, media_type="text/event-stream", headers={
'X-Accel-Buffering': 'no',
'Cache-Control': "no-cache",
'Connection': "keep-alive",
})
测试重启 worker,该连接在1秒后断开,worker平稳重启。完美。
苦逼撸业务的一天又开始了。