Monitoring transaction propagation from node to mempool in EVM networks with Python

Introduction

With the adoption of Ethereum and other blockchains that use the same kind of technology (we call these EVM blockchains), it's important to make sure that the network is stable, safe, and efficient. One big thing developers and the people who manage the network need to keep an eye on is how transactions move around in the mempool.

In this article, we're going to show you how to test how blocks move in the mempool for EVM nodes using Chainstack. We'll talk about why these tests are so important, the tools and tricks you'll need, and some common problems and slowdowns that can happen when blocks are moving around.

Why test transaction propagation?

The mempool is a temporary storage space where unconfirmed transactions wait to be included in a block. In a decentralized system, such as Ethereum, multiple nodes maintain separate mempools, but they constantly communicate with each other to sync up on the latest state of pending transactions.

Propagation in the mempool predominantly affects two major factors: latency and network efficiency. Poor propagation can lead to increased transaction confirmation times and the risk of chain reorganizations or forks. Thus, testing propagation ensures the network performs optimally and maintains consensus.

šŸ“˜

Learn more about the mempool by reading A developerā€™s guide to transactions in Ethereum mempool.

Roll your own propagation test

If you want to see how transactions and blocks are moving in a blockchain network, tools like web3.js or web3.py are your friends. They let you write your own scripts to understand better how things like the network setup, network conditions, and the number of transactions can make a difference.

You can run these tests on your private networks or public ones like Goerli and Sepolia to mimic what happens in the real world. Pair this with network monitoring tools; you'll get a clear image of your blockchain's performance and where things might be slowing down. This can help you adjust your setup and keep things moving smoothly.

šŸ“˜

We'll be using the web3.py library to interact with the Ethereum network.

Prerequisites

  • Python 3.6 or higher

  • A Sepolia node with Chainstack

  • web3.py library. Install it with:

    pip install web3
    

Getting started

Let's start by importing the libraries we need and connecting to an Ethereum node using Web3 HTTPProvider.

from web3 import Web3
import time
from web3.exceptions import TransactionNotFound
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# Connect to the Ethereum node
w3 = Web3(Web3.HTTPProvider('YOUR_CHAINSTACK_ENDPOINT'))

# Target Ethereum address
address = 'YOUR_ETHEREUM_ADDRESS'

šŸ“˜

Add your Sepolia node URL instead of YOUR_CHAINSTACK_ENDPOINT.

Making responses look nice

Next, we'll create a function to make the transaction details we get from the Ethereum network look pretty. This function will also wait for the transaction receipt to update the block hash and block number when the transaction gets validated.

def pretty_print_transaction(tx):

  def wait_for_confirmation(tx_hash, block_info):
    while True:
      try:
        receipt = w3.eth.get_transaction_receipt(tx_hash)
        if receipt is not None and receipt['blockHash']:
          block_info['blockHash'] = receipt['blockHash']
          block_info['blockNumber'] = receipt['blockNumber']
          break
      except TransactionNotFound:
        pass
      time.sleep(1)

  block_info = {'blockHash': None, 'blockNumber': None}

  # Start a separate thread to wait for the block
  confirmation_thread = threading.Thread(target=wait_for_confirmation,
                                         args=(tx['hash'], block_info))
  confirmation_thread.start()

  print("Transaction details:")
  print(f"  Block hash: {block_info['blockHash']}")
  print(f"  Block number: {block_info['blockNumber']}")
  print(f"  From: {tx.get('from')}")
  print(f"  Gas: {tx.get('gas')}")
  print(f"  Gas price: {tx.get('gasPrice')}")
  if 'maxFeePerGas' in tx:
    print(f"  Max fee per gas: {tx['maxFeePerGas']}")
  if 'maxPriorityFeePerGas' in tx:
    print(f"  Max priority fee per gas: {tx['maxPriorityFeePerGas']}")
  print(f"  Transaction hash: {tx.get('hash').hex()}")
  print(f"  Input: {tx.get('input')}")
  print(f"  Nonce: {tx.get('nonce')}")
  print(f"  To: {tx.get('to')}")
  print(f"  Transaction index: {tx.get('transactionIndex')}")
  print(f"  Value: {tx.get('value')}")
  print(f"  Type: {tx.get('type')}")
  print(f"  Access list: {tx.get('accessList')}")
  print(f"  Chain ID: {tx.get('chainId')}")
  print(f"  v: {tx.get('v')}")
  print(f"  r: {tx.get('r').hex() if tx.get('r') else None}")
  print(f"  s: {tx.get('s').hex() if tx.get('s') else None}")

  # Wait for the confirmation thread to complete
  confirmation_thread.join()

  print(f"Updated block hash: {block_info['blockHash'].hex()}")
  print(f"Updated block number: {block_info['blockNumber']}")

Monitoring the mempool

Now, let's whip up a two functions to check if there are any transactions from or to the address you're watching in the mempool:

