Building Your First MEV Bot: A Developer's Deep Dive into Mempool Analysis

Written by amanvaths | Published 2026/01/22
Tech Story Tags: web3 | mev-bot | mempool-analysis | websocket-connection | transaction-intent | function-selector | database-mapping | arbitrage-detection

TLDRMEV bots make thousands of transactions daily, but how does that actually work? The answer lies in one overlooked piece of infrastructure: the mempool. I spent months building MEV systems from scratch, hitting every possible roadblock along the way. This guide shares what actually works, skipping the theory and focusing on implementation details. via the TL;DR App

Most blockchain developers hear about MEV bots making thousands daily and wonder — how does that actually work? The answer lies in one overlooked piece of infrastructure: the mempool.

I spent months building MEV systems from scratch, hitting every possible roadblock along the way. This guide shares what actually works, skipping the theory and focusing on implementation details that tutorials rarely cover.

By the end, you'll have working code for monitoring pending transactions, identifying profitable opportunities, and submitting bundles through Flashbots — the complete pipeline from detection to extraction.


What We're Building

Here's the architecture we'll implement:

┌────────────────────────────────────────────────────────────────┐
│                     MEV Bot Architecture                        │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│   [Ethereum Node]                                               │
│        │                                                        │
│        ▼                                                        │
│   [WebSocket Stream] ──► [Transaction Parser]                   │
│                                │                                │
│                                ▼                                │
│                    [Opportunity Identifier]                     │
│                                │                                │
│                    ┌───────────┼───────────┐                    │
│                    ▼           ▼           ▼                    │
│              [Arbitrage]  [Liquidation]  [Backrun]              │
│                    │           │           │                    │
│                    └───────────┼───────────┘                    │
│                                ▼                                │
│                    [Profit Calculator]                          │
│                                │                                │
│                                ▼                                │
│                    [Bundle Constructor]                         │
│                                │                                │
│                                ▼                                │
│                    [Flashbots Submission]                       │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

Let's build each component.


Part 1: Connecting to the Mempool

The mempool isn't a single database — it's a collection of unconfirmed transactions sitting in memory across thousands of nodes. Each node has slightly different contents based on network propagation timing.

Running Your Own Node

Public RPCs won't cut it for MEV. They filter mempool data, add latency, and rate-limit aggressively. You need your own node.

Geth Setup (Recommended):

# Install Geth
sudo add-apt-repository -y ppa:ethereum/ethereum
sudo apt-get update
sudo apt-get install ethereum

# Create data directory
mkdir -p /data/ethereum

# Start with MEV-optimized settings
geth \
  --datadir /data/ethereum \
  --http \
  --http.api eth,net,web3,txpool,debug \
  --http.addr 127.0.0.1 \
  --http.port 8545 \
  --ws \
  --ws.api eth,net,web3,txpool \
  --ws.addr 127.0.0.1 \
  --ws.port 8546 \
  --txpool.globalslots 100000 \
  --txpool.globalqueue 25000 \
  --txpool.accountslots 1024 \
  --txpool.accountqueue 512 \
  --maxpeers 150 \
  --cache 16384 \
  --syncmode snap

Why These Settings Matter:

Parameter

Value

Purpose

txpool.globalslots

100000

Maximum pending transactions to track

txpool.globalqueue

25000

Queued transactions awaiting nonce

maxpeers

150

More peers = faster transaction propagation

cache

16384

RAM cache for state access speed

Establishing WebSocket Connection

Once your node syncs, connect via WebSocket for real-time transaction streaming:

import asyncio
import json
import websockets
from dataclasses import dataclass
from typing import Optional, Callable, List
import aiohttp

@dataclass
class PendingTransaction:
    hash: str
    sender: str
    recipient: Optional[str]
    value: int
    input_data: str
    gas_limit: int
    gas_price: Optional[int]
    max_fee: Optional[int]
    max_priority_fee: Optional[int]
    nonce: int
    
    @property
    def effective_gas_price(self) -> int:
        return self.gas_price or self.max_fee or 0


