> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chainstack.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Ethereum: Redundant event listener with Python

> Build a redundant Ethereum event listener in Python with automatic WebSocket reconnection and failover across multiple Chainstack node endpoints.

**TLDR**

* This tutorial shows how to create a redundant event listener in Python to reliably capture Ethereum events by connecting to multiple endpoints.
* Using web3.py, it continuously checks for WETH “Transfer” events, mitigating the risk of missing events due to node downtime or latency issues.
* Each event is tracked by a unique hash to avoid duplicates, ensuring only the first instance gets processed.
* The result is a more robust, fault-tolerant DApp setup that remains accurate and responsive even during endpoint failures.

## Main article

Event listeners play an important role in blockchain technology, enabling applications to respond to specific events emitted by smart contracts. These events are vital for decentralized applications (DApps), such as tracking token transfers and monitoring contract executions.

In the Ethereum ecosystem, events are logged on the blockchain. They can be listened to by off-chain applications to trigger specific actions or update their state based on the latest blockchain activity.

This tutorial aims to build a resilient Ethereum event listener using Python. By leveraging multiple endpoints across different regions, we aim to achieve better consistency in catching events and ensure redundancy in case of endpoint failures.

This approach enhances the reliability and robustness of the event listener, making it more effective in real-world scenarios where network issues or endpoint downtimes might occur. This tutorial teaches you how to set up, configure, and run a resilient event listener that can handle events efficiently and reliably.

<Info>
  ### JavaScript version

  We also have a JavaScript version of this tutorial, find it here: [Ethereum: BUIDLing a redundant event listener with ethers and web3.js](/docs/ethereum-redundant-event-llstener-ethers-web3js).
</Info>

## Prerequisites

Before diving into the tutorial, let's go over the basic setup:

Ensure you have the following setup before starting the tutorial:

