Na semana passada, escrevi uma análise da especificação IETF Idempotency-Key . A especificação visa evitar solicitações duplicadas. Resumindo, a ideia é que o cliente envie uma chave única junto com a solicitação:
Este post mostra como implementá-lo com Apache APISIX .
Antes de começar a codificar, precisamos definir algumas coisas. Apache APISIX oferece uma arquitetura baseada em plugins. Portanto, codificaremos a lógica acima em um plugin.
Apache APISIX baseia-se no OpenResty, que se baseia no nginx. Cada componente define fases, que mapeiam mais ou menos os componentes. Para mais informações sobre as fases, consulte este post anterior .
Finalmente, decidiremos sobre uma prioridade. Prioridade define a ordem em que APISIX executa plugins dentro de uma fase . Decidi por 1500
, pois todos os plug-ins de autenticação têm prioridade na faixa 2000
e mais, mas quero retornar a resposta em cache o mais rápido possível.
A especificação exige que armazenemos dados. APISIX oferece muitas abstrações, mas armazenamento não é uma delas. Precisamos de acesso por meio da chave de idempotência para que pareça um armazenamento de valores-chave.
Escolhi arbitrariamente o Redis, pois é bastante difundido e o cliente já faz parte da distribuição APISIX. Observe que o Redis simples não oferece armazenamento JSON; portanto, eu uso a imagem Docker redis-stack
.
A infraestrutura local é a seguinte:
services: apisix: image: apache/apisix:3.9.0-debian volumes: - ./apisix/config.yml:/usr/local/apisix/conf/config.yaml:ro - ./apisix/apisix.yml:/usr/local/apisix/conf/apisix.yaml:ro #1 - ./plugin/src:/opt/apisix/plugins:ro #2 ports: - "9080:9080" redis: image: redis/redis-stack:7.2.0-v9 ports: - "8001:8001" #3
A configuração APISIX é a seguinte:
deployment: role: data_plane role_data_plane: config_provider: yaml #1 apisix: extra_lua_path: /opt/?.lua #2 plugins: - idempotency # priority: 1500 #3 plugin_attr: #4 idempotency: host: redis #5
Finalmente, declaramos nossa rota única:
routes: - uri: /* plugins: idempotency: ~ #1 upstream: nodes: "httpbin.org:80": 1 #2 #END #3
Com esta infraestrutura instalada, podemos iniciar a implementação.
Os fundamentos de um plugin Apache APISIX são bastante básicos:
local plugin_name = "idempotency" local _M = { version = 1.0, priority = 1500, schema = {}, name = plugin_name, } return _M
A próxima etapa é a configuração, por exemplo, host e porta Redis. Para começar, ofereceremos uma configuração única do Redis em todas as rotas. Essa é a ideia por trás da seção plugin_attr
no arquivo config.yaml
: configuração comum. Vamos detalhar nosso plugin:
local core = require("apisix.core") local plugin = require("apisix.plugin") local attr_schema = { --1 type = "object", properties = { host = { type = "string", description = "Redis host", default = "localhost", }, port = { type = "integer", description = "Redis port", default = 6379, }, }, } function _M.init() local attr = plugin.plugin_attr(plugin_name) or {} local ok, err = core.schema.check(attr_schema, attr) --2 if not ok then core.log.error("Failed to check the plugin_attr[", plugin_name, "]", ": ", err) return false, err end end
Defina a forma da configuração
Verifique se a configuração é válida
Como defini valores padrão no plug-in, posso substituir apenas o host
para redis
para executar dentro de minha infraestrutura Docker Compose e usar a porta padrão.
Em seguida, preciso criar o cliente Redis. Observe que a plataforma me impede de conectar em qualquer fase após a seção de reescrita/acesso. Portanto, vou criá-lo no método init()
e mantê-lo até o final.
local redis_new = require("resty.redis").new --1 function _M.init() -- ... redis = redis_new() --2 redis:set_timeout(1000) local ok, err = redis:connect(attr.host, attr.port) if not ok then core.log.error("Failed to connect to Redis: ", err) return false, err end end
Consulte a new
função do módulo OpenResty Redis.
Chame-o para obter uma instância.
O cliente Redis agora está disponível na variável redis
durante todo o restante do ciclo de execução do plugin.
Na minha vida anterior de engenheiro de software, geralmente implementava primeiro o caminho nominal. Depois, tornei o código mais robusto gerenciando os casos de erro individualmente. Dessa forma, se eu tivesse que divulgar a qualquer momento, ainda entregaria os valores do negócio – com avisos. Abordarei este miniprojeto da mesma maneira.
O pseudoalgoritmo no caminho nominal se parece com o seguinte:
DO extract idempotency key from request DO look up value from Redis IF value doesn't exist DO set key in Redis with empty value ELSE RETURN cached response DO forward to upstream DO store response in Redis RETURN response
Precisamos mapear a lógica para a fase que mencionei acima. Duas fases estão disponíveis antes do upstream, reescrita e acesso ; três depois, header_filter , body_filter e log . A fase de acesso parecia óbvia para o trabalho antes, mas eu precisava descobrir entre as outras três. Escolhi aleatoriamente o body_filter , mas estou mais do que disposto a ouvir argumentos sensatos para outras fases.
Observe que removi os logs para tornar o código mais legível. Logs de erros e informativos são necessários para facilitar a depuração de problemas de produção.
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key --2 local resp, err = redis:hgetall(redis_key) --3 if not resp then return end if next(resp) == nil then --4 local resp, err = redis:hset(redis_key, "request", true ) --4 if not resp then return end else local data = normalize_hgetall_result(resp) --5 local response = core.json.decode(data["response"]) --6 local body = response["body"] --7 local status_code = response["status"] --7 local headers = response["headers"] for k, v in pairs(headers) do --7 core.response.set_header(k, v) end return core.response.exit(status_code, body) --8 end end
return
: APISIX ignora as fases posteriores do ciclo de vida.
function _M.body_filter(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") --1 local redis_key = "idempotency#" .. idempotency_key if core.response then local response = { --2 status = ngx.status, body = core.response.hold_body_chunk(ctx, true), headers = ngx.resp.get_headers() } local redis_key = "idempotency#" .. redis_key local resp, err = red:set(redis_key, "response", core.json.encode(response)) --3 if not resp then return end end end
Extraia a chave de idempotência da solicitação.
Organize os diferentes elementos de uma resposta em uma tabela Lua.
Armazene a resposta codificada em JSON em um conjunto Redis
Os testes revelam que funciona conforme o esperado.
Tentar:
curl -i -X POST -H 'Idempotency-Key: A' localhost:9080/response-headers\?freeform=hello curl -i -H 'Idempotency-Key: B' localhost:9080/status/250 curl -i -H 'Idempotency-Key: C' -H 'foo: bar' localhost:9080/status/250
Além disso, tente reutilizar uma chave de idempotência incompatível, por exemplo , A
para a terceira solicitação. Como ainda não implementamos nenhum gerenciamento de erros, você receberá a resposta em cache para outra solicitação. É hora de melhorar nosso jogo.
A especificação define vários caminhos de erro:
Vamos implementá-los um por um. Primeiro, vamos verificar se a solicitação possui uma chave de idempotência. Observe que podemos configurar o plugin por rota, portanto, se a rota incluir o plugin, podemos concluir que ele é obrigatório.
function _M.access(conf, ctx) local idempotency_key = core.request.header(ctx, "Idempotency-Key") if not idempotency_key then return core.response.exit(400, "This operation is idempotent and it requires correct usage of Idempotency Key") end -- ...
Basta retornar o 400 apropriado se a chave estiver faltando. Esse foi fácil.
Verificar a reutilização de uma chave existente para uma solicitação diferente é um pouco mais complicado. Primeiro precisamos armazenar a solicitação, ou mais precisamente, a impressão digital do que constitui uma solicitação. Duas solicitações são iguais se tiverem: o mesmo método, o mesmo caminho, o mesmo corpo e os mesmos cabeçalhos. Dependendo da sua situação, o domínio (e a porta) podem ou não fazer parte deles. Para minha implementação simples, vou deixar isso de fora.
Existem vários problemas para resolver. Primeiro, não encontrei uma API existente para fazer hash do objeto core.request
como existe em outras linguagens com as quais estou mais familiarizado, por exemplo , Object.hash()
do Java. Decidi codificar o objeto em JSON e fazer o hash da string. No entanto, o core.request
existente possui subelementos que não podem ser convertidos em JSON. Tive que extrair as partes citadas acima e converter a tabela.
local function hash_request(request, ctx) local request = { --1 method = core.request.get_method(), uri = ctx.var.request_uri, headers = core.request.headers(), body = core.request.get_body() } local json = core.json.stably_encode(request) --2 return ngx.encode_base64(json) --3 end
Crie uma tabela apenas com as partes relevantes.
A biblioteca cjson
produz JSON cujos membros podem ser classificados de forma diferente em diversas chamadas. Conseqüentemente, isso resulta em hashes diferentes. O core.json.stably_encode
corrige esse problema.
Faça isso.
Então, em vez de armazenar um booleano ao receber a solicitação, armazenamos o hash resultante.
local hash = hash_request(core.request, ctx) if next(resp) == nil then core.log.warn("No key found in Redis for Idempotency-Key, set it: ", redis_key) local resp, err = redis:hset(redis_key, "request", hash) if not resp then core.log.error("Failed to set data in Redis: ", err) return end then -- ...
Lemos o hash armazenado na chave de idempotência na outra ramificação. Se não corresponderem, saímos com o código de erro relevante:
local data = normalize_hgetall_result(resp) local stored_hash = data["request"] if hash ~= stored_hash then return core.response.exit(422, "This operation is idempotent and it requires correct usage of Idempotency Key. Idempotency Key MUST not be reused across different payloads of this operation.") end
O gerenciamento final de erros acontece logo depois. Imagine o seguinte cenário:
Uma solicitação vem com chave de idempotência X.
O plugin identifica e armazena o hash no Redis.
APISIX encaminha a solicitação para o upstream.
Uma solicitação duplicada vem com a mesma chave de idempotência, X.
O plugin lê os dados do Redis e não encontra resposta em cache.
O upstream não concluiu o processamento da solicitação; portanto, a primeira solicitação ainda não atingiu a fase body_filter
.
Anexamos o seguinte código ao trecho acima:
if not data["response"] then return core.response.exit(409, " request with the same Idempotency-Key for the same operation is being processed or is outstanding.") end
É isso.
Neste post, mostrei uma implementação simples da especificação do cabeçalho Idempotency-Key
no Apache APISIX por meio de um plugin. Nesta fase, há espaço para melhorias: testes automatizados, capacidade de configurar Redis por rota, configurar o domínio/caminho para fazer parte da solicitação, configurar um cluster Redis em vez de uma única instância, usar outro K/ Loja V, etc.
No entanto, ele implementa a especificação e tem potencial para evoluir para uma implementação de nível de produção.
O código-fonte completo desta postagem pode ser encontrado no GitHub .
Ir adiante:
Publicado originalmente em A Java Geek em 7 de abril de 2024