class MempoolStream:
    """Real-time mempool transaction stream"""
    
    def __init__(self, ws_endpoint: str, http_endpoint: str):
        self.ws_endpoint = ws_endpoint
        self.http_endpoint = http_endpoint
        self.handlers: List[Callable] = []
        self.running = False
        
    def on_transaction(self, handler: Callable):
        """Register handler for new transactions"""
        self.handlers.append(handler)
        return handler
    
    async def start(self):
        """Begin streaming pending transactions"""
        self.running = True
        
        while self.running:
            try:
                async with websockets.connect(
                    self.ws_endpoint,
                    ping_interval=30,
                    ping_timeout=10
                ) as ws:
                    # Subscribe to pending transactions
                    await ws.send(json.dumps({
                        "jsonrpc": "2.0",
                        "id": 1,
                        "method": "eth_subscribe",
                        "params": ["newPendingTransactions"]
                    }))
                    
                    response = await ws.recv()
                    sub_id = json.loads(response).get("result")
                    print(f"✓ Subscribed to mempool (ID: {sub_id})")
                    
                    async for message in ws:
                        await self._handle_message(message)
                        
            except websockets.ConnectionClosed:
                print("Connection lost, reconnecting...")
                await asyncio.sleep(1)
            except Exception as e:
                print(f"Error: {e}, retrying...")
                await asyncio.sleep(5)
    
    async def _handle_message(self, message: str):
        """Process incoming WebSocket message"""
        data = json.loads(message)
        
        if "params" not in data:
            return
            
        tx_hash = data["params"]["result"]
        
        # Fetch full transaction details
        tx = await self._fetch_transaction(tx_hash)
        
        if tx:
            # Notify all handlers
            for handler in self.handlers:
                try:
                    await handler(tx)
                except Exception as e:
                    print(f"Handler error: {e}")
    
    async def _fetch_transaction(self, tx_hash: str) -> Optional[PendingTransaction]:
        """Retrieve full transaction data"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.http_endpoint,
                json={
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "eth_getTransactionByHash",
                    "params": [tx_hash]
                }
            ) as resp:
                result = await resp.json()
                tx_data = result.get("result")
                
                if not tx_data:
                    return None
                
                return PendingTransaction(
                    hash=tx_hash,
                    sender=tx_data["from"],
                    recipient=tx_data.get("to"),
                    value=int(tx_data["value"], 16),
                    input_data=tx_data["input"],
                    gas_limit=int(tx_data["gas"], 16),
                    gas_price=int(tx_data["gasPrice"], 16) if tx_data.get("gasPrice") else None,
                    max_fee=int(tx_data["maxFeePerGas"], 16) if tx_data.get("maxFeePerGas") else None,
                    max_priority_fee=int(tx_data["maxPriorityFeePerGas"], 16) if tx_data.get("maxPriorityFeePerGas") else None,
                    nonce=int(tx_data["nonce"], 16)
                )
    
    def stop(self):
        self.running = False


# Usage example
async def main():
    stream = MempoolStream(
        ws_endpoint="ws://localhost:8546",
        http_endpoint="http://localhost:8545"
    )
    
    @stream.on_transaction
    async def log_transaction(tx: PendingTransaction):
        print(f"New TX: {tx.hash[:16]}... | To: {tx.recipient} | Gas: {tx.effective_gas_price / 1e9:.2f} gwei")
    
    await stream.start()

if __name__ == "__main__":
    asyncio.run(main())

Part 2: Decoding Transaction Intent

Raw transaction data is just hex bytes. To find MEV opportunities, we need to understand what each transaction is trying to do.

Function Selector Database

Every smart contract call begins with a 4-byte function selector:

0x38ed1739...
  ^^^^^^^^
  Function selector = keccak256("swapExactTokensForTokens(uint256,uint256,address[],address,uint256)")[:8]

Build a database mapping selectors to function signatures:

from eth_abi import decode
from typing import Dict, Any, Optional
import hashlib

class FunctionRegistry:
    """Maps function selectors to decoders"""
    
    def __init__(self):
        self.functions: Dict[str, dict] = {}
        self._register_common_functions()
    
    def _register_common_functions(self):
        """Pre-register known DeFi functions"""
        
        # Uniswap V2 Router
        self.register(
            "swapExactTokensForTokens(uint256,uint256,address[],address,uint256)",
            ["uint256", "uint256", "address[]", "address", "uint256"],
            ["amountIn", "amountOutMin", "path", "to", "deadline"],
            protocol="uniswap_v2",
            action="swap"
        )
        
        self.register(
            "swapTokensForExactTokens(uint256,uint256,address[],address,uint256)",
            ["uint256", "uint256", "address[]", "address", "uint256"],
            ["amountOut", "amountInMax", "path", "to", "deadline"],
            protocol="uniswap_v2",
            action="swap"
        )
        
        self.register(
            "swapExactETHForTokens(uint256,address[],address,uint256)",
            ["uint256", "address[]", "address", "uint256"],
            ["amountOutMin", "path", "to", "deadline"],
            protocol="uniswap_v2",
            action="swap"
        )
        
        self.register(
            "swapExactTokensForETH(uint256,uint256,address[],address,uint256)",
            ["uint256", "uint256", "address[]", "address", "uint256"],
            ["amountIn", "amountOutMin", "path", "to", "deadline"],
            protocol="uniswap_v2",
            action="swap"
        )
        
        # Uniswap V3 Router
        self.register(
            "exactInputSingle((address,address,uint24,address,uint256,uint256,uint256,uint160))",
            ["(address,address,uint24,address,uint256,uint256,uint256,uint160)"],
            ["params"],
            protocol="uniswap_v3",
            action="swap"
        )
        
        self.register(
            "exactInput((bytes,address,uint256,uint256,uint256))",
            ["(bytes,address,uint256,uint256,uint256)"],
            ["params"],
            protocol="uniswap_v3",
            action="swap"
        )
        
        # Aave V3
        self.register(
            "liquidationCall(address,address,address,uint256,bool)",
            ["address", "address", "address", "uint256", "bool"],
            ["collateralAsset", "debtAsset", "user", "debtToCover", "receiveAToken"],
            protocol="aave_v3",
            action="liquidation"
        )
        
        self.register(
            "supply(address,uint256,address,uint16)",
            ["address", "uint256", "address", "uint16"],
            ["asset", "amount", "onBehalfOf", "referralCode"],
            protocol="aave_v3",
            action="supply"
        )
        
        self.register(
            "borrow(address,uint256,uint256,uint16,address)",
            ["address", "uint256", "uint256", "uint16", "address"],
            ["asset", "amount", "interestRateMode", "referralCode", "onBehalfOf"],
            protocol="aave_v3",
            action="borrow"
        )
        
        # ERC20
        self.register(
            "transfer(address,uint256)",
            ["address", "uint256"],
            ["to", "amount"],
            protocol="erc20",
            action="transfer"
        )
        
        self.register(
            "approve(address,uint256)",
            ["address", "uint256"],
            ["spender", "amount"],
            protocol="erc20",
            action="approve"
        )
    
    def register(self, signature: str, types: list, names: list, protocol: str, action: str):
        """Register a function signature"""
        selector = "0x" + hashlib.sha3_256(signature.encode()).hexdigest()[:8]
        # Actually use keccak256
        from eth_utils import keccak
        selector = "0x" + keccak(text=signature).hex()[:8]
        
        self.functions[selector] = {
            "signature": signature,
            "types": types,
            "names": names,
            "protocol": protocol,
            "action": action
        }
    
    def decode(self, input_data: str) -> Optional[Dict[str, Any]]:
        """Decode transaction input data"""
        if len(input_data) < 10:
            return None
        
        selector = input_data[:10].lower()
        
        if selector not in self.functions:
            return None
        
        func = self.functions[selector]
        
        try:
            # Decode parameters
            params_hex = input_data[10:]
            params_bytes = bytes.fromhex(params_hex)
            
            decoded_values = decode(func["types"], params_bytes)
            
            # Map to named parameters
            params = {}
            for name, value in zip(func["names"], decoded_values):
                params[name] = value
            
            return {
                "selector": selector,
                "signature": func["signature"],
                "protocol": func["protocol"],
                "action": func["action"],
                "params": params
            }
            
        except Exception as e:
            return None


class TransactionAnalyzer:
    """Analyzes decoded transactions for MEV potential"""
    
    def __init__(self):
        self.registry = FunctionRegistry()
        
        # Known router addresses
        self.routers = {
            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "uniswap_v2",
            "0xe592427a0aece92de3edee1f18e0157c05861564": "uniswap_v3",
            "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "uniswap_v3_router2",
            "0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "sushiswap",
        }
    
    def analyze(self, tx: PendingTransaction) -> Optional[Dict[str, Any]]:
        """Analyze transaction for MEV opportunities"""
        
        # Skip contract deployments
        if tx.recipient is None:
            return None
        
        # Decode the function call
        decoded = self.registry.decode(tx.input_data)
        
        if not decoded:
            return None
        
        # Identify the protocol from recipient address
        recipient_lower = tx.recipient.lower()
        router_protocol = self.routers.get(recipient_lower)
        
        return {
            "tx_hash": tx.hash,
            "sender": tx.sender,
            "recipient": tx.recipient,
            "value_wei": tx.value,
            "gas_price": tx.effective_gas_price,
            "decoded": decoded,
            "router_protocol": router_protocol,
            "is_swap": decoded["action"] == "swap",
            "is_liquidation": decoded["action"] == "liquidation"
        }

Part 3: Identifying Profitable Opportunities

Now we can see what transactions are doing. Next step: figure out which ones create MEV opportunities.

Arbitrage Detection Engine

When someone swaps on Uniswap, they move the price. If that price differs from other DEXes, arbitrage exists.

from decimal import Decimal
from typing import List, Tuple
import asyncio

class DEXPricer:
    """Fetches prices from various DEXes"""
    
    def __init__(self, web3_client):
        self.w3 = web3_client
        
        # Uniswap V2 pair ABI (minimal)
        self.pair_abi = [
            {
                "name": "getReserves",
                "type": "function",
                "inputs": [],
                "outputs": [
                    {"name": "reserve0", "type": "uint112"},
                    {"name": "reserve1", "type": "uint112"},
                    {"name": "blockTimestampLast", "type": "uint32"}
                ]
            },
            {
                "name": "token0",
                "type": "function",
                "inputs": [],
                "outputs": [{"type": "address"}]
            },
            {
                "name": "token1",
                "type": "function",
                "inputs": [],
                "outputs": [{"type": "address"}]
            }
        ]
    
    async def get_reserves(self, pair_address: str) -> Tuple[int, int]:
        """Get current reserves from Uniswap V2 pair"""
        pair = self.w3.eth.contract(
            address=self.w3.to_checksum_address(pair_address),
            abi=self.pair_abi
        )
        
        reserves = pair.functions.getReserves().call()
        return reserves[0], reserves[1]
    
    def calculate_output(self, amount_in: int, reserve_in: int, reserve_out: int, fee: Decimal = Decimal("0.003")) -> int:
        """Calculate swap output using constant product formula"""
        amount_in_with_fee = Decimal(amount_in) * (1 - fee)
        
        numerator = amount_in_with_fee * Decimal(reserve_out)
        denominator = Decimal(reserve_in) + amount_in_with_fee
        
        return int(numerator / denominator)
    
    def calculate_price_impact(self, amount_in: int, reserve_in: int, reserve_out: int) -> Decimal:
        """Calculate price impact of a swap"""
        spot_price = Decimal(reserve_out) / Decimal(reserve_in)
        
        output = self.calculate_output(amount_in, reserve_in, reserve_out)
        execution_price = Decimal(output) / Decimal(amount_in)
        
        impact = (spot_price - execution_price) / spot_price
        return impact


class ArbitrageScanner:
    """Scans for cross-DEX arbitrage opportunities"""
    
    def __init__(self, pricer: DEXPricer):
        self.pricer = pricer
        
        # Token pair addresses across DEXes
        # Example: WETH/USDC pairs
        self.pair_registry = {
            ("WETH", "USDC"): {
                "uniswap_v2": "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc",
                "sushiswap": "0x397FF1542f962076d0BFE58eA045FfA2d347ACa0",
            },
            ("WETH", "USDT"): {
                "uniswap_v2": "0x0d4a11d5EEaaC28EC3F61d100daF4d40471f1852",
                "sushiswap": "0x06da0fd433C1A5d7a4faa01111c044910A184553",
            }
        }
    
    async def find_arbitrage(self, analyzed_tx: dict) -> Optional[dict]:
        """Check if a pending swap creates arbitrage"""
        
        if not analyzed_tx.get("is_swap"):
            return None
        
        decoded = analyzed_tx["decoded"]
        params = decoded["params"]
        
        # Extract swap path
        path = params.get("path", [])
        if len(path) < 2:
            return None
        
        token_in = path[0]
        token_out = path[-1]
        amount_in = params.get("amountIn", 0)
        
        if amount_in == 0:
            return None
        
        # Find this pair on other DEXes
        opportunities = []
        
        source_dex = analyzed_tx.get("router_protocol", "unknown")
        
        # Simulate the pending swap's price impact
        # Then check if other DEXes have better prices
        
        for pair_key, dex_pairs in self.pair_registry.items():
            # Check if this trade involves this pair
            # Simplified check - production would need token address matching
            
            for dex_name, pair_address in dex_pairs.items():
                if dex_name == source_dex:
                    continue
                
                try:
                    r0, r1 = await self.pricer.get_reserves(pair_address)
                    
                    # Calculate potential arbitrage profit
                    # Buy on DEX with lower price, sell on DEX with higher price
                    
                    output = self.pricer.calculate_output(amount_in, r0, r1)
                    
                    # Estimate gas cost
                    estimated_gas = 150000  # Typical swap gas
                    gas_cost = estimated_gas * analyzed_tx["gas_price"]
                    
                    potential_profit = output - amount_in - gas_cost
                    
                    if potential_profit > 0:
                        opportunities.append({
                            "type": "arbitrage",
                            "source_dex": source_dex,
                            "target_dex": dex_name,
                            "pair_address": pair_address,
                            "amount_in": amount_in,
                            "expected_output": output,
                            "estimated_profit_wei": potential_profit,
                            "estimated_profit_eth": potential_profit / 1e18,
                            "trigger_tx": analyzed_tx["tx_hash"]
                        })
                        
                except Exception as e:
                    continue
        
        if opportunities:
            # Return the most profitable opportunity
            return max(opportunities, key=lambda x: x["estimated_profit_wei"])
        
        return None


class LiquidationScanner:
    """Monitors lending protocols for liquidation opportunities"""
    
    def __init__(self, web3_client):
        self.w3 = web3_client
        
        # Aave V3 Pool address (Ethereum mainnet)
        self.aave_pool = "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2"
        
        # Simplified ABI for getUserAccountData
        self.pool_abi = [
            {
                "name": "getUserAccountData",
                "type": "function",
                "inputs": [{"name": "user", "type": "address"}],
                "outputs": [
                    {"name": "totalCollateralBase", "type": "uint256"},
                    {"name": "totalDebtBase", "type": "uint256"},
                    {"name": "availableBorrowsBase", "type": "uint256"},
                    {"name": "currentLiquidationThreshold", "type": "uint256"},
                    {"name": "ltv", "type": "uint256"},
                    {"name": "healthFactor", "type": "uint256"}
                ]
            }
        ]
        
        self.pool_contract = self.w3.eth.contract(
            address=self.w3.to_checksum_address(self.aave_pool),
            abi=self.pool_abi
        )
    
    async def check_liquidation_opportunity(self, analyzed_tx: dict) -> Optional[dict]:
        """Check if transaction affects a liquidatable position"""
        
        decoded = analyzed_tx.get("decoded", {})
        
        # Check if this is a borrow or repay that might affect health factor
        action = decoded.get("action")
        
        if action not in ["borrow", "supply", "liquidation"]:
            return None
        
        # Get the user's current position
        user = analyzed_tx.get("sender")
        
        try:
            account_data = self.pool_contract.functions.getUserAccountData(
                self.w3.to_checksum_address(user)
            ).call()
            
            health_factor = account_data[5] / 1e18
            total_debt = account_data[1]
            total_collateral = account_data[0]
            
            # Check if position is liquidatable or close to it
            if health_factor < 1.0:
                # Already liquidatable!
                liquidation_bonus = 0.05  # 5% typical bonus
                max_liquidation = total_debt // 2  # Can liquidate up to 50%
                
                potential_profit = int(max_liquidation * liquidation_bonus)
                
                return {
                    "type": "liquidation",
                    "user": user,
                    "health_factor": health_factor,
                    "total_debt": total_debt,
                    "total_collateral": total_collateral,
                    "max_liquidation_amount": max_liquidation,
                    "estimated_profit_wei": potential_profit,
                    "estimated_profit_eth": potential_profit / 1e18,
                    "urgency": "immediate"
                }
            
            elif health_factor < 1.05:
                # At risk - monitor closely
                return {
                    "type": "liquidation_watch",
                    "user": user,
                    "health_factor": health_factor,
                    "price_drop_to_liquidate": f"{(health_factor - 1.0) * 100:.2f}%",
                    "urgency": "monitor"
                }
                
        except Exception as e:
            pass
        
        return None

Part 4: Calculating Real Profitability

Finding opportunities is useless without accurate profit calculation. Gas costs, slippage, and competition eat into margins.

Comprehensive Profit Calculator

from dataclasses import dataclass
from typing import Optional
from decimal import Decimal

@dataclass
class GasEstimate:
    base_fee: int
    priority_fee: int
    gas_units: int
    
    @property
    def total_cost(self) -> int:
        return (self.base_fee + self.priority_fee) * self.gas_units
    
    @property
    def total_cost_eth(self) -> Decimal:
        return Decimal(self.total_cost) / Decimal(10**18)


class ProfitCalculator:
    """Calculates net profit for MEV opportunities"""
    
    def __init__(self, web3_client):
        self.w3 = web3_client
    
    async def get_current_gas_params(self) -> GasEstimate:
        """Fetch current gas parameters from network"""
        latest_block = self.w3.eth.get_block("latest")
        base_fee = latest_block["baseFeePerGas"]
        
        # Get priority fee from recent blocks
        fee_history = self.w3.eth.fee_history(5, "latest", [50])
        avg_priority = sum(fee_history["reward"][i][0] for i in range(5)) // 5
        
        return GasEstimate(
            base_fee=base_fee,
            priority_fee=avg_priority,
            gas_units=0  # Will be set per opportunity
        )
    
    async def calculate_arbitrage_profit(self, opportunity: dict) -> dict:
        """Calculate net profit for arbitrage opportunity"""
        
        gas_params = await self.get_current_gas_params()
        
        # Arbitrage typically needs ~250k gas for 2 swaps
        gas_params.gas_units = 250000
        
        gross_profit = opportunity["estimated_profit_wei"]
        gas_cost = gas_params.total_cost
        
        # Builder tip (typically 90% of profit goes to builder)
        builder_tip_rate = Decimal("0.90")
        builder_tip = int(Decimal(gross_profit - gas_cost) * builder_tip_rate)
        
        net_profit = gross_profit - gas_cost - builder_tip
        
        return {
            "gross_profit_wei": gross_profit,
            "gas_cost_wei": gas_cost,
            "builder_tip_wei": builder_tip,
            "net_profit_wei": net_profit,
            "net_profit_eth": Decimal(net_profit) / Decimal(10**18),
            "profitable": net_profit > 0,
            "roi_percent": (Decimal(net_profit) / Decimal(opportunity["amount_in"]) * 100) if opportunity["amount_in"] > 0 else Decimal(0),
            "gas_params": {
                "base_fee_gwei": gas_params.base_fee / 1e9,
                "priority_fee_gwei": gas_params.priority_fee / 1e9,
                "gas_units": gas_params.gas_units
            }
        }
    
    async def calculate_liquidation_profit(self, opportunity: dict) -> dict:
        """Calculate net profit for liquidation opportunity"""
        
        gas_params = await self.get_current_gas_params()
        
        # Liquidation with flash loan needs ~400k gas
        gas_params.gas_units = 400000
        
        # Flash loan fee (0.09% for Aave)
        flash_loan_amount = opportunity["max_liquidation_amount"]
        flash_loan_fee = int(flash_loan_amount * 0.0009)
        
        gross_profit = opportunity["estimated_profit_wei"]
        gas_cost = gas_params.total_cost
        
        # Slippage buffer for collateral swap (1%)
        slippage_cost = int(gross_profit * 0.01)
        
        net_profit = gross_profit - gas_cost - flash_loan_fee - slippage_cost
        
        return {
            "gross_profit_wei": gross_profit,
            "gas_cost_wei": gas_cost,
            "flash_loan_fee_wei": flash_loan_fee,
            "slippage_buffer_wei": slippage_cost,
            "net_profit_wei": net_profit,
            "net_profit_eth": Decimal(net_profit) / Decimal(10**18),
            "profitable": net_profit > 0
        }
    
    def minimum_profitable_amount(self, gas_price_gwei: float, gas_units: int = 250000) -> int:
        """Calculate minimum trade size for profitability"""
        
        gas_cost = int(gas_price_gwei * 1e9 * gas_units)
        
        # Assume 0.3% spread capture, 90% to builder
        # Net = gross * 0.003 * 0.10 - gas_cost > 0
        # gross > gas_cost / 0.0003
        
        min_amount = gas_cost / 0.0003
        
        return int(min_amount)

Part 5: Constructing and Submitting Bundles

Flashbots lets us submit transactions privately without exposing them to the public mempool.

Flashbots Bundle Submission

from eth_account import Account
from eth_account.signers.local import LocalAccount
import requests
import json
from typing import List, Dict, Any

class FlashbotsClient:
    """Client for Flashbots bundle submission"""
    
    RELAY_URL = "https://relay.flashbots.net"
    
    def __init__(self, signer: LocalAccount, auth_signer: LocalAccount):
        self.signer = signer
        self.auth_signer = auth_signer
    
    def _sign_message(self, message: str) -> str:
        """Sign message with auth key for Flashbots authentication"""
        from eth_account.messages import encode_defunct
        
        message_hash = encode_defunct(text=message)
        signed = self.auth_signer.sign_message(message_hash)
        
        return f"{self.auth_signer.address}:{signed.signature.hex()}"
    
    async def simulate_bundle(self, signed_txs: List[str], block_number: int) -> Dict[str, Any]:
        """Simulate bundle execution"""
        
        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_callBundle",
            "params": [{
                "txs": signed_txs,
                "blockNumber": hex(block_number),
                "stateBlockNumber": "latest"
            }]
        }
        
        body = json.dumps(payload)
        signature = self._sign_message(body)
        
        response = requests.post(
            self.RELAY_URL,
            json=payload,
            headers={
                "Content-Type": "application/json",
                "X-Flashbots-Signature": signature
            }
        )
        
        return response.json()
    
    async def send_bundle(self, signed_txs: List[str], target_block: int) -> Dict[str, Any]:
        """Submit bundle to Flashbots relay"""
        
        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_sendBundle",
            "params": [{
                "txs": signed_txs,
                "blockNumber": hex(target_block),
            }]
        }
        
        body = json.dumps(payload)
        signature = self._sign_message(body)
        
        response = requests.post(
            self.RELAY_URL,
            json=payload,
            headers={
                "Content-Type": "application/json",
                "X-Flashbots-Signature": signature
            }
        )
        
        result = response.json()
        
        if "result" in result:
            return {
                "success": True,
                "bundle_hash": result["result"]["bundleHash"],
                "target_block": target_block
            }
        else:
            return {
                "success": False,
                "error": result.get("error", "Unknown error")
            }
    
    async def get_bundle_stats(self, bundle_hash: str, block_number: int) -> Dict[str, Any]:
        """Check if bundle was included"""
        
        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "flashbots_getBundleStats",
            "params": [{
                "bundleHash": bundle_hash,
                "blockNumber": hex(block_number)
            }]
        }
        
        body = json.dumps(payload)
        signature = self._sign_message(body)
        
        response = requests.post(
            self.RELAY_URL,
            json=payload,
            headers={
                "Content-Type": "application/json",
                "X-Flashbots-Signature": signature
            }
        )
        
        return response.json()


class BundleBuilder:
    """Constructs MEV bundles for submission"""
    
    def __init__(self, web3_client, signer: LocalAccount):
        self.w3 = web3_client
        self.signer = signer
    
    async def build_arbitrage_bundle(self, opportunity: dict, profit_calc: dict) -> List[str]:
        """Build arbitrage transaction bundle"""
        
        signed_txs = []
        
        # Build swap transaction on target DEX
        # This is simplified - real implementation needs actual contract calls
        
        nonce = self.w3.eth.get_transaction_count(self.signer.address)
        
        # Transaction 1: Buy on cheaper DEX
        buy_tx = {
            "to": opportunity["pair_address"],
            "value": opportunity["amount_in"],
            "gas": 150000,
            "maxFeePerGas": profit_calc["gas_params"]["base_fee_gwei"] * 10**9 + profit_calc["gas_params"]["priority_fee_gwei"] * 10**9,
            "maxPriorityFeePerGas": profit_calc["gas_params"]["priority_fee_gwei"] * 10**9,
            "nonce": nonce,
            "chainId": 1,
            "data": "0x"  # Actual swap calldata would go here
        }
        
        signed_buy = self.signer.sign_transaction(buy_tx)
        signed_txs.append(signed_buy.rawTransaction.hex())
        
        # Transaction 2: Sell on expensive DEX
        sell_tx = {
            "to": opportunity["pair_address"],  # Different DEX in reality
            "value": 0,
            "gas": 150000,
            "maxFeePerGas": buy_tx["maxFeePerGas"],
            "maxPriorityFeePerGas": buy_tx["maxPriorityFeePerGas"],
            "nonce": nonce + 1,
            "chainId": 1,
            "data": "0x"  # Actual swap calldata
        }
        
        signed_sell = self.signer.sign_transaction(sell_tx)
        signed_txs.append(signed_sell.rawTransaction.hex())
        
        return signed_txs
    
    async def build_backrun_bundle(self, trigger_tx_raw: str, backrun_tx: dict) -> List[str]:
        """Build bundle that backruns a target transaction"""
        
        signed_txs = []
        
        # Include the trigger transaction first
        signed_txs.append(trigger_tx_raw)
        
        # Then our backrun transaction
        signed_backrun = self.signer.sign_transaction(backrun_tx)
        signed_txs.append(signed_backrun.rawTransaction.hex())
        
        return signed_txs

Part 6: Putting It All Together

Here's the complete MEV bot pipeline:

import asyncio
from typing import Optional

class MEVBot:
    """Complete MEV extraction pipeline"""
    
    def __init__(self, config: dict):
        self.config = config
        
        # Initialize components
        self.stream = MempoolStream(
            ws_endpoint=config["ws_endpoint"],
            http_endpoint=config["http_endpoint"]
        )
        
        self.analyzer = TransactionAnalyzer()
        self.pricer = DEXPricer(config["web3_client"])
        self.arb_scanner = ArbitrageScanner(self.pricer)
        self.liq_scanner = LiquidationScanner(config["web3_client"])
        self.profit_calc = ProfitCalculator(config["web3_client"])
        
        self.flashbots = FlashbotsClient(
            signer=config["signer"],
            auth_signer=config["auth_signer"]
        )
        
        self.bundle_builder = BundleBuilder(
            web3_client=config["web3_client"],
            signer=config["signer"]
        )
        
        # Stats
        self.stats = {
            "txs_analyzed": 0,
            "opportunities_found": 0,
            "bundles_submitted": 0,
            "bundles_included": 0,
            "total_profit_eth": 0
        }
    
    async def start(self):
        """Start the MEV bot"""
        print("=" * 60)
        print("MEV Bot Starting...")
        print("=" * 60)
        
        @self.stream.on_transaction
        async def process_transaction(tx: PendingTransaction):
            await self._process_transaction(tx)
        
        await self.stream.start()
    
    async def _process_transaction(self, tx: PendingTransaction):
        """Process a single pending transaction"""
        self.stats["txs_analyzed"] += 1
        
        # Step 1: Analyze transaction
        analyzed = self.analyzer.analyze(tx)
        
        if not analyzed:
            return
        
        # Step 2: Scan for opportunities
        opportunity = None
        
        # Check arbitrage
        arb_opp = await self.arb_scanner.find_arbitrage(analyzed)
        if arb_opp:
            opportunity = arb_opp
        
        # Check liquidations
        liq_opp = await self.liq_scanner.check_liquidation_opportunity(analyzed)
        if liq_opp and liq_opp.get("type") == "liquidation":
            # Prefer immediate liquidations over arbitrage
            opportunity = liq_opp
        
        if not opportunity:
            return
        
        self.stats["opportunities_found"] += 1
        print(f"\n{'='*60}")
        print(f"🎯 OPPORTUNITY FOUND: {opportunity['type']}")
        print(f"{'='*60}")
        
        # Step 3: Calculate profitability
        if opportunity["type"] == "arbitrage":
            profit = await self.profit_calc.calculate_arbitrage_profit(opportunity)
        else:
            profit = await self.profit_calc.calculate_liquidation_profit(opportunity)
        
        print(f"Gross Profit: {profit['gross_profit_wei'] / 1e18:.6f} ETH")
        print(f"Gas Cost: {profit['gas_cost_wei'] / 1e18:.6f} ETH")
        print(f"Net Profit: {profit['net_profit_wei'] / 1e18:.6f} ETH")
        print(f"Profitable: {profit['profitable']}")
        
        if not profit["profitable"]:
            print("❌ Not profitable after costs")
            return
        
        # Step 4: Build and submit bundle
        print("\n📦 Building bundle...")
        
        if opportunity["type"] == "arbitrage":
            bundle = await self.bundle_builder.build_arbitrage_bundle(opportunity, profit)
        else:
            # Liquidation bundle building would go here
            return
        
        # Step 5: Simulate bundle
        current_block = self.config["web3_client"].eth.block_number
        target_block = current_block + 1
        
        print(f"🧪 Simulating for block {target_block}...")
        
        simulation = await self.flashbots.simulate_bundle(bundle, target_block)
        
        if "error" in simulation:
            print(f"❌ Simulation failed: {simulation['error']}")
            return
        
        print("✅ Simulation successful")
        
        # Step 6: Submit bundle
        print(f"🚀 Submitting bundle to Flashbots...")
        
        result = await self.flashbots.send_bundle(bundle, target_block)
        
        if result["success"]:
            self.stats["bundles_submitted"] += 1
            print(f"✅ Bundle submitted: {result['bundle_hash']}")
            
            # Wait and check inclusion
            await asyncio.sleep(15)  # Wait for block
            
            stats = await self.flashbots.get_bundle_stats(
                result["bundle_hash"],
                target_block
            )
            
            if stats.get("result", {}).get("isSimulated"):
                print("⏳ Bundle was considered but not included")
            else:
                self.stats["bundles_included"] += 1
                self.stats["total_profit_eth"] += float(profit["net_profit_eth"])
                print("✅ Bundle INCLUDED!")
        else:
            print(f"❌ Submission failed: {result['error']}")
    
    def print_stats(self):
        """Print bot statistics"""
        print("\n" + "=" * 60)
        print("BOT STATISTICS")
        print("=" * 60)
        print(f"Transactions Analyzed: {self.stats['txs_analyzed']}")
        print(f"Opportunities Found: {self.stats['opportunities_found']}")
        print(f"Bundles Submitted: {self.stats['bundles_submitted']}")
        print(f"Bundles Included: {self.stats['bundles_included']}")
        print(f"Total Profit: {self.stats['total_profit_eth']:.6f} ETH")
        print("=" * 60)


# Main entry point
async def run_bot():
    from web3 import Web3
    from eth_account import Account
    
    # Configuration
    config = {
        "ws_endpoint": "ws://localhost:8546",
        "http_endpoint": "http://localhost:8545",
        "web3_client": Web3(Web3.HTTPProvider("http://localhost:8545")),
        "signer": Account.from_key("YOUR_PRIVATE_KEY"),
        "auth_signer": Account.create(),  # Separate key for Flashbots auth
    }
    
    bot = MEVBot(config)
    
    try:
        await bot.start()
    except KeyboardInterrupt:
        bot.print_stats()


if __name__ == "__main__":
    asyncio.run(run_bot())

Performance Optimization Tips

After building the basics, here's how to compete with professional searchers:

1. Reduce Latency Everywhere

# Bad: Sequential operations
tx = await fetch_transaction(hash)
decoded = decode_transaction(tx)
opportunity = find_opportunity(decoded)

# Good: Parallel where possible
async def optimized_pipeline(tx_hash):
    # Start fetching while preparing decoder
    tx_future = asyncio.create_task(fetch_transaction(tx_hash))
    
    # Pre-warm caches
    prepare_decoder_cache()
    
    tx = await tx_future
    
    # Use pre-computed data
    decoded = decode_with_cache(tx)
    
    return find_opportunity(decoded)

2. Cache Aggressively

class ReserveCache:
    """Cache DEX reserves with block-level invalidation"""
    
    def __init__(self):
        self.cache = {}
        self.last_block = 0
    
    async def get_reserves(self, pair: str, current_block: int):
        # Invalidate on new block
        if current_block > self.last_block:
            self.cache.clear()
            self.last_block = current_block
        
        if pair not in self.cache:
            self.cache[pair] = await self._fetch_reserves(pair)
        
        return self.cache[pair]

3. Pre-Sign Transactions

class TransactionPool:
    """Pool of pre-signed transactions for common operations"""
    
    def __init__(self, signer, web3):
        self.signer = signer
        self.w3 = web3
        self.templates = {}
    
    def prepare_templates(self):
        """Pre-build transaction templates"""
        base_nonce = self.w3.eth.get_transaction_count(self.signer.address)
        
        # Prepare 10 nonces worth of templates
        for i in range(10):
            self.templates[base_nonce + i] = {
                "nonce": base_nonce + i,
                "chainId": 1,
                "gas": 200000,
                # Other fields filled at execution time
            }
    
    def get_ready_tx(self, nonce: int, to: str, data: str, value: int = 0):
        """Get a nearly-ready transaction"""
        template = self.templates.get(nonce, {}).copy()
        template.update({
            "to": to,
            "data": data,
            "value": value
        })
        return template

Common Pitfalls and Solutions

Pitfall 1: Stale State

Problem: Your simulation uses old state, execution fails.

# Solution: Always simulate against pending state
async def simulate_with_pending(tx, pending_txs):
    # Apply pending transactions first
    state_override = {}
    
    for pending in pending_txs:
        effects = simulate_tx_effects(pending)
        state_override.update(effects)
    
    # Now simulate our tx with modified state
    return simulate_tx(tx, state_override)

Pitfall 2: Gas Price Races

Problem: Competitors outbid your gas price.

# Solution: Use Flashbots - no gas price competition
# Tip the builder directly instead

def calculate_builder_tip(profit, urgency="normal"):
    tip_rates = {
        "low": 0.70,      # Keep 30%
        "normal": 0.85,   # Keep 15%
        "high": 0.95,     # Keep 5%
    }
    
    rate = tip_rates.get(urgency, 0.85)
    return int(profit * rate)

Pitfall 3: Reverted Bundles

Problem: Bundle reverts waste time and miss opportunities.

# Solution: Extensive pre-flight checks
async def validate_bundle(bundle):
    checks = [
        check_sufficient_balance(),
        check_nonce_sequence(),
        check_gas_limits(),
        check_contract_state(),
        simulate_full_execution(),
    ]
    
    results = await asyncio.gather(*checks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            return False, f"Check {i} failed: {result}"
    
    return True, "All checks passed"

Monitoring Your Bot

Track these metrics to optimize performance:

import time
from dataclasses import dataclass, field
from typing import List
from collections import deque

@dataclass
class BotMetrics:
    # Timing
    detection_latencies: deque = field(default_factory=lambda: deque(maxlen=1000))
    submission_latencies: deque = field(default_factory=lambda: deque(maxlen=1000))
    
    # Success rates
    opportunities_detected: int = 0
    bundles_submitted: int = 0
    bundles_included: int = 0
    bundles_failed: int = 0
    
    # Profit tracking
    total_gross_profit: float = 0
    total_gas_spent: float = 0
    total_tips_paid: float = 0
    
    def record_detection(self, start_time: float):
        latency = time.time() - start_time
        self.detection_latencies.append(latency)
    
    def record_submission(self, start_time: float):
        latency = time.time() - start_time
        self.submission_latencies.append(latency)
    
    @property
    def avg_detection_latency_ms(self) -> float:
        if not self.detection_latencies:
            return 0
        return sum(self.detection_latencies) / len(self.detection_latencies) * 1000
    
    @property
    def inclusion_rate(self) -> float:
        if self.bundles_submitted == 0:
            return 0
        return self.bundles_included / self.bundles_submitted
    
    @property
    def net_profit(self) -> float:
        return self.total_gross_profit - self.total_gas_spent - self.total_tips_paid
    
    def summary(self) -> str:
        return f"""
