大家好!几年来,我一直在用 Python 为自己编写各种“助理”电报机器人,它们为我处理各种小型日常任务 - 通知我某件事、检查服务正常运行时间、从电报频道和聊天转发有趣的内容等等。
这很方便,因为手机总是在手边,而且无需打开笔记本电脑就能修复服务器上的某些内容,这给我带来了特别的乐趣。
总的来说,我积累了很多不同的小项目模板,想与 HackerNoon 读者分享。
我会立即说,这些示例可能就其应用程序“按原样”而言是利基市场,但我将标记那些地方,通过将几行代码更改为您自己的代码,您将能够重用大部分代码您的项目的发展。
几天前我完成了这个具体项目,它已经给我带来了很多好处。我在 Web3 基础设施提供商 chainstack.com 工作,负责处理 EVM 区块链上智能合约数据索引服务。
正在开发的服务的质量关键取决于服务在线检索数据的节点的运行情况。
我花了很多时间尝试使用我们基础设施部门使用的现成工具,例如 Grafana、BetterUptime 等,但由于我对系统内部结构没什么兴趣,所以我主要关注的是入口处的指标和退出后,我决定编写自己的机器人,它将执行以下操作:
在本文中,我将重点关注第一部分,即根据请求接收指标。
我们需要一个新的虚拟工作环境。
cd ~ virtualenv -p python3.8 up_env # crete a virtualenv source ~/up_env/bin/activate # activate the virtualenl
安装依赖项:
pip install python-telegram-bot pip install "python-telegram-bot[job-queue]" --pre pip install --upgrade python-telegram-bot==13.6.0 # the code was written before version 20, so here the version is explicitly specified pip install numpy # needed for the median value function pip install web3 # needed for requests to nodes (replace with what you need)
包含函数functions.py的文件(可以用类来实现,但由于示例很短,我不打算将其分成模块,但是多线程库需要将函数移动到单独的文件中)。导入依赖项:
import numpy as np import multiprocessing from web3 import Web3 # add those libraries needed for your task
描述检查状态的函数。就我而言,它涉及循环预先选择的公共节点,检索它们的最后一个块,采用中值来过滤掉任何偏差,然后根据该中值检查我们自己的节点。
服务状态检查功能(可以替换为自己的):
# Helper function that checks a single node def get_last_block_once(rpc): try: w3 = Web3(Web3.HTTPProvider(rpc)) block_number = w3.eth.block_number if isinstance(block_number, int): return block_number else: return None except Exception as e: print(f'{rpc} - {repr(e)}') return None # Main function to check the status of the service that will be called def check_service(): # pre-prepared list of reference nodes # for any network, it can be found on the website https://chainlist.org/ list_of_public_nodes = [ 'https://polygon.llamarpc.com', 'https://polygon.rpc.blxrbdn.com', 'https://polygon.blockpi.network/v1/rpc/public', 'https://polygon-mainnet.public.blastapi.io', 'https://rpc-mainnet.matic.quiknode.pro', 'https://polygon-bor.publicnode.com', 'https://poly-rpc.gateway.pokt.network', 'https://rpc.ankr.com/polygon', 'https://polygon-rpc.com' ] # parallel processing of requests to all nodes with multiprocessing.Pool(processes=len(list_of_public_nodes)) as pool: results = pool.map(get_last_block_once, list_of_public_nodes) last_blocks = [b for b in results if b is not None and isinstance(b, int)] # define the maximum and median value of the current block med_val = int(np.median(last_blocks)) max_val = int(np.max(last_blocks)) # determine the number of nodes with the maximum and median value med_support = np.sum([1 for x in last_blocks if x == med_val]) max_support = np.sum([1 for x in last_blocks if x == max_val]) return max_val, max_support, med_val, med_support
机器人的下一个重要文件是uptime_bot.py 。我们从上面的文件导入库和函数并设置必要的常量:
import telegram from telegram.ext import Updater, CommandHandler, Filters from functions import get_last_block_once, check_service # Here one can to set a limited circle of bot users, # listing the usernames of the users ALLOWED_USERS = ['your_telegram_account', 'someone_else'] # The address of the node that I am monitoring (also a public node in this case) OBJECT_OF_CHECKING = 'https://polygon-mainnet.chainstacklabs.com' # Threshold for highlighting critical lag THRESHOLD = 5
接下来,我们来描述从机器人 UI 发出命令时将调用的函数。
def start(update, context): """Send a message when the command /start is issued.""" try: # Get the user user = update.effective_user # Filter out bots if user.is_bot: return # Check if the user is allowed username = str(user.username) if username not in ALLOWED_USERS: return except Exception as e: print(f'{repr(e)}') return # Call the main function to check the network status max_val, max_support, med_val, med_support = check_service() # Call the function to check the status of the specified node last_block = get_last_block_once(OBJECT_OF_CHECKING) # Create the message to send to Telegram message = "" # Information about the state of the nodes in the public network (median, maximum, and number of nodes) message += f"Public median block number {med_val} (on {med_support}) RPCs\n" message += f"Public maximum block number +{max_val - med_val} (on {max_support}) PRCs\n" # Compare with the threshold if last_block is not None: out_text = str(last_block - med_val) if last_block - med_val < 0 else '+' + str(last_block - med_val) if abs(last_block - med_val) > THRESHOLD: message += f"The node block number shift ⚠️<b>{out_text}</b>⚠️" else: message += f"The node block number shift {out_text}" else: # Exception processing if a node has not responded message += f"The node has ⚠️<b>not responded</b>⚠️" # Send the message to the user context.bot.send_message(chat_id=user.id, text=message, parse_mode="HTML")
现在,剩下的就是添加初始化机器人的部分,并连接处理程序函数:
token = "xxx" # Bot token obtained from BotFather # set up the bot bot = telegram.Bot(token=token) updater = Updater(token=token, use_context=True) dispatcher = updater.dispatcher # bind the handler function dispatcher.add_handler(CommandHandler("start", start, filters=Filters.chat_type.private)) # run the bot updater.start_polling()
最后,您可以使用以下命令在廉价的 VPS 服务器上运行代码:
source ~/up_env/bin/activate python uptime_bot.py
配置systemd单元文件后。
结果,机器人的工作将如下所示。
如果一切顺利:
如果滞后变得太大,则如下:
在接下来的文章中,我将描述如何实现剩下的两个任务:
根据请求检索显示过去 X 小时内发生的事件的图表。
接收指示当前正在发生某事并需要采取措施的警报。
该项目的源代码可在 GitHub存储库中获取。如果您发现本教程有帮助,请随时在 GitHub 上给它一颗星,我将不胜感激🙂