めいふの備忘ログ

やったことを忘れないように書きこのしておくのです。

補遺:OpenAIのAPIをより高速にたたくためのコツ(非同期通信)

本備忘ログは「続・ChatGPTで行う述語項目構造解析:遥かなるプロンプトエンジニアリング編 - めいふの備忘ログ」の補遺にあたります

枕詞

本備忘ログでは512記事を用いて17のプロンプト精度の比較を行っています。プロンプトによっては複数の会話をChatGPTさんと交わさなければならないわけです。そん時に困ったことがありました。

ChatGPTちゃんとのAPI通信に時間がかかるな

と。早くおわんねーかな。1時間かかって30記事(1記事2分)しか進まないのでは困るよ君ィ、と(512記事が実験処理対象)。

いやしかし、どうすればいいの? 速度を上げるには使ってるパソコンのCPUパワーを上げればいいの? ChatGPTさんとのお喋り実験にはOpenAIのAPIを叩いているのですが、いえいえ、ChatGPTさんの処理自体はOpenAIのサーバ内でやっているのでワタクシが使っているパソコンのCPUパワーを上げても速くはなりません。そこでワタクシが目をつけたのが、非同期通信です。 OpenAIのAPIと通信するときの待ち時間があるので、その待っている間に他の処理を進めてみる進め方です。本備忘ログでは、ワタクシがOpenAIのAPIの非同期通信化について調べたり試みたことを記載しております。

Pythonの非同期通信について調査したときに参考にさせていただいたブログ

ChatGPTさんとのAPI通信の非同期通信による高速化に向けて、関連情報を収集したところ、以下のような情報が取得できました。

Pythonの非同期処理の基礎とOpenAI APIへ並列リクエストする実践例
https://zenn.dev/dev_commune/articles/19296b87231ea8?redirected=1
Pythonにおける非同期通信について解説してくれています。

Python の asyncio は超便利
https://qiita.com/waterada/items/1c03a7c863faf9327595
Pythonで非同期通信をするための仕組みであるasyncioについて解説してくれています。

OpenAI Python API library
https://github.com/openai/openai-python
pip で入れるライブラリのDocumentationがあります。最初に読むべきかも。

OpenAI Python SDK v1.0がめちゃくちゃ使いやすくなってた
https://zenn.dev/yosemat/articles/da8a633d93bbaf

OpenAI Python API Library v1.0 入門
https://note.com/npaka/n/n27b94df96179

Pyhonで非同期処理を考える。
https://note.com/dngri/n/n1a41632d5ebc

OpenAIのAPIに昔あったacreateというメソッドがありましたが、廃止されたようです。非同期通信するにはAsyncOpenAIクラスからcreateメソッドを呼び出すそうです。色々と調べた結果、ワタクシはOpenAIと通信クラスについてはOpenAI純正のAsyncOpenAIクラスを用い、そして、asyncioで並列処理を起動する、というような方針としました。実装から抜粋したコード断片を以下に紹介します。

OpenAIのAPI呼び出しを非同期処理するためのpythonコード

まずはimport のTipsから紹介します。AsyncOpenAIのimport、API通信時のretryを管理してくれるtenacityのimport、Google Colabのjupyter notebook でasyncioを適用するためのnest_asyncio.apply()を行っています。Google colabのuserdataを用いてOpenAIのシークレットキーを管理しております。

from openai import OpenAI
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_random_exponential
import nest_asyncio
nest_asyncio.apply()
import asyncio
from google.colab import userdata

非同期通信したい場合はAsyncOpenAIクラスでclientを作ります。メソッド内で都度clientを作るのがOpenAI的におすすめだったような記憶があります(method内でclientの生成と処理を全て済ませるという意味だったかも。記憶が曖昧)。ChatGPTのclinetからAPI呼び出しを行うメソッド。メソッドにasyncをつけて非同期処理の宣言を行う。@retryはAPIのリトライを処理するtenacityの便利機能。

