Este artículo le enseñará cómo ejecutar evaluaciones utilizando cualquier modelo LLM sin sucumbir a la temida excepción del "límite de tasa de OpenAI". Empezaríamos por:
Hasta ahora, la explicación de Cloudflare es la mejor que he visto: la limitación de velocidad es una estrategia para limitar el tráfico de la red. Pone un límite a la frecuencia con la que alguien puede repetir una acción dentro de un período de tiempo determinado, por ejemplo, intentar iniciar sesión en una cuenta.
En pocas palabras, imagina ser madre de cuatro hijos a quienes les encanta la miel. La última vez, la miel se acabó antes de lo esperado. Ahora, ha configurado un cronómetro para contar hasta diez mil y le ha dado a cada niño un turno para tomar un poco de miel. El temporizador representa el límite de tasa, ya que impone un tiempo de espera específico antes de que puedan tener más miel.
Habiendo explicado el concepto, comprendamos los límites de velocidad de OpenAI y analicemos cómo implementé una lógica de límite de velocidad para administrar el R/TPM (solicitud/token por minuto) de OpenAI usando Python.
OpenAI ha establecido ciertas restricciones en la cantidad de solicitudes que uno puede realizar para sus modelos de IA en un minuto. Estas limitaciones son diferentes para cada modelo de IA proporcionado por OpenAI.
Para la versión gratuita:
Para el nivel 1:
Consulte los documentos para obtener más información sobre los límites de tarifas de otros niveles.
El motivo de estas restricciones incluye:
Se espera que estas limitaciones se mantengan constantes en el futuro previsible.
El proceso (ver imagen a continuación) implica permitir a los usuarios ejecutar evaluaciones de LLM desde la interfaz de usuario y configurar parámetros de límite de velocidad para sus aplicaciones de LLM sin necesidad de escribir la lógica ellos mismos.
Esto se logra mediante una función que prepara e invoca el lote. Cada llamada en el lote invoca la función run_with_retry
, que a su vez invoca la función final ( invoke_app
) con el mecanismo de reintento.
Estoy seguro de que puedes escribir el código lógico en cualquier idioma que elijas después de echar un vistazo al proceso anterior. De todos modos, te mostraré cómo hice el mío. Para obtener más información y contexto, trabajo principalmente como ingeniero de software backend en Agenta.
Agenta es una plataforma integral para desarrolladores de LLM de código abierto que le brinda las herramientas para una ingeniería y administración rápidas, ⚖️ evaluación, anotación humana e 🚀 implementación. Todo sin imponer restricciones a la elección de marco, biblioteca o modelo. Agenta permite a los desarrolladores y equipos de productos colaborar en la creación de aplicaciones LLM de nivel de producción en menos tiempo.
Queríamos brindar a los usuarios la posibilidad de configurar la configuración de limitación de tasa de sus evaluaciones de LLM desde la interfaz de usuario para que puedan omitir la excepción de limitación de tasa de su proveedor de LLM.
Mirando el diagrama del proceso, lo primero que hay que implementar es la lógica para preparar e invocar el lote (de llamadas LLM). Es importante validar la configuración del límite de tasa y utilizar un modelo de validación de datos para definir el límite de tasa de ejecución de LLM. El siguiente modelo maneja el parámetro rate_limit_config
requerido para que funcione la invocación por lotes.
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
La función batch_invoke
toma los siguientes parámetros:
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
Después de preparar e invocar el lote, el siguiente paso implica ejecutar la lógica run_with_retry
. Esta implementación personalizada incluye una funcionalidad de limitación de velocidad y administra la invocación de la aplicación llm, reintentando después de alcanzar el retraso establecido. El retroceso exponencial, una técnica que reintenta una operación con un tiempo de espera que aumenta exponencialmente, se emplea hasta que se alcanza un recuento máximo de reintentos.
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
El uso de AppOutput : es importante manejar una excepción incluso después de que haya agotado su máximo de reintentos. De esta manera, permite que se ejecuten todos los datos que está intentando procesar y luego puede determinar qué falló y qué pasó.
El último paso es invocar la aplicación, utilizando openapi_parameters
de la aplicación LLM para determinar cómo invocarla con un único punto de datos.
La función make_payload no debería preocuparte. Construye la carga útil para invocar la aplicación LLM en función de sus parámetros OpenAPI .
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
Y eso completa el proceso.
La estrategia exponencial de retroceso en el código funciona de la siguiente manera:
Procesamiento por lotes: la función batch_invoke divide los datos del conjunto de pruebas en lotes más pequeños con un tamaño configurable. Cada lote se procesa secuencialmente.
Invocaciones individuales con reintento: dentro de cada lote, cada punto de datos es procesado por la función run_with_retry
. Esta función intenta invocar la aplicación para el punto de datos. Si la invocación falla debido a errores de red específicos (tiempos de espera, problemas de conexión), la función lo vuelve a intentar con un retraso. Este retraso se establece inicialmente en un valor configurable ( retry_delay
) y se duplica para cada reintento posterior dentro del mismo lote.
Este enfoque ayuda a evitar sobrecargar el servidor de aplicaciones con solicitudes repetidas después de una falla. Le da tiempo al servidor para recuperarse y permite que la cola de solicitudes pendientes se borre antes de volver a intentarlo.
La estrategia también incluye un número máximo configurable de reintentos por punto de datos para evitar bucles infinitos. También se incluye un retraso entre lotes ( delay_between_batches
) para evitar exceder los límites de velocidad establecidos por el servidor de aplicaciones.
Espero que esto resuma todo lo que has aprendido en el artículo de hoy. ¡Por favor hazme saber si tienes preguntas!