╔══════════════════════════════════════════════════════════════╗
║                     MEV BOT METRICS                          ║
╠══════════════════════════════════════════════════════════════╣
║  Detection Latency (avg):  {self.avg_detection_latency_ms:>8.2f} ms                   ║
║  Opportunities Detected:   {self.opportunities_detected:>8}                        ║
║  Bundles Submitted:        {self.bundles_submitted:>8}                        ║
║  Bundles Included:         {self.bundles_included:>8}                        ║
║  Inclusion Rate:           {self.inclusion_rate*100:>7.2f}%                        ║
╠══════════════════════════════════════════════════════════════╣
║  Gross Profit:             {self.total_gross_profit:>8.4f} ETH                    ║
║  Gas Spent:                {self.total_gas_spent:>8.4f} ETH                    ║
║  Tips Paid:                {self.total_tips_paid:>8.4f} ETH                    ║
║  Net Profit:               {self.net_profit:>8.4f} ETH                    ║
╚══════════════════════════════════════════════════════════════╝
        """

Where to Go From Here

This guide covered the fundamentals. To become competitive, explore:

Advanced Topics:

  • Multi-hop arbitrage paths
  • Cross-chain MEV opportunities
  • Custom smart contracts for gas optimization
  • Private transaction pools beyond Flashbots

Infrastructure Upgrades:

  • Co-located servers near validators
  • Multiple geographically distributed nodes
  • Custom P2P networking layers
  • Hardware-accelerated transaction parsing

Part 7: Advanced Arbitrage Strategies

Basic two-pool arbitrage is heavily competed. Let's explore more sophisticated approaches.

Multi-Hop Path Finding

Sometimes the best arbitrage spans 3+ pools:

WETH → USDC (Uniswap) → DAI (Curve) → WETH (Sushiswap)
from typing import List, Tuple, Set
from collections import defaultdict
import heapq

class PathFinder:
    """Find optimal arbitrage paths across multiple DEXes"""
    
    def __init__(self):
        # Graph: token -> [(connected_token, pool_address, dex)]
        self.graph = defaultdict(list)
        
    def add_pool(self, token_a: str, token_b: str, pool: str, dex: str):
        """Add a liquidity pool to the graph"""
        self.graph[token_a].append((token_b, pool, dex))
        self.graph[token_b].append((token_a, pool, dex))
    
    def find_cycles(self, start_token: str, max_hops: int = 4) -> List[List[Tuple]]:
        """Find all profitable cycles starting and ending at start_token"""
        cycles = []
        
        def dfs(current: str, path: List[Tuple], visited_pools: Set[str], depth: int):
            if depth > max_hops:
                return
            
            for next_token, pool, dex in self.graph[current]:
                # Avoid using same pool twice
                if pool in visited_pools:
                    continue
                
                new_path = path + [(current, next_token, pool, dex)]
                
                # Found a cycle back to start
                if next_token == start_token and len(new_path) >= 2:
                    cycles.append(new_path)
                    continue
                
                # Continue searching
                dfs(
                    next_token, 
                    new_path, 
                    visited_pools | {pool}, 
                    depth + 1
                )
        
        dfs(start_token, [], set(), 0)
        return cycles
    
    async def evaluate_cycle(self, cycle: List[Tuple], start_amount: int, pricer) -> dict:
        """Calculate profit for a specific cycle"""
        current_amount = start_amount
        
        for token_in, token_out, pool, dex in cycle:
            reserves = await pricer.get_reserves(pool)
            
            # Determine reserve order
            r_in, r_out = self._order_reserves(token_in, token_out, pool, reserves)
            
            # Calculate output
            current_amount = pricer.calculate_output(current_amount, r_in, r_out)
        
        profit = current_amount - start_amount
        
        # Estimate gas (each hop ~150k gas)
        gas_estimate = len(cycle) * 150000
        
        return {
            "cycle": cycle,
            "input_amount": start_amount,
            "output_amount": current_amount,
            "gross_profit": profit,
            "hops": len(cycle),
            "gas_estimate": gas_estimate
        }


class TriangularArbitrage:
    """Specialized scanner for triangular arbitrage"""
    
    def __init__(self, pricer, tokens: List[str]):
        self.pricer = pricer
        self.tokens = tokens
        self.path_finder = PathFinder()
    
    async def scan_all_triangles(self, base_token: str = "WETH") -> List[dict]:
        """Find all profitable triangular arbitrage opportunities"""
        
        opportunities = []
        
        # Get all 3-hop cycles starting from base token
        cycles = self.path_finder.find_cycles(base_token, max_hops=3)
        
        # Evaluate each cycle with different input amounts
        test_amounts = [
            int(0.1 * 1e18),   # 0.1 ETH
            int(0.5 * 1e18),   # 0.5 ETH
            int(1.0 * 1e18),   # 1.0 ETH
            int(5.0 * 1e18),   # 5.0 ETH
            int(10.0 * 1e18),  # 10.0 ETH
        ]
        
        for cycle in cycles:
            best_result = None
            best_profit = 0
            
            for amount in test_amounts:
                result = await self.path_finder.evaluate_cycle(cycle, amount, self.pricer)
                
                if result["gross_profit"] > best_profit:
                    best_profit = result["gross_profit"]
                    best_result = result
            
            if best_result and best_profit > 0:
                opportunities.append(best_result)
        
        # Sort by profit
        opportunities.sort(key=lambda x: x["gross_profit"], reverse=True)
        
        return opportunities

Just-In-Time (JIT) Liquidity

Provide liquidity right before a large swap, capture fees, withdraw immediately:

class JITLiquidityBot:
    """Provide just-in-time liquidity for large swaps"""
    
    def __init__(self, web3_client, position_manager_address: str):
        self.w3 = web3_client
        self.position_manager = position_manager_address
        
        # Minimum swap size to consider (in USD)
        self.min_swap_usd = 50000
        
        # Fee tier preferences
        self.fee_tiers = [500, 3000, 10000]  # 0.05%, 0.3%, 1%
    
    async def analyze_swap_for_jit(self, analyzed_tx: dict) -> Optional[dict]:
        """Check if swap is suitable for JIT liquidity"""
        
        if not analyzed_tx.get("is_swap"):
            return None
        
        decoded = analyzed_tx["decoded"]
        protocol = decoded.get("protocol")
        
        # JIT works best on Uniswap V3 due to concentrated liquidity
        if protocol != "uniswap_v3":
            return None
        
        params = decoded["params"]
        
        # Extract swap details (V3 exactInputSingle)
        if "params" in params:
            swap_params = params["params"]
            token_in = swap_params[0]
            token_out = swap_params[1]
            fee_tier = swap_params[2]
            amount_in = swap_params[4]
        else:
            return None
        
        # Check if swap is large enough
        swap_value_usd = await self._get_usd_value(token_in, amount_in)
        
        if swap_value_usd < self.min_swap_usd:
            return None
        
        # Calculate optimal liquidity position
        current_price = await self._get_current_price(token_in, token_out, fee_tier)
        
        # Provide liquidity in tight range around current price
        # Capture maximum fees from the swap
        tick_spacing = self._get_tick_spacing(fee_tier)
        
        lower_tick = self._price_to_tick(current_price * 0.99) // tick_spacing * tick_spacing
        upper_tick = self._price_to_tick(current_price * 1.01) // tick_spacing * tick_spacing
        
        # Calculate expected fee capture
        fee_rate = fee_tier / 1000000  # Convert to decimal
        expected_fees = amount_in * fee_rate
        
        # Gas costs for mint + swap-induced fee collection + burn
        gas_cost = 500000 * analyzed_tx["gas_price"]
        
        if expected_fees > gas_cost:
            return {
                "type": "jit_liquidity",
                "pool": self._get_pool_address(token_in, token_out, fee_tier),
                "token_in": token_in,
                "token_out": token_out,
                "fee_tier": fee_tier,
                "swap_amount": amount_in,
                "lower_tick": lower_tick,
                "upper_tick": upper_tick,
                "expected_fees": expected_fees,
                "gas_cost": gas_cost,
                "net_profit": expected_fees - gas_cost,
                "trigger_tx": analyzed_tx["tx_hash"]
            }
        
        return None
    
    async def build_jit_bundle(self, opportunity: dict, victim_tx_raw: str) -> List[str]:
        """Build JIT liquidity bundle: mint → victim swap → burn"""
        
        signed_txs = []
        nonce = self.w3.eth.get_transaction_count(self.signer.address)
        
        # Transaction 1: Mint concentrated liquidity position
        mint_params = {
            "token0": opportunity["token_in"],
            "token1": opportunity["token_out"],
            "fee": opportunity["fee_tier"],
            "tickLower": opportunity["lower_tick"],
            "tickUpper": opportunity["upper_tick"],
            "amount0Desired": opportunity["liquidity_amount_0"],
            "amount1Desired": opportunity["liquidity_amount_1"],
            "amount0Min": 0,
            "amount1Min": 0,
            "recipient": self.signer.address,
            "deadline": int(time.time()) + 120
        }
        
        mint_tx = self._build_mint_tx(mint_params, nonce)
        signed_txs.append(self.signer.sign_transaction(mint_tx).rawTransaction.hex())
        
        # Transaction 2: Victim's swap (from mempool)
        signed_txs.append(victim_tx_raw)
        
        # Transaction 3: Burn position and collect fees
        burn_tx = self._build_burn_tx(opportunity, nonce + 1)
        signed_txs.append(self.signer.sign_transaction(burn_tx).rawTransaction.hex())
        
        return signed_txs

Part 8: Handling Different DEX Architectures

Each DEX has unique mechanics affecting MEV extraction.

Uniswap V2 vs V3

class UniswapV2Calculator:
    """Calculations for Uniswap V2 constant product pools"""
    
    FEE = Decimal("0.003")  # 0.3%
    
    def get_amount_out(self, amount_in: int, reserve_in: int, reserve_out: int) -> int:
        """Standard x*y=k formula"""
        amount_in_with_fee = Decimal(amount_in) * (1 - self.FEE)
        numerator = amount_in_with_fee * Decimal(reserve_out)
        denominator = Decimal(reserve_in) + amount_in_with_fee
        return int(numerator / denominator)
    
    def get_amount_in(self, amount_out: int, reserve_in: int, reserve_out: int) -> int:
        """Reverse calculation: how much input for desired output"""
        numerator = Decimal(reserve_in) * Decimal(amount_out) * 1000
        denominator = (Decimal(reserve_out) - Decimal(amount_out)) * 997
        return int(numerator / denominator) + 1


class UniswapV3Calculator:
    """Calculations for Uniswap V3 concentrated liquidity"""
    
    def __init__(self):
        # Q64.96 fixed point math constants
        self.Q96 = 2 ** 96
    
    def sqrt_price_to_price(self, sqrt_price_x96: int) -> Decimal:
        """Convert sqrtPriceX96 to actual price"""
        return (Decimal(sqrt_price_x96) / Decimal(self.Q96)) ** 2
    
    def get_amount_out(self, amount_in: int, sqrt_price: int, liquidity: int, 
                       zero_for_one: bool, fee: int) -> int:
        """Calculate output for V3 swap within single tick range"""
        
        # This is simplified - real V3 math involves tick crossing
        fee_amount = amount_in * fee // 1000000
        amount_in_after_fee = amount_in - fee_amount
        
        price = self.sqrt_price_to_price(sqrt_price)
        
        if zero_for_one:
            # Selling token0 for token1
            return int(Decimal(amount_in_after_fee) * price)
        else:
            # Selling token1 for token0
            return int(Decimal(amount_in_after_fee) / price)
    
    def simulate_swap(self, pool_state: dict, amount_in: int, zero_for_one: bool) -> dict:
        """Simulate V3 swap accounting for tick transitions"""
        
        current_tick = pool_state["tick"]
        sqrt_price = pool_state["sqrtPriceX96"]
        liquidity = pool_state["liquidity"]
        fee = pool_state["fee"]
        
        remaining = amount_in
        total_out = 0
        gas_estimate = 100000  # Base gas
        
        # Simulate tick by tick
        while remaining > 0:
            # Calculate max swap in current tick range
            tick_boundary = self._get_next_tick_boundary(current_tick, zero_for_one)
            
            max_in_tick = self._get_max_amount_in_tick(
                sqrt_price, tick_boundary, liquidity, zero_for_one
            )
            
            swap_amount = min(remaining, max_in_tick)
            
            out = self.get_amount_out(swap_amount, sqrt_price, liquidity, zero_for_one, fee)
            total_out += out
            remaining -= swap_amount
            
            if remaining > 0:
                # Cross tick
                current_tick = tick_boundary
                sqrt_price = self._tick_to_sqrt_price(tick_boundary)
                liquidity = self._get_liquidity_at_tick(pool_state, tick_boundary)
                gas_estimate += 50000  # Additional gas per tick cross
        
        return {
            "amount_out": total_out,
            "final_tick": current_tick,
            "final_sqrt_price": sqrt_price,
            "ticks_crossed": (pool_state["tick"] - current_tick) // pool_state["tickSpacing"],
            "gas_estimate": gas_estimate
        }


class CurveCalculator:
    """Calculations for Curve StableSwap pools"""
    
    def __init__(self, A: int, n_coins: int):
        self.A = A  # Amplification coefficient
        self.n_coins = n_coins
    
    def get_D(self, balances: List[int]) -> int:
        """Calculate StableSwap invariant D"""
        S = sum(balances)
        
        if S == 0:
            return 0
        
        D = S
        Ann = self.A * self.n_coins
        
        for _ in range(255):
            D_P = D
            for balance in balances:
                D_P = D_P * D // (balance * self.n_coins)
            
            D_prev = D
            D = (Ann * S + D_P * self.n_coins) * D // ((Ann - 1) * D + (self.n_coins + 1) * D_P)
            
            if abs(D - D_prev) <= 1:
                return D
        
        raise Exception("D calculation did not converge")
    
    def get_y(self, i: int, j: int, x: int, balances: List[int]) -> int:
        """Calculate output amount for StableSwap"""
        D = self.get_D(balances)
        
        c = D
        S = 0
        Ann = self.A * self.n_coins
        
        for k in range(self.n_coins):
            if k == i:
                _x = x
            elif k != j:
                _x = balances[k]
            else:
                continue
            
            S += _x
            c = c * D // (_x * self.n_coins)
        
        c = c * D // (Ann * self.n_coins)
        b = S + D // Ann
        
        y = D
        for _ in range(255):
            y_prev = y
            y = (y * y + c) // (2 * y + b - D)
            
            if abs(y - y_prev) <= 1:
                return y
        
        raise Exception("y calculation did not converge")

Part 9: Cross-Chain MEV

As DeFi expands across chains, cross-chain arbitrage becomes increasingly important.

Bridge Arbitrage Scanner

class CrossChainArbitrage:
    """Monitor price discrepancies across chains"""
    
    def __init__(self, chains: dict):
        # chains = {"ethereum": w3_eth, "arbitrum": w3_arb, "polygon": w3_poly}
        self.chains = chains
        
        # Common tokens across chains
        self.token_mappings = {
            "USDC": {
                "ethereum": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
                "arbitrum": "0xFF970A61A04b1cA14834A43f5dE4533eBDDB5CC8",
                "polygon": "0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174"
            },
            "WETH": {
                "ethereum": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
                "arbitrum": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1",
                "polygon": "0x7ceB23fD6bC0adD59E62ac25578270cFf1b9f619"
            }
        }
        
        # Bridge contracts
        self.bridges = {
            ("ethereum", "arbitrum"): "0x...",  # Arbitrum bridge
            ("ethereum", "polygon"): "0x...",    # Polygon bridge
        }
    
    async def get_price_across_chains(self, token: str, quote_token: str = "USDC") -> dict:
        """Get token price on all supported chains"""
        prices = {}
        
        for chain_name, w3 in self.chains.items():
            token_address = self.token_mappings.get(token, {}).get(chain_name)
            quote_address = self.token_mappings.get(quote_token, {}).get(chain_name)
            
            if not token_address or not quote_address:
                continue
            
            price = await self._get_dex_price(w3, token_address, quote_address, chain_name)
            prices[chain_name] = price
        
        return prices
    
    async def find_cross_chain_arbitrage(self) -> List[dict]:
        """Find arbitrage opportunities across chains"""
        
        opportunities = []
        
        for token in self.token_mappings.keys():
            if token == "USDC":
                continue
            
            prices = await self.get_price_across_chains(token)
            
            if len(prices) < 2:
                continue
            
            # Find price discrepancies
            chains = list(prices.keys())
            for i, chain_a in enumerate(chains):
                for chain_b in chains[i+1:]:
                    price_a = prices[chain_a]
                    price_b = prices[chain_b]
                    
                    spread = abs(price_a - price_b) / min(price_a, price_b)
                    
                    # Need significant spread to cover bridge costs
                    if spread > 0.005:  # 0.5% minimum
                        buy_chain = chain_a if price_a < price_b else chain_b
                        sell_chain = chain_b if price_a < price_b else chain_a
                        
                        # Estimate bridge costs
                        bridge_cost = await self._estimate_bridge_cost(buy_chain, sell_chain)
                        bridge_time = self._estimate_bridge_time(buy_chain, sell_chain)
                        
                        opportunities.append({
                            "token": token,
                            "buy_chain": buy_chain,
                            "sell_chain": sell_chain,
                            "buy_price": prices[buy_chain],
                            "sell_price": prices[sell_chain],
                            "spread_percent": spread * 100,
                            "bridge_cost_usd": bridge_cost,
                            "bridge_time_minutes": bridge_time,
                            "risk": "high" if bridge_time > 10 else "medium"
                        })
        
        return sorted(opportunities, key=lambda x: x["spread_percent"], reverse=True)
    
    def _estimate_bridge_time(self, from_chain: str, to_chain: str) -> int:
        """Estimate bridge finality time in minutes"""
        times = {
            ("ethereum", "arbitrum"): 10,
            ("ethereum", "polygon"): 30,
            ("arbitrum", "ethereum"): 7 * 24 * 60,  # 7 days for native
            ("polygon", "ethereum"): 45,
        }
        return times.get((from_chain, to_chain), 60)

L2 Sequencer MEV

class L2SequencerMonitor:
    """Monitor L2 sequencer for MEV opportunities"""
    
    def __init__(self, sequencer_rpc: str):
        self.sequencer_rpc = sequencer_rpc
        
        # L2s have different MEV characteristics
        # - Faster blocks (sub-second on some)
        # - Sequencer controls ordering
        # - Lower gas costs
    
    async def monitor_sequencer_feed(self):
        """Monitor sequencer's pending transaction feed"""
        
        # Arbitrum uses a sequencer feed
        # Optimism has similar architecture
        
        async with websockets.connect(self.sequencer_rpc) as ws:
            await ws.send(json.dumps({
                "jsonrpc": "2.0",
                "id": 1,
                "method": "eth_subscribe",
                "params": ["newPendingTransactions"]
            }))
            
            async for message in ws:
                data = json.loads(message)
                if "params" in data:
                    # Process much faster than L1
                    # Sub-second response required
                    await self._fast_process(data["params"]["result"])
    
    def get_l2_specific_opportunities(self, analyzed_tx: dict) -> Optional[dict]:
        """Find L2-specific MEV opportunities"""
        
        # L2 has unique opportunities:
        # 1. L1 -> L2 deposit arbitrage
        # 2. Sequencer priority auctions
        # 3. Smaller trades viable due to low gas
        
        pass

