上周,我写了一篇关于IETF 幂等密钥规范的分析。该规范旨在避免重复请求。简而言之,其理念是让客户端随请求一起发送一个唯一的密钥:
这篇文章展示了如何使用Apache APISIX实现它。
在开始编码之前,我们需要定义一些事情。Apache APISIX 提供基于插件的架构。因此,我们将在插件中编写上述逻辑。
Apache APISIX 建立在 OpenResty 之上,而 OpenResty 又建立在 nginx 之上。每个组件都定义了阶段,这些阶段或多或少地映射到各个组件上。有关阶段的更多信息,请参阅此上一篇文章。
最后,我们要确定优先级。优先级定义了 APISIX在阶段内运行插件的顺序。我决定使用1500
,因为所有身份验证插件的优先级都在2000
及以上范围内,但我想尽快返回缓存的响应。
规范要求我们存储数据。APISIX 提供了许多抽象,但存储不是其中之一。我们需要通过幂等键进行访问,因此它看起来像一个键值存储。
我随意选择了 Redis,因为它非常普及,而且客户端已经是 APISIX 发行版的一部分。请注意,简单的 Redis 不提供 JSON 存储;因此,我使用redis-stack
Docker 映像。
本地基础设施如下:
services: apisix: image: apache/apisix:3.9.0-debian volumes: - ./apisix/config.yml:/usr/local/apisix/conf/config.yaml:ro - ./apisix/apisix.yml:/usr/local/apisix/conf/apisix.yaml:ro #1 - ./plugin/src:/opt/apisix/plugins:ro #2 ports: - "9080:9080" redis: image: redis/redis-stack:7.2.0-v9 ports: - "8001:8001" #3
APISIX 配置如下:
deployment: role: data_plane role_data_plane: config_provider: yaml #1 apisix: extra_lua_path: /opt/?.lua #2 plugins: - idempotency # priority: 1500 #3 plugin_attr: #4 idempotency: host: redis #5
最后,我们声明我们的单一路线:
routes: - uri: /* plugins: idempotency: ~ #1 upstream: nodes: "httpbin.org:80": 1 #2 #END #3
有了这个基础设施,我们就可以开始实施了。
Apache APISIX 插件的基础非常简单:
local plugin_name = "idempotency" local _M = { version = 1.0, priority = 1500, schema = {}, name = plugin_name, } return _M
下一步是配置,例如Redis 主机和端口。首先,我们将为所有路由提供单一的 Redis 配置。这就是config.yaml
文件中plugin_attr
部分背后的想法:通用配置。让我们充实我们的插件:
local core = require("apisix.core") local plugin = require("apisix.plugin") local attr_schema = { --1 type = "object", properties = { host = { type = "string", description = "Redis host", default = "localhost", }, port = { type = "integer", description = "Redis port", default = 6379, }, }, } function _M.init() local attr = plugin.plugin_attr(plugin_name) or {} local ok, err = core.schema.check(attr_schema, attr) --2 if not ok then core.log.error("Failed to check the plugin_attr[", plugin_name, "]", ": ", err) return false, err end end
定义配置的形状
检查配置是否有效
因为我在插件中定义了默认值,所以我只能将host
覆盖为redis
,以在我的 Docker Compose 基础架构内运行并使用默认端口。
接下来,我需要创建 Redis 客户端。请注意,平台禁止我在重写/访问部分之后的任何阶段进行连接。因此,我将在init()
方法中创建它并将其保留到最后。
local redis_new = require("resty.redis").new --1 function _M.init() -- ... redis = redis_new() --2 redis:set_timeout(1000) local ok, err = redis:connect(attr.host, attr.port) if not ok then core.log.error("Failed to connect to Redis: ", err) return false, err end end
引用OpenResty Redis模块的new
功能。
调用它来获取一个实例。
Redis 客户端现在可在插件执行周期的其余时间内在redis
变量中使用。
在我以前的软件工程师生涯中,我通常首先实施名义路径。然后,我通过单独管理错误情况使代码更加健壮。这样,如果我必须在任何时候发布,我仍然会提供业务价值 - 带有警告。我将以同样的方式处理这个小项目。
标称路径上的伪算法如下所示:
DO extract idempotency key from request DO look up value from Redis IF value doesn't exist DO set key in Redis with empty value ELSE RETURN cached response DO forward to upstream DO store response in Redis RETURN response
我们需要将逻辑映射到我上面提到的阶段。上游之前有两个阶段可用,即重写和访问;之后有三个阶段,即header_filter 、 body_filter和log 。访问阶段之前的工作似乎很明显,但我需要在其他三个阶段之间做出选择。我随机选择了body_filter ,但我非常愿意听取其他阶段的合理论点。
请注意,我删除了日志以使代码更具可读性。错误和信息日志对于简化生产问题的调试必不可少。
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key --2 local resp, err = redis:hgetall(redis_key) --3 if not resp then return end if next(resp) == nil then --4 local resp, err = redis:hset(redis_key, "request", true ) --4 if not resp then return end else local data = normalize_hgetall_result(resp) --5 local response = core.json.decode(data["response"]) --6 local body = response["body"] --7 local status_code = response["status"] --7 local headers = response["headers"] for k, v in pairs(headers) do --7 core.response.set_header(k, v) end return core.response.exit(status_code, body) --8 end end
return
语句:APISIX 跳过了后续的生命周期阶段。
function _M.body_filter(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key if core.response then local response = { --2 status = ngx.status, body = core.response.hold_body_chunk(ctx, true), headers = ngx.resp.get_headers() } local redis_key = "idempotency#" .. redis_key local resp, err = red:set(redis_key, "response", core.json.encode(response)) --3 if not resp then return end end end
从请求中提取幂等性密钥。
在 Lua 表中排列响应的不同元素。
将 JSON 编码的响应存储在 Redis 集中
测试表明它按预期运行。
尝试:
curl -i -X POST -H 'Idempotency-Key: A' localhost:9080/response-headers\?freeform=hello curl -i -H 'Idempotency-Key: B' localhost:9080/status/250 curl -i -H 'Idempotency-Key: C' -H 'foo: bar' localhost:9080/status/250
另外,尝试重复使用不匹配的幂等性密钥,例如,第三个请求的A
由于我们尚未实施任何错误管理,您将获得另一个请求的缓存响应。是时候提高我们的水平了。
规范定义了几种错误路径:
让我们逐一实现它们。首先,让我们检查请求是否具有幂等性键。请注意,我们可以在每个路由上配置插件,因此如果路由包含插件,我们可以断定它是必需的。
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") if not idempotency_key then return core.response.exit(400, "This operation is idempotent and it requires correct usage of Idempotency Key") end -- ...
如果钥匙丢失,只需返回相应的 400。这个很简单。
检查现有密钥是否被其他请求重用稍微复杂一些。我们首先需要存储请求,或者更准确地说,存储构成请求的指纹。如果两个请求具有相同的方法、相同的路径、相同的主体和相同的标头,则这两个请求是相同的。根据您的情况,域(和端口)可能是也可能不是它们的一部分。对于我的简单实现,我将省略它。
有几个问题需要解决。首先,我没有找到现有的 API 来对core.request
对象进行哈希处理,就像我更熟悉的其他语言中那样,例如Java 的Object.hash()
。我决定用 JSON 编码对象并哈希字符串。但是,现有的core.request
有无法转换为 JSON 的子元素。我不得不提取上面提到的部分并转换表格。
local function hash_request(request, ctx) local request = { --1 method = core.request.get_method(), uri = ctx.var.request_uri, headers = core.request.headers(), body = core.request.get_body() } local json = core.json.stably_encode(request) --2 return ngx.encode_base64(json) --3 end
创建一个仅包含相关部分的表格。
cjson
库生成的 JSON 的成员在多次调用中可能排序不同。因此,它会产生不同的哈希值。core.json.stably_encode 修复core.json.stably_encode
这个问题。
将其哈希化。
然后,在接收请求时,我们不再存储布尔值,而是存储生成的哈希值。
local hash = hash_request(core.request, ctx) if next(resp) == nil then core.log.warn("No key found in Redis for Idempotency-Key, set it: ", redis_key) local resp, err = redis:hset(redis_key, "request", hash) if not resp then core.log.error("Failed to set data in Redis: ", err) return end then -- ...
我们读取另一个分支上幂等性键下存储的哈希值。如果它们不匹配,我们将退出并显示相关错误代码:
local data = normalize_hgetall_result(resp) local stored_hash = data["request"] if hash ~= stored_hash then return core.response.exit(422, "This operation is idempotent and it requires correct usage of Idempotency Key. Idempotency Key MUST not be reused across different payloads of this operation.") end
最后的错误管理随后发生。想象一下以下场景:
请求附带幂等密钥X。
插件对指纹进行识别并将哈希值存储在 Redis 中。
APISIX 将请求转发给上游。
重复请求具有相同的幂等键 X。
插件从 Redis 读取数据,发现没有缓存的响应。
上游尚未完成处理请求;因此,第一个请求尚未到达body_filter
阶段。
我们将以下代码附加到上面的代码片段:
if not data["response"] then return core.response.exit(409, " request with the same Idempotency-Key for the same operation is being processed or is outstanding.") end
就是这样。
在这篇文章中,我通过插件展示了 Apache APISIX 上Idempotency-Key
标头规范的简单实现。在这个阶段,它还有改进的空间:自动化测试、基于每个路由配置 Redis 的能力、将域/路径配置为请求的一部分、配置 Redis 集群而不是单个实例、使用另一个 K/V 存储等。
然而,它确实实现了规范,并有可能发展成为更适合生产级的实现。
本文的完整源代码可以在GitHub上找到。
进一步来说:
最初于 2024 年 4 月 7 日在A Java Geek上发布