この記事では、恐ろしい「OpenAI レート制限」例外に屈することなく、LLM モデルを使用して評価を実行する方法を説明します。まず次のことから始めます。
これまでのところ、Cloudflare の説明は私が見た中で最高のものです。レート制限はネットワーク トラフィックを制限するための戦略です。これは、アカウントへのログイン試行など、特定の時間枠内でユーザーがアクションを繰り返すことができる頻度に上限を設けます。
簡単に言うと、はちみつが大好きな 4 人の子供の母親になったと想像してください。前回は思ったより早く蜂蜜がなくなってしまいました。さて、1 万まで数えるタイマーを設定し、各子供にはちみつを飲む順番を与えました。タイマーはレート制限を表し、蜂蜜をさらに摂取できるようになるまでに特定の待機時間を強制します。
概念を説明したところで、OpenAI のレート制限について理解し、Python を使用して OpenAI の R/TPM (1 分あたりのリクエスト/トークン) を管理するレート制限ロジックをどのように実装したかについて説明します。
OpenAI は、1 分間に AI モデルに対して実行できるリクエストの数に一定の制限を設けています。これらの制限は、OpenAI が提供する AI モデルごとに異なります。
無料版の場合:
階層 1 の場合:
他の層のレート制限の詳細については、ドキュメントを参照してください。
これらの制限の理由には次のようなものがあります。
これらの制限は、予見可能な将来にわたって一貫したままであると予想されます。
このプロセス (下の図を参照) には、ユーザーがロジックを自分で記述することなく、UI から LLM 評価を実行し、LLM アプリのレート制限パラメーターを構成できるようにすることが含まれます。
これは、バッチを準備して呼び出す関数によって実現されます。バッチ内の各呼び出しはrun_with_retry
関数を呼び出し、次にこの関数が再試行メカニズムを備えた最終関数 ( invoke_app
) を呼び出します。
上記のプロセスを確認した後は、任意の言語でコード ロジックを作成できると確信しています。とにかく、私がどのようにやったかをお見せします。詳しい背景と背景については、私は主に Agenta でバックエンド ソフトウェア エンジニアとして働いています。
Agenta は、迅速なエンジニアリングと管理、⚖️ 評価、人間による注釈、🚀 導入のためのツールを提供するオープンソースのエンドツーエンド LLM 開発者プラットフォームです。フレームワーク、ライブラリ、モデルの選択に制限を課すことはありません。 Agenta を使用すると、開発者と製品チームが協力して、実稼働グレードの LLM を利用したアプリケーションを短時間で構築できるようになります。
ユーザーが UI から LLM 評価のレート制限構成を構成できるようにして、LLM プロバイダーのレート制限例外を回避できるようにしたいと考えました。
プロセス図を見ると、最初に実装するのは、(LLM 呼び出しの) バッチを準備して呼び出すためのロジックです。レート制限構成を検証し、データ検証モデルを使用して LLM 実行レート制限を定義することが重要です。以下のモデルは、バッチ呼び出しが機能するために必要なrate_limit_config
パラメーターを処理します。
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
batch_invoke
関数は次のパラメーターを受け取ります。
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
バッチを準備して呼び出したら、次のステップではrun_with_retry
ロジックを実行します。このカスタム実装にはレート制限機能が含まれており、llm アプリの呼び出しを管理し、設定された遅延に達した後に再試行します。指数バックオフは、最大再試行回数に達するまで、指数関数的に増加する待機時間を使用して操作を再試行する手法です。
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
AppOutputの使用: 最大再試行回数を使い果たした後でも例外を処理することが重要です。こうすることで、処理しようとしているすべてのデータの実行を許可し、何が失敗し、何が成功したかを判断できます。
最後のステップでは、LLM アプリのopenapi_parameters
使用してアプリを呼び出し、単一のデータポイントでアプリを呼び出す方法を決定します。
make_payload 関数は気にする必要はありません。 OpenAPIパラメータに基づいて、LLM アプリを呼び出すためのペイロードを構築します。
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
これでプロセスは終了です。
コード内のバックオフ指数関数戦略は次のように機能します。
バッチ処理: batch_invoke 関数は、テストセット データを構成可能なサイズの小さなバッチに分割します。各バッチは順番に処理されます。
再試行を伴う個別の呼び出し:各バッチ内で、各データ ポイントがrun_with_retry
関数によって処理されます。この関数は、データ ポイントのアプリの呼び出しを試みます。特定のネットワーク エラー (タイムアウト、接続の問題) が原因で呼び出しが失敗した場合、関数は遅延して再試行します。この遅延は、最初は構成可能な値 ( retry_delay
) に設定され、同じバッチ内での後続の再試行ごとに 2 倍になります。
このアプローチは、失敗後のリクエストの繰り返しによるアプリサーバーの過負荷を回避するのに役立ちます。これにより、サーバーに回復する時間が与えられ、再試行する前に保留中のリクエストのキューをクリアできるようになります。
この戦略には、無限ループを防ぐためのデータ ポイントごとの構成可能な最大再試行数も含まれています。アプリサーバーによって設定されたレート制限を超えないようにするために、バッチ間の遅延 ( delay_between_batches
) も含まれます。
今日の記事で学んだことのすべてがこれに要約されていると幸いです。ご質問がございましたら、お知らせください。