Part 10: Smart Contract Optimization

Custom smart contracts reduce gas costs and enable atomic execution.

MEV Executor Contract

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;

import "@uniswap/v2-periphery/contracts/interfaces/IUniswapV2Router02.sol";
import "@uniswap/v3-periphery/contracts/interfaces/ISwapRouter.sol";
import "@aave/v3-core/contracts/flashloan/base/FlashLoanSimpleReceiverBase.sol";

contract MEVExecutor is FlashLoanSimpleReceiverBase {
    address public owner;
    
    // Whitelisted routers
    mapping(address => bool) public approvedRouters;
    
    constructor(address _poolProvider) FlashLoanSimpleReceiverBase(IPoolAddressesProvider(_poolProvider)) {
        owner = msg.sender;
    }
    
    modifier onlyOwner() {
        require(msg.sender == owner, "Not owner");
        _;
    }
    
    /**
     * @notice Execute multi-hop arbitrage in single transaction
     * @param path Array of swap instructions
     */
    function executeArbitrage(
        SwapInstruction[] calldata path,
        uint256 minProfit
    ) external onlyOwner {
        uint256 startBalance = address(this).balance;
        
        for (uint i = 0; i < path.length; i++) {
            _executeSwap(path[i]);
        }
        
        uint256 endBalance = address(this).balance;
        require(endBalance > startBalance + minProfit, "Insufficient profit");
        
        // Transfer profit to owner
        payable(owner).transfer(endBalance - startBalance);
    }
    
    /**
     * @notice Execute flash loan liquidation
     */
    function flashLiquidate(
        address collateralAsset,
        address debtAsset,
        address user,
        uint256 debtToCover
    ) external onlyOwner {
        // Request flash loan for debt amount
        POOL.flashLoanSimple(
            address(this),
            debtAsset,
            debtToCover,
            abi.encode(collateralAsset, user),
            0
        );
    }
    
    /**
     * @notice Flash loan callback
     */
    function executeOperation(
        address asset,
        uint256 amount,
        uint256 premium,
        address initiator,
        bytes calldata params
    ) external override returns (bool) {
        require(msg.sender == address(POOL), "Invalid caller");
        require(initiator == address(this), "Invalid initiator");
        
        (address collateralAsset, address user) = abi.decode(params, (address, address));
        
        // 1. Execute liquidation
        IERC20(asset).approve(address(POOL), amount);
        POOL.liquidationCall(collateralAsset, asset, user, amount, false);
        
        // 2. Swap received collateral to repay flash loan
        uint256 collateralReceived = IERC20(collateralAsset).balanceOf(address(this));
        uint256 amountToRepay = amount + premium;
        
        _swapExactOutput(collateralAsset, asset, collateralReceived, amountToRepay);
        
        // 3. Approve repayment
        IERC20(asset).approve(address(POOL), amountToRepay);
        
        return true;
    }
    
    struct SwapInstruction {
        address router;
        address tokenIn;
        address tokenOut;
        uint256 amountIn;
        uint256 minAmountOut;
        bytes data;
    }
    
    function _executeSwap(SwapInstruction calldata instruction) internal {
        require(approvedRouters[instruction.router], "Router not approved");
        
        IERC20(instruction.tokenIn).approve(instruction.router, instruction.amountIn);
        
        (bool success,) = instruction.router.call(instruction.data);
        require(success, "Swap failed");
    }
    
    function _swapExactOutput(
        address tokenIn,
        address tokenOut,
        uint256 maxAmountIn,
        uint256 amountOut
    ) internal {
        // Implementation for exact output swap
    }
    
    // Admin functions
    function approveRouter(address router) external onlyOwner {
        approvedRouters[router] = true;
    }
    
    function withdrawToken(address token) external onlyOwner {
        uint256 balance = IERC20(token).balanceOf(address(this));
        IERC20(token).transfer(owner, balance);
    }
    
    function withdrawETH() external onlyOwner {
        payable(owner).transfer(address(this).balance);
    }
    
    receive() external payable {}
}

