Cet article vous apprendra comment exécuter des évaluations à l'aide de n'importe quel modèle LLM sans succomber à la redoutable exception « OpenAI Rate Limit ». Nous commencerions par :
Jusqu'à présent, l'explication de Cloudflare est la meilleure que j'ai vue : la limitation de débit est une stratégie visant à limiter le trafic réseau. Il limite la fréquence à laquelle une personne peut répéter une action dans un certain laps de temps - par exemple, essayer de se connecter à un compte.
Pour faire simple, imaginez être mère de quatre enfants qui aiment tous le miel. La dernière fois, le miel s’est épuisé plus tôt que prévu. Maintenant, vous avez réglé une minuterie pour compter jusqu'à dix mille et donné à chaque enfant un tour pour avoir du miel. La minuterie représente la limite de débit, car elle impose un temps d'attente spécifique avant de pouvoir avoir plus de miel.
Après avoir expliqué le concept, comprenons les limites de débit OpenAI et discutons de la façon dont j'ai implémenté une logique de limite de débit pour gérer le R/TPM (requête/jeton par minute) d'OpenAI à l'aide de Python.
OpenAI a fixé certaines restrictions sur le nombre de requêtes que l'on peut effectuer pour ses modèles d'IA en une minute. Ces limitations sont différentes pour chaque modèle d'IA fourni par OpenAI.
Pour la version gratuite :
Pour le niveau 1 :
Consultez la documentation pour plus d'informations sur les limites de débit des autres niveaux.
La raison de ces restrictions comprend :
Ces limitations devraient rester cohérentes dans un avenir prévisible.
Le processus (voir l'image ci-dessous) consiste à permettre aux utilisateurs d'exécuter des évaluations LLM à partir de l'interface utilisateur et de configurer les paramètres de limite de débit pour leurs applications LLM sans avoir besoin d'écrire eux-mêmes la logique.
Ceci est réalisé grâce à une fonction qui prépare et appelle le lot. Chaque appel du lot invoque la fonction run_with_retry
, qui à son tour appelle la fonction finale ( invoke_app
) avec le mécanisme de nouvelle tentative.
Je suis convaincu que vous pouvez écrire la logique du code dans n'importe quelle langue de votre choix après avoir examiné le processus ci-dessus. Quoi qu'il en soit, je vais vous montrer comment j'ai fait le mien. Pour plus d'informations et de contexte, je travaille principalement en tant qu'ingénieur logiciel back-end chez Agenta.
Agenta est une plate-forme de développement LLM open source de bout en bout qui vous fournit les outils nécessaires à une ingénierie et une gestion rapides, ⚖️ une évaluation, une annotation humaine et un 🚀 déploiement. Le tout sans imposer aucune restriction sur votre choix de framework, de bibliothèque ou de modèle. Agenta permet aux développeurs et aux équipes produit de collaborer à la création d'applications LLM de qualité production en moins de temps.
Nous voulions donner aux utilisateurs la possibilité de configurer la configuration de limitation de débit de leurs évaluations LLM à partir de l'interface utilisateur afin qu'ils puissent contourner l'exception de limitation de débit de leur fournisseur LLM.
En regardant le diagramme de processus, la première chose à implémenter est la logique de préparation et d'appel du lot (d'appels LLM). Il est important de valider la configuration de la limite de débit et d'utiliser un modèle de validation des données pour définir la limite de débit d'exécution LLM. Le modèle ci-dessous gère le paramètre rate_limit_config
requis pour que l'appel par lots fonctionne.
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
La fonction batch_invoke
prend les paramètres suivants :
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
Après avoir préparé et appelé le lot, l'étape suivante consiste à exécuter la logique run_with_retry
. Cette implémentation personnalisée inclut une fonctionnalité de limitation de débit et gère l'invocation de l'application llm, en réessayant une fois le délai défini atteint. L'interruption exponentielle, une technique qui consiste à relancer une opération avec un temps d'attente augmentant de façon exponentielle, est utilisée jusqu'à ce qu'un nombre maximum de tentatives soit atteint.
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
L'utilisation d' AppOutput : il est important de gérer une exception même après qu'elle a épuisé son nombre maximal de tentatives. De cette façon, vous autorisez l’exécution de toutes les données que vous essayez de traiter, puis vous pouvez déterminer ce qui a échoué et ce qui a réussi.
La dernière étape consiste à appeler l'application, en utilisant les openapi_parameters
de l'application LLM pour déterminer comment l'invoquer avec un seul point de données.
La fonction make_payload ne devrait pas vous concerner. Il construit la charge utile pour appeler l'application LLM en fonction de ses paramètres OpenAPI .
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
Et cela complète le processus.
La stratégie exponentielle d'attente dans le code fonctionne comme suit :
Traitement par lots : la fonction batch_invoke divise les données de l'ensemble de tests en lots plus petits avec une taille configurable. Chaque lot est traité séquentiellement.
Appels individuels avec nouvelle tentative : dans chaque lot, chaque point de données est traité par la fonction run_with_retry
. Cette fonction tente d'appeler l'application pour le point de données. Si l'appel échoue en raison d'erreurs réseau spécifiques (délais d'attente, problèmes de connexion), la fonction réessaye avec un délai. Ce délai est initialement défini sur une valeur configurable ( retry_delay
) et est doublé pour chaque nouvelle tentative ultérieure au sein du même lot.
Cette approche permet d'éviter de surcharger le serveur d'applications avec des requêtes répétées après un échec. Cela donne au serveur le temps de récupérer et permet à la file d'attente des demandes en attente de se vider avant de réessayer.
La stratégie comprend également un nombre maximum configurable de tentatives par point de données pour éviter les boucles infinies. Un délai entre les lots ( delay_between_batches
) est également inclus pour éviter de dépasser les limites de débit définies par le serveur d'applications.
J'espère que cela résume tout ce que vous avez appris dans l'article d'aujourd'hui. S'il vous plaît laissez-moi savoir si vous avez des questions!