先週、私はIETF Idempotency-Key 仕様の分析を書きました。この仕様は、リクエストの重複を避けることを目的としています。簡単に言うと、クライアントがリクエストと一緒に一意のキーを送信するという考え方です。
この投稿では、Apache APISIXを使用してこれを実装する方法を示します。
コーディングを始める前に、いくつか定義する必要があります。Apache APISIX はプラグインベースのアーキテクチャを提供します。したがって、上記のロジックをプラグインでコーディングします。
Apache APISIX は OpenResty を基盤としており、OpenResty は nginx を基盤としています。各コンポーネントはフェーズを定義し、それが多かれ少なかれコンポーネント全体にマッピングされます。フェーズの詳細については、この以前の投稿を参照してください。
最後に、優先度を決定します。優先度は、フェーズ内でAPISIX がプラグインを実行する順序を定義します。すべての認証プラグインの優先度は2000
以上であるため、私は1500
に決定しましたが、キャッシュされた応答をできるだけ早く返したいのです。
仕様では、データを保存することが求められています。APISIX は多くの抽象化を提供していますが、ストレージはその 1 つではありません。キー値ストアのように見えるように、冪等性キーを介してアクセスする必要があります。
私は Redis を任意に選択しました。これは、Redis がかなり普及しており、クライアントがすでに APISIX ディストリビューションの一部になっているためです。単純な Redis では JSON ストレージが提供されないことに注意してください。そのため、 redis-stack
Docker イメージを使用します。
ローカル インフラストラクチャは次のとおりです。
services: apisix: image: apache/apisix:3.9.0-debian volumes: - ./apisix/config.yml:/usr/local/apisix/conf/config.yaml:ro - ./apisix/apisix.yml:/usr/local/apisix/conf/apisix.yaml:ro #1 - ./plugin/src:/opt/apisix/plugins:ro #2 ports: - "9080:9080" redis: image: redis/redis-stack:7.2.0-v9 ports: - "8001:8001" #3
APISIX 構成は次のとおりです。
deployment: role: data_plane role_data_plane: config_provider: yaml #1 apisix: extra_lua_path: /opt/?.lua #2 plugins: - idempotency # priority: 1500 #3 plugin_attr: #4 idempotency: host: redis #5
最後に、単一のルートを宣言します。
routes: - uri: /* plugins: idempotency: ~ #1 upstream: nodes: "httpbin.org:80": 1 #2 #END #3
このインフラストラクチャが整えば、実装を開始できます。
Apache APISIX プラグインの基礎は非常に基本的なものです。
local plugin_name = "idempotency" local _M = { version = 1.0, priority = 1500, schema = {}, name = plugin_name, } return _M
次のステップは、Redis のホストとポートなどの設定です。まず、すべてのルートで単一の Redis 設定を提供します。これが、 config.yaml
ファイルのplugin_attr
セクションの背後にある考え方です。共通設定です。プラグインを具体化しましょう。
local core = require("apisix.core") local plugin = require("apisix.plugin") local attr_schema = { --1 type = "object", properties = { host = { type = "string", description = "Redis host", default = "localhost", }, port = { type = "integer", description = "Redis port", default = 6379, }, }, } function _M.init() local attr = plugin.plugin_attr(plugin_name) or {} local ok, err = core.schema.check(attr_schema, attr) --2 if not ok then core.log.error("Failed to check the plugin_attr[", plugin_name, "]", ": ", err) return false, err end end
構成の形状を定義する
設定が有効であることを確認する
プラグインでデフォルト値を定義したので、Docker Compose インフラストラクチャ内でredis
を実行するhost
のみをオーバーライドし、デフォルトのポートを使用できます。
次に、Redis クライアントを作成する必要があります。プラットフォームでは、書き換え/アクセス セクションの後のどのフェーズでも接続できないことに注意してください。そのため、 init()
メソッドで作成し、最後まで保持します。
local redis_new = require("resty.redis").new --1 function _M.init() -- ... redis = redis_new() --2 redis:set_timeout(1000) local ok, err = redis:connect(attr.host, attr.port) if not ok then core.log.error("Failed to connect to Redis: ", err) return false, err end end
OpenResty Redis モジュールのnew
機能を参照します。
インスタンスを取得するにはこれを呼び出します。
Redis クライアントは、プラグイン実行サイクルの残りの間、 redis
変数で使用できるようになりました。
以前のソフトウェア エンジニア時代、私は通常、まず名目上のパスを実装していました。その後、エラー ケースを個別に管理することで、コードをより堅牢にしました。この方法では、いつでもリリースする必要が生じた場合、警告はあってもビジネス価値を提供できます。このミニ プロジェクトにも同じように取り組みます。
公称パス上の疑似アルゴリズムは次のようになります。
DO extract idempotency key from request DO look up value from Redis IF value doesn't exist DO set key in Redis with empty value ELSE RETURN cached response DO forward to upstream DO store response in Redis RETURN response
ロジックを、上で述べたフェーズにマッピングする必要があります。アップストリームの前にはrewriteとaccess の2 つのフェーズがあり、アップストリームの後にはheader_filter 、 body_filter 、 log の3 つのフェーズがあります。 accessフェーズは以前の作業では明らかでしたが、他の 3 つのフェーズを理解する必要がありました。 body_filter をランダムに選択しましたが、他のフェーズについても理にかなった議論を聞くつもりです。
コードを読みやすくするためにログを削除したことに注意してください。エラー ログと情報ログは、運用上の問題のデバッグを容易にするために必要です。
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key --2 local resp, err = redis:hgetall(redis_key) --3 if not resp then return end if next(resp) == nil then --4 local resp, err = redis:hset(redis_key, "request", true ) --4 if not resp then return end else local data = normalize_hgetall_result(resp) --5 local response = core.json.decode(data["response"]) --6 local body = response["body"] --7 local status_code = response["status"] --7 local headers = response["headers"] for k, v in pairs(headers) do --7 core.response.set_header(k, v) end return core.response.exit(status_code, body) --8 end end
return
クライアントに返します。return ステートメントに注意してください。APISIX は、後のライフサイクル フェーズをスキップします。
function _M.body_filter(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key if core.response then local response = { --2 status = ngx.status, body = core.response.hold_body_chunk(ctx, true), headers = ngx.resp.get_headers() } local redis_key = "idempotency#" .. redis_key local resp, err = red:set(redis_key, "response", core.json.encode(response)) --3 if not resp then return end end end
リクエストから冪等性キーを抽出します。
応答のさまざまな要素を Lua テーブルに配置します。
JSONエンコードされたレスポンスをRedisセットに保存する
テストの結果、期待通りに動作することがわかりました。
試す:
curl -i -X POST -H 'Idempotency-Key: A' localhost:9080/response-headers\?freeform=hello curl -i -H 'Idempotency-Key: B' localhost:9080/status/250 curl -i -H 'Idempotency-Key: C' -H 'foo: bar' localhost:9080/status/250
また、一致しない冪等性キー (例: A
を 3 番目のリクエストで再利用してみてください。まだエラー管理を実装していないため、別のリクエストに対してキャッシュされた応答が返されます。ゲームを盛り上げるときが来ました。
仕様ではいくつかのエラー パスが定義されています。
1 つずつ実装してみましょう。まず、リクエストに冪等性キーがあるかどうかを確認します。ルートごとにプラグインを設定できるため、ルートにプラグインが含まれている場合は必須であると判断できます。
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") if not idempotency_key then return core.response.exit(400, "This operation is idempotent and it requires correct usage of Idempotency Key") end -- ...
キーが見つからない場合は、適切な 400 を返すだけです。これは簡単でした。
既存のキーが別のリクエストで再利用されているかどうかを確認するのは、少し複雑です。まず、リクエスト、より正確には、リクエストを構成するもののフィンガープリントを保存する必要があります。2 つのリクエストが同じメソッド、同じパス、同じ本文、同じヘッダーを持っている場合、そのリクエストは同じです。状況に応じて、ドメイン (およびポート) は、リクエストの一部である場合とそうでない場合があります。私のシンプルな実装では、それを省略します。
解決すべき問題がいくつかあります。まず、私がよく知っている他の言語 (Java のObject.hash()
など) にあるようなcore.request
オブジェクトをハッシュする既存の API が見つかりませんでした。オブジェクトを JSON でエンコードし、文字列をハッシュすることにしました。ただし、既存のcore.request
には JSON に変換できないサブ要素があります。上記の部分を抽出してテーブルを変換する必要がありました。
local function hash_request(request, ctx) local request = { --1 method = core.request.get_method(), uri = ctx.var.request_uri, headers = core.request.headers(), body = core.request.get_body() } local json = core.json.stably_encode(request) --2 return ngx.encode_base64(json) --3 end
関連する部分のみを含む表を作成します。
cjson
ライブラリは、複数の呼び出し間でメンバーが異なってソートされる可能性がある JSON を生成します。そのため、異なるハッシュが生成されます。core.json.stably_encode core.json.stably_encode
、この問題を修正します。
ハッシュします。
次に、リクエストを受信したときにブール値を保存する代わりに、結果のハッシュを保存します。
local hash = hash_request(core.request, ctx) if next(resp) == nil then core.log.warn("No key found in Redis for Idempotency-Key, set it: ", redis_key) local resp, err = redis:hset(redis_key, "request", hash) if not resp then core.log.error("Failed to set data in Redis: ", err) return end then -- ...
他のブランチのべき等キーの下に格納されているハッシュを読み取ります。一致しない場合は、関連するエラー コードで終了します。
local data = normalize_hgetall_result(resp) local stored_hash = data["request"] if hash ~= stored_hash then return core.response.exit(422, "This operation is idempotent and it requires correct usage of Idempotency Key. Idempotency Key MUST not be reused across different payloads of this operation.") end
最終的なエラー管理はその後すぐに行われます。次のシナリオを想像してください。
リクエストには冪等性キー X が付属しています。
プラグインはハッシュのフィンガープリントを作成し、Redis に保存します。
APISIX はリクエストをアップストリームに転送します。
重複したリクエストには、同じ冪等性キー X が付属します。
プラグインは Redis からデータを読み取りますが、キャッシュされた応答が見つかりません。
アップストリームはリクエストの処理を完了していないため、最初のリクエストはまだbody_filter
フェーズに到達していません。
上記のスニペットに次のコードを追加します。
if not data["response"] then return core.response.exit(409, " request with the same Idempotency-Key for the same operation is being processed or is outstanding.") end
それでおしまい。
この投稿では、プラグインを介して Apache APISIX にIdempotency-Key
ヘッダー仕様を簡単に実装する方法を示しました。この段階では、自動化されたテスト、ルートごとに Redis を構成する機能、ドメイン/パスをリクエストの一部に構成する機能、単一インスタンスではなく Redis クラスターを構成する機能、別の K/V ストアを使用する機能など、改善の余地があります。
しかし、仕様は実装されており、より実用レベルの実装に進化する可能性があります。
この投稿の完全なソースコードはGitHubにあります。
さらに進むには:
2024年4月7日にA Java Geekで最初に公開されました