Gas Optimization Techniques

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.19;

contract GasOptimizedMEV {
    // Pack storage variables
    // slot 0: owner (20 bytes) + paused (1 byte) + version (1 byte)
    address public owner;
    bool public paused;
    uint8 public version;
    
    // Use immutable for constants set in constructor
    address public immutable WETH;
    address public immutable UNISWAP_ROUTER;
    
    // Cache frequently accessed storage in memory
    function optimizedSwap(
        address[] calldata path,
        uint256 amountIn
    ) external {
        // Cache storage reads
        address _owner = owner;
        require(msg.sender == _owner, "Not owner");
        
        // Use unchecked for math that can't overflow
        unchecked {
            for (uint i = 0; i < path.length - 1; ++i) {
                // Swap logic
            }
        }
    }
    
    // Batch operations to amortize fixed costs
    function batchSwap(
        SwapParams[] calldata swaps
    ) external {
        uint256 length = swaps.length;
        
        for (uint i = 0; i < length;) {
            _executeSwap(swaps[i]);
            
            unchecked { ++i; }
        }
    }
    
    // Use calldata instead of memory for read-only arrays
    function efficientPath(
        address[] calldata path  // calldata is cheaper than memory
    ) external view returns (uint256) {
        return path.length;
    }
    
    struct SwapParams {
        address tokenIn;
        address tokenOut;
        uint256 amount;
    }
    
    function _executeSwap(SwapParams calldata params) internal {
        // Implementation
    }
}

