This article will teach you how to run evaluations using any LLM model without succumbing to the dreaded "OpenAI Rate Limit" exception. We’d start by:
So far, Cloudflare explanation is the best I have seen: Rate-limiting is a strategy for limiting network traffic. It puts a cap on how often someone can repeat an action within a certain time-frame - for instance, trying to log in to an account.
To put it simply, imagine being a mother of four kids who all love honey. Last time, the honey ran out sooner than expected. Now, you've set a timer to count up to ten thousand and given each child a turn to have some honey. The timer represents the rate-limit, as it enforces a specific wait time before they can have more honey.
Having explained the concept, let's understand OpenAI rate-limits and discuss how I implemented a rate-limit logic to manage OpenAI's R/TPM (request/token per minute) using Python.
OpenAI has set certain restrictions on the number of requests one can make for its AI models within a minute. These limitations are different for each AI model provided by OpenAI.
For the free version:
For the tier 1:
See the docs for more information about other tiers rate-limits..
The reason for these restrictions includes:
These limitations are expected to stay consistent for the foreseeable future.
The process (see image below) involves enabling users to run LLM evaluations from the UI and configure rate-limit parameters for their LLM apps without needing to write the logic themselves.
This is achieved through a function that prepares and invokes the batch. Each call in the batch invokes the run_with_retry
function, which in turn invokes the final function (invoke_app
) with the retry mechanism.
I'm confident you can write the code-logic in any language of your choice after having a look at the above process. Regardless, I'll show you how I did mine. For more background and context, I primarily work as a backend software engineer at Agenta.
Agenta is an open-source end-to-end LLM developer platform that provides you with the tools for prompt engineering and management, ⚖️ evaluation, human annotation, and 🚀 deployment. All without imposing any restrictions on your choice of framework, library, or model. Agenta allows developers and product teams to collaborate in building production-grade LLM-powered applications in less time.
We wanted to give users the ability to configure their LLM evaluations rate-limiting configuration from the UI so they can bypass their LLM provider rate-limiting exception.
Looking at the process diagram, the first thing to implement is the logic for preparing and invoking the batch (of LLM calls). Validating the rate limit configuration and using a data validation model to define the LLM run rate limit is important. The model below handles the rate_limit_config
parameter required for the batch invoke to function.
from pydantic import BaseModel, Field
class LLMRunRateLimit(BaseModel):
batch_size: int = Field(default=10)
max_retries: int = Field(default=3)
retry_delay: int = Field(default=3)
delay_between_batches: int = Field(default=5)
The batch_invoke
function takes the following parameters:
async def batch_invoke(
uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict
) -> List[AppOutput]:
"""
Invokes the LLm app in batches, processing the testset data.
Args:
uri (str): The URI of the LLm app.
testset_data (List[Dict]): The testset data to be processed.
parameters (Dict): The parameters for the LLm app.
rate_limit_config (Dict): The rate limit configuration.
Returns:
List[AppOutput]: The list of app outputs after running all batches.
"""
batch_size = rate_limit_config[
"batch_size"
] # Number of testset to make in each batch
max_retries = rate_limit_config[
"max_retries"
] # Maximum number of times to retry the failed llm call
retry_delay = rate_limit_config[
"retry_delay"
] # Delay before retrying the failed llm call (in seconds)
delay_between_batches = rate_limit_config[
"delay_between_batches"
] # Delay between batches (in seconds)
list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches
openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json")
async def run_batch(start_idx: int):
print(f"Preparing {start_idx} batch...")
end_idx = min(start_idx + batch_size, len(testset_data))
for index in range(start_idx, end_idx):
try:
batch_output: AppOutput = await run_with_retry(
uri,
testset_data[index],
parameters,
max_retries,
retry_delay,
openapi_parameters,
)
list_of_app_outputs.append(batch_output)
print(f"Adding outputs to batch {start_idx}")
except Exception as exc:
import traceback
traceback.print_exc()
print(
f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}"
)
# Schedule the next batch with a delay
next_batch_start_idx = end_idx
if next_batch_start_idx < len(testset_data):
await asyncio.sleep(delay_between_batches)
await run_batch(next_batch_start_idx)
# Start the first batch
await run_batch(0)
return list_of_app_outputs
After preparing and invoking the batch, the next step involves executing the run_with_retry
logic. This custom implementation includes rate-limiting functionality and manages the invocation of the llm app, retrying after the set delay is reached. Exponential backoff, a technique that retries an operation with an exponentially increasing wait-time, is employed until a maximum retry count is reached.
async def run_with_retry(
uri: str,
input_data: Any,
parameters: Dict,
max_retry_count: int,
retry_delay: int,
openapi_parameters: List[Dict],
) -> AppOutput:
"""
Runs the specified app with retry mechanism.
Args:
uri (str): The URI of the app.
input_data (Any): The input data for the app.
parameters (Dict): The parameters for the app.
max_retry_count (int): The maximum number of retries.
retry_delay (int): The delay between retries in seconds.
openapi_parameters (List[Dict]): The OpenAPI parameters for the app.
Returns:
AppOutput: The output of the app.
"""
retries = 0
last_exception = None
while retries < max_retry_count:
try:
result = await invoke_app(uri, input_data, parameters, openapi_parameters)
return result
except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e:
last_exception = e
print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e)
await asyncio.sleep(retry_delay)
retries += 1
# If max retries reached, return the last exception
return AppOutput(output=None, status=str(last_exception))
The Use of AppOutput: It’s important to handle an exception even after it has used up its max retries. This way, you allow all the data that you are trying to process to run, and then you can determine what failed and what passed.
The final step is invoking the app, using the openapi_parameters
of the LLM app to determine how to invoke it with a single datapoint.
The make_payload function should not concern you. It constructs the payload for invoking the LLM app based on its OpenAPI parameters.
async def invoke_app(
uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict]
) -> AppOutput:
"""
Invokes an app for one datapoint using the openapi_parameters to determine
how to invoke the app.
Args:
uri (str): The URI of the app to invoke.
datapoint (Any): The data to be sent to the app.
parameters (Dict): The parameters required by the app taken from the db.
openapi_parameters (List[Dict]): The OpenAPI parameters of the app.
Returns:
AppOutput: The output of the app.
Raises:
httpx.HTTPError: If the POST request fails.
"""
url = f"{uri}/generate"
payload = await make_payload(datapoint, parameters, openapi_parameters)
async with httpx.AsyncClient() as client:
try:
logger.debug(f"Invoking app {uri} with payload {payload}")
response = await client.post(
url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5)
)
response.raise_for_status()
llm_app_response = response.json()
app_output = (
llm_app_response["message"]
if isinstance(llm_app_response, dict)
else llm_app_response
)
return AppOutput(output=app_output, status="success")
except:
return AppOutput(output="Error", status="error")
And that rounds up the process.
The backoff exponential strategy in the code works as follows:
Batch Processing: The batch_invoke function splits the testset data into smaller batches with a configurable size. Each batch is processed sequentially.
Individual Invokes with Retry: Within each batch, each data point is processed by the run_with_retry
function. This function attempts to invoke the app for the data point. If the invocation fails due to specific network errors (timeouts, connection issues), the function retries with a delay. This delay is initially set to a configurable value (retry_delay
) and is doubled for each subsequent retry attempt within the same batch.
This approach helps avoid overloading the app server with repeated requests after a failure. It gives the server time to recover and allows the queue of pending requests to clear before retrying.
The strategy also includes a configurable maximum number of retries per data point to prevent infinite loops. A delay between batches (delay_between_batches
) is also included to avoid exceeding rate limits set by the app server.
I hope this summarizes everything that you have learnt in today’s article. Please let me know if you have any questions!