def check_pending_transaction(tx_hash, target_address_lower, w3):
  try:
    tx = w3.eth.get_transaction(tx_hash)
    if tx['from'].lower() == target_address_lower or (
        tx['to'] and tx['to'].lower() == target_address_lower):
      return tx
  except TransactionNotFound:
    pass
  return None

# Function to check if any transactions from/to the target address are in the mempool
def find_mempool_transactions(target_address):
  local_w3 = w3
  transaction_list = []
  target_address_lower = target_address.lower()
  current_block = local_w3.eth.block_number
  pending_transactions = local_w3.eth.get_block('pending')['transactions']

  with ThreadPoolExecutor() as executor:
    futures = [
      executor.submit(check_pending_transaction, tx_hash, target_address_lower,
                      local_w3) for tx_hash in pending_transactions
    ]
    for future in as_completed(futures):
      result = future.result()
      if result is not None:
        transaction_list.append(result)

  return transaction_list

šŸ“˜

You'll notice we're using ThreadPoolExecutor to check pending transactions simultaneously, making everything quicker.

Keeping an eye on transactions

With those helper functions ready, we can put together the main function to keep an eye on the mempool for new transactions involving the address you're interested in:

# Main function to monitor the mempool
def monitor_mempool(address):
  seen_transactions = set()

  # Add the print statement here
  print("Mempool monitoring starting...")

  while True:
    current_block = w3.eth.block_number
    pending_block = w3.eth.get_block('pending')
    pending_transactions = pending_block['transactions']
    print(
      f"Current block: {current_block}. Pending transactions: {len(pending_transactions)}"
    )

    # Record the start time
    start_time = time.time()

    transactions = find_mempool_transactions(address)
    if transactions:
      new_transactions = [
        tx for tx in transactions if tx.get('hash') not in seen_transactions
      ]

      if new_transactions:
        # Calculate the time taken
        time_taken = time.time() - start_time
        print(f"\nTime taken since last check: {time_taken:.2f} seconds\n")

        print(
          f"Found {len(new_transactions)} new transaction in the mempool involving {address}:"
        )
        for tx in new_transactions:
          pretty_print_transaction(tx)
          seen_transactions.add(tx.get('hash'))
        break

monitor_mempool(address)

When a new transaction involving the address you're watching pops up, we'll make it look pretty with pretty_print_transaction(). After that, the main function will stop, but you can tweak the loop if you want it to keep watching for new transactions.

The full code

Here you can find the entire code for the monitoring tool:

from web3 import Web3
import time
from web3.exceptions import TransactionNotFound
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# Connect to the Ethereum node
w3 = Web3(
  Web3.HTTPProvider(
    'YOUR_CHAINSTACK_ENDPOINT'))

# Ethereum address to monitor
address = 'YOUR_ETHEREUM_ADDRESS'

def pretty_print_transaction(tx):

  def wait_for_confirmation(tx_hash, block_info):
    while True:
      try:
        receipt = w3.eth.get_transaction_receipt(tx_hash)
        if receipt is not None and receipt['blockHash']:
          block_info['blockHash'] = receipt['blockHash']
          block_info['blockNumber'] = receipt['blockNumber']
          break
      except TransactionNotFound:
        pass
      time.sleep(1)

  block_info = {'blockHash': None, 'blockNumber': None}

  # Start a separate thread to wait for the block
  confirmation_thread = threading.Thread(target=wait_for_confirmation,
                                         args=(tx['hash'], block_info))
  confirmation_thread.start()

  print("Transaction details:")
  print(f"  Block hash: {block_info['blockHash']}")
  print(f"  Block number: {block_info['blockNumber']}")
  print(f"  From: {tx.get('from')}")
  print(f"  Gas: {tx.get('gas')}")
  print(f"  Gas price: {tx.get('gasPrice')}")
  if 'maxFeePerGas' in tx:
    print(f"  Max fee per gas: {tx['maxFeePerGas']}")
  if 'maxPriorityFeePerGas' in tx:
    print(f"  Max priority fee per gas: {tx['maxPriorityFeePerGas']}")
  print(f"  Transaction hash: {tx.get('hash').hex()}")
  print(f"  Input: {tx.get('input')}")
  print(f"  Nonce: {tx.get('nonce')}")
  print(f"  To: {tx.get('to')}")
  print(f"  Transaction index: {tx.get('transactionIndex')}")
  print(f"  Value: {tx.get('value')}")
  print(f"  Type: {tx.get('type')}")
  print(f"  Access list: {tx.get('accessList')}")
  print(f"  Chain ID: {tx.get('chainId')}")
  print(f"  v: {tx.get('v')}")
  print(f"  r: {tx.get('r').hex() if tx.get('r') else None}")
  print(f"  s: {tx.get('s').hex() if tx.get('s') else None}")

  # Wait for the confirmation thread to complete
  confirmation_thread.join()

  print(f"Updated block hash: {block_info['blockHash'].hex()}")
  print(f"Updated block number: {block_info['blockNumber']}")

