Olá pessoal! Por vários anos, tenho escrito vários bots de telegrama "assistentes" para mim em Python que lidam com várias pequenas tarefas de rotina para mim - notificando-me sobre algo, verificando o tempo de atividade do serviço, encaminhando conteúdo interessante de canais de telegrama e bate-papos e assim por diante.
Isso é conveniente porque o telefone está sempre à mão e poder consertar algo no servidor sem nem mesmo abrir meu laptop me dá um prazer especial.
Em geral, acumulei muitos modelos de pequenos projetos diferentes que desejo compartilhar com os leitores do HackerNoon.
Direi desde já que os exemplos podem ser de nicho em termos de aplicação "como estão", mas marcarei aqueles lugares onde, alterando algumas linhas de código para as suas, você poderá reutilizar a maior parte do desenvolvimentos para seus projetos.
Concluí este projeto específico há alguns dias e já me trouxe muitos benefícios. Eu trabalho em um provedor de infraestrutura Web3 chainstack.com, lidando com um serviço para indexar dados de contratos inteligentes em blockchains EVM.
E a qualidade do serviço que está sendo desenvolvido depende criticamente de quão "bem" estão funcionando os nós dos quais o serviço recupera dados online.
Passei muitas horas tentando usar ferramentas prontas que nossa divisão de infraestrutura usa, como Grafana, BetterUptime e outras, mas como tenho pouco interesse nas partes internas do sistema, sendo o foco principal para mim as métricas na entrada e a saída, decidi escrever meu próprio bot, que faria o seguinte:
Neste artigo, vou focar na primeira parte, ou seja, receber métricas sob demanda.
Vamos precisar de um novo ambiente virtual para trabalhar.
cd ~ virtualenv -p python3.8 up_env # crete a virtualenv source ~/up_env/bin/activate # activate the virtualenl
Instalar dependências:
pip install python-telegram-bot pip install "python-telegram-bot[job-queue]" --pre pip install --upgrade python-telegram-bot==13.6.0 # the code was written before version 20, so here the version is explicitly specified pip install numpy # needed for the median value function pip install web3 # needed for requests to nodes (replace with what you need)
Arquivo com funções functions.py (você pode implementá-lo com classes, mas como o exemplo é curto, não planejei dividi-lo em módulos, mas uma biblioteca multithreading requer que as funções sejam movidas para um arquivo separado). Dependências de importação:
import numpy as np import multiprocessing from web3 import Web3 # add those libraries needed for your task
Descrever uma função para verificar o estado. No meu caso, envolvia um loop através de nós públicos pré-selecionados, recuperando seu último bloco, pegando o valor mediano para filtrar quaisquer desvios e, em seguida, verificando nosso próprio nó contra essa mediana.
Função de verificação do estado do serviço (você pode substituí-la por sua própria):
# Helper function that checks a single node def get_last_block_once(rpc): try: w3 = Web3(Web3.HTTPProvider(rpc)) block_number = w3.eth.block_number if isinstance(block_number, int): return block_number else: return None except Exception as e: print(f'{rpc} - {repr(e)}') return None # Main function to check the status of the service that will be called def check_service(): # pre-prepared list of reference nodes # for any network, it can be found on the website https://chainlist.org/ list_of_public_nodes = [ 'https://polygon.llamarpc.com', 'https://polygon.rpc.blxrbdn.com', 'https://polygon.blockpi.network/v1/rpc/public', 'https://polygon-mainnet.public.blastapi.io', 'https://rpc-mainnet.matic.quiknode.pro', 'https://polygon-bor.publicnode.com', 'https://poly-rpc.gateway.pokt.network', 'https://rpc.ankr.com/polygon', 'https://polygon-rpc.com' ] # parallel processing of requests to all nodes with multiprocessing.Pool(processes=len(list_of_public_nodes)) as pool: results = pool.map(get_last_block_once, list_of_public_nodes) last_blocks = [b for b in results if b is not None and isinstance(b, int)] # define the maximum and median value of the current block med_val = int(np.median(last_blocks)) max_val = int(np.max(last_blocks)) # determine the number of nodes with the maximum and median value med_support = np.sum([1 for x in last_blocks if x == med_val]) max_support = np.sum([1 for x in last_blocks if x == max_val]) return max_val, max_support, med_val, med_support
O próximo arquivo importante do bot é uptime_bot.py . Importamos bibliotecas e funções do arquivo acima e definimos as constantes necessárias:
import telegram from telegram.ext import Updater, CommandHandler, Filters from functions import get_last_block_once, check_service # Here one can to set a limited circle of bot users, # listing the usernames of the users ALLOWED_USERS = ['your_telegram_account', 'someone_else'] # The address of the node that I am monitoring (also a public node in this case) OBJECT_OF_CHECKING = 'https://polygon-mainnet.chainstacklabs.com' # Threshold for highlighting critical lag THRESHOLD = 5
Em seguida, vamos descrever uma função que será chamada quando o comando for emitido a partir da IU do bot.
def start(update, context): """Send a message when the command /start is issued.""" try: # Get the user user = update.effective_user # Filter out bots if user.is_bot: return # Check if the user is allowed username = str(user.username) if username not in ALLOWED_USERS: return except Exception as e: print(f'{repr(e)}') return # Call the main function to check the network status max_val, max_support, med_val, med_support = check_service() # Call the function to check the status of the specified node last_block = get_last_block_once(OBJECT_OF_CHECKING) # Create the message to send to Telegram message = "" # Information about the state of the nodes in the public network (median, maximum, and number of nodes) message += f"Public median block number {med_val} (on {med_support}) RPCs\n" message += f"Public maximum block number +{max_val - med_val} (on {max_support}) PRCs\n" # Compare with the threshold if last_block is not None: out_text = str(last_block - med_val) if last_block - med_val < 0 else '+' + str(last_block - med_val) if abs(last_block - med_val) > THRESHOLD: message += f"The node block number shift ⚠️<b>{out_text}</b>⚠️" else: message += f"The node block number shift {out_text}" else: # Exception processing if a node has not responded message += f"The node has ⚠️<b>not responded</b>⚠️" # Send the message to the user context.bot.send_message(chat_id=user.id, text=message, parse_mode="HTML")
Agora, só falta adicionar a parte onde o bot é inicializado, e a função do handler é conectada:
token = "xxx" # Bot token obtained from BotFather # set up the bot bot = telegram.Bot(token=token) updater = Updater(token=token, use_context=True) dispatcher = updater.dispatcher # bind the handler function dispatcher.add_handler(CommandHandler("start", start, filters=Filters.chat_type.private)) # run the bot updater.start_polling()
Por fim, você pode executar o código em um servidor VPS barato usando:
source ~/up_env/bin/activate python uptime_bot.py
Depois de configurar o arquivo systemd unit.
Como resultado, o trabalho do bot ficará assim.
Se estiver tudo bem:
E se o atraso se tornar muito grande, faça o seguinte:
Nos artigos a seguir, descreverei como implementar as duas tarefas restantes:
Recupere gráficos a pedido mostrando os eventos ocorridos nas últimas X horas.
Receba um alerta indicando que algo está acontecendo no momento e requer ação.
O código-fonte do projeto está disponível no repositório GitHub. Se você achou este tutorial útil, sinta-se à vontade para dar uma estrela no GitHub, eu agradeceria🙂