本文将教您如何使用任何 LLM 模型运行评估,而不会屈服于可怕的“OpenAI 速率限制”异常。我们从以下开始:
到目前为止,Cloudflare 的解释是我见过的最好的:速率限制是限制网络流量的策略。它限制了某人在特定时间范围内重复操作的频率 - 例如,尝试登录帐户。
简而言之,想象一下你是四个孩子的母亲,他们都喜欢蜂蜜。上次蜂蜜用完的时间比预期的要早。现在,您已经设置了一个计时器,数到一万,并让每个孩子轮流吃一些蜂蜜。计时器代表速率限制,因为它会强制执行特定的等待时间,然后才能获得更多蜂蜜。
解释了这个概念后,让我们了解 OpenAI 速率限制并讨论如何使用 Python 实现速率限制逻辑来管理 OpenAI 的 R/TPM(每分钟请求/令牌)。
OpenAI 对一分钟内对其 AI 模型发出的请求数量设置了一定的限制。对于 OpenAI 提供的每个 AI 模型,这些限制都不同。
对于免费版本:
对于 1 级:
有关其他层速率限制的更多信息,请参阅文档。
这些限制的原因包括:
预计这些限制在可预见的未来将保持一致。
该过程(见下图)涉及使用户能够从 UI 运行 LLM 评估并为其 LLM 应用程序配置速率限制参数,而无需自己编写逻辑。
这是通过准备和调用批处理的函数来实现的。批处理中的每个调用都会调用run_with_retry
函数,该函数又会使用重试机制调用最终函数 ( invoke_app
)。
我相信在看完上述过程后,您可以用您选择的任何语言编写代码逻辑。不管怎样,我会向你展示我是如何做到的。有关更多背景和背景,我主要在 Agenta 担任后端软件工程师。
Agenta是一个开源端到端 LLM 开发者平台,为您提供用于快速工程和管理、⚖️ 评估、人工注释和 🚀 部署的工具。所有这些都不会对您选择的框架、库或模型施加任何限制。 Agenta允许开发人员和产品团队在更短的时间内协作构建由 LLM 驱动的生产级应用程序。
我们希望让用户能够从 UI 配置其 LLM 评估速率限制配置,以便他们可以绕过其 LLM 提供商速率限制例外。
查看流程图,首先要实现的是准备和调用批处理(LLM 调用)的逻辑。验证速率限制配置并使用数据验证模型来定义 LLM 运行速率限制非常重要。下面的模型处理批量调用运行所需的rate_limit_config
参数。
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
batch_invoke
函数采用以下参数:
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
准备并调用批处理后,下一步涉及执行run_with_retry
逻辑。此自定义实现包括速率限制功能并管理 llm 应用程序的调用,在达到设置的延迟后重试。指数退避是一种以指数增加的等待时间重试操作的技术,直到达到最大重试计数为止。
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
AppOutput的使用:即使在已用完最大重试次数后处理异常也很重要。这样,您可以允许您尝试处理的所有数据运行,然后您可以确定哪些数据失败了,哪些数据通过了。
最后一步是调用应用程序,使用 LLM 应用程序的openapi_parameters
来确定如何使用单个数据点调用它。
make_payload 函数不应该让您担心。它根据其OpenAPI参数构造用于调用 LLM 应用程序的有效负载。
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
整个过程就这样结束了。
代码中的退避指数策略的工作原理如下:
批处理: batch_invoke 函数将测试集数据拆分为具有可配置大小的较小批次。每个批次均按顺序处理。
单独调用重试:在每个批次中,每个数据点都由run_with_retry
函数处理。此函数尝试调用数据点的应用程序。如果由于特定网络错误(超时、连接问题)导致调用失败,该函数会延迟重试。此延迟最初设置为可配置值 ( retry_delay
),并针对同一批次中的每次后续重试尝试加倍。
此方法有助于避免失败后重复请求导致应用程序服务器过载。它为服务器提供了恢复时间,并允许在重试之前清除待处理请求的队列。
该策略还包括每个数据点的可配置最大重试次数,以防止无限循环。还包括批次之间的延迟 ( delay_between_batches
),以避免超出应用程序服务器设置的速率限制。
我希望这总结了您在今天的文章中学到的所有内容。请让我知道,如果你有任何问题!