Part 11: Risk Management Systems

Professional MEV operations require robust risk controls.

Position and Loss Limits

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import asyncio

@dataclass
class RiskLimits:
    max_position_eth: float = 10.0
    max_daily_loss_eth: float = 1.0
    max_single_trade_eth: float = 2.0
    max_gas_price_gwei: int = 200
    min_profit_threshold_eth: float = 0.001
    max_consecutive_losses: int = 5


class RiskManager:
    """Manages risk for MEV operations"""
    
    def __init__(self, limits: RiskLimits):
        self.limits = limits
        self.daily_pnl = 0.0
        self.consecutive_losses = 0
        self.last_reset = datetime.now()
        self.active_positions = {}
        self.paused = False
    
    def reset_daily_stats(self):
        """Reset daily tracking (call at midnight UTC)"""
        if datetime.now() - self.last_reset > timedelta(days=1):
            self.daily_pnl = 0.0
            self.last_reset = datetime.now()
    
    def can_execute(self, opportunity: dict, gas_price: int) -> tuple[bool, str]:
        """Check if opportunity passes risk checks"""
        
        self.reset_daily_stats()
        
        # Check if trading is paused
        if self.paused:
            return False, "Trading paused due to risk limits"
        
        # Check gas price
        gas_gwei = gas_price / 1e9
        if gas_gwei > self.limits.max_gas_price_gwei:
            return False, f"Gas price {gas_gwei:.0f} exceeds limit {self.limits.max_gas_price_gwei}"
        
        # Check trade size
        trade_size_eth = opportunity.get("amount_in", 0) / 1e18
        if trade_size_eth > self.limits.max_single_trade_eth:
            return False, f"Trade size {trade_size_eth:.2f} exceeds limit {self.limits.max_single_trade_eth}"
        
        # Check daily loss limit
        if self.daily_pnl < -self.limits.max_daily_loss_eth:
            self.paused = True
            return False, f"Daily loss limit reached: {self.daily_pnl:.4f} ETH"
        
        # Check consecutive losses
        if self.consecutive_losses >= self.limits.max_consecutive_losses:
            self.paused = True
            return False, f"Too many consecutive losses: {self.consecutive_losses}"
        
        # Check minimum profit threshold
        expected_profit = opportunity.get("net_profit_eth", 0)
        if expected_profit < self.limits.min_profit_threshold_eth:
            return False, f"Profit {expected_profit:.6f} below threshold"
        
        # Check total position exposure
        total_exposure = sum(p.get("amount_eth", 0) for p in self.active_positions.values())
        if total_exposure + trade_size_eth > self.limits.max_position_eth:
            return False, f"Would exceed position limit: {total_exposure + trade_size_eth:.2f} ETH"
        
        return True, "Passed all risk checks"
    
    def record_trade(self, trade_id: str, result: dict):
        """Record trade result and update stats"""
        
        pnl = result.get("actual_profit_eth", 0)
        self.daily_pnl += pnl
        
        if pnl < 0:
            self.consecutive_losses += 1
        else:
            self.consecutive_losses = 0
        
        # Remove from active positions
        if trade_id in self.active_positions:
            del self.active_positions[trade_id]
    
    def add_position(self, trade_id: str, position: dict):
        """Track active position"""
        self.active_positions[trade_id] = position
    
    def emergency_stop(self, reason: str):
        """Immediately halt all trading"""
        self.paused = True
        print(f"⚠️ EMERGENCY STOP: {reason}")
        # Could also trigger position unwinding here
    
    def get_status(self) -> dict:
        """Get current risk status"""
        return {
            "paused": self.paused,
            "daily_pnl_eth": self.daily_pnl,
            "consecutive_losses": self.consecutive_losses,
            "active_positions": len(self.active_positions),
            "total_exposure_eth": sum(p.get("amount_eth", 0) for p in self.active_positions.values())
        }


