이 기사에서는 두려운 "OpenAI 속도 제한" 예외에 굴복하지 않고 LLM 모델을 사용하여 평가를 실행하는 방법을 설명합니다. 우리는 다음과 같이 시작합니다:
지금까지 Cloudflare에 대한 설명이 제가 본 것 중 최고입니다. 속도 제한은 네트워크 트래픽을 제한하기 위한 전략입니다. 이는 특정 기간 내에 누군가가 작업을 반복할 수 있는 빈도를 제한합니다(예: 계정 로그인 시도).
간단히 말해서 꿀을 사랑하는 네 아이의 엄마가 된다고 상상해 보세요. 지난번에는 예상보다 꿀이 빨리 떨어졌어요. 이제 타이머를 최대 1만까지 세도록 설정하고 각 어린이에게 차례대로 꿀을 먹도록 했습니다. 타이머는 더 많은 꿀을 얻기 전에 특정 대기 시간을 적용하므로 속도 제한을 나타냅니다.
개념을 설명했으니 OpenAI 속도 제한을 이해하고 Python을 사용하여 OpenAI의 R/TPM(분당 요청/토큰)을 관리하기 위해 속도 제한 논리를 구현한 방법에 대해 논의해 보겠습니다.
OpenAI는 1분 내에 AI 모델에 대해 만들 수 있는 요청 수에 대해 특정 제한을 설정했습니다. 이러한 제한 사항은 OpenAI에서 제공하는 AI 모델마다 다릅니다.
무료 버전의 경우:
1등급의 경우:
다른 계층 비율 제한에 대한 자세한 내용은 문서를 참조하세요.
이러한 제한의 이유는 다음과 같습니다.
이러한 제한은 가까운 미래에도 일관되게 유지될 것으로 예상됩니다.
프로세스(아래 이미지 참조)에는 사용자가 UI에서 LLM 평가를 실행하고 논리 자체를 작성할 필요 없이 LLM 앱에 대한 속도 제한 매개 변수를 구성할 수 있도록 하는 작업이 포함됩니다.
이는 배치를 준비하고 호출하는 함수를 통해 달성됩니다. 배치의 각 호출은 run_with_retry
함수를 호출하고, 이는 다시 재시도 메커니즘을 통해 최종 함수( invoke_app
)를 호출합니다.
위의 프로세스를 살펴본 후에는 원하는 언어로 코드 로직을 작성할 수 있다고 확신합니다. 어쨌든 제가 어떻게 했는지 보여드리겠습니다. 더 많은 배경과 맥락을 알고 싶다면 저는 주로 Agenta에서 백엔드 소프트웨어 엔지니어로 일하고 있습니다.
Agenta 는 신속한 엔지니어링 및 관리, ⚖️ 평가, 사람 주석 및 🚀 배포를 위한 도구를 제공하는 오픈 소스 엔드투엔드 LLM 개발자 플랫폼입니다. 프레임워크, 라이브러리 또는 모델 선택에 제한을 두지 않고 모두 가능합니다. Agenta를 사용하면 개발자와 제품 팀이 더 짧은 시간에 프로덕션 수준의 LLM 기반 애플리케이션을 구축하는 데 협업할 수 있습니다.
우리는 사용자가 LLM 공급자 속도 제한 예외를 우회할 수 있도록 UI에서 LLM 평가 속도 제한 구성을 구성할 수 있는 기능을 제공하고 싶었습니다.
프로세스 다이어그램을 보면 가장 먼저 구현해야 할 것은 배치(LLM 호출)를 준비하고 호출하는 논리입니다. 속도 제한 구성을 검증하고 데이터 검증 모델을 사용하여 LLM 실행 속도 제한을 정의하는 것이 중요합니다. 아래 모델은 일괄 호출 기능에 필요한 rate_limit_config
매개변수를 처리합니다.
from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)
batch_invoke
함수는 다음 매개변수를 사용합니다.
async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs
배치를 준비하고 호출한 후 다음 단계에서는 run_with_retry
논리를 실행합니다. 이 사용자 정의 구현에는 속도 제한 기능이 포함되어 있으며 설정된 지연에 도달한 후 다시 시도하는 llm 앱 호출을 관리합니다. 최대 재시도 횟수에 도달할 때까지 대기 시간을 기하급수적으로 늘려 작업을 재시도하는 기술인 지수 백오프(Exponential Backoff)를 사용합니다.
async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))
AppOutput 사용: 최대 재시도 횟수를 모두 사용한 후에도 예외를 처리하는 것이 중요합니다. 이렇게 하면 처리하려는 모든 데이터가 실행되도록 허용한 다음 실패한 데이터와 통과한 데이터를 확인할 수 있습니다.
마지막 단계는 LLM 앱의 openapi_parameters
사용하여 단일 데이터 포인트로 앱을 호출하는 방법을 결정하여 앱을 호출하는 것입니다.
make_payload 함수는 사용자에게 영향을 주지 않습니다. OpenAPI 매개변수를 기반으로 LLM 앱을 호출하기 위한 페이로드를 구성합니다.
async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")
그리고 그것은 프로세스를 마무리합니다.
코드의 백오프 지수 전략은 다음과 같이 작동합니다.
일괄 처리: Batch_invoke 함수는 테스트 세트 데이터를 구성 가능한 크기로 더 작은 배치로 분할합니다. 각 배치는 순차적으로 처리됩니다.
재시도를 통한 개별 호출: 각 배치 내에서 각 데이터 포인트는 run_with_retry
함수에 의해 처리됩니다. 이 함수는 데이터 포인트에 대한 앱 호출을 시도합니다. 특정 네트워크 오류(시간 초과, 연결 문제)로 인해 호출이 실패하면 함수는 지연된 후 다시 시도합니다. 이 지연은 처음에 구성 가능한 값( retry_delay
)으로 설정되며 동일한 배치 내에서 후속 재시도를 시도할 때마다 두 배로 늘어납니다.
이 접근 방식은 실패 후 반복되는 요청으로 인해 앱 서버에 과부하가 걸리는 것을 방지하는 데 도움이 됩니다. 이는 서버가 복구할 수 있는 시간을 제공하고 재시도하기 전에 보류 중인 요청 대기열을 지울 수 있도록 합니다.
이 전략에는 무한 루프를 방지하기 위해 데이터 포인트당 구성 가능한 최대 재시도 횟수도 포함됩니다. 앱 서버에서 설정한 속도 제한을 초과하지 않도록 배치 간 지연( delay_between_batches
)도 포함됩니다.
이것이 오늘 기사에서 배운 모든 내용을 요약하기를 바랍니다. 궁금한 점이 있으면 알려주세요!