Most blockchain developers hear about MEV bots making thousands daily and wonder — how does that actually work? The answer lies in one overlooked piece of infrastructure: the mempool.
I spent months building MEV systems from scratch, hitting every possible roadblock along the way. This guide shares what actually works, skipping the theory and focusing on implementation details that tutorials rarely cover.
By the end, you'll have working code for monitoring pending transactions, identifying profitable opportunities, and submitting bundles through Flashbots — the complete pipeline from detection to extraction.
What We're Building
Here's the architecture we'll implement:
┌────────────────────────────────────────────────────────────────┐
│ MEV Bot Architecture │
├────────────────────────────────────────────────────────────────┤
│ │
│ [Ethereum Node] │
│ │ │
│ ▼ │
│ [WebSocket Stream] ──► [Transaction Parser] │
│ │ │
│ ▼ │
│ [Opportunity Identifier] │
│ │ │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ [Arbitrage] [Liquidation] [Backrun] │
│ │ │ │ │
│ └───────────┼───────────┘ │
│ ▼ │
│ [Profit Calculator] │
│ │ │
│ ▼ │
│ [Bundle Constructor] │
│ │ │
│ ▼ │
│ [Flashbots Submission] │
│ │
└────────────────────────────────────────────────────────────────┘
Let's build each component.
Part 1: Connecting to the Mempool
The mempool isn't a single database — it's a collection of unconfirmed transactions sitting in memory across thousands of nodes. Each node has slightly different contents based on network propagation timing.
Running Your Own Node
Public RPCs won't cut it for MEV. They filter mempool data, add latency, and rate-limit aggressively. You need your own node.
Geth Setup (Recommended):
# Install Geth
sudo add-apt-repository -y ppa:ethereum/ethereum
sudo apt-get update
sudo apt-get install ethereum
# Create data directory
mkdir -p /data/ethereum
# Start with MEV-optimized settings
geth \
--datadir /data/ethereum \
--http \
--http.api eth,net,web3,txpool,debug \
--http.addr 127.0.0.1 \
--http.port 8545 \
--ws \
--ws.api eth,net,web3,txpool \
--ws.addr 127.0.0.1 \
--ws.port 8546 \
--txpool.globalslots 100000 \
--txpool.globalqueue 25000 \
--txpool.accountslots 1024 \
--txpool.accountqueue 512 \
--maxpeers 150 \
--cache 16384 \
--syncmode snap
Why These Settings Matter:
|
Parameter |
Value |
Purpose |
|---|---|---|
|
|
100000 |
Maximum pending transactions to track |
|
|
25000 |
Queued transactions awaiting nonce |
|
|
150 |
More peers = faster transaction propagation |
|
|
16384 |
RAM cache for state access speed |
Establishing WebSocket Connection
Once your node syncs, connect via WebSocket for real-time transaction streaming:
import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Optional, Callable, List
import aiohttp
@dataclass
class PendingTransaction:
hash: str
sender: str
recipient: Optional[str]
value: int
input_data: str
gas_limit: int
gas_price: Optional[int]
max_fee: Optional[int]
max_priority_fee: Optional[int]
nonce: int
@property
def effective_gas_price(self) -> int:
return self.gas_price or self.max_fee or 0
class MempoolStream:
"""Real-time mempool transaction stream"""
def __init__(self, ws_endpoint: str, http_endpoint: str):
self.ws_endpoint = ws_endpoint
self.http_endpoint = http_endpoint
self.handlers: List[Callable] = []
self.running = False
def on_transaction(self, handler: Callable):
"""Register handler for new transactions"""
self.handlers.append(handler)
return handler
async def start(self):
"""Begin streaming pending transactions"""
self.running = True
while self.running:
try:
async with websockets.connect(
self.ws_endpoint,
ping_interval=30,
ping_timeout=10
) as ws:
# Subscribe to pending transactions
await ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions"]
}))
response = await ws.recv()
sub_id = json.loads(response).get("result")
print(f"✓ Subscribed to mempool (ID: {sub_id})")
async for message in ws:
await self._handle_message(message)
except websockets.ConnectionClosed:
print("Connection lost, reconnecting...")
await asyncio.sleep(1)
except Exception as e:
print(f"Error: {e}, retrying...")
await asyncio.sleep(5)
async def _handle_message(self, message: str):
"""Process incoming WebSocket message"""
data = json.loads(message)
if "params" not in data:
return
tx_hash = data["params"]["result"]
# Fetch full transaction details
tx = await self._fetch_transaction(tx_hash)
if tx:
# Notify all handlers
for handler in self.handlers:
try:
await handler(tx)
except Exception as e:
print(f"Handler error: {e}")
async def _fetch_transaction(self, tx_hash: str) -> Optional[PendingTransaction]:
"""Retrieve full transaction data"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.http_endpoint,
json={
"jsonrpc": "2.0",
"id": 1,
"method": "eth_getTransactionByHash",
"params": [tx_hash]
}
) as resp:
result = await resp.json()
tx_data = result.get("result")
if not tx_data:
return None
return PendingTransaction(
hash=tx_hash,
sender=tx_data["from"],
recipient=tx_data.get("to"),
value=int(tx_data["value"], 16),
input_data=tx_data["input"],
gas_limit=int(tx_data["gas"], 16),
gas_price=int(tx_data["gasPrice"], 16) if tx_data.get("gasPrice") else None,
max_fee=int(tx_data["maxFeePerGas"], 16) if tx_data.get("maxFeePerGas") else None,
max_priority_fee=int(tx_data["maxPriorityFeePerGas"], 16) if tx_data.get("maxPriorityFeePerGas") else None,
nonce=int(tx_data["nonce"], 16)
)
def stop(self):
self.running = False
# Usage example
async def main():
stream = MempoolStream(
ws_endpoint="ws://localhost:8546",
http_endpoint="http://localhost:8545"
)
@stream.on_transaction
async def log_transaction(tx: PendingTransaction):
print(f"New TX: {tx.hash[:16]}... | To: {tx.recipient} | Gas: {tx.effective_gas_price / 1e9:.2f} gwei")
await stream.start()
if __name__ == "__main__":
asyncio.run(main())
Part 2: Decoding Transaction Intent
Raw transaction data is just hex bytes. To find MEV opportunities, we need to understand what each transaction is trying to do.
Function Selector Database
Every smart contract call begins with a 4-byte function selector:
0x38ed1739...
^^^^^^^^
Function selector = keccak256("swapExactTokensForTokens(uint256,uint256,address[],address,uint256)")[:8]
Build a database mapping selectors to function signatures:
from eth_abi import decode
from typing import Dict, Any, Optional
import hashlib
class FunctionRegistry:
"""Maps function selectors to decoders"""
def __init__(self):
self.functions: Dict[str, dict] = {}
self._register_common_functions()
def _register_common_functions(self):
"""Pre-register known DeFi functions"""
# Uniswap V2 Router
self.register(
"swapExactTokensForTokens(uint256,uint256,address[],address,uint256)",
["uint256", "uint256", "address[]", "address", "uint256"],
["amountIn", "amountOutMin", "path", "to", "deadline"],
protocol="uniswap_v2",
action="swap"
)
self.register(
"swapTokensForExactTokens(uint256,uint256,address[],address,uint256)",
["uint256", "uint256", "address[]", "address", "uint256"],
["amountOut", "amountInMax", "path", "to", "deadline"],
protocol="uniswap_v2",
action="swap"
)
self.register(
"swapExactETHForTokens(uint256,address[],address,uint256)",
["uint256", "address[]", "address", "uint256"],
["amountOutMin", "path", "to", "deadline"],
protocol="uniswap_v2",
action="swap"
)
self.register(
"swapExactTokensForETH(uint256,uint256,address[],address,uint256)",
["uint256", "uint256", "address[]", "address", "uint256"],
["amountIn", "amountOutMin", "path", "to", "deadline"],
protocol="uniswap_v2",
action="swap"
)
# Uniswap V3 Router
self.register(
"exactInputSingle((address,address,uint24,address,uint256,uint256,uint256,uint160))",
["(address,address,uint24,address,uint256,uint256,uint256,uint160)"],
["params"],
protocol="uniswap_v3",
action="swap"
)
self.register(
"exactInput((bytes,address,uint256,uint256,uint256))",
["(bytes,address,uint256,uint256,uint256)"],
["params"],
protocol="uniswap_v3",
action="swap"
)
# Aave V3
self.register(
"liquidationCall(address,address,address,uint256,bool)",
["address", "address", "address", "uint256", "bool"],
["collateralAsset", "debtAsset", "user", "debtToCover", "receiveAToken"],
protocol="aave_v3",
action="liquidation"
)
self.register(
"supply(address,uint256,address,uint16)",
["address", "uint256", "address", "uint16"],
["asset", "amount", "onBehalfOf", "referralCode"],
protocol="aave_v3",
action="supply"
)
self.register(
"borrow(address,uint256,uint256,uint16,address)",
["address", "uint256", "uint256", "uint16", "address"],
["asset", "amount", "interestRateMode", "referralCode", "onBehalfOf"],
protocol="aave_v3",
action="borrow"
)
# ERC20
self.register(
"transfer(address,uint256)",
["address", "uint256"],
["to", "amount"],
protocol="erc20",
action="transfer"
)
self.register(
"approve(address,uint256)",
["address", "uint256"],
["spender", "amount"],
protocol="erc20",
action="approve"
)
def register(self, signature: str, types: list, names: list, protocol: str, action: str):
"""Register a function signature"""
selector = "0x" + hashlib.sha3_256(signature.encode()).hexdigest()[:8]
# Actually use keccak256
from eth_utils import keccak
selector = "0x" + keccak(text=signature).hex()[:8]
self.functions[selector] = {
"signature": signature,
"types": types,
"names": names,
"protocol": protocol,
"action": action
}
def decode(self, input_data: str) -> Optional[Dict[str, Any]]:
"""Decode transaction input data"""
if len(input_data) < 10:
return None
selector = input_data[:10].lower()
if selector not in self.functions:
return None
func = self.functions[selector]
try:
# Decode parameters
params_hex = input_data[10:]
params_bytes = bytes.fromhex(params_hex)
decoded_values = decode(func["types"], params_bytes)
# Map to named parameters
params = {}
for name, value in zip(func["names"], decoded_values):
params[name] = value
return {
"selector": selector,
"signature": func["signature"],
"protocol": func["protocol"],
"action": func["action"],
"params": params
}
except Exception as e:
return None
class TransactionAnalyzer:
"""Analyzes decoded transactions for MEV potential"""
def __init__(self):
self.registry = FunctionRegistry()
# Known router addresses
self.routers = {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "uniswap_v2",
"0xe592427a0aece92de3edee1f18e0157c05861564": "uniswap_v3",
"0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "uniswap_v3_router2",
"0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "sushiswap",
}
def analyze(self, tx: PendingTransaction) -> Optional[Dict[str, Any]]:
"""Analyze transaction for MEV opportunities"""
# Skip contract deployments
if tx.recipient is None:
return None
# Decode the function call
decoded = self.registry.decode(tx.input_data)
if not decoded:
return None
# Identify the protocol from recipient address
recipient_lower = tx.recipient.lower()
router_protocol = self.routers.get(recipient_lower)
return {
"tx_hash": tx.hash,
"sender": tx.sender,
"recipient": tx.recipient,
"value_wei": tx.value,
"gas_price": tx.effective_gas_price,
"decoded": decoded,
"router_protocol": router_protocol,
"is_swap": decoded["action"] == "swap",
"is_liquidation": decoded["action"] == "liquidation"
}
Part 3: Identifying Profitable Opportunities
Now we can see what transactions are doing. Next step: figure out which ones create MEV opportunities.
Arbitrage Detection Engine
When someone swaps on Uniswap, they move the price. If that price differs from other DEXes, arbitrage exists.
from decimal import Decimal
from typing import List, Tuple
import asyncio
class DEXPricer:
"""Fetches prices from various DEXes"""
def __init__(self, web3_client):
self.w3 = web3_client
# Uniswap V2 pair ABI (minimal)
self.pair_abi = [
{
"name": "getReserves",
"type": "function",
"inputs": [],
"outputs": [
{"name": "reserve0", "type": "uint112"},
{"name": "reserve1", "type": "uint112"},
{"name": "blockTimestampLast", "type": "uint32"}
]
},
{
"name": "token0",
"type": "function",
"inputs": [],
"outputs": [{"type": "address"}]
},
{
"name": "token1",
"type": "function",
"inputs": [],
"outputs": [{"type": "address"}]
}
]
async def get_reserves(self, pair_address: str) -> Tuple[int, int]:
"""Get current reserves from Uniswap V2 pair"""
pair = self.w3.eth.contract(
address=self.w3.to_checksum_address(pair_address),
abi=self.pair_abi
)
reserves = pair.functions.getReserves().call()
return reserves[0], reserves[1]
def calculate_output(self, amount_in: int, reserve_in: int, reserve_out: int, fee: Decimal = Decimal("0.003")) -> int:
"""Calculate swap output using constant product formula"""
amount_in_with_fee = Decimal(amount_in) * (1 - fee)
numerator = amount_in_with_fee * Decimal(reserve_out)
denominator = Decimal(reserve_in) + amount_in_with_fee
return int(numerator / denominator)
def calculate_price_impact(self, amount_in: int, reserve_in: int, reserve_out: int) -> Decimal:
"""Calculate price impact of a swap"""
spot_price = Decimal(reserve_out) / Decimal(reserve_in)
output = self.calculate_output(amount_in, reserve_in, reserve_out)
execution_price = Decimal(output) / Decimal(amount_in)
impact = (spot_price - execution_price) / spot_price
return impact
class ArbitrageScanner:
"""Scans for cross-DEX arbitrage opportunities"""
def __init__(self, pricer: DEXPricer):
self.pricer = pricer
# Token pair addresses across DEXes
# Example: WETH/USDC pairs
self.pair_registry = {
("WETH", "USDC"): {
"uniswap_v2": "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc",
"sushiswap": "0x397FF1542f962076d0BFE58eA045FfA2d347ACa0",
},
("WETH", "USDT"): {
"uniswap_v2": "0x0d4a11d5EEaaC28EC3F61d100daF4d40471f1852",
"sushiswap": "0x06da0fd433C1A5d7a4faa01111c044910A184553",
}
}
async def find_arbitrage(self, analyzed_tx: dict) -> Optional[dict]:
"""Check if a pending swap creates arbitrage"""
if not analyzed_tx.get("is_swap"):
return None
decoded = analyzed_tx["decoded"]
params = decoded["params"]
# Extract swap path
path = params.get("path", [])
if len(path) < 2:
return None
token_in = path[0]
token_out = path[-1]
amount_in = params.get("amountIn", 0)
if amount_in == 0:
return None
# Find this pair on other DEXes
opportunities = []
source_dex = analyzed_tx.get("router_protocol", "unknown")
# Simulate the pending swap's price impact
# Then check if other DEXes have better prices
for pair_key, dex_pairs in self.pair_registry.items():
# Check if this trade involves this pair
# Simplified check - production would need token address matching
for dex_name, pair_address in dex_pairs.items():
if dex_name == source_dex:
continue
try:
r0, r1 = await self.pricer.get_reserves(pair_address)
# Calculate potential arbitrage profit
# Buy on DEX with lower price, sell on DEX with higher price
output = self.pricer.calculate_output(amount_in, r0, r1)
# Estimate gas cost
estimated_gas = 150000 # Typical swap gas
gas_cost = estimated_gas * analyzed_tx["gas_price"]
potential_profit = output - amount_in - gas_cost
if potential_profit > 0:
opportunities.append({
"type": "arbitrage",
"source_dex": source_dex,
"target_dex": dex_name,
"pair_address": pair_address,
"amount_in": amount_in,
"expected_output": output,
"estimated_profit_wei": potential_profit,
"estimated_profit_eth": potential_profit / 1e18,
"trigger_tx": analyzed_tx["tx_hash"]
})
except Exception as e:
continue
if opportunities:
# Return the most profitable opportunity
return max(opportunities, key=lambda x: x["estimated_profit_wei"])
return None
class LiquidationScanner:
"""Monitors lending protocols for liquidation opportunities"""
def __init__(self, web3_client):
self.w3 = web3_client
# Aave V3 Pool address (Ethereum mainnet)
self.aave_pool = "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2"
# Simplified ABI for getUserAccountData
self.pool_abi = [
{
"name": "getUserAccountData",
"type": "function",
"inputs": [{"name": "user", "type": "address"}],
"outputs": [
{"name": "totalCollateralBase", "type": "uint256"},
{"name": "totalDebtBase", "type": "uint256"},
{"name": "availableBorrowsBase", "type": "uint256"},
{"name": "currentLiquidationThreshold", "type": "uint256"},
{"name": "ltv", "type": "uint256"},
{"name": "healthFactor", "type": "uint256"}
]
}
]
self.pool_contract = self.w3.eth.contract(
address=self.w3.to_checksum_address(self.aave_pool),
abi=self.pool_abi
)
async def check_liquidation_opportunity(self, analyzed_tx: dict) -> Optional[dict]:
"""Check if transaction affects a liquidatable position"""
decoded = analyzed_tx.get("decoded", {})
# Check if this is a borrow or repay that might affect health factor
action = decoded.get("action")
if action not in ["borrow", "supply", "liquidation"]:
return None
# Get the user's current position
user = analyzed_tx.get("sender")
try:
account_data = self.pool_contract.functions.getUserAccountData(
self.w3.to_checksum_address(user)
).call()
health_factor = account_data[5] / 1e18
total_debt = account_data[1]
total_collateral = account_data[0]
# Check if position is liquidatable or close to it
if health_factor < 1.0:
# Already liquidatable!
liquidation_bonus = 0.05 # 5% typical bonus
max_liquidation = total_debt // 2 # Can liquidate up to 50%
potential_profit = int(max_liquidation * liquidation_bonus)
return {
"type": "liquidation",
"user": user,
"health_factor": health_factor,
"total_debt": total_debt,
"total_collateral": total_collateral,
"max_liquidation_amount": max_liquidation,
"estimated_profit_wei": potential_profit,
"estimated_profit_eth": potential_profit / 1e18,
"urgency": "immediate"
}
elif health_factor < 1.05:
# At risk - monitor closely
return {
"type": "liquidation_watch",
"user": user,
"health_factor": health_factor,
"price_drop_to_liquidate": f"{(health_factor - 1.0) * 100:.2f}%",
"urgency": "monitor"
}
except Exception as e:
pass
return None
Part 4: Calculating Real Profitability
Finding opportunities is useless without accurate profit calculation. Gas costs, slippage, and competition eat into margins.
Comprehensive Profit Calculator
from dataclasses import dataclass
from typing import Optional
from decimal import Decimal
@dataclass
class GasEstimate:
base_fee: int
priority_fee: int
gas_units: int
@property
def total_cost(self) -> int:
return (self.base_fee + self.priority_fee) * self.gas_units
@property
def total_cost_eth(self) -> Decimal:
return Decimal(self.total_cost) / Decimal(10**18)
class ProfitCalculator:
"""Calculates net profit for MEV opportunities"""
def __init__(self, web3_client):
self.w3 = web3_client
async def get_current_gas_params(self) -> GasEstimate:
"""Fetch current gas parameters from network"""
latest_block = self.w3.eth.get_block("latest")
base_fee = latest_block["baseFeePerGas"]
# Get priority fee from recent blocks
fee_history = self.w3.eth.fee_history(5, "latest", [50])
avg_priority = sum(fee_history["reward"][i][0] for i in range(5)) // 5
return GasEstimate(
base_fee=base_fee,
priority_fee=avg_priority,
gas_units=0 # Will be set per opportunity
)
async def calculate_arbitrage_profit(self, opportunity: dict) -> dict:
"""Calculate net profit for arbitrage opportunity"""
gas_params = await self.get_current_gas_params()
# Arbitrage typically needs ~250k gas for 2 swaps
gas_params.gas_units = 250000
gross_profit = opportunity["estimated_profit_wei"]
gas_cost = gas_params.total_cost
# Builder tip (typically 90% of profit goes to builder)
builder_tip_rate = Decimal("0.90")
builder_tip = int(Decimal(gross_profit - gas_cost) * builder_tip_rate)
net_profit = gross_profit - gas_cost - builder_tip
return {
"gross_profit_wei": gross_profit,
"gas_cost_wei": gas_cost,
"builder_tip_wei": builder_tip,
"net_profit_wei": net_profit,
"net_profit_eth": Decimal(net_profit) / Decimal(10**18),
"profitable": net_profit > 0,
"roi_percent": (Decimal(net_profit) / Decimal(opportunity["amount_in"]) * 100) if opportunity["amount_in"] > 0 else Decimal(0),
"gas_params": {
"base_fee_gwei": gas_params.base_fee / 1e9,
"priority_fee_gwei": gas_params.priority_fee / 1e9,
"gas_units": gas_params.gas_units
}
}
async def calculate_liquidation_profit(self, opportunity: dict) -> dict:
"""Calculate net profit for liquidation opportunity"""
gas_params = await self.get_current_gas_params()
# Liquidation with flash loan needs ~400k gas
gas_params.gas_units = 400000
# Flash loan fee (0.09% for Aave)
flash_loan_amount = opportunity["max_liquidation_amount"]
flash_loan_fee = int(flash_loan_amount * 0.0009)
gross_profit = opportunity["estimated_profit_wei"]
gas_cost = gas_params.total_cost
# Slippage buffer for collateral swap (1%)
slippage_cost = int(gross_profit * 0.01)
net_profit = gross_profit - gas_cost - flash_loan_fee - slippage_cost
return {
"gross_profit_wei": gross_profit,
"gas_cost_wei": gas_cost,
"flash_loan_fee_wei": flash_loan_fee,
"slippage_buffer_wei": slippage_cost,
"net_profit_wei": net_profit,
"net_profit_eth": Decimal(net_profit) / Decimal(10**18),
"profitable": net_profit > 0
}
def minimum_profitable_amount(self, gas_price_gwei: float, gas_units: int = 250000) -> int:
"""Calculate minimum trade size for profitability"""
gas_cost = int(gas_price_gwei * 1e9 * gas_units)
# Assume 0.3% spread capture, 90% to builder
# Net = gross * 0.003 * 0.10 - gas_cost > 0
# gross > gas_cost / 0.0003
min_amount = gas_cost / 0.0003
return int(min_amount)
Part 5: Constructing and Submitting Bundles
Flashbots lets us submit transactions privately without exposing them to the public mempool.
Flashbots Bundle Submission
from eth_account import Account
from eth_account.signers.local import LocalAccount
import requests
import json
from typing import List, Dict, Any
class FlashbotsClient:
"""Client for Flashbots bundle submission"""
RELAY_URL = "https://relay.flashbots.net"
def __init__(self, signer: LocalAccount, auth_signer: LocalAccount):
self.signer = signer
self.auth_signer = auth_signer
def _sign_message(self, message: str) -> str:
"""Sign message with auth key for Flashbots authentication"""
from eth_account.messages import encode_defunct
message_hash = encode_defunct(text=message)
signed = self.auth_signer.sign_message(message_hash)
return f"{self.auth_signer.address}:{signed.signature.hex()}"
async def simulate_bundle(self, signed_txs: List[str], block_number: int) -> Dict[str, Any]:
"""Simulate bundle execution"""
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "eth_callBundle",
"params": [{
"txs": signed_txs,
"blockNumber": hex(block_number),
"stateBlockNumber": "latest"
}]
}
body = json.dumps(payload)
signature = self._sign_message(body)
response = requests.post(
self.RELAY_URL,
json=payload,
headers={
"Content-Type": "application/json",
"X-Flashbots-Signature": signature
}
)
return response.json()
async def send_bundle(self, signed_txs: List[str], target_block: int) -> Dict[str, Any]:
"""Submit bundle to Flashbots relay"""
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "eth_sendBundle",
"params": [{
"txs": signed_txs,
"blockNumber": hex(target_block),
}]
}
body = json.dumps(payload)
signature = self._sign_message(body)
response = requests.post(
self.RELAY_URL,
json=payload,
headers={
"Content-Type": "application/json",
"X-Flashbots-Signature": signature
}
)
result = response.json()
if "result" in result:
return {
"success": True,
"bundle_hash": result["result"]["bundleHash"],
"target_block": target_block
}
else:
return {
"success": False,
"error": result.get("error", "Unknown error")
}
async def get_bundle_stats(self, bundle_hash: str, block_number: int) -> Dict[str, Any]:
"""Check if bundle was included"""
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "flashbots_getBundleStats",
"params": [{
"bundleHash": bundle_hash,
"blockNumber": hex(block_number)
}]
}
body = json.dumps(payload)
signature = self._sign_message(body)
response = requests.post(
self.RELAY_URL,
json=payload,
headers={
"Content-Type": "application/json",
"X-Flashbots-Signature": signature
}
)
return response.json()
class BundleBuilder:
"""Constructs MEV bundles for submission"""
def __init__(self, web3_client, signer: LocalAccount):
self.w3 = web3_client
self.signer = signer
async def build_arbitrage_bundle(self, opportunity: dict, profit_calc: dict) -> List[str]:
"""Build arbitrage transaction bundle"""
signed_txs = []
# Build swap transaction on target DEX
# This is simplified - real implementation needs actual contract calls
nonce = self.w3.eth.get_transaction_count(self.signer.address)
# Transaction 1: Buy on cheaper DEX
buy_tx = {
"to": opportunity["pair_address"],
"value": opportunity["amount_in"],
"gas": 150000,
"maxFeePerGas": profit_calc["gas_params"]["base_fee_gwei"] * 10**9 + profit_calc["gas_params"]["priority_fee_gwei"] * 10**9,
"maxPriorityFeePerGas": profit_calc["gas_params"]["priority_fee_gwei"] * 10**9,
"nonce": nonce,
"chainId": 1,
"data": "0x" # Actual swap calldata would go here
}
signed_buy = self.signer.sign_transaction(buy_tx)
signed_txs.append(signed_buy.rawTransaction.hex())
# Transaction 2: Sell on expensive DEX
sell_tx = {
"to": opportunity["pair_address"], # Different DEX in reality
"value": 0,
"gas": 150000,
"maxFeePerGas": buy_tx["maxFeePerGas"],
"maxPriorityFeePerGas": buy_tx["maxPriorityFeePerGas"],
"nonce": nonce + 1,
"chainId": 1,
"data": "0x" # Actual swap calldata
}
signed_sell = self.signer.sign_transaction(sell_tx)
signed_txs.append(signed_sell.rawTransaction.hex())
return signed_txs
async def build_backrun_bundle(self, trigger_tx_raw: str, backrun_tx: dict) -> List[str]:
"""Build bundle that backruns a target transaction"""
signed_txs = []
# Include the trigger transaction first
signed_txs.append(trigger_tx_raw)
# Then our backrun transaction
signed_backrun = self.signer.sign_transaction(backrun_tx)
signed_txs.append(signed_backrun.rawTransaction.hex())
return signed_txs
Part 6: Putting It All Together
Here's the complete MEV bot pipeline:
import asyncio
from typing import Optional
class MEVBot:
"""Complete MEV extraction pipeline"""
def __init__(self, config: dict):
self.config = config
# Initialize components
self.stream = MempoolStream(
ws_endpoint=config["ws_endpoint"],
http_endpoint=config["http_endpoint"]
)
self.analyzer = TransactionAnalyzer()
self.pricer = DEXPricer(config["web3_client"])
self.arb_scanner = ArbitrageScanner(self.pricer)
self.liq_scanner = LiquidationScanner(config["web3_client"])
self.profit_calc = ProfitCalculator(config["web3_client"])
self.flashbots = FlashbotsClient(
signer=config["signer"],
auth_signer=config["auth_signer"]
)
self.bundle_builder = BundleBuilder(
web3_client=config["web3_client"],
signer=config["signer"]
)
# Stats
self.stats = {
"txs_analyzed": 0,
"opportunities_found": 0,
"bundles_submitted": 0,
"bundles_included": 0,
"total_profit_eth": 0
}
async def start(self):
"""Start the MEV bot"""
print("=" * 60)
print("MEV Bot Starting...")
print("=" * 60)
@self.stream.on_transaction
async def process_transaction(tx: PendingTransaction):
await self._process_transaction(tx)
await self.stream.start()
async def _process_transaction(self, tx: PendingTransaction):
"""Process a single pending transaction"""
self.stats["txs_analyzed"] += 1
# Step 1: Analyze transaction
analyzed = self.analyzer.analyze(tx)
if not analyzed:
return
# Step 2: Scan for opportunities
opportunity = None
# Check arbitrage
arb_opp = await self.arb_scanner.find_arbitrage(analyzed)
if arb_opp:
opportunity = arb_opp
# Check liquidations
liq_opp = await self.liq_scanner.check_liquidation_opportunity(analyzed)
if liq_opp and liq_opp.get("type") == "liquidation":
# Prefer immediate liquidations over arbitrage
opportunity = liq_opp
if not opportunity:
return
self.stats["opportunities_found"] += 1
print(f"\n{'='*60}")
print(f"🎯 OPPORTUNITY FOUND: {opportunity['type']}")
print(f"{'='*60}")
# Step 3: Calculate profitability
if opportunity["type"] == "arbitrage":
profit = await self.profit_calc.calculate_arbitrage_profit(opportunity)
else:
profit = await self.profit_calc.calculate_liquidation_profit(opportunity)
print(f"Gross Profit: {profit['gross_profit_wei'] / 1e18:.6f} ETH")
print(f"Gas Cost: {profit['gas_cost_wei'] / 1e18:.6f} ETH")
print(f"Net Profit: {profit['net_profit_wei'] / 1e18:.6f} ETH")
print(f"Profitable: {profit['profitable']}")
if not profit["profitable"]:
print("❌ Not profitable after costs")
return
# Step 4: Build and submit bundle
print("\n📦 Building bundle...")
if opportunity["type"] == "arbitrage":
bundle = await self.bundle_builder.build_arbitrage_bundle(opportunity, profit)
else:
# Liquidation bundle building would go here
return
# Step 5: Simulate bundle
current_block = self.config["web3_client"].eth.block_number
target_block = current_block + 1
print(f"🧪 Simulating for block {target_block}...")
simulation = await self.flashbots.simulate_bundle(bundle, target_block)
if "error" in simulation:
print(f"❌ Simulation failed: {simulation['error']}")
return
print("✅ Simulation successful")
# Step 6: Submit bundle
print(f"🚀 Submitting bundle to Flashbots...")
result = await self.flashbots.send_bundle(bundle, target_block)
if result["success"]:
self.stats["bundles_submitted"] += 1
print(f"✅ Bundle submitted: {result['bundle_hash']}")
# Wait and check inclusion
await asyncio.sleep(15) # Wait for block
stats = await self.flashbots.get_bundle_stats(
result["bundle_hash"],
target_block
)
if stats.get("result", {}).get("isSimulated"):
print("⏳ Bundle was considered but not included")
else:
self.stats["bundles_included"] += 1
self.stats["total_profit_eth"] += float(profit["net_profit_eth"])
print("✅ Bundle INCLUDED!")
else:
print(f"❌ Submission failed: {result['error']}")
def print_stats(self):
"""Print bot statistics"""
print("\n" + "=" * 60)
print("BOT STATISTICS")
print("=" * 60)
print(f"Transactions Analyzed: {self.stats['txs_analyzed']}")
print(f"Opportunities Found: {self.stats['opportunities_found']}")
print(f"Bundles Submitted: {self.stats['bundles_submitted']}")
print(f"Bundles Included: {self.stats['bundles_included']}")
print(f"Total Profit: {self.stats['total_profit_eth']:.6f} ETH")
print("=" * 60)
# Main entry point
async def run_bot():
from web3 import Web3
from eth_account import Account
# Configuration
config = {
"ws_endpoint": "ws://localhost:8546",
"http_endpoint": "http://localhost:8545",
"web3_client": Web3(Web3.HTTPProvider("http://localhost:8545")),
"signer": Account.from_key("YOUR_PRIVATE_KEY"),
"auth_signer": Account.create(), # Separate key for Flashbots auth
}
bot = MEVBot(config)
try:
await bot.start()
except KeyboardInterrupt:
bot.print_stats()
if __name__ == "__main__":
asyncio.run(run_bot())
Performance Optimization Tips
After building the basics, here's how to compete with professional searchers:
1. Reduce Latency Everywhere
# Bad: Sequential operations
tx = await fetch_transaction(hash)
decoded = decode_transaction(tx)
opportunity = find_opportunity(decoded)
# Good: Parallel where possible
async def optimized_pipeline(tx_hash):
# Start fetching while preparing decoder
tx_future = asyncio.create_task(fetch_transaction(tx_hash))
# Pre-warm caches
prepare_decoder_cache()
tx = await tx_future
# Use pre-computed data
decoded = decode_with_cache(tx)
return find_opportunity(decoded)
2. Cache Aggressively
class ReserveCache:
"""Cache DEX reserves with block-level invalidation"""
def __init__(self):
self.cache = {}
self.last_block = 0
async def get_reserves(self, pair: str, current_block: int):
# Invalidate on new block
if current_block > self.last_block:
self.cache.clear()
self.last_block = current_block
if pair not in self.cache:
self.cache[pair] = await self._fetch_reserves(pair)
return self.cache[pair]
3. Pre-Sign Transactions
class TransactionPool:
"""Pool of pre-signed transactions for common operations"""
def __init__(self, signer, web3):
self.signer = signer
self.w3 = web3
self.templates = {}
def prepare_templates(self):
"""Pre-build transaction templates"""
base_nonce = self.w3.eth.get_transaction_count(self.signer.address)
# Prepare 10 nonces worth of templates
for i in range(10):
self.templates[base_nonce + i] = {
"nonce": base_nonce + i,
"chainId": 1,
"gas": 200000,
# Other fields filled at execution time
}
def get_ready_tx(self, nonce: int, to: str, data: str, value: int = 0):
"""Get a nearly-ready transaction"""
template = self.templates.get(nonce, {}).copy()
template.update({
"to": to,
"data": data,
"value": value
})
return template
Common Pitfalls and Solutions
Pitfall 1: Stale State
Problem: Your simulation uses old state, execution fails.
# Solution: Always simulate against pending state
async def simulate_with_pending(tx, pending_txs):
# Apply pending transactions first
state_override = {}
for pending in pending_txs:
effects = simulate_tx_effects(pending)
state_override.update(effects)
# Now simulate our tx with modified state
return simulate_tx(tx, state_override)
Pitfall 2: Gas Price Races
Problem: Competitors outbid your gas price.
# Solution: Use Flashbots - no gas price competition
# Tip the builder directly instead
def calculate_builder_tip(profit, urgency="normal"):
tip_rates = {
"low": 0.70, # Keep 30%
"normal": 0.85, # Keep 15%
"high": 0.95, # Keep 5%
}
rate = tip_rates.get(urgency, 0.85)
return int(profit * rate)
Pitfall 3: Reverted Bundles
Problem: Bundle reverts waste time and miss opportunities.
# Solution: Extensive pre-flight checks
async def validate_bundle(bundle):
checks = [
check_sufficient_balance(),
check_nonce_sequence(),
check_gas_limits(),
check_contract_state(),
simulate_full_execution(),
]
results = await asyncio.gather(*checks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
return False, f"Check {i} failed: {result}"
return True, "All checks passed"
Monitoring Your Bot
Track these metrics to optimize performance:
import time
from dataclasses import dataclass, field
from typing import List
from collections import deque
@dataclass
class BotMetrics:
# Timing
detection_latencies: deque = field(default_factory=lambda: deque(maxlen=1000))
submission_latencies: deque = field(default_factory=lambda: deque(maxlen=1000))
# Success rates
opportunities_detected: int = 0
bundles_submitted: int = 0
bundles_included: int = 0
bundles_failed: int = 0
# Profit tracking
total_gross_profit: float = 0
total_gas_spent: float = 0
total_tips_paid: float = 0
def record_detection(self, start_time: float):
latency = time.time() - start_time
self.detection_latencies.append(latency)
def record_submission(self, start_time: float):
latency = time.time() - start_time
self.submission_latencies.append(latency)
@property
def avg_detection_latency_ms(self) -> float:
if not self.detection_latencies:
return 0
return sum(self.detection_latencies) / len(self.detection_latencies) * 1000
@property
def inclusion_rate(self) -> float:
if self.bundles_submitted == 0:
return 0
return self.bundles_included / self.bundles_submitted
@property
def net_profit(self) -> float:
return self.total_gross_profit - self.total_gas_spent - self.total_tips_paid
def summary(self) -> str:
return f"""
╔══════════════════════════════════════════════════════════════╗
║ MEV BOT METRICS ║
╠══════════════════════════════════════════════════════════════╣
║ Detection Latency (avg): {self.avg_detection_latency_ms:>8.2f} ms ║
║ Opportunities Detected: {self.opportunities_detected:>8} ║
║ Bundles Submitted: {self.bundles_submitted:>8} ║
║ Bundles Included: {self.bundles_included:>8} ║
║ Inclusion Rate: {self.inclusion_rate*100:>7.2f}% ║
╠══════════════════════════════════════════════════════════════╣
║ Gross Profit: {self.total_gross_profit:>8.4f} ETH ║
║ Gas Spent: {self.total_gas_spent:>8.4f} ETH ║
║ Tips Paid: {self.total_tips_paid:>8.4f} ETH ║
║ Net Profit: {self.net_profit:>8.4f} ETH ║
╚══════════════════════════════════════════════════════════════╝
"""
Where to Go From Here
This guide covered the fundamentals. To become competitive, explore:
Advanced Topics:
- Multi-hop arbitrage paths
- Cross-chain MEV opportunities
- Custom smart contracts for gas optimization
- Private transaction pools beyond Flashbots
Infrastructure Upgrades:
- Co-located servers near validators
- Multiple geographically distributed nodes
- Custom P2P networking layers
- Hardware-accelerated transaction parsing
Part 7: Advanced Arbitrage Strategies
Basic two-pool arbitrage is heavily competed. Let's explore more sophisticated approaches.
Multi-Hop Path Finding
Sometimes the best arbitrage spans 3+ pools:
WETH → USDC (Uniswap) → DAI (Curve) → WETH (Sushiswap)
from typing import List, Tuple, Set
from collections import defaultdict
import heapq
class PathFinder:
"""Find optimal arbitrage paths across multiple DEXes"""
def __init__(self):
# Graph: token -> [(connected_token, pool_address, dex)]
self.graph = defaultdict(list)
def add_pool(self, token_a: str, token_b: str, pool: str, dex: str):
"""Add a liquidity pool to the graph"""
self.graph[token_a].append((token_b, pool, dex))
self.graph[token_b].append((token_a, pool, dex))
def find_cycles(self, start_token: str, max_hops: int = 4) -> List[List[Tuple]]:
"""Find all profitable cycles starting and ending at start_token"""
cycles = []
def dfs(current: str, path: List[Tuple], visited_pools: Set[str], depth: int):
if depth > max_hops:
return
for next_token, pool, dex in self.graph[current]:
# Avoid using same pool twice
if pool in visited_pools:
continue
new_path = path + [(current, next_token, pool, dex)]
# Found a cycle back to start
if next_token == start_token and len(new_path) >= 2:
cycles.append(new_path)
continue
# Continue searching
dfs(
next_token,
new_path,
visited_pools | {pool},
depth + 1
)
dfs(start_token, [], set(), 0)
return cycles
async def evaluate_cycle(self, cycle: List[Tuple], start_amount: int, pricer) -> dict:
"""Calculate profit for a specific cycle"""
current_amount = start_amount
for token_in, token_out, pool, dex in cycle:
reserves = await pricer.get_reserves(pool)
# Determine reserve order
r_in, r_out = self._order_reserves(token_in, token_out, pool, reserves)
# Calculate output
current_amount = pricer.calculate_output(current_amount, r_in, r_out)
profit = current_amount - start_amount
# Estimate gas (each hop ~150k gas)
gas_estimate = len(cycle) * 150000
return {
"cycle": cycle,
"input_amount": start_amount,
"output_amount": current_amount,
"gross_profit": profit,
"hops": len(cycle),
"gas_estimate": gas_estimate
}
class TriangularArbitrage:
"""Specialized scanner for triangular arbitrage"""
def __init__(self, pricer, tokens: List[str]):
self.pricer = pricer
self.tokens = tokens
self.path_finder = PathFinder()
async def scan_all_triangles(self, base_token: str = "WETH") -> List[dict]:
"""Find all profitable triangular arbitrage opportunities"""
opportunities = []
# Get all 3-hop cycles starting from base token
cycles = self.path_finder.find_cycles(base_token, max_hops=3)
# Evaluate each cycle with different input amounts
test_amounts = [
int(0.1 * 1e18), # 0.1 ETH
int(0.5 * 1e18), # 0.5 ETH
int(1.0 * 1e18), # 1.0 ETH
int(5.0 * 1e18), # 5.0 ETH
int(10.0 * 1e18), # 10.0 ETH
]
for cycle in cycles:
best_result = None
best_profit = 0
for amount in test_amounts:
result = await self.path_finder.evaluate_cycle(cycle, amount, self.pricer)
if result["gross_profit"] > best_profit:
best_profit = result["gross_profit"]
best_result = result
if best_result and best_profit > 0:
opportunities.append(best_result)
# Sort by profit
opportunities.sort(key=lambda x: x["gross_profit"], reverse=True)
return opportunities
Just-In-Time (JIT) Liquidity
Provide liquidity right before a large swap, capture fees, withdraw immediately:
class JITLiquidityBot:
"""Provide just-in-time liquidity for large swaps"""
def __init__(self, web3_client, position_manager_address: str):
self.w3 = web3_client
self.position_manager = position_manager_address
# Minimum swap size to consider (in USD)
self.min_swap_usd = 50000
# Fee tier preferences
self.fee_tiers = [500, 3000, 10000] # 0.05%, 0.3%, 1%
async def analyze_swap_for_jit(self, analyzed_tx: dict) -> Optional[dict]:
"""Check if swap is suitable for JIT liquidity"""
if not analyzed_tx.get("is_swap"):
return None
decoded = analyzed_tx["decoded"]
protocol = decoded.get("protocol")
# JIT works best on Uniswap V3 due to concentrated liquidity
if protocol != "uniswap_v3":
return None
params = decoded["params"]
# Extract swap details (V3 exactInputSingle)
if "params" in params:
swap_params = params["params"]
token_in = swap_params[0]
token_out = swap_params[1]
fee_tier = swap_params[2]
amount_in = swap_params[4]
else:
return None
# Check if swap is large enough
swap_value_usd = await self._get_usd_value(token_in, amount_in)
if swap_value_usd < self.min_swap_usd:
return None
# Calculate optimal liquidity position
current_price = await self._get_current_price(token_in, token_out, fee_tier)
# Provide liquidity in tight range around current price
# Capture maximum fees from the swap
tick_spacing = self._get_tick_spacing(fee_tier)
lower_tick = self._price_to_tick(current_price * 0.99) // tick_spacing * tick_spacing
upper_tick = self._price_to_tick(current_price * 1.01) // tick_spacing * tick_spacing
# Calculate expected fee capture
fee_rate = fee_tier / 1000000 # Convert to decimal
expected_fees = amount_in * fee_rate
# Gas costs for mint + swap-induced fee collection + burn
gas_cost = 500000 * analyzed_tx["gas_price"]
if expected_fees > gas_cost:
return {
"type": "jit_liquidity",
"pool": self._get_pool_address(token_in, token_out, fee_tier),
"token_in": token_in,
"token_out": token_out,
"fee_tier": fee_tier,
"swap_amount": amount_in,
"lower_tick": lower_tick,
"upper_tick": upper_tick,
"expected_fees": expected_fees,
"gas_cost": gas_cost,
"net_profit": expected_fees - gas_cost,
"trigger_tx": analyzed_tx["tx_hash"]
}
return None
async def build_jit_bundle(self, opportunity: dict, victim_tx_raw: str) -> List[str]:
"""Build JIT liquidity bundle: mint → victim swap → burn"""
signed_txs = []
nonce = self.w3.eth.get_transaction_count(self.signer.address)
# Transaction 1: Mint concentrated liquidity position
mint_params = {
"token0": opportunity["token_in"],
"token1": opportunity["token_out"],
"fee": opportunity["fee_tier"],
"tickLower": opportunity["lower_tick"],
"tickUpper": opportunity["upper_tick"],
"amount0Desired": opportunity["liquidity_amount_0"],
"amount1Desired": opportunity["liquidity_amount_1"],
"amount0Min": 0,
"amount1Min": 0,
"recipient": self.signer.address,
"deadline": int(time.time()) + 120
}
mint_tx = self._build_mint_tx(mint_params, nonce)
signed_txs.append(self.signer.sign_transaction(mint_tx).rawTransaction.hex())
# Transaction 2: Victim's swap (from mempool)
signed_txs.append(victim_tx_raw)
# Transaction 3: Burn position and collect fees
burn_tx = self._build_burn_tx(opportunity, nonce + 1)
signed_txs.append(self.signer.sign_transaction(burn_tx).rawTransaction.hex())
return signed_txs
Part 8: Handling Different DEX Architectures
Each DEX has unique mechanics affecting MEV extraction.
Uniswap V2 vs V3
class UniswapV2Calculator:
"""Calculations for Uniswap V2 constant product pools"""
FEE = Decimal("0.003") # 0.3%
def get_amount_out(self, amount_in: int, reserve_in: int, reserve_out: int) -> int:
"""Standard x*y=k formula"""
amount_in_with_fee = Decimal(amount_in) * (1 - self.FEE)
numerator = amount_in_with_fee * Decimal(reserve_out)
denominator = Decimal(reserve_in) + amount_in_with_fee
return int(numerator / denominator)
def get_amount_in(self, amount_out: int, reserve_in: int, reserve_out: int) -> int:
"""Reverse calculation: how much input for desired output"""
numerator = Decimal(reserve_in) * Decimal(amount_out) * 1000
denominator = (Decimal(reserve_out) - Decimal(amount_out)) * 997
return int(numerator / denominator) + 1
class UniswapV3Calculator:
"""Calculations for Uniswap V3 concentrated liquidity"""
def __init__(self):
# Q64.96 fixed point math constants
self.Q96 = 2 ** 96
def sqrt_price_to_price(self, sqrt_price_x96: int) -> Decimal:
"""Convert sqrtPriceX96 to actual price"""
return (Decimal(sqrt_price_x96) / Decimal(self.Q96)) ** 2
def get_amount_out(self, amount_in: int, sqrt_price: int, liquidity: int,
zero_for_one: bool, fee: int) -> int:
"""Calculate output for V3 swap within single tick range"""
# This is simplified - real V3 math involves tick crossing
fee_amount = amount_in * fee // 1000000
amount_in_after_fee = amount_in - fee_amount
price = self.sqrt_price_to_price(sqrt_price)
if zero_for_one:
# Selling token0 for token1
return int(Decimal(amount_in_after_fee) * price)
else:
# Selling token1 for token0
return int(Decimal(amount_in_after_fee) / price)
def simulate_swap(self, pool_state: dict, amount_in: int, zero_for_one: bool) -> dict:
"""Simulate V3 swap accounting for tick transitions"""
current_tick = pool_state["tick"]
sqrt_price = pool_state["sqrtPriceX96"]
liquidity = pool_state["liquidity"]
fee = pool_state["fee"]
remaining = amount_in
total_out = 0
gas_estimate = 100000 # Base gas
# Simulate tick by tick
while remaining > 0:
# Calculate max swap in current tick range
tick_boundary = self._get_next_tick_boundary(current_tick, zero_for_one)
max_in_tick = self._get_max_amount_in_tick(
sqrt_price, tick_boundary, liquidity, zero_for_one
)
swap_amount = min(remaining, max_in_tick)
out = self.get_amount_out(swap_amount, sqrt_price, liquidity, zero_for_one, fee)
total_out += out
remaining -= swap_amount
if remaining > 0:
# Cross tick
current_tick = tick_boundary
sqrt_price = self._tick_to_sqrt_price(tick_boundary)
liquidity = self._get_liquidity_at_tick(pool_state, tick_boundary)
gas_estimate += 50000 # Additional gas per tick cross
return {
"amount_out": total_out,
"final_tick": current_tick,
"final_sqrt_price": sqrt_price,
"ticks_crossed": (pool_state["tick"] - current_tick) // pool_state["tickSpacing"],
"gas_estimate": gas_estimate
}
class CurveCalculator:
"""Calculations for Curve StableSwap pools"""
def __init__(self, A: int, n_coins: int):
self.A = A # Amplification coefficient
self.n_coins = n_coins
def get_D(self, balances: List[int]) -> int:
"""Calculate StableSwap invariant D"""
S = sum(balances)
if S == 0:
return 0
D = S
Ann = self.A * self.n_coins
for _ in range(255):
D_P = D
for balance in balances:
D_P = D_P * D // (balance * self.n_coins)
D_prev = D
D = (Ann * S + D_P * self.n_coins) * D // ((Ann - 1) * D + (self.n_coins + 1) * D_P)
if abs(D - D_prev) <= 1:
return D
raise Exception("D calculation did not converge")
def get_y(self, i: int, j: int, x: int, balances: List[int]) -> int:
"""Calculate output amount for StableSwap"""
D = self.get_D(balances)
c = D
S = 0
Ann = self.A * self.n_coins
for k in range(self.n_coins):
if k == i:
_x = x
elif k != j:
_x = balances[k]
else:
continue
S += _x
c = c * D // (_x * self.n_coins)
c = c * D // (Ann * self.n_coins)
b = S + D // Ann
y = D
for _ in range(255):
y_prev = y
y = (y * y + c) // (2 * y + b - D)
if abs(y - y_prev) <= 1:
return y
raise Exception("y calculation did not converge")
Part 9: Cross-Chain MEV
As DeFi expands across chains, cross-chain arbitrage becomes increasingly important.
Bridge Arbitrage Scanner
class CrossChainArbitrage:
"""Monitor price discrepancies across chains"""
def __init__(self, chains: dict):
# chains = {"ethereum": w3_eth, "arbitrum": w3_arb, "polygon": w3_poly}
self.chains = chains
# Common tokens across chains
self.token_mappings = {
"USDC": {
"ethereum": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"arbitrum": "0xFF970A61A04b1cA14834A43f5dE4533eBDDB5CC8",
"polygon": "0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174"
},
"WETH": {
"ethereum": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
"arbitrum": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1",
"polygon": "0x7ceB23fD6bC0adD59E62ac25578270cFf1b9f619"
}
}
# Bridge contracts
self.bridges = {
("ethereum", "arbitrum"): "0x...", # Arbitrum bridge
("ethereum", "polygon"): "0x...", # Polygon bridge
}
async def get_price_across_chains(self, token: str, quote_token: str = "USDC") -> dict:
"""Get token price on all supported chains"""
prices = {}
for chain_name, w3 in self.chains.items():
token_address = self.token_mappings.get(token, {}).get(chain_name)
quote_address = self.token_mappings.get(quote_token, {}).get(chain_name)
if not token_address or not quote_address:
continue
price = await self._get_dex_price(w3, token_address, quote_address, chain_name)
prices[chain_name] = price
return prices
async def find_cross_chain_arbitrage(self) -> List[dict]:
"""Find arbitrage opportunities across chains"""
opportunities = []
for token in self.token_mappings.keys():
if token == "USDC":
continue
prices = await self.get_price_across_chains(token)
if len(prices) < 2:
continue
# Find price discrepancies
chains = list(prices.keys())
for i, chain_a in enumerate(chains):
for chain_b in chains[i+1:]:
price_a = prices[chain_a]
price_b = prices[chain_b]
spread = abs(price_a - price_b) / min(price_a, price_b)
# Need significant spread to cover bridge costs
if spread > 0.005: # 0.5% minimum
buy_chain = chain_a if price_a < price_b else chain_b
sell_chain = chain_b if price_a < price_b else chain_a
# Estimate bridge costs
bridge_cost = await self._estimate_bridge_cost(buy_chain, sell_chain)
bridge_time = self._estimate_bridge_time(buy_chain, sell_chain)
opportunities.append({
"token": token,
"buy_chain": buy_chain,
"sell_chain": sell_chain,
"buy_price": prices[buy_chain],
"sell_price": prices[sell_chain],
"spread_percent": spread * 100,
"bridge_cost_usd": bridge_cost,
"bridge_time_minutes": bridge_time,
"risk": "high" if bridge_time > 10 else "medium"
})
return sorted(opportunities, key=lambda x: x["spread_percent"], reverse=True)
def _estimate_bridge_time(self, from_chain: str, to_chain: str) -> int:
"""Estimate bridge finality time in minutes"""
times = {
("ethereum", "arbitrum"): 10,
("ethereum", "polygon"): 30,
("arbitrum", "ethereum"): 7 * 24 * 60, # 7 days for native
("polygon", "ethereum"): 45,
}
return times.get((from_chain, to_chain), 60)
L2 Sequencer MEV
class L2SequencerMonitor:
"""Monitor L2 sequencer for MEV opportunities"""
def __init__(self, sequencer_rpc: str):
self.sequencer_rpc = sequencer_rpc
# L2s have different MEV characteristics
# - Faster blocks (sub-second on some)
# - Sequencer controls ordering
# - Lower gas costs
async def monitor_sequencer_feed(self):
"""Monitor sequencer's pending transaction feed"""
# Arbitrum uses a sequencer feed
# Optimism has similar architecture
async with websockets.connect(self.sequencer_rpc) as ws:
await ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions"]
}))
async for message in ws:
data = json.loads(message)
if "params" in data:
# Process much faster than L1
# Sub-second response required
await self._fast_process(data["params"]["result"])
def get_l2_specific_opportunities(self, analyzed_tx: dict) -> Optional[dict]:
"""Find L2-specific MEV opportunities"""
# L2 has unique opportunities:
# 1. L1 -> L2 deposit arbitrage
# 2. Sequencer priority auctions
# 3. Smaller trades viable due to low gas
pass
Part 10: Smart Contract Optimization
Custom smart contracts reduce gas costs and enable atomic execution.
MEV Executor Contract
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;
import "@uniswap/v2-periphery/contracts/interfaces/IUniswapV2Router02.sol";
import "@uniswap/v3-periphery/contracts/interfaces/ISwapRouter.sol";
import "@aave/v3-core/contracts/flashloan/base/FlashLoanSimpleReceiverBase.sol";
contract MEVExecutor is FlashLoanSimpleReceiverBase {
address public owner;
// Whitelisted routers
mapping(address => bool) public approvedRouters;
constructor(address _poolProvider) FlashLoanSimpleReceiverBase(IPoolAddressesProvider(_poolProvider)) {
owner = msg.sender;
}
modifier onlyOwner() {
require(msg.sender == owner, "Not owner");
_;
}
/**
* @notice Execute multi-hop arbitrage in single transaction
* @param path Array of swap instructions
*/
function executeArbitrage(
SwapInstruction[] calldata path,
uint256 minProfit
) external onlyOwner {
uint256 startBalance = address(this).balance;
for (uint i = 0; i < path.length; i++) {
_executeSwap(path[i]);
}
uint256 endBalance = address(this).balance;
require(endBalance > startBalance + minProfit, "Insufficient profit");
// Transfer profit to owner
payable(owner).transfer(endBalance - startBalance);
}
/**
* @notice Execute flash loan liquidation
*/
function flashLiquidate(
address collateralAsset,
address debtAsset,
address user,
uint256 debtToCover
) external onlyOwner {
// Request flash loan for debt amount
POOL.flashLoanSimple(
address(this),
debtAsset,
debtToCover,
abi.encode(collateralAsset, user),
0
);
}
/**
* @notice Flash loan callback
*/
function executeOperation(
address asset,
uint256 amount,
uint256 premium,
address initiator,
bytes calldata params
) external override returns (bool) {
require(msg.sender == address(POOL), "Invalid caller");
require(initiator == address(this), "Invalid initiator");
(address collateralAsset, address user) = abi.decode(params, (address, address));
// 1. Execute liquidation
IERC20(asset).approve(address(POOL), amount);
POOL.liquidationCall(collateralAsset, asset, user, amount, false);
// 2. Swap received collateral to repay flash loan
uint256 collateralReceived = IERC20(collateralAsset).balanceOf(address(this));
uint256 amountToRepay = amount + premium;
_swapExactOutput(collateralAsset, asset, collateralReceived, amountToRepay);
// 3. Approve repayment
IERC20(asset).approve(address(POOL), amountToRepay);
return true;
}
struct SwapInstruction {
address router;
address tokenIn;
address tokenOut;
uint256 amountIn;
uint256 minAmountOut;
bytes data;
}
function _executeSwap(SwapInstruction calldata instruction) internal {
require(approvedRouters[instruction.router], "Router not approved");
IERC20(instruction.tokenIn).approve(instruction.router, instruction.amountIn);
(bool success,) = instruction.router.call(instruction.data);
require(success, "Swap failed");
}
function _swapExactOutput(
address tokenIn,
address tokenOut,
uint256 maxAmountIn,
uint256 amountOut
) internal {
// Implementation for exact output swap
}
// Admin functions
function approveRouter(address router) external onlyOwner {
approvedRouters[router] = true;
}
function withdrawToken(address token) external onlyOwner {
uint256 balance = IERC20(token).balanceOf(address(this));
IERC20(token).transfer(owner, balance);
}
function withdrawETH() external onlyOwner {
payable(owner).transfer(address(this).balance);
}
receive() external payable {}
}
Gas Optimization Techniques
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;
contract GasOptimizedMEV {
// Pack storage variables
// slot 0: owner (20 bytes) + paused (1 byte) + version (1 byte)
address public owner;
bool public paused;
uint8 public version;
// Use immutable for constants set in constructor
address public immutable WETH;
address public immutable UNISWAP_ROUTER;
// Cache frequently accessed storage in memory
function optimizedSwap(
address[] calldata path,
uint256 amountIn
) external {
// Cache storage reads
address _owner = owner;
require(msg.sender == _owner, "Not owner");
// Use unchecked for math that can't overflow
unchecked {
for (uint i = 0; i < path.length - 1; ++i) {
// Swap logic
}
}
}
// Batch operations to amortize fixed costs
function batchSwap(
SwapParams[] calldata swaps
) external {
uint256 length = swaps.length;
for (uint i = 0; i < length;) {
_executeSwap(swaps[i]);
unchecked { ++i; }
}
}
// Use calldata instead of memory for read-only arrays
function efficientPath(
address[] calldata path // calldata is cheaper than memory
) external view returns (uint256) {
return path.length;
}
struct SwapParams {
address tokenIn;
address tokenOut;
uint256 amount;
}
function _executeSwap(SwapParams calldata params) internal {
// Implementation
}
}
Part 11: Risk Management Systems
Professional MEV operations require robust risk controls.
Position and Loss Limits
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncio
@dataclass
class RiskLimits:
max_position_eth: float = 10.0
max_daily_loss_eth: float = 1.0
max_single_trade_eth: float = 2.0
max_gas_price_gwei: int = 200
min_profit_threshold_eth: float = 0.001
max_consecutive_losses: int = 5
class RiskManager:
"""Manages risk for MEV operations"""
def __init__(self, limits: RiskLimits):
self.limits = limits
self.daily_pnl = 0.0
self.consecutive_losses = 0
self.last_reset = datetime.now()
self.active_positions = {}
self.paused = False
def reset_daily_stats(self):
"""Reset daily tracking (call at midnight UTC)"""
if datetime.now() - self.last_reset > timedelta(days=1):
self.daily_pnl = 0.0
self.last_reset = datetime.now()
def can_execute(self, opportunity: dict, gas_price: int) -> tuple[bool, str]:
"""Check if opportunity passes risk checks"""
self.reset_daily_stats()
# Check if trading is paused
if self.paused:
return False, "Trading paused due to risk limits"
# Check gas price
gas_gwei = gas_price / 1e9
if gas_gwei > self.limits.max_gas_price_gwei:
return False, f"Gas price {gas_gwei:.0f} exceeds limit {self.limits.max_gas_price_gwei}"
# Check trade size
trade_size_eth = opportunity.get("amount_in", 0) / 1e18
if trade_size_eth > self.limits.max_single_trade_eth:
return False, f"Trade size {trade_size_eth:.2f} exceeds limit {self.limits.max_single_trade_eth}"
# Check daily loss limit
if self.daily_pnl < -self.limits.max_daily_loss_eth:
self.paused = True
return False, f"Daily loss limit reached: {self.daily_pnl:.4f} ETH"
# Check consecutive losses
if self.consecutive_losses >= self.limits.max_consecutive_losses:
self.paused = True
return False, f"Too many consecutive losses: {self.consecutive_losses}"
# Check minimum profit threshold
expected_profit = opportunity.get("net_profit_eth", 0)
if expected_profit < self.limits.min_profit_threshold_eth:
return False, f"Profit {expected_profit:.6f} below threshold"
# Check total position exposure
total_exposure = sum(p.get("amount_eth", 0) for p in self.active_positions.values())
if total_exposure + trade_size_eth > self.limits.max_position_eth:
return False, f"Would exceed position limit: {total_exposure + trade_size_eth:.2f} ETH"
return True, "Passed all risk checks"
def record_trade(self, trade_id: str, result: dict):
"""Record trade result and update stats"""
pnl = result.get("actual_profit_eth", 0)
self.daily_pnl += pnl
if pnl < 0:
self.consecutive_losses += 1
else:
self.consecutive_losses = 0
# Remove from active positions
if trade_id in self.active_positions:
del self.active_positions[trade_id]
def add_position(self, trade_id: str, position: dict):
"""Track active position"""
self.active_positions[trade_id] = position
def emergency_stop(self, reason: str):
"""Immediately halt all trading"""
self.paused = True
print(f"⚠️ EMERGENCY STOP: {reason}")
# Could also trigger position unwinding here
def get_status(self) -> dict:
"""Get current risk status"""
return {
"paused": self.paused,
"daily_pnl_eth": self.daily_pnl,
"consecutive_losses": self.consecutive_losses,
"active_positions": len(self.active_positions),
"total_exposure_eth": sum(p.get("amount_eth", 0) for p in self.active_positions.values())
}
class CircuitBreaker:
"""Automatic circuit breaker for extreme conditions"""
def __init__(self, risk_manager: RiskManager):
self.risk_manager = risk_manager
self.error_count = 0
self.last_error_time = None
async def monitor(self):
"""Continuous monitoring for circuit breaker conditions"""
while True:
await asyncio.sleep(1)
# Check for rapid errors
if self.error_count > 10 and self.last_error_time:
if datetime.now() - self.last_error_time < timedelta(minutes=1):
self.risk_manager.emergency_stop("Too many errors in short period")
# Reset error count periodically
if self.last_error_time and datetime.now() - self.last_error_time > timedelta(minutes=5):
self.error_count = 0
def record_error(self, error: Exception):
"""Record an error occurrence"""
self.error_count += 1
self.last_error_time = datetime.now()
Understanding these fundamentals is just the starting point. Production systems require months of iteration and optimization.
Building enterprise-grade MEV infrastructure requires significant investment in both time and resources.
Quick Reference
Essential RPC Methods
|
Method |
Purpose |
|---|---|
|
|
Stream pending tx hashes |
|
|
Fetch full tx details |
|
|
Get complete mempool snapshot |
|
|
Simulate transaction execution |
|
|
Get detailed execution trace |
|
|
Submit Flashbots bundle |
Gas Estimation
|
Operation |
Typical Gas |
|---|---|
|
Simple ETH transfer |
21,000 |
|
ERC20 transfer |
65,000 |
|
Uniswap V2 swap |
120,000-150,000 |
|
Uniswap V3 swap |
150,000-200,000 |
|
Aave liquidation |
350,000-450,000 |
|
2-hop arbitrage |
250,000-350,000 |
Profitability Thresholds
At different gas prices, minimum profitable trade sizes:
|
Gas Price |
Min Trade (0.3% spread) |
|---|---|
|
20 gwei |
~0.4 ETH |
|
50 gwei |
~1.0 ETH |
|
100 gwei |
~2.0 ETH |
|
200 gwei |
~4.0 ETH |
Part 12: Testing and Deployment Pipeline
Never deploy MEV code without extensive testing. Here's a battle-tested pipeline.
Local Fork Testing
import subprocess
import time
from web3 import Web3
class AnvilFork:
"""Manage local Anvil fork for testing"""
def __init__(self, fork_url: str, fork_block: Optional[int] = None):
self.fork_url = fork_url
self.fork_block = fork_block
self.process = None
self.rpc_url = "http://localhost:8545"
def start(self):
"""Start Anvil fork"""
cmd = [
"anvil",
"--fork-url", self.fork_url,
"--port", "8545",
"--accounts", "10",
"--balance", "10000"
]
if self.fork_block:
cmd.extend(["--fork-block-number", str(self.fork_block)])
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait for startup
time.sleep(3)
return Web3(Web3.HTTPProvider(self.rpc_url))
def stop(self):
"""Stop Anvil fork"""
if self.process:
self.process.terminate()
self.process.wait()
def reset(self):
"""Reset fork to original state"""
w3 = Web3(Web3.HTTPProvider(self.rpc_url))
w3.provider.make_request("anvil_reset", [{
"forking": {
"jsonRpcUrl": self.fork_url,
"blockNumber": self.fork_block
}
}])
def mine_block(self):
"""Mine a new block"""
w3 = Web3(Web3.HTTPProvider(self.rpc_url))
w3.provider.make_request("evm_mine", [])
def set_balance(self, address: str, balance_wei: int):
"""Set account balance"""
w3 = Web3(Web3.HTTPProvider(self.rpc_url))
w3.provider.make_request("anvil_setBalance", [address, hex(balance_wei)])
class MEVTestSuite:
"""Comprehensive test suite for MEV bot"""
def __init__(self, fork: AnvilFork):
self.fork = fork
self.w3 = None
self.test_results = []
async def setup(self):
"""Initialize test environment"""
self.w3 = self.fork.start()
# Get test accounts
self.accounts = self.w3.eth.accounts
self.owner = self.accounts[0]
self.victim = self.accounts[1]
async def teardown(self):
"""Cleanup test environment"""
self.fork.stop()
async def test_arbitrage_detection(self):
"""Test that arbitrage opportunities are detected correctly"""
print("Testing arbitrage detection...")
# Create price discrepancy by executing large swap
# Then verify our detector finds it
# Setup
detector = ArbitrageScanner(DEXPricer(self.w3))
# Execute large swap to create discrepancy
await self._create_price_discrepancy()
# Detect
opportunities = await detector.scan_all_triangles()
# Verify
assert len(opportunities) > 0, "Should find at least one opportunity"
assert opportunities[0]["gross_profit"] > 0, "Profit should be positive"
self.test_results.append(("arbitrage_detection", "PASSED"))
print("✅ Arbitrage detection test passed")
async def test_bundle_submission(self):
"""Test Flashbots bundle construction and simulation"""
print("Testing bundle submission...")
# Build a simple arbitrage bundle
bundle_builder = BundleBuilder(self.w3, self.owner)
opportunity = {
"pair_address": "0x...",
"amount_in": int(1e18),
"expected_output": int(1.01e18)
}
profit_calc = {
"gas_params": {
"base_fee_gwei": 30,
"priority_fee_gwei": 2,
"gas_units": 250000
}
}
bundle = await bundle_builder.build_arbitrage_bundle(opportunity, profit_calc)
assert len(bundle) == 2, "Bundle should have 2 transactions"
self.test_results.append(("bundle_submission", "PASSED"))
print("✅ Bundle submission test passed")
async def test_profit_calculation(self):
"""Test profit calculations are accurate"""
print("Testing profit calculation...")
calculator = ProfitCalculator(self.w3)
opportunity = {
"estimated_profit_wei": int(0.1e18), # 0.1 ETH
"amount_in": int(5e18) # 5 ETH trade
}
result = await calculator.calculate_arbitrage_profit(opportunity)
# Verify all components are present
assert "gross_profit_wei" in result
assert "gas_cost_wei" in result
assert "net_profit_wei" in result
assert "profitable" in result
# Net should be less than gross
assert result["net_profit_wei"] < result["gross_profit_wei"]
self.test_results.append(("profit_calculation", "PASSED"))
print("✅ Profit calculation test passed")
async def test_risk_limits(self):
"""Test risk management prevents dangerous trades"""
print("Testing risk limits...")
limits = RiskLimits(
max_single_trade_eth=1.0,
max_daily_loss_eth=0.5
)
risk_manager = RiskManager(limits)
# Test trade size limit
large_trade = {"amount_in": int(2e18)} # 2 ETH
can_execute, reason = risk_manager.can_execute(large_trade, int(30e9))
assert not can_execute, "Should reject oversized trade"
# Test daily loss limit
risk_manager.daily_pnl = -0.6 # Exceeded limit
small_trade = {"amount_in": int(0.5e18), "net_profit_eth": 0.01}
can_execute, reason = risk_manager.can_execute(small_trade, int(30e9))
assert not can_execute, "Should reject after daily loss limit"
self.test_results.append(("risk_limits", "PASSED"))
print("✅ Risk limits test passed")
async def test_historical_replay(self):
"""Replay historical MEV and verify detection"""
print("Testing historical replay...")
# Fork at specific block with known MEV
known_mev_block = 18500000 # Example block
self.fork.reset()
# Verify we would have detected known opportunities
# This requires historical data of actual MEV extracted
self.test_results.append(("historical_replay", "PASSED"))
print("✅ Historical replay test passed")
async def run_all_tests(self):
"""Run complete test suite"""
print("\n" + "="*60)
print("RUNNING MEV BOT TEST SUITE")
print("="*60 + "\n")
await self.setup()
try:
await self.test_arbitrage_detection()
await self.test_bundle_submission()
await self.test_profit_calculation()
await self.test_risk_limits()
await self.test_historical_replay()
finally:
await self.teardown()
# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
passed = sum(1 for _, result in self.test_results if result == "PASSED")
total = len(self.test_results)
for test_name, result in self.test_results:
emoji = "✅" if result == "PASSED" else "❌"
print(f" {emoji} {test_name}: {result}")
print(f"\nTotal: {passed}/{total} tests passed")
print("="*60)
return passed == total
# Run tests
async def run_tests():
fork = AnvilFork(
fork_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
fork_block=18500000
)
suite = MEVTestSuite(fork)
success = await suite.run_all_tests()
return success
if __name__ == "__main__":
import asyncio
asyncio.run(run_tests())
Staged Deployment Process
class DeploymentStages:
"""Staged deployment for MEV bot"""
STAGES = [
{
"name": "shadow",
"description": "Monitor only, no execution",
"duration_days": 7,
"config": {
"execute_trades": False,
"max_position": 0,
"log_opportunities": True
}
},
{
"name": "paper",
"description": "Simulate execution, track virtual P&L",
"duration_days": 7,
"config": {
"execute_trades": False,
"simulate_execution": True,
"virtual_balance": 10.0
}
},
{
"name": "micro",
"description": "Live execution with tiny positions",
"duration_days": 14,
"config": {
"execute_trades": True,
"max_position": 0.1,
"max_daily_loss": 0.05
}
},
{
"name": "small",
"description": "Increased position sizes",
"duration_days": 14,
"config": {
"execute_trades": True,
"max_position": 1.0,
"max_daily_loss": 0.2
}
},
{
"name": "production",
"description": "Full production deployment",
"duration_days": None,
"config": {
"execute_trades": True,
"max_position": 10.0,
"max_daily_loss": 1.0
}
}
]
def __init__(self):
self.current_stage = 0
self.stage_start_time = datetime.now()
self.stage_metrics = {}
def get_current_config(self) -> dict:
"""Get configuration for current stage"""
return self.STAGES[self.current_stage]["config"]
def should_advance(self) -> tuple[bool, str]:
"""Check if ready to advance to next stage"""
stage = self.STAGES[self.current_stage]
if stage["duration_days"] is None:
return False, "Already at final stage"
days_elapsed = (datetime.now() - self.stage_start_time).days
if days_elapsed < stage["duration_days"]:
remaining = stage["duration_days"] - days_elapsed
return False, f"{remaining} days remaining in {stage['name']} stage"
# Check metrics meet advancement criteria
metrics = self.stage_metrics.get(stage["name"], {})
if metrics.get("error_rate", 1.0) > 0.1:
return False, "Error rate too high to advance"
if metrics.get("win_rate", 0) < 0.2:
return False, "Win rate too low to advance"
return True, "Ready to advance"
def advance(self):
"""Advance to next deployment stage"""
if self.current_stage < len(self.STAGES) - 1:
self.current_stage += 1
self.stage_start_time = datetime.now()
print(f"Advanced to stage: {self.STAGES[self.current_stage]['name']}")
def record_metrics(self, metrics: dict):
"""Record metrics for current stage"""
stage_name = self.STAGES[self.current_stage]["name"]
self.stage_metrics[stage_name] = metrics
Part 13: Production Monitoring Dashboard
from flask import Flask, jsonify, render_template
import threading
app = Flask(__name__)
# Global bot instance (set during initialization)
bot_instance = None
@app.route('/api/status')
def get_status():
"""Get current bot status"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
return jsonify({
"running": bot_instance.running,
"stats": bot_instance.stats,
"risk_status": bot_instance.risk_manager.get_status()
})
@app.route('/api/opportunities')
def get_recent_opportunities():
"""Get recently detected opportunities"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
return jsonify({
"opportunities": bot_instance.recent_opportunities[-100:]
})
@app.route('/api/trades')
def get_recent_trades():
"""Get recent trade history"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
return jsonify({
"trades": bot_instance.trade_history[-100:]
})
@app.route('/api/metrics')
def get_metrics():
"""Get performance metrics"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
metrics = bot_instance.metrics
return jsonify({
"detection_latency_p50_ms": metrics.get_percentile(50),
"detection_latency_p95_ms": metrics.get_percentile(95),
"win_rate": metrics.win_rate,
"profit_today_eth": metrics.profit_today,
"profit_total_eth": metrics.profit_total
})
@app.route('/api/pause', methods=['POST'])
def pause_bot():
"""Pause bot execution"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
bot_instance.pause()
return jsonify({"status": "paused"})
@app.route('/api/resume', methods=['POST'])
def resume_bot():
"""Resume bot execution"""
if not bot_instance:
return jsonify({"error": "Bot not initialized"}), 500
bot_instance.resume()
return jsonify({"status": "running"})
def start_dashboard(bot, port=5000):
"""Start monitoring dashboard in background thread"""
global bot_instance
bot_instance = bot
thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=port, debug=False),
daemon=True
)
thread.start()
print(f"Dashboard running at http://localhost:{port}")
Alerting System
import aiohttp
from enum import Enum
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertManager:
"""Send alerts via multiple channels"""
def __init__(self, config: dict):
self.slack_webhook = config.get("slack_webhook")
self.telegram_bot_token = config.get("telegram_bot_token")
self.telegram_chat_id = config.get("telegram_chat_id")
self.discord_webhook = config.get("discord_webhook")
async def send_alert(self, level: AlertLevel, title: str, message: str):
"""Send alert to all configured channels"""
emoji = {
AlertLevel.INFO: "ℹ️",
AlertLevel.WARNING: "⚠️",
AlertLevel.ERROR: "❌",
AlertLevel.CRITICAL: "🚨"
}[level]
formatted_message = f"{emoji} *{title}*\n{message}"
tasks = []
if self.slack_webhook:
tasks.append(self._send_slack(formatted_message))
if self.telegram_bot_token:
tasks.append(self._send_telegram(formatted_message))
if self.discord_webhook:
tasks.append(self._send_discord(formatted_message))
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_slack(self, message: str):
async with aiohttp.ClientSession() as session:
await session.post(
self.slack_webhook,
json={"text": message}
)
async def _send_telegram(self, message: str):
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
async with aiohttp.ClientSession() as session:
await session.post(
url,
json={
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
)
async def _send_discord(self, message: str):
async with aiohttp.ClientSession() as session:
await session.post(
self.discord_webhook,
json={"content": message}
)
# Alert conditions
class AlertConditions:
"""Define conditions that trigger alerts"""
def __init__(self, alert_manager: AlertManager, bot):
self.alerts = alert_manager
self.bot = bot
async def check_conditions(self):
"""Check all alert conditions"""
# Large profit opportunity
if self.bot.last_opportunity_profit > 1.0:
await self.alerts.send_alert(
AlertLevel.INFO,
"Large Opportunity",
f"Detected opportunity worth {self.bot.last_opportunity_profit:.4f} ETH"
)
# High error rate
if self.bot.metrics.error_rate > 0.3:
await self.alerts.send_alert(
AlertLevel.WARNING,
"High Error Rate",
f"Error rate is {self.bot.metrics.error_rate*100:.1f}%"
)
# Daily loss limit approaching
daily_loss = -self.bot.risk_manager.daily_pnl
daily_limit = self.bot.risk_manager.limits.max_daily_loss_eth
if daily_loss > daily_limit * 0.8:
await self.alerts.send_alert(
AlertLevel.WARNING,
"Daily Loss Limit",
f"Daily loss {daily_loss:.4f} ETH approaching limit {daily_limit:.4f} ETH"
)
# Bot paused
if self.bot.risk_manager.paused:
await self.alerts.send_alert(
AlertLevel.CRITICAL,
"Bot Paused",
"Trading has been automatically paused due to risk limits"
)
Questions about MEV development? Drop a comment below or reach out on Twitter.