class CircuitBreaker:
    """Automatic circuit breaker for extreme conditions"""
    
    def __init__(self, risk_manager: RiskManager):
        self.risk_manager = risk_manager
        self.error_count = 0
        self.last_error_time = None
    
    async def monitor(self):
        """Continuous monitoring for circuit breaker conditions"""
        while True:
            await asyncio.sleep(1)
            
            # Check for rapid errors
            if self.error_count > 10 and self.last_error_time:
                if datetime.now() - self.last_error_time < timedelta(minutes=1):
                    self.risk_manager.emergency_stop("Too many errors in short period")
            
            # Reset error count periodically
            if self.last_error_time and datetime.now() - self.last_error_time > timedelta(minutes=5):
                self.error_count = 0
    
    def record_error(self, error: Exception):
        """Record an error occurrence"""
        self.error_count += 1
        self.last_error_time = datetime.now()

Understanding these fundamentals is just the starting point. Production systems require months of iteration and optimization.

Building enterprise-grade MEV infrastructure requires significant investment in both time and resources.


Quick Reference

Essential RPC Methods

Method

Purpose

eth_subscribe("newPendingTransactions")

Stream pending tx hashes

eth_getTransactionByHash

Fetch full tx details

txpool_content

Get complete mempool snapshot

eth_call

Simulate transaction execution

debug_traceCall

Get detailed execution trace

eth_sendBundle

Submit Flashbots bundle

Gas Estimation

Operation

Typical Gas

Simple ETH transfer

21,000

ERC20 transfer

65,000

Uniswap V2 swap

120,000-150,000

Uniswap V3 swap

150,000-200,000

Aave liquidation

350,000-450,000

2-hop arbitrage

250,000-350,000

Profitability Thresholds

At different gas prices, minimum profitable trade sizes:

Gas Price

Min Trade (0.3% spread)

20 gwei

~0.4 ETH

50 gwei

~1.0 ETH

100 gwei

~2.0 ETH

200 gwei

~4.0 ETH


Part 12: Testing and Deployment Pipeline

Never deploy MEV code without extensive testing. Here's a battle-tested pipeline.

Local Fork Testing

import subprocess
import time
from web3 import Web3

class AnvilFork:
    """Manage local Anvil fork for testing"""
    
    def __init__(self, fork_url: str, fork_block: Optional[int] = None):
        self.fork_url = fork_url
        self.fork_block = fork_block
        self.process = None
        self.rpc_url = "http://localhost:8545"
    
    def start(self):
        """Start Anvil fork"""
        cmd = [
            "anvil",
            "--fork-url", self.fork_url,
            "--port", "8545",
            "--accounts", "10",
            "--balance", "10000"
        ]
        
        if self.fork_block:
            cmd.extend(["--fork-block-number", str(self.fork_block)])
        
        self.process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        # Wait for startup
        time.sleep(3)
        
        return Web3(Web3.HTTPProvider(self.rpc_url))
    
    def stop(self):
        """Stop Anvil fork"""
        if self.process:
            self.process.terminate()
            self.process.wait()
    
    def reset(self):
        """Reset fork to original state"""
        w3 = Web3(Web3.HTTPProvider(self.rpc_url))
        w3.provider.make_request("anvil_reset", [{
            "forking": {
                "jsonRpcUrl": self.fork_url,
                "blockNumber": self.fork_block
            }
        }])
    
    def mine_block(self):
        """Mine a new block"""
        w3 = Web3(Web3.HTTPProvider(self.rpc_url))
        w3.provider.make_request("evm_mine", [])
    
    def set_balance(self, address: str, balance_wei: int):
        """Set account balance"""
        w3 = Web3(Web3.HTTPProvider(self.rpc_url))
        w3.provider.make_request("anvil_setBalance", [address, hex(balance_wei)])


class MEVTestSuite:
    """Comprehensive test suite for MEV bot"""
    
    def __init__(self, fork: AnvilFork):
        self.fork = fork
        self.w3 = None
        self.test_results = []
    
    async def setup(self):
        """Initialize test environment"""
        self.w3 = self.fork.start()
        
        # Get test accounts
        self.accounts = self.w3.eth.accounts
        self.owner = self.accounts[0]
        self.victim = self.accounts[1]
    
    async def teardown(self):
        """Cleanup test environment"""
        self.fork.stop()
    
    async def test_arbitrage_detection(self):
        """Test that arbitrage opportunities are detected correctly"""
        print("Testing arbitrage detection...")
        
        # Create price discrepancy by executing large swap
        # Then verify our detector finds it
        
        # Setup
        detector = ArbitrageScanner(DEXPricer(self.w3))
        
        # Execute large swap to create discrepancy
        await self._create_price_discrepancy()
        
        # Detect
        opportunities = await detector.scan_all_triangles()
        
        # Verify
        assert len(opportunities) > 0, "Should find at least one opportunity"
        assert opportunities[0]["gross_profit"] > 0, "Profit should be positive"
        
        self.test_results.append(("arbitrage_detection", "PASSED"))
        print("✅ Arbitrage detection test passed")
    
    async def test_bundle_submission(self):
        """Test Flashbots bundle construction and simulation"""
        print("Testing bundle submission...")
        
        # Build a simple arbitrage bundle
        bundle_builder = BundleBuilder(self.w3, self.owner)
        
        opportunity = {
            "pair_address": "0x...",
            "amount_in": int(1e18),
            "expected_output": int(1.01e18)
        }
        
        profit_calc = {
            "gas_params": {
                "base_fee_gwei": 30,
                "priority_fee_gwei": 2,
                "gas_units": 250000
            }
        }
        
        bundle = await bundle_builder.build_arbitrage_bundle(opportunity, profit_calc)
        
        assert len(bundle) == 2, "Bundle should have 2 transactions"
        
        self.test_results.append(("bundle_submission", "PASSED"))
        print("✅ Bundle submission test passed")
    
    async def test_profit_calculation(self):
        """Test profit calculations are accurate"""
        print("Testing profit calculation...")
        
        calculator = ProfitCalculator(self.w3)
        
        opportunity = {
            "estimated_profit_wei": int(0.1e18),  # 0.1 ETH
            "amount_in": int(5e18)  # 5 ETH trade
        }
        
        result = await calculator.calculate_arbitrage_profit(opportunity)
        
        # Verify all components are present
        assert "gross_profit_wei" in result
        assert "gas_cost_wei" in result
        assert "net_profit_wei" in result
        assert "profitable" in result
        
        # Net should be less than gross
        assert result["net_profit_wei"] < result["gross_profit_wei"]
        
        self.test_results.append(("profit_calculation", "PASSED"))
        print("✅ Profit calculation test passed")
    
    async def test_risk_limits(self):
        """Test risk management prevents dangerous trades"""
        print("Testing risk limits...")
        
        limits = RiskLimits(
            max_single_trade_eth=1.0,
            max_daily_loss_eth=0.5
        )
        risk_manager = RiskManager(limits)
        
        # Test trade size limit
        large_trade = {"amount_in": int(2e18)}  # 2 ETH
        can_execute, reason = risk_manager.can_execute(large_trade, int(30e9))
        assert not can_execute, "Should reject oversized trade"
        
        # Test daily loss limit
        risk_manager.daily_pnl = -0.6  # Exceeded limit
        small_trade = {"amount_in": int(0.5e18), "net_profit_eth": 0.01}
        can_execute, reason = risk_manager.can_execute(small_trade, int(30e9))
        assert not can_execute, "Should reject after daily loss limit"
        
        self.test_results.append(("risk_limits", "PASSED"))
        print("✅ Risk limits test passed")
    
    async def test_historical_replay(self):
        """Replay historical MEV and verify detection"""
        print("Testing historical replay...")
        
        # Fork at specific block with known MEV
        known_mev_block = 18500000  # Example block
        
        self.fork.reset()
        
        # Verify we would have detected known opportunities
        # This requires historical data of actual MEV extracted
        
        self.test_results.append(("historical_replay", "PASSED"))
        print("✅ Historical replay test passed")
    
    async def run_all_tests(self):
        """Run complete test suite"""
        print("\n" + "="*60)
        print("RUNNING MEV BOT TEST SUITE")
        print("="*60 + "\n")
        
        await self.setup()
        
        try:
            await self.test_arbitrage_detection()
            await self.test_bundle_submission()
            await self.test_profit_calculation()
            await self.test_risk_limits()
            await self.test_historical_replay()
        finally:
            await self.teardown()
        
        # Summary
        print("\n" + "="*60)
        print("TEST SUMMARY")
        print("="*60)
        
        passed = sum(1 for _, result in self.test_results if result == "PASSED")
        total = len(self.test_results)
        
        for test_name, result in self.test_results:
            emoji = "✅" if result == "PASSED" else "❌"
            print(f"  {emoji} {test_name}: {result}")
        
        print(f"\nTotal: {passed}/{total} tests passed")
        print("="*60)
        
        return passed == total


