Эта статья научит вас, как проводить оценки с использованием любой модели LLM, не поддаваясь ужасному исключению «Ограничение скорости OpenAI». Мы бы начали с:
На данный момент объяснение Cloudflare — лучшее, что я видел: ограничение скорости — это стратегия ограничения сетевого трафика. Он накладывает ограничение на то, как часто кто-то может повторять действие в течение определенного периода времени, например, пытаясь войти в учетную запись.
Проще говоря, представьте, что вы мать четверых детей, которые все любят мед. В прошлый раз мед закончился раньше, чем ожидалось. Теперь вы установили таймер на счет до десяти тысяч и дали каждому ребенку по очереди выпить меда. Таймер представляет собой ограничение скорости, поскольку он устанавливает определенное время ожидания, прежде чем они смогут получить больше меда.
Объяснив концепцию, давайте разберемся с ограничениями скорости OpenAI и обсудим, как я реализовал логику ограничения скорости для управления R/TPM OpenAI (запрос/токен в минуту) с помощью Python.
OpenAI установила определенные ограничения на количество запросов к своим моделям ИИ, которые можно сделать в течение минуты. Эти ограничения различны для каждой модели ИИ, предоставляемой OpenAI.
Для бесплатной версии:
Для уровня 1:
Дополнительную информацию об ограничениях скорости других уровней см. в документации .
Причинами этих ограничений являются:
Ожидается, что эти ограничения останутся неизменными в обозримом будущем.
Этот процесс (см. изображение ниже) включает в себя предоставление пользователям возможности запускать оценки LLM из пользовательского интерфейса и настраивать параметры ограничения скорости для своих приложений LLM без необходимости писать логику самостоятельно.
Это достигается с помощью функции, которая подготавливает и вызывает пакет. Каждый вызов в пакете вызывает функцию run_with_retry
, которая, в свою очередь, вызывает конечную функцию ( invoke_app
) с механизмом повтора.
Я уверен, что после ознакомления с описанным выше процессом вы сможете написать логику кода на любом языке по вашему выбору. В любом случае, я покажу вам, как я это сделал. Чтобы получить больше информации и контекста, я в основном работаю инженером-программистом в Agenta.
Agenta — это комплексная платформа LLM-разработчиков с открытым исходным кодом, которая предоставляет вам инструменты для быстрого проектирования и управления, ⚖️ оценки, ручного аннотирования и 🚀 развертывания. И все это без каких-либо ограничений на выбор платформы, библиотеки или модели. Agenta позволяет разработчикам и командам разработчиков совместно работать над созданием приложений на базе LLM промышленного уровня за меньшее время.
Мы хотели дать пользователям возможность настраивать конфигурацию ограничения скорости своих оценок LLM из пользовательского интерфейса, чтобы они могли обойти исключение ограничения скорости своего поставщика LLM.
Глядя на диаграмму процесса, первое, что нужно реализовать, — это логику подготовки и вызова пакета (вызовов LLM). Важно проверить конфигурацию ограничения скорости и использовать модель проверки данных для определения ограничения скорости выполнения LLM. Модель ниже обрабатывает rate_limit_config
, необходимый для работы пакетного вызова.
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
Функция batch_invoke
принимает следующие параметры:
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
После подготовки и вызова пакета следующим шагом будет выполнение логики run_with_retry
. Эта пользовательская реализация включает в себя функцию ограничения скорости и управляет вызовом приложения llm, повторяя попытку после достижения установленной задержки. Экспоненциальная отсрочка, метод, который повторяет операцию с экспоненциально увеличивающимся временем ожидания, используется до тех пор, пока не будет достигнуто максимальное количество повторов.
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
Использование AppOutput : важно обрабатывать исключение даже после того, как максимальное количество попыток исчерпано. Таким образом, вы разрешаете выполнение всех данных, которые вы пытаетесь обработать, и затем можете определить, что не удалось, а что прошло.
Последний шаг — вызов приложения с использованием openapi_parameters
приложения LLM, чтобы определить, как его вызвать с помощью одной точки данных.
Функция make_payload вас не должна беспокоить. Он создает полезную нагрузку для вызова приложения LLM на основе его параметров OpenAPI .
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
И это завершает процесс.
Экспоненциальная стратегия отсрочки в коде работает следующим образом:
Пакетная обработка: функцияatch_invoke разбивает данные набора тестов на более мелкие пакеты с настраиваемым размером. Каждая партия обрабатывается последовательно.
Отдельные вызовы с повтором: в каждом пакете каждая точка данных обрабатывается функцией run_with_retry
. Эта функция пытается вызвать приложение для точки данных. Если вызов завершается неудачей из-за определенных сетевых ошибок (тайм-ауты, проблемы с подключением), функция повторяет попытку с задержкой. Для этой задержки изначально установлено настраиваемое значение ( retry_delay
) и удваивается для каждой последующей повторной попытки в том же пакете.
Такой подход помогает избежать перегрузки сервера приложений повторными запросами после сбоя. Это дает серверу время на восстановление и позволяет очистить очередь ожидающих запросов перед повторной попыткой.
Стратегия также включает настраиваемое максимальное количество повторов на точку данных, чтобы предотвратить бесконечные циклы. Задержка между пакетами ( delay_between_batches
) также включена, чтобы избежать превышения ограничений скорости, установленных сервером приложений.
Я надеюсь, что это суммирует все, что вы узнали в сегодняшней статье. Пожалуйста, дай мне знать, если возникнут какие-либо вопросы!