问题重现
在调用 ChatGPT API 并使用流式输出时,我们经常会遇到网络问题导致的超时情况。有趣的是,笔者发现在本地调试遇到的超时,会在 10 分钟后自动恢复(为什么是 10 分钟?我们留到后面解释),但是在服务器上等待一会儿却会失败,报出超时异常(错误代码 502)。
笔者认为,本地能恢复的原因可能是自动重试,只是重试的时间有点久(ChatGPT API 没有重试功能,这是项目加入的)。服务器返回「502」是因为内容从后台返回到前端需要经过网关层,而网关层超时校验的时间比自动重试的时间(10 分钟)更短,所以撑不到重试就会报超时异常。
基于以上场景,本文着手解决 ChatGPT API 调用超时问题。
优化诉求
不向用户展示超时的报错信息。
缩短超时后重试的时间间隔。
解决思路
笔者考虑了两种方案。
一是彻底解决网络问题,但难度有点大。这属于 OpenAI 服务器问题,即使是部署在国外的服务器也会出现超时的情况。
二是利用自动重试解决问题。通过调整超时的时间,提升响应速度,方案可行。
实施解决方案
解决过程中,笔者分两步由浅至深地调整了超时时间;如果想直接了解最终方案,请移步「解决方案二」~
运行环境:
Python: 3.10.7
openai: 0.27.6
调用方法:
openai.api_resources.chat_completion.ChatCompletion.acreate
( 这是异步调用 ChatGPT 的方法。)
方法调用链路:
超时参数 ClientTimeout,一共有 4 个属性 total、connect、sock_read 和 sock_connect。
# 方法 -> 超时相关参数 openai.api_resources.chat_completion.ChatCompletion.acreate -> kwargs openai.api_resources.abstract.engine_api_resource.EngineAPIResource.acreate -> params openai.api_requestor.APIRequestor.arequest -> request_timeout # request_timeout 在这一步变成了 timeout,因此,只需要传参 request_timeout 即可 openai.api_requestor.APIRequestor.arequest_raw -> request_timeout aiohttp.client.ClientSession.request -> kwargs aiohttp.client.ClientSession._request -> timeout tm = TimeoutHandle(self._loop, real_timeout.total) -> ClientTimeout.total async with ceil_timeout(real_timeout.connect): -> ClientTimeout.connect # 子分支1 aiohttp.connector.BaseConnector.connect -> timeout aiohttp.connector.TCPConnector._create_connection -> timeout aiohttp.connector.TCPConnector._create_direct_connection -> timeout aiohttp.connector.TCPConnector._wrap_create_connection -> timeout async with ceil_timeout(timeout.sock_connect): -> ClientTimeout.sock_connect # 子分支2 aiohttp.client_reqrep.ClientRequest.send -> timeout aiohttp.client_proto.ResponseHandler.set_response_params -> read_timeout aiohttp.client_proto.ResponseHandler._reschedule_timeout -> self._read_timeout if timeout: self._read_timeout_handle = self._loop.call_later( timeout, self._on_read_timeout ) -> ClientTimeout.sock_read
解决方案一
openai.api_requestor.APIRequestor.arequest_raw 方法中的 request_timeout 参数可以传递 connect 和 total 参数,因此可以在调用openai.api_resources.chat_completion.ChatCompletion.acreate时,设置 request_time(10, 300)。
# async def arequest_raw( self, method, url, session, *, params=None, supplied_headers: Optional[Dict[str, str]] = None, files=None, request_id: Optional[str] = None, request_timeout: Optional[Union[float, Tuple[float, float]]] = None, ) -> aiohttp.ClientResponse: abs_url, headers, data = self._prepare_request_raw( url, supplied_headers, method, params, files, request_id ) if isinstance(request_timeout, tuple): timeout = aiohttp.ClientTimeout( connect=request_timeout[0], total=request_timeout[1], )else: timeout = aiohttp.ClientTimeout( total=request_timeout if request_timeout else TIMEOUT_SECS ) ...
该方案有效,但没有完全生效:它可以控制连接时间和请求的全部时间,但没有彻底解决超时异常,因为「请求连接时间」和「第一个字符读取时间」是两码事。「请求连接时间」基于 total 时间重试(300s),而网关时间并没有设置这么久。
于是,笔者继续提出「解决方案二」。
解决方案二
使用 monkey_patch 方式重写openai.api_requestor.APIRequestor.arequest_raw 方法,重点在于重写 request_timeout 参数,让其支持原生的 aiohttp.client.ClientTimeout 参数。
1. 新建 api_requestor_mp.py 文件,并写入以下代码。
# 注意 request_timeout 参数已经换了,Optional[Union[float, Tuple[float, float]]] -> Optional[Union[float, tuple]] async def arequest_raw( self, method, url, session, *, params=None, supplied_headers: Optional[Dict[str, str]] = None, files=None, request_id: Optional[str] = None, request_timeout: Optional[Union[float, tuple]] = None, ) -> aiohttp.ClientResponse: abs_url, headers, data = self._prepare_request_raw( url, supplied_headers, method, params, files, request_id ) # 判断 request_timeout 的类型,按需设置 sock_read 和 sock_connect 属性 if isinstance(request_timeout, tuple): timeout = aiohttp.ClientTimeout( connect=request_timeout[0], total=request_timeout[1], sock_read=None if len(request_timeout) < 3 else request_timeout[2], sock_connect=None if len(request_timeout) < 4 else request_timeout[3], ) else: timeout = aiohttp.ClientTimeout( total=request_timeout if request_timeout else TIMEOUT_SECS ) if files: # TODO: Use aiohttp.MultipartWriter to create the multipart form data here. # For now we use the private requests method that is known to have worked so far. data, content_type = requests.models.RequestEncodingMixin._encode_files( # type: ignore files, data ) headers["Content-Type"] = content_type request_kwargs = { "method": method, "url": abs_url, "headers": headers, "data": data, "proxy": _aiohttp_proxies_arg(openai.proxy), "timeout": timeout, } try: result = await session.request(**request_kwargs) util.log_info( "OpenAI API response", path=abs_url, response_code=result.status, processing_ms=result.headers.get("OpenAI-Processing-Ms"), request_id=result.headers.get("X-Request-Id"), ) # Don't read the whole stream for debug logging unless necessary. if openai.log == "debug": util.log_debug( "API response body", body=result.content, headers=result.headers ) return result except (aiohttp.ServerTimeoutError, asyncio.TimeoutError) as e: raise error.Timeout("Request timed out") from e except aiohttp.ClientError as e: raise error.APIConnectionError("Error communicating with OpenAI") from e def monkey_patch(): APIRequestor.arequest_raw = arequest_raw
2. 在初始化 ChatGPT API 的文件头部补充:
from *.*.api_requestor_mp import monkey_patch do_api_requestor = monkey_patch
设置参数 request_timeout=(10, 300, 15, 10) 后,再调试就没什么问题了。
交付测试,通过。
经验总结
直接看代码、看方法调用链路会有点困难,可以通过异常堆栈来找调用链路,这样更方便。
ChatGPT API 暴露的 request_timeout 参数不够用,需要重写;搜索了一下重写方案,了解到 monkey_patch,非常实用。
项目过程中,笔者发现改代码本身不难,难的是知道「改哪里」「怎么改」以及「为什么」。
审核编辑:黄飞
评论
查看更多