* **Python:** Make sure Python is installed on your machine. You can download it from the [official Python website](https://www.python.org/).

* **Virtual environment:** Using a virtual environment to manage your project dependencies is a good practice. To set up and activate a virtual environment, run the following commands in the terminal in the directory where you want to create the project:

  <CodeGroup>
    ```bash Bash theme={"system"}
    $ python -m venv venv
    $ source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```
  </CodeGroup>

* **web3.py library:** Install the web3.py library using pip. Run the following command from the terminal in the directory where you want to create the project:

  <CodeGroup>
    ```bash Bash theme={"system"}
    pip install web3
    ```
  </CodeGroup>

* **Ethereum node endpoints:**

<Check>
  ### Get your own node endpoint today

  [Start for free](https://console.chainstack.com/) and get your app to production levels immediately. No credit card required.

  You can sign up with your GitHub, X, Google, or Microsoft account.
</Check>

<Info>
  You need at least a Chainstack paid plan to deploy multiple nodes. Check the [Chainstack pricing](https://chainstack.com/pricing/) page for a coupon.
</Info>

## The code

Now that your environment is ready let's go over the code. In the directory where your project is create a new file named `main.py` and paste the following code:

<CodeGroup>
  ```python main.py theme={"system"}
  import os
  import asyncio
  import logging
  from web3 import Web3
  from web3.middleware import geth_poa_middleware

  # Configure logging
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

  # List of endpoints
  endpoints = [
      "ENDPOINT_1",
      "ENDPOINT_2",
      "ENDPOINT_3"
  ]

  # Filter for WETH transfer events
  logs_filter = {
      'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',  # WETH contract address
      'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],  # Transfer event signature
  }

  # Set to track seen event identifiers to prevent duplicates
  seen_events = set()

  async def handle_event(event, endpoint):
      event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
      if event_id not in seen_events:
          seen_events.add(event_id)
          logging.info(f"Event received first from {endpoint}: {event_id}")

  async def subscribe_to_logs(endpoint):
      while True:
          try:
              web3 = Web3(Web3.HTTPProvider(endpoint))
              web3.middleware_onion.inject(geth_poa_middleware, layer=0)

              if not web3.is_connected():
                  logging.warning(f"Failed to connect to endpoint {endpoint}")
                  await asyncio.sleep(5)
                  continue

              logging.info(f"Connected to endpoint {endpoint}")

              event_filter = web3.eth.filter(logs_filter)
              while True:
                  for event in event_filter.get_new_entries():
                      await handle_event(event, endpoint)
                  await asyncio.sleep(1)
          except Exception as e:
              logging.error(f"Error when subscribing to logs from {endpoint}: {e}")
              await asyncio.sleep(5)  # Wait before retrying

  async def main():
      tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints]
      await asyncio.gather(*tasks)

  if __name__ == "__main__":
      asyncio.run(main())
  ```
</CodeGroup>

### Step-by-step code breakdown

Let's break down the code step-by-step to understand how it works:

1. **Import libraries**

   The first step is to import the necessary libraries. We import `os`, `asyncio`, and `logging` for general purpose use, and `Web3` and `geth_poa_middleware` from the `web3` library to interact with the Ethereum blockchain.

   <CodeGroup>
     ```python Python theme={"system"}
     import os
     import asyncio
     import logging
     from web3 import Web3
     from web3.middleware import geth_poa_middleware
     ```
   </CodeGroup>

2. **Configure logging**

   Logging is configured to display informational messages with a specific format, which includes the timestamp, log level, and message. This helps in tracking the application's behavior and debugging issues.

   <CodeGroup>
     ```python Python theme={"system"}
     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
     ```
   </CodeGroup>

3. **Define endpoints**

   A list of Ethereum node endpoints is defined. These endpoints should be URLs of your RPC nodes. Replace `ENDPOINT_1`, `ENDPOINT_2`, and `ENDPOINT_3` with the actual URLs of your Ethereum nodes.

   <CodeGroup>
     ```python Python theme={"system"}
     endpoints = [
         "ENDPOINT_1",
         "ENDPOINT_2",
         "ENDPOINT_3"
     ]
     ```
   </CodeGroup>

4. **Set up event filter**

   The event filter is configured to listen for WETH transfer events. The `address` field specifies the WETH contract address and the `topics` field contains the signature for the Transfer event.

   <CodeGroup>
     ```python Python theme={"system"}
     logs_filter = {
         'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',  # WETH contract address
         'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],  # Transfer event signature
     }
     ```
   </CodeGroup>

5. **Track seen events**

   A set named `seen_events` is used to track event identifiers that have already been processed. This helps in preventing duplicate handling of the same event.

   <CodeGroup>
     ```python Python theme={"system"}
     seen_events = set()
     ```
   </CodeGroup>

6. **Handle events**

   The `handle_event` function processes the received events. It constructs a unique event identifier from the transaction hash and log index. If the event has not been seen before, it logs the event and adds the identifier to the `seen_events` set.

   <CodeGroup>
     ```python Python theme={"system"}
     async def handle_event(event, endpoint):
         event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
         if event_id not in seen_events:
             seen_events.add(event_id)
             logging.info(f"Event received first from {endpoint}: {event_id}")
     ```
   </CodeGroup>

7. **Subscribe to logs**

   The `subscribe_to_logs` function continuously tries to connect to an Ethereum node, sets up a filter for the specified events, and processes new entries as they arrive. If the connection fails, it waits for 5 seconds before retrying.

   <CodeGroup>
     ```python Python theme={"system"}
     async def subscribe_to_logs(endpoint):
         while True:
             try:
                 web3 = Web3(Web3.HTTPProvider(endpoint))
                 web3.middleware_onion.inject(geth_poa_middleware, layer=0)

                 if not web3.is_connected():
                     logging.warning(f"Failed to connect to endpoint {endpoint}")
                     await asyncio.sleep(5)
                     continue

                 logging.info(f"Connected to endpoint {endpoint}")

                 event_filter = web3.eth.filter(logs_filter)
                 while True:
                     for event in event_filter.get_new_entries():
                         await handle_event(event, endpoint)
                     await asyncio.sleep(1)
             except Exception as e:
                 logging.error(f"Error when subscribing to logs from {endpoint}: {e}")
                 await asyncio.sleep(5)  # Wait before retrying
     ```
   </CodeGroup>

8. **Run the main function**

   The `main` function creates tasks for each endpoint and runs them concurrently using `asyncio.gather`. The script starts by calling `asyncio.run(main())`.

   <CodeGroup>
     ```python Python theme={"system"}
     async def main():
         tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints]
         await asyncio.gather(*tasks)

     if __name__ == "__main__":
         asyncio.run(main())
     ```
   </CodeGroup>

### Run the code

To run the code, simply add your endpoints in the `endpoints` list and run the command in the terminal:

```
python3 main.py
```

It will start listening and logging; here is an example:

```
2024-05-16 17:07:09,452 - INFO - Connected to endpoint https://nd-974-620-518.p2pify.com/
2024-05-16 17:07:11,345 - INFO - Connected to endpoint https://nd-777-597-727.p2pify.com/
2024-05-16 17:07:12,776 - INFO - Connected to endpoint https://ethereum-mainnet.core.chainstack.com/
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0xcab7994c8ff1495136db4966f4ed4556513540c6cf08dbd22e09fb3496acadef-1
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x649a1c6138ae7f3135d9ec2a24068ced7d1d2f00fd63781fa11153915d3f22b4-4
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x52c3c874bc8a8a7c34cdcfbf7e0adad89164b2069d1b445feb44504d350dee59-7
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8755f753006a4bcf6b436bf1b377ca41d39c33219658426e8ae6a63b914279c3-30
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x4b5d3072d599cb4fc148a09c38b85bfa2729f5ad95e2b676cd15c8ebd4cff76d-43
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x54ebab7ec57c78c8182ab6797bc4a41f31c852e4d206ea13c59166592c44f41a-91
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8f69c9aa219fcc6d29edec88d72de91d7ab4bbb2f3854e7d4c339deb313badf9-128
```

You can find the full event in `event` in `handle_event` and you can use it for further processing:

```
async def handle_event(event, endpoint):
    event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
    if event_id not in seen_events:
        seen_events.add(event_id)
        logging.info(f"Event received first from {endpoint}: {event_id}")
```

## Conclusion

Building a resilient Ethereum event listener is crucial for maintaining reliable and consistent decentralized applications. Using multiple endpoints across different regions can ensure better event capture and redundancy, making your event listener robust against network issues and endpoint downtimes.

This tutorial guided you through setting up, configuring, and running a resilient event listener using Python and web3.py. With this setup, you are well-equipped to handle blockchain events efficiently and reliably, enhancing the effectiveness of your DApps.
