こんにちは、みんな!ここ数年、私はさまざまな「アシスタント」電報ボットを Python で自分用に作成してきました。これは、何かについての通知、サービス稼働時間の確認、電報チャネルやチャットからの興味深いコンテンツの転送など、さまざまな小さな日常タスクを処理します。
一般に、私は HackerNoon 読者と共有したい、さまざまな小さなプロジェクト テンプレートをたくさん蓄積しました。
私は数日前にこの特定のプロジェクトを完了しましたが、すでに多くのメリットをもたらしています。私は Web3 インフラストラクチャ プロバイダーのchainstack.com で働いており、EVM ブロックチェーン上のスマート コントラクトからのデータのインデックス付けサービスを扱っています。
私はインフラストラクチャ部門が使用している Grafana、BetterUptime などの既製のツールを使おうと何時間も費やしましたが、システムの内部にはほとんど興味がなく、私にとって主な焦点は入口と出口のメトリクスでした。最後に、次のことを行う独自のボットを作成することにしました。
cd ~ virtualenv -p python3.8 up_env # crete a virtualenv source ~/up_env/bin/activate # activate the virtualenl
pip install python-telegram-bot pip install "python-telegram-bot[job-queue]" --pre pip install --upgrade python-telegram-bot==13.6.0 # the code was written before version 20, so here the version is explicitly specified pip install numpy # needed for the median value function pip install web3 # needed for requests to nodes (replace with what you need)
関数を含むファイルfunction.py (クラスで実装できますが、サンプルが短いため、モジュールに分割するつもりはありませんでしたが、マルチスレッド ライブラリでは関数を別のファイルに移動する必要があります)。依存関係をインポートします。
import numpy as np import multiprocessing from web3 import Web3 # add those libraries needed for your task
状態を確認するための関数を記述します。私の場合、事前に選択したパブリック ノードをループし、最後のブロックを取得し、中央値を取得して逸脱を除外し、次にこの中央値に対して独自のノードをチェックする必要がありました。
サービス状態チェック関数 (独自のものに置き換えることができます):
# Helper function that checks a single node def get_last_block_once(rpc): try: w3 = Web3(Web3.HTTPProvider(rpc)) block_number = w3.eth.block_number if isinstance(block_number, int): return block_number else: return None except Exception as e: print(f'{rpc} - {repr(e)}') return None # Main function to check the status of the service that will be called def check_service(): # pre-prepared list of reference nodes # for any network, it can be found on the website https://chainlist.org/ list_of_public_nodes = [ 'https://polygon.llamarpc.com', 'https://polygon.rpc.blxrbdn.com', 'https://polygon.blockpi.network/v1/rpc/public', 'https://polygon-mainnet.public.blastapi.io', 'https://rpc-mainnet.matic.quiknode.pro', 'https://polygon-bor.publicnode.com', 'https://poly-rpc.gateway.pokt.network', 'https://rpc.ankr.com/polygon', 'https://polygon-rpc.com' ] # parallel processing of requests to all nodes with multiprocessing.Pool(processes=len(list_of_public_nodes)) as pool: results = pool.map(get_last_block_once, list_of_public_nodes) last_blocks = [b for b in results if b is not None and isinstance(b, int)] # define the maximum and median value of the current block med_val = int(np.median(last_blocks)) max_val = int(np.max(last_blocks)) # determine the number of nodes with the maximum and median value med_support = np.sum([1 for x in last_blocks if x == med_val]) max_support = np.sum([1 for x in last_blocks if x == max_val]) return max_val, max_support, med_val, med_support
import telegram from telegram.ext import Updater, CommandHandler, Filters from functions import get_last_block_once, check_service # Here one can to set a limited circle of bot users, # listing the usernames of the users ALLOWED_USERS = ['your_telegram_account', 'someone_else'] # The address of the node that I am monitoring (also a public node in this case) OBJECT_OF_CHECKING = 'https://polygon-mainnet.chainstacklabs.com' # Threshold for highlighting critical lag THRESHOLD = 5
次に、ボットの UI からコマンドが発行されたときに呼び出される関数を説明します。
def start(update, context): """Send a message when the command /start is issued.""" try: # Get the user user = update.effective_user # Filter out bots if user.is_bot: return # Check if the user is allowed username = str(user.username) if username not in ALLOWED_USERS: return except Exception as e: print(f'{repr(e)}') return # Call the main function to check the network status max_val, max_support, med_val, med_support = check_service() # Call the function to check the status of the specified node last_block = get_last_block_once(OBJECT_OF_CHECKING) # Create the message to send to Telegram message = "" # Information about the state of the nodes in the public network (median, maximum, and number of nodes) message += f"Public median block number {med_val} (on {med_support}) RPCs\n" message += f"Public maximum block number +{max_val - med_val} (on {max_support}) PRCs\n" # Compare with the threshold if last_block is not None: out_text = str(last_block - med_val) if last_block - med_val < 0 else '+' + str(last_block - med_val) if abs(last_block - med_val) > THRESHOLD: message += f"The node block number shift ⚠️<b>{out_text}</b>⚠️" else: message += f"The node block number shift {out_text}" else: # Exception processing if a node has not responded message += f"The node has ⚠️<b>not responded</b>⚠️" # Send the message to the user context.bot.send_message(chat_id=user.id, text=message, parse_mode="HTML")
token = "xxx" # Bot token obtained from BotFather # set up the bot bot = telegram.Bot(token=token) updater = Updater(token=token, use_context=True) dispatcher = updater.dispatcher # bind the handler function dispatcher.add_handler(CommandHandler("start", start, filters=Filters.chat_type.private)) # run the bot updater.start_polling()
最後に、以下を使用して安価な VPS サーバーでコードを実行できます。
source ~/up_env/bin/activate python uptime_bot.py
systemd ユニットファイルを設定した後。
次の記事では、残りの 2 つのタスクを実装する方法について説明します。
リクエストに応じて、過去 X 時間に発生したイベントを示すグラフを取得します。
プロジェクトのソース コードは、GitHubリポジトリで入手できます。このチュートリアルが役立つと思われた場合は、お気軽に GitHub でスターを付けていただければ幸いです 🙂