def _ret_openai_clinet(self):
    SECRET_KEY_ENV_VARIABLE = 'OPEN_AI_KEY'
    api_secret_key = None

    # 環境変数からAPIのシークレットコードを取得
    # Google colab環境ではuserdata.get 経由でcolabに設定した値を取得する
    if self.process_environment == 'local':
        api_secret_key = os.environ.get(SECRET_KEY_ENV_VARIABLE)
    elif self.process_environment == 'colab':
        api_secret_key = userdata.get("OPENAI_API_KEY")
        os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
   	
    client = None
    if api_secret_key is None:
        print(f"エラー: {SECRET_KEY_ENV_VARIABLE} 環境変数が設定されていません。")
        exit()
    else:
        # APIのシークレットコードを使用して何かしらの処理を行う
        if self.async_flag == True:
            client = AsyncOpenAI(api_key=api_secret_key)
        elif self.async_flag == False:
            client = OpenAI(api_key=api_secret_key)
    return client

生成したclientはclientは別メソッド内で使っていきます(あまりよい作法ではなさそう)。そのメソッドにもasyncマークをつけて非同期宣言、clientを呼び出す関数にもawait宣言します。await宣言は他の非同期の処理を「待つ」箇所につけてあげます。

async def ret_one_response(self, id1, exp_name, prompt_dict1, delay):
 (略)
    client = self._ret_openai_clinet()
    # 待つところにはawait をつける。
    response = await self.ret_response_dict(client, model_name, message)
async def ret_response_dict(self, client, model_name, messages):

    # 推論処理を行う箇所。awaitをつける
    @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
    async def ret_chatgpt_response_dict(client, model_name, messages, temperature=0):
        if self.async_flag == True:
            chat_completion = await client.chat.completions.create(
                messages= messages,
                temperature=temperature,
                model=model_name,
            )
        # 同期処理。awaitはつけない
        elif self.async_flag == False:
            chat_completion = client.chat.completions.create(
                messages= messages,
                temperature=temperature,
                model=model_name,
            )
        
        return chat_completion
	
     # APIから返されたオブジェクトを辞書に変換。他にもっとスマートな方法ないのかな?
    def obj2dict(response):
        rdict = vars(response)
        clist = []
        for choice in rdict['choices']:
            c1 = vars(choice)
            c1['message'] = vars(choice.message) #tmp_dict
            clist.append(c1)
        rdict['choices'] = clist
        usage = rdict['usage']
        rdict['usage'] = vars(usage)
        return rdict
    
    response_dict = {}
    # clientを非同期メソッドに食べさせて処理を辞書に格納する。awaitをつける。
    try:
        response = await ret_chatgpt_response_dict(client, model_name, messages, temperature=0)
        response_dict['status'] = 'ok'
        response_dict['response'] = obj2dict(response)
        response_dict['input'] = messages
    except Exception as e:
        print(str(e))
        response_dict['status'] = 'ng'
        response_dict['input'] = messages
        response_dict['error_message'] = str(e)

    return response_dict

OpenAIのAPI呼び出しの非同期化のコツをまとめると以下のようになります。

  • 非同期を処理するメソッド宣言にはasyncをつける
  • その呼び出しで他の非同期処理を待つ箇所にはawaitをつける

他にもっといい書き方があるかなと思いますが、プロンプトの実験を回すことができたので、これで良しでした。

OpenAIのAPIのRate Limits(Quota制限)対策

これでOpenAIのChatGPTとの通信処理の並列化できましたし、ワタクシは皮算用として、OpenAIのAPIを100並列で実行すれば100倍速、実験はあっというまに終わる、という展開をちょっぴり期待しておりました。しかし、そうは問屋がおろさないのです。APIにはQuota制限があるものなのです。ユーザに無限にリソースを消費されないようにAPIやシステム利用に課された制約のことです。OpenAIのChatGPT周りの制約はどうなっているんでしょうか?

Rate limits
https://platform.openai.com/docs/guides/rate-limits/usage-tiers

202401に調査したところ、

  • 1分あたりの tokenの90, 000 TPM(token per miniute)
  • 1分あたりの requets上限は3, 500 RPM(request per miniute)

