Este artigo ensinará como executar avaliações usando qualquer modelo LLM sem sucumbir à temida exceção “OpenAI Rate Limit”. Começaríamos por:
Até agora, a explicação da Cloudflare é a melhor que já vi: a limitação de taxa é uma estratégia para limitar o tráfego de rede. Ele limita a frequência com que alguém pode repetir uma ação dentro de um determinado período de tempo – por exemplo, tentar fazer login em uma conta.
Simplificando, imagine ser mãe de quatro filhos que amam mel. Da última vez, o mel acabou antes do esperado. Agora, você configurou um cronômetro para contar até dez mil e deu a cada criança a oportunidade de comer um pouco de mel. O cronômetro representa o limite de taxa, pois impõe um tempo de espera específico antes que eles possam ter mais mel.
Depois de explicar o conceito, vamos entender os limites de taxa do OpenAI e discutir como implementei uma lógica de limite de taxa para gerenciar o R/TPM (solicitação/token por minuto) do OpenAI usando Python.
A OpenAI estabeleceu certas restrições sobre o número de solicitações que podem ser feitas para seus modelos de IA em um minuto. Estas limitações são diferentes para cada modelo de IA fornecido pela OpenAI.
Para a versão gratuita:
Para o nível 1:
Consulte a documentação para obter mais informações sobre limites de taxa de outros níveis.
A razão para essas restrições inclui:
Espera-se que essas limitações permaneçam consistentes no futuro próximo.
O processo (veja a imagem abaixo) envolve permitir que os usuários executem avaliações LLM a partir da UI e configurem parâmetros de limite de taxa para seus aplicativos LLM sem a necessidade de escrever a lógica por conta própria.
Isto é conseguido através de uma função que prepara e invoca o lote. Cada chamada no lote invoca a função run_with_retry
, que por sua vez invoca a função final ( invoke_app
) com o mecanismo de nova tentativa.
Tenho certeza de que você pode escrever a lógica do código em qualquer linguagem de sua escolha depois de dar uma olhada no processo acima. Independentemente disso, vou mostrar como fiz o meu. Para obter mais informações e contexto, trabalho principalmente como engenheiro de software back-end na Agenta.
Agenta é uma plataforma de desenvolvedor LLM de código aberto ponta a ponta que fornece as ferramentas para engenharia e gerenciamento imediatos, ⚖️ avaliação, anotação humana e 🚀 implantação. Tudo sem impor quaisquer restrições à sua escolha de estrutura, biblioteca ou modelo. Agenta permite que desenvolvedores e equipes de produto colaborem na construção de aplicativos LLM de nível de produção em menos tempo.
Queríamos dar aos usuários a capacidade de definir a configuração de limitação de taxa de suas avaliações LLM na interface do usuário para que possam ignorar a exceção de limitação de taxa do provedor LLM.
Observando o diagrama do processo, a primeira coisa a implementar é a lógica de preparação e invocação do lote (de chamadas LLM). É importante validar a configuração do limite de taxa e usar um modelo de validação de dados para definir o limite de taxa de execução do LLM. O modelo abaixo lida com o parâmetro rate_limit_config
necessário para que a chamada em lote funcione.
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
A função batch_invoke
usa os seguintes parâmetros:
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
Depois de preparar e invocar o lote, a próxima etapa envolve a execução da lógica run_with_retry
. Essa implementação personalizada inclui funcionalidade de limitação de taxa e gerencia a invocação do aplicativo llm, tentando novamente após atingir o atraso definido. A espera exponencial, uma técnica que tenta novamente uma operação com um tempo de espera crescente exponencialmente, é empregada até que uma contagem máxima de novas tentativas seja atingida.
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
O uso de AppOutput : é importante lidar com uma exceção mesmo depois de ela ter esgotado seu máximo de tentativas. Dessa forma, você permite que todos os dados que está tentando processar sejam executados e, então, pode determinar o que falhou e o que foi aprovado.
A etapa final é invocar o aplicativo, usando openapi_parameters
do aplicativo LLM para determinar como invocá-lo com um único ponto de dados.
A função make_payload não deve preocupar você. Ele constrói a carga para invocar o aplicativo LLM com base em seus parâmetros OpenAPI .
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
E isso completa o processo.
A estratégia exponencial de backoff no código funciona da seguinte maneira:
Processamento em lote: a função batch_invoke divide os dados do conjunto de testes em lotes menores com tamanho configurável. Cada lote é processado sequencialmente.
Invocações individuais com nova tentativa: dentro de cada lote, cada ponto de dados é processado pela função run_with_retry
. Esta função tenta invocar o aplicativo para o ponto de dados. Se a invocação falhar devido a erros de rede específicos (tempo limite, problemas de conexão), a função tentará novamente com atraso. Esse atraso é inicialmente definido para um valor configurável ( retry_delay
) e é duplicado para cada nova tentativa subsequente dentro do mesmo lote.
Essa abordagem ajuda a evitar sobrecarregar o servidor de aplicativos com solicitações repetidas após uma falha. Dá ao servidor tempo para se recuperar e permite que a fila de solicitações pendentes seja limpa antes de tentar novamente.
A estratégia também inclui um número máximo configurável de novas tentativas por ponto de dados para evitar loops infinitos. Um atraso entre lotes ( delay_between_batches
) também está incluído para evitar exceder os limites de taxa definidos pelo servidor de aplicativos.
Espero que isso resuma tudo o que você aprendeu no artigo de hoje. Por favor, deixe-me saber se você tem alguma dúvida!