Ethereum: How to set up a redundant event listener with Python

Event listeners play an important role in blockchain technology, enabling applications to respond to specific events emitted by smart contracts. These events are vital for decentralized applications (DApps), such as tracking token transfers and monitoring contract executions.

In the Ethereum ecosystem, events are logged on the blockchain. They can be listened to by off-chain applications to trigger specific actions or update their state based on the latest blockchain activity.

This tutorial aims to build a resilient Ethereum event listener using Python. By leveraging multiple endpoints across different regions, we aim to achieve better consistency in catching events and ensure redundancy in case of endpoint failures.

This approach enhances the reliability and robustness of the event listener, making it more effective in real-world scenarios where network issues or endpoint downtimes might occur. This tutorial teaches you how to set up, configure, and run a resilient event listener that can handle events efficiently and reliably.

šŸ“˜

JavaScript version

We also have a JavaScript version of this tutorial, find it here: Ethereum: BUIDLing a redundant event listener with ethers and web3.js.

Prerequisites

Before diving into the tutorial, let's go over the basic setup:

Ensure you have the following setup before starting the tutorial:

  • Python: Make sure Python is installed on your machine. You can download it from the official Python website.

  • Virtual environment: Using a virtual environment to manage your project dependencies is a good practice. To set up and activate a virtual environment, run the following commands in the terminal in the directory where you want to create the project:

    $ python -m venv venv
    $ source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    
  • web3.py library: Install the web3.py library using pip. Run the following command from the terminal in the directory where you want to create the project:

    pip install web3
    
  • Ethereum node endpoints:
    For this project, it is recommended to deploy at least three RPC nodes: one Global Elastic Node and two Regional Elastic Nodes. The number of nodes you can use in the project is not limited; you can also mix different providers.

šŸ“˜

Note:

You need at least the Chainstack Growth plan to deploy multiple nodes. Check the Chainstack pricing page for a coupon.

The code

Now that your environment is ready let's go over the code. In the directory where your project is create a new file named main.py and paste the following code:

import os
import asyncio
import logging
from web3 import Web3
from web3.middleware import geth_poa_middleware

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# List of endpoints
endpoints = [
    "ENDPOINT_1",
    "ENDPOINT_2",
    "ENDPOINT_3"
]

# Filter for WETH transfer events
logs_filter = {
    'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',  # WETH contract address
    'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],  # Transfer event signature
}

# Set to track seen event identifiers to prevent duplicates
seen_events = set()

async def handle_event(event, endpoint):
    event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
    if event_id not in seen_events:
        seen_events.add(event_id)
        logging.info(f"Event received first from {endpoint}: {event_id}")

async def subscribe_to_logs(endpoint):
    while True:
        try:
            web3 = Web3(Web3.HTTPProvider(endpoint))
            web3.middleware_onion.inject(geth_poa_middleware, layer=0)

            if not web3.is_connected():
                logging.warning(f"Failed to connect to endpoint {endpoint}")
                await asyncio.sleep(5)
                continue

            logging.info(f"Connected to endpoint {endpoint}")

            event_filter = web3.eth.filter(logs_filter)
            while True:
                for event in event_filter.get_new_entries():
                    await handle_event(event, endpoint)
                await asyncio.sleep(1)
        except Exception as e:
            logging.error(f"Error when subscribing to logs from {endpoint}: {e}")
            await asyncio.sleep(5)  # Wait before retrying

async def main():
    tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Step-by-step code breakdown