def check_pending_transaction(tx_hash, target_address_lower, w3):
  try:
    tx = w3.eth.get_transaction(tx_hash)
    if tx['from'].lower() == target_address_lower or (
        tx['to'] and tx['to'].lower() == target_address_lower):
      return tx
  except TransactionNotFound:
    pass
  return None

# Function to check if any transactions from/to the target address are in the mempool
def find_mempool_transactions(target_address):
  local_w3 = w3
  transaction_list = []
  target_address_lower = target_address.lower()
  current_block = local_w3.eth.block_number
  pending_transactions = local_w3.eth.get_block('pending')['transactions']

  with ThreadPoolExecutor() as executor:
    futures = [
      executor.submit(check_pending_transaction, tx_hash, target_address_lower,
                      local_w3) for tx_hash in pending_transactions
    ]
    for future in as_completed(futures):
      result = future.result()
      if result is not None:
        transaction_list.append(result)

  return transaction_list

# Main function to monitor the mempool
def monitor_mempool(address):
  seen_transactions = set()

  # Add the print statement here
  print("Mempool monitoring starting...")

  while True:
    current_block = w3.eth.block_number
    pending_block = w3.eth.get_block('pending')
    pending_transactions = pending_block['transactions']
    print(
      f"Current block: {current_block}. Pending transactions: {len(pending_transactions)}"
    )

    # Record the start time
    start_time = time.time()

    transactions = find_mempool_transactions(address)
    if transactions:
      new_transactions = [
        tx for tx in transactions if tx.get('hash') not in seen_transactions
      ]

      if new_transactions:
        # Calculate the time taken
        time_taken = time.time() - start_time
        print(f"\nTime taken since last check: {time_taken:.2f} seconds\n")

        print(
          f"Found {len(new_transactions)} new transaction in the mempool involving {address}:"
        )
        for tx in new_transactions:
          pretty_print_transaction(tx)
          seen_transactions.add(tx.get('hash'))
        break

monitor_mempool(address)

Running the test

To conduct this test, you'll first need to execute the Python script. Then, initiate a transaction. For simplicity, we'll use MetaMask in this scenario.

Your objective determines whether you use the same or a different endpoint in comparison to your script. If you wish to measure the speed at which a transaction reaches the mempool of your own node, stick with the same endpoint as used in the script. However, if your goal is to determine the time taken for the transaction to propagate across other nodes in the Ethereum network, opt for a different endpoint. This will provide a more accurate depiction of transaction propagation times across the network.

šŸ“˜

Learn how to add your Chainstack endpoint to MetaMask by reading Fault-tolerant transactions with MetaMask and Chainstack nodes.

In the Python script, input your endpoint and the Ethereum address that you wish to monitor. This could be either the sending or receiving address for the transaction, as the script is designed to track the transaction in both cases.

Once the script detects a new transaction in the mempool involving the target address, it will display the transaction's details in the console. Additionally, it will provide an estimated duration that the script took to locate the transaction. While this figure might not be entirely accurate, it serves as a useful approximation of the transaction's propagation speed across the Ethereum network.

  1. Start the script.
  2. Send a transaction using MetaMask.

You will receive a similar log in the console:

Mempool monitoring starting...
Current block: 3772987. Pending transactions: 84
Current block: 3772987. Pending transactions: 102

Time taken since last check: 2.22 seconds

Found 1 new transaction in the mempool involving 0x8f8e7012F8F974707A8F11C7cfFC5d45EfF5c2Ae:
Transaction details:
  Block hash: None
  Block number: None
  From: 0x8f8e7012F8F974707A8F11C7cfFC5d45EfF5c2Ae
  Gas: 21000
  Gas price: 2181505086
  Max fee per gas: 2181505086
  Max priority fee per gas: 1500000000
  Transaction hash: 0xebbeaa0ee6e787fa3486db9e1b8ad9ccb1e3ab982462c51fca8fa41143be053d
  Input: 0x
  Nonce: 59
  To: 0x7ea178aE883bC78Fa540b15F36b1e2a8Ea90F7F7
  Transaction index: None
  Value: 1000000000000000000
  Type: 2
  Access list: []
  Chain ID: 11155111
  v: 0
  r: 0x1483859043ee02820eead543ce58bf9f5a6ec3cd3b339dc709e1860781aa1e57
  s: 0x045fb5f1bb7caf42cbeb2d480fbb1a3ed1a85408154bcb052fbb17417eab5e84
Updated block hash: 0x2b120a75e3a97ba9b77d3764945c4c3c2a328699c13327538fb6dacc4642ff57
Updated block number: 3772988

Conclusion

Testing transaction propagation in mempool for EVM nodes is crucial to maintaining a reliable and efficient blockchain network. By simulating realistic scenarios and using a combination of network monitoring and custom tests, developers and infrastructure operators can ensure that nodes process transactions in a timely and secure manner. Regularly conducting propagation tests will help identify potential issues, optimize resources, and contribute to the overall health of your blockchain ecosystem.