# Run tests
async def run_tests():
    fork = AnvilFork(
        fork_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
        fork_block=18500000
    )
    
    suite = MEVTestSuite(fork)
    success = await suite.run_all_tests()
    
    return success

if __name__ == "__main__":
    import asyncio
    asyncio.run(run_tests())

Staged Deployment Process

class DeploymentStages:
    """Staged deployment for MEV bot"""
    
    STAGES = [
        {
            "name": "shadow",
            "description": "Monitor only, no execution",
            "duration_days": 7,
            "config": {
                "execute_trades": False,
                "max_position": 0,
                "log_opportunities": True
            }
        },
        {
            "name": "paper",
            "description": "Simulate execution, track virtual P&L",
            "duration_days": 7,
            "config": {
                "execute_trades": False,
                "simulate_execution": True,
                "virtual_balance": 10.0
            }
        },
        {
            "name": "micro",
            "description": "Live execution with tiny positions",
            "duration_days": 14,
            "config": {
                "execute_trades": True,
                "max_position": 0.1,
                "max_daily_loss": 0.05
            }
        },
        {
            "name": "small",
            "description": "Increased position sizes",
            "duration_days": 14,
            "config": {
                "execute_trades": True,
                "max_position": 1.0,
                "max_daily_loss": 0.2
            }
        },
        {
            "name": "production",
            "description": "Full production deployment",
            "duration_days": None,
            "config": {
                "execute_trades": True,
                "max_position": 10.0,
                "max_daily_loss": 1.0
            }
        }
    ]
    
    def __init__(self):
        self.current_stage = 0
        self.stage_start_time = datetime.now()
        self.stage_metrics = {}
    
    def get_current_config(self) -> dict:
        """Get configuration for current stage"""
        return self.STAGES[self.current_stage]["config"]
    
    def should_advance(self) -> tuple[bool, str]:
        """Check if ready to advance to next stage"""
        stage = self.STAGES[self.current_stage]
        
        if stage["duration_days"] is None:
            return False, "Already at final stage"
        
        days_elapsed = (datetime.now() - self.stage_start_time).days
        
        if days_elapsed < stage["duration_days"]:
            remaining = stage["duration_days"] - days_elapsed
            return False, f"{remaining} days remaining in {stage['name']} stage"
        
        # Check metrics meet advancement criteria
        metrics = self.stage_metrics.get(stage["name"], {})
        
        if metrics.get("error_rate", 1.0) > 0.1:
            return False, "Error rate too high to advance"
        
        if metrics.get("win_rate", 0) < 0.2:
            return False, "Win rate too low to advance"
        
        return True, "Ready to advance"
    
    def advance(self):
        """Advance to next deployment stage"""
        if self.current_stage < len(self.STAGES) - 1:
            self.current_stage += 1
            self.stage_start_time = datetime.now()
            print(f"Advanced to stage: {self.STAGES[self.current_stage]['name']}")
    
    def record_metrics(self, metrics: dict):
        """Record metrics for current stage"""
        stage_name = self.STAGES[self.current_stage]["name"]
        self.stage_metrics[stage_name] = metrics

Part 13: Production Monitoring Dashboard

from flask import Flask, jsonify, render_template
import threading

app = Flask(__name__)

# Global bot instance (set during initialization)
bot_instance = None

@app.route('/api/status')
def get_status():
    """Get current bot status"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    return jsonify({
        "running": bot_instance.running,
        "stats": bot_instance.stats,
        "risk_status": bot_instance.risk_manager.get_status()
    })

@app.route('/api/opportunities')
def get_recent_opportunities():
    """Get recently detected opportunities"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    return jsonify({
        "opportunities": bot_instance.recent_opportunities[-100:]
    })

@app.route('/api/trades')
def get_recent_trades():
    """Get recent trade history"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    return jsonify({
        "trades": bot_instance.trade_history[-100:]
    })

@app.route('/api/metrics')
def get_metrics():
    """Get performance metrics"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    metrics = bot_instance.metrics
    
    return jsonify({
        "detection_latency_p50_ms": metrics.get_percentile(50),
        "detection_latency_p95_ms": metrics.get_percentile(95),
        "win_rate": metrics.win_rate,
        "profit_today_eth": metrics.profit_today,
        "profit_total_eth": metrics.profit_total
    })

@app.route('/api/pause', methods=['POST'])
def pause_bot():
    """Pause bot execution"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    bot_instance.pause()
    return jsonify({"status": "paused"})

@app.route('/api/resume', methods=['POST'])
def resume_bot():
    """Resume bot execution"""
    if not bot_instance:
        return jsonify({"error": "Bot not initialized"}), 500
    
    bot_instance.resume()
    return jsonify({"status": "running"})

def start_dashboard(bot, port=5000):
    """Start monitoring dashboard in background thread"""
    global bot_instance
    bot_instance = bot
    
    thread = threading.Thread(
        target=lambda: app.run(host='0.0.0.0', port=port, debug=False),
        daemon=True
    )
    thread.start()
    print(f"Dashboard running at http://localhost:{port}")

Alerting System

import aiohttp
from enum import Enum

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertManager:
    """Send alerts via multiple channels"""
    
    def __init__(self, config: dict):
        self.slack_webhook = config.get("slack_webhook")
        self.telegram_bot_token = config.get("telegram_bot_token")
        self.telegram_chat_id = config.get("telegram_chat_id")
        self.discord_webhook = config.get("discord_webhook")
    
    async def send_alert(self, level: AlertLevel, title: str, message: str):
        """Send alert to all configured channels"""
        
        emoji = {
            AlertLevel.INFO: "ℹ️",
            AlertLevel.WARNING: "⚠️",
            AlertLevel.ERROR: "❌",
            AlertLevel.CRITICAL: "🚨"
        }[level]
        
        formatted_message = f"{emoji} *{title}*\n{message}"
        
        tasks = []
        
        if self.slack_webhook:
            tasks.append(self._send_slack(formatted_message))
        
        if self.telegram_bot_token:
            tasks.append(self._send_telegram(formatted_message))
        
        if self.discord_webhook:
            tasks.append(self._send_discord(formatted_message))
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _send_slack(self, message: str):
        async with aiohttp.ClientSession() as session:
            await session.post(
                self.slack_webhook,
                json={"text": message}
            )
    
    async def _send_telegram(self, message: str):
        url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
        async with aiohttp.ClientSession() as session:
            await session.post(
                url,
                json={
                    "chat_id": self.telegram_chat_id,
                    "text": message,
                    "parse_mode": "Markdown"
                }
            )
    
    async def _send_discord(self, message: str):
        async with aiohttp.ClientSession() as session:
            await session.post(
                self.discord_webhook,
                json={"content": message}
            )


# Alert conditions
class AlertConditions:
    """Define conditions that trigger alerts"""
    
    def __init__(self, alert_manager: AlertManager, bot):
        self.alerts = alert_manager
        self.bot = bot
    
    async def check_conditions(self):
        """Check all alert conditions"""
        
        # Large profit opportunity
        if self.bot.last_opportunity_profit > 1.0:
            await self.alerts.send_alert(
                AlertLevel.INFO,
                "Large Opportunity",
                f"Detected opportunity worth {self.bot.last_opportunity_profit:.4f} ETH"
            )
        
        # High error rate
        if self.bot.metrics.error_rate > 0.3:
            await self.alerts.send_alert(
                AlertLevel.WARNING,
                "High Error Rate",
                f"Error rate is {self.bot.metrics.error_rate*100:.1f}%"
            )
        
        # Daily loss limit approaching
        daily_loss = -self.bot.risk_manager.daily_pnl
        daily_limit = self.bot.risk_manager.limits.max_daily_loss_eth
        
        if daily_loss > daily_limit * 0.8:
            await self.alerts.send_alert(
                AlertLevel.WARNING,
                "Daily Loss Limit",
                f"Daily loss {daily_loss:.4f} ETH approaching limit {daily_limit:.4f} ETH"
            )
        
        # Bot paused
        if self.bot.risk_manager.paused:
            await self.alerts.send_alert(
                AlertLevel.CRITICAL,
                "Bot Paused",
                "Trading has been automatically paused due to risk limits"
            )

Questions about MEV development? Drop a comment below or reach out on Twitter.


Written by amanvaths | Aman Vaths, Founder & CTO of Nadcab Labs, builds enterprise AI, Web3, blockchain, cloud, and cybersecurity solutions.
Published by HackerNoon on 2026/01/22