Let's break down the code step-by-step to understand how it works:

  1. Import libraries

    The first step is to import the necessary libraries. We import os, asyncio, and logging for general purpose use, and Web3 and geth_poa_middleware from the web3 library to interact with the Ethereum blockchain.

    import os
    import asyncio
    import logging
    from web3 import Web3
    from web3.middleware import geth_poa_middleware
    
  2. Configure logging

    Logging is configured to display informational messages with a specific format, which includes the timestamp, log level, and message. This helps in tracking the application's behavior and debugging issues.

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
  3. Define endpoints

    A list of Ethereum node endpoints is defined. These endpoints should be URLs of your RPC nodes. Replace ENDPOINT_1, ENDPOINT_2, and ENDPOINT_3 with the actual URLs of your Ethereum nodes.

    endpoints = [
        "ENDPOINT_1",
        "ENDPOINT_2",
        "ENDPOINT_3"
    ]
    
  4. Set up event filter

    The event filter is configured to listen for WETH transfer events. The address field specifies the WETH contract address and the topics field contains the signature for the Transfer event.

    logs_filter = {
        'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',  # WETH contract address
        'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],  # Transfer event signature
    }
    
  5. Track seen events

    A set named seen_events is used to track event identifiers that have already been processed. This helps in preventing duplicate handling of the same event.

    seen_events = set()
    
  6. Handle events

    The handle_event function processes the received events. It constructs a unique event identifier from the transaction hash and log index. If the event has not been seen before, it logs the event and adds the identifier to the seen_events set.

    async def handle_event(event, endpoint):
        event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
        if event_id not in seen_events:
            seen_events.add(event_id)
            logging.info(f"Event received first from {endpoint}: {event_id}")
    
  7. Subscribe to logs

    The subscribe_to_logs function continuously tries to connect to an Ethereum node, sets up a filter for the specified events, and processes new entries as they arrive. If the connection fails, it waits for 5 seconds before retrying.

    async def subscribe_to_logs(endpoint):
        while True:
            try:
                web3 = Web3(Web3.HTTPProvider(endpoint))
                web3.middleware_onion.inject(geth_poa_middleware, layer=0)
    
                if not web3.is_connected():
                    logging.warning(f"Failed to connect to endpoint {endpoint}")
                    await asyncio.sleep(5)
                    continue
    
                logging.info(f"Connected to endpoint {endpoint}")
    
                event_filter = web3.eth.filter(logs_filter)
                while True:
                    for event in event_filter.get_new_entries():
                        await handle_event(event, endpoint)
                    await asyncio.sleep(1)
            except Exception as e:
                logging.error(f"Error when subscribing to logs from {endpoint}: {e}")
                await asyncio.sleep(5)  # Wait before retrying
    
  8. Run the main function

    The main function creates tasks for each endpoint and runs them concurrently using asyncio.gather. The script starts by calling asyncio.run(main()).

    async def main():
        tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints]
        await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        asyncio.run(main())
    

Run the code

To run the code, simply add your endpoints in the endpoints list and run the command in the terminal:

python3 main.py

It will start listening and logging; here is an example:

2024-05-16 17:07:09,452 - INFO - Connected to endpoint https://nd-974-620-518.p2pify.com/
2024-05-16 17:07:11,345 - INFO - Connected to endpoint https://nd-777-597-727.p2pify.com/
2024-05-16 17:07:12,776 - INFO - Connected to endpoint https://ethereum-mainnet.core.chainstack.com/
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0xcab7994c8ff1495136db4966f4ed4556513540c6cf08dbd22e09fb3496acadef-1
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x649a1c6138ae7f3135d9ec2a24068ced7d1d2f00fd63781fa11153915d3f22b4-4
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x52c3c874bc8a8a7c34cdcfbf7e0adad89164b2069d1b445feb44504d350dee59-7
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8755f753006a4bcf6b436bf1b377ca41d39c33219658426e8ae6a63b914279c3-30
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x4b5d3072d599cb4fc148a09c38b85bfa2729f5ad95e2b676cd15c8ebd4cff76d-43
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x54ebab7ec57c78c8182ab6797bc4a41f31c852e4d206ea13c59166592c44f41a-91
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8f69c9aa219fcc6d29edec88d72de91d7ab4bbb2f3854e7d4c339deb313badf9-128

You can find the full event in event in handle_event and you can use it for further processing:

async def handle_event(event, endpoint):
    event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
    if event_id not in seen_events:
        seen_events.add(event_id)
        logging.info(f"Event received first from {endpoint}: {event_id}")

Conclusion

Building a resilient Ethereum event listener is crucial for maintaining reliable and consistent decentralized applications. Using multiple endpoints across different regions can ensure better event capture and redundancy, making your event listener robust against network issues and endpoint downtimes.

This tutorial guided you through setting up, configuring, and running a resilient event listener using Python and web3.py. With this setup, you are well-equipped to handle blockchain events efficiently and reliably, enhancing the effectiveness of your DApps.