でした。

5記事に対する16プロンプト(当時は16プロンプトで実験していました)135 リクエスト、token 合計は、114,967となりました。1queryあたりのtokenが多い。1分で5サンプルではRate Limitが溢れてしまいます。120000/5=24000。1サンプル投げると24000トークンを消費します。ですが、3サンプルごとに、全てのプロンプトを同時に投げて結果を格納すれば、Rate Limitsに引っ掛からない。そして、制約は1分あたりのリソースの処理量にかかります。というわけで、ChatGPTへのプロンプト実験では、以下のようにトークン量を監視し、実験中にRate Limitに引っかかりそうなら、今は待つ!(処理量あたりの時間を増やす)することでRate Limits(Quota制限)対策としました(メソッド名がテキトーで「gogo_hogehoge」とかなのは気にしないでくさい)。

def gogo_prompt(self, target_id_list, prompt_files):
	""" 並列処理の時間内の処理量の制御を行う
		target_id_list : 処理対象の記事IDのリスト 
		prompt_files: プロンプトが格納された配列
	"""
	
    # 1回の並列処理トライアルで処理するプロンプトを格納する
    # asyncio.create タスクに処理するメソッド(この場合はself.ret_one_response )を格納する
    async def main1(gogo_prompt_list):
        print(["batch size 1: ", len(gogo_prompt_list)])
        gogo_list = []
        # prompt_listにはプロンプトが格納されている。これを処理するタスクをgogo_listに詰めている
        for pidx, prompt_list in enumerate(gogo_prompt_list):
            id1 = prompt_list[0]
            exp_name = prompt_list[1]
            prompt_dict1 = prompt_list[2]
            gogo_list.append(asyncio.create_task(self.ret_one_response(id1, exp_name, prompt_dict1, 1)) )

        print(["batch size 2:", len(gogo_list)])
        
        # asyncio.gatherで並列処理の結果の集計を行います。gogo_listに並列処理の単位が格納されている
        results = await asyncio.gather(
            *gogo_list,
            return_exceptions=False,
            )
        
	# resultsにはOpenAIからのAPIの返り値の辞書が格納されています。処理時間もわかります。
        total_tokens, total_seconds, query_counter = experimental_util._ret_elapsed_time(results)
        # 秒数を分に変換
        minutes = total_seconds / 60.0
        # tpmとrpmの計算
        tpm = 0.0
        rpm = 0.0
        if minutes > 0.0:
            tpm = total_tokens /  minutes
            rpm = query_counter / minutes
        print(f"tpm {tpm}, rpm {rpm}")
        
        # Rate Limitを超えそうなら、待つ!
        if tpm > ( 90000 - 5000 )  or ( rpm > 3500 - 100 ):
            print("Waiting")
            time.sleep(100)
        self.save_jsons(results)
    
    # self._ret_batch_dict で 1回の並列処理の処理対象のセットを返す。
    # id1 と exp_name に フラットな list を返す。パラメータは サンプル数と 実験数
    gogo_prompt_dict = self._ret_batch_dict(target_id_list, prompt_files)
    print(f"target_id_list {len(target_id_list)}, prompt_files: {len(prompt_files)}")
    
    #  並列処理の1回のバッチ。asyncio.runに上記のmain1を与えている
    for key, gogo_prompt_list in gogo_prompt_dict.items():
        print(f"gogo number {key}...")
        if self.skip_process_flag == True:
            print([f"skipping {len(gogo_prompt_list)} promps of {key}"])
            pass
        else:
            asyncio.run(main1(gogo_prompt_list))

非同期通信にした処理時間改善効果

なお、なぜ非同期処理用のAsyncOpenAIと同期処理用のOpenAIを使い分けるようになっているかと言いますと、非同期処理と同期処理の速度差を計測したかったのでフラグで呼び分けています。同期処理で1記事16プロンプトで処理するのに1分程度かかっていたものが、20秒と短縮されまして、処理時間が1/3になりました。非同期処理による処理速度向上効果ありでした。