Ethereum: How to set up a redundant event listener with Python
Event listeners play an important role in blockchain technology, enabling applications to respond to specific events emitted by smart contracts. These events are vital for decentralized applications (DApps), such as tracking token transfers and monitoring contract executions.
In the Ethereum ecosystem, events are logged on the blockchain. They can be listened to by off-chain applications to trigger specific actions or update their state based on the latest blockchain activity.
This tutorial aims to build a resilient Ethereum event listener using Python. By leveraging multiple endpoints across different regions, we aim to achieve better consistency in catching events and ensure redundancy in case of endpoint failures.
This approach enhances the reliability and robustness of the event listener, making it more effective in real-world scenarios where network issues or endpoint downtimes might occur. This tutorial teaches you how to set up, configure, and run a resilient event listener that can handle events efficiently and reliably.
JavaScript version
We also have a JavaScript version of this tutorial, find it here: Ethereum: BUIDLing a redundant event listener with ethers and web3.js.
Prerequisites
Before diving into the tutorial, let's go over the basic setup:
Ensure you have the following setup before starting the tutorial:
-
Python: Make sure Python is installed on your machine. You can download it from the official Python website.
-
Virtual environment: Using a virtual environment to manage your project dependencies is a good practice. To set up and activate a virtual environment, run the following commands in the terminal in the directory where you want to create the project:
$ python -m venv venv $ source venv/bin/activate # On Windows, use `venv\Scripts\activate`
-
web3.py library: Install the web3.py library using pip. Run the following command from the terminal in the directory where you want to create the project:
pip install web3
-
Ethereum node endpoints:
For this project, it is recommended to deploy at least three RPC nodes: one Global Elastic Node and two Regional Elastic Nodes. The number of nodes you can use in the project is not limited; you can also mix different providers.
Note:
You need at least the Chainstack Growth plan to deploy multiple nodes. Check the Chainstack pricing page for a coupon.
The code
Now that your environment is ready let's go over the code. In the directory where your project is create a new file named main.py
and paste the following code:
import os
import asyncio
import logging
from web3 import Web3
from web3.middleware import geth_poa_middleware
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# List of endpoints
endpoints = [
"ENDPOINT_1",
"ENDPOINT_2",
"ENDPOINT_3"
]
# Filter for WETH transfer events
logs_filter = {
'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', # WETH contract address
'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'], # Transfer event signature
}
# Set to track seen event identifiers to prevent duplicates
seen_events = set()
async def handle_event(event, endpoint):
event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
if event_id not in seen_events:
seen_events.add(event_id)
logging.info(f"Event received first from {endpoint}: {event_id}")
async def subscribe_to_logs(endpoint):
while True:
try:
web3 = Web3(Web3.HTTPProvider(endpoint))
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
if not web3.is_connected():
logging.warning(f"Failed to connect to endpoint {endpoint}")
await asyncio.sleep(5)
continue
logging.info(f"Connected to endpoint {endpoint}")
event_filter = web3.eth.filter(logs_filter)
while True:
for event in event_filter.get_new_entries():
await handle_event(event, endpoint)
await asyncio.sleep(1)
except Exception as e:
logging.error(f"Error when subscribing to logs from {endpoint}: {e}")
await asyncio.sleep(5) # Wait before retrying
async def main():
tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Step-by-step code breakdown
Let's break down the code step-by-step to understand how it works:
-
Import libraries
The first step is to import the necessary libraries. We import
os
,asyncio
, andlogging
for general purpose use, andWeb3
andgeth_poa_middleware
from theweb3
library to interact with the Ethereum blockchain.import os import asyncio import logging from web3 import Web3 from web3.middleware import geth_poa_middleware
-
Configure logging
Logging is configured to display informational messages with a specific format, which includes the timestamp, log level, and message. This helps in tracking the application's behavior and debugging issues.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
-
Define endpoints
A list of Ethereum node endpoints is defined. These endpoints should be URLs of your RPC nodes. Replace
ENDPOINT_1
,ENDPOINT_2
, andENDPOINT_3
with the actual URLs of your Ethereum nodes.endpoints = [ "ENDPOINT_1", "ENDPOINT_2", "ENDPOINT_3" ]
-
Set up event filter
The event filter is configured to listen for WETH transfer events. The
address
field specifies the WETH contract address and thetopics
field contains the signature for the Transfer event.logs_filter = { 'address': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', # WETH contract address 'topics': ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'], # Transfer event signature }
-
Track seen events
A set named
seen_events
is used to track event identifiers that have already been processed. This helps in preventing duplicate handling of the same event.seen_events = set()
-
Handle events
The
handle_event
function processes the received events. It constructs a unique event identifier from the transaction hash and log index. If the event has not been seen before, it logs the event and adds the identifier to theseen_events
set.async def handle_event(event, endpoint): event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}" if event_id not in seen_events: seen_events.add(event_id) logging.info(f"Event received first from {endpoint}: {event_id}")
-
Subscribe to logs
The
subscribe_to_logs
function continuously tries to connect to an Ethereum node, sets up a filter for the specified events, and processes new entries as they arrive. If the connection fails, it waits for 5 seconds before retrying.async def subscribe_to_logs(endpoint): while True: try: web3 = Web3(Web3.HTTPProvider(endpoint)) web3.middleware_onion.inject(geth_poa_middleware, layer=0) if not web3.is_connected(): logging.warning(f"Failed to connect to endpoint {endpoint}") await asyncio.sleep(5) continue logging.info(f"Connected to endpoint {endpoint}") event_filter = web3.eth.filter(logs_filter) while True: for event in event_filter.get_new_entries(): await handle_event(event, endpoint) await asyncio.sleep(1) except Exception as e: logging.error(f"Error when subscribing to logs from {endpoint}: {e}") await asyncio.sleep(5) # Wait before retrying
-
Run the main function
The
main
function creates tasks for each endpoint and runs them concurrently usingasyncio.gather
. The script starts by callingasyncio.run(main())
.async def main(): tasks = [subscribe_to_logs(endpoint) for endpoint in endpoints] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())
Run the code
To run the code, simply add your endpoints in the endpoints
list and run the command in the terminal:
python3 main.py
It will start listening and logging; here is an example:
2024-05-16 17:07:09,452 - INFO - Connected to endpoint https://nd-974-620-518.p2pify.com/
2024-05-16 17:07:11,345 - INFO - Connected to endpoint https://nd-777-597-727.p2pify.com/
2024-05-16 17:07:12,776 - INFO - Connected to endpoint https://ethereum-mainnet.core.chainstack.com/
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0xcab7994c8ff1495136db4966f4ed4556513540c6cf08dbd22e09fb3496acadef-1
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x649a1c6138ae7f3135d9ec2a24068ced7d1d2f00fd63781fa11153915d3f22b4-4
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x52c3c874bc8a8a7c34cdcfbf7e0adad89164b2069d1b445feb44504d350dee59-7
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8755f753006a4bcf6b436bf1b377ca41d39c33219658426e8ae6a63b914279c3-30
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x4b5d3072d599cb4fc148a09c38b85bfa2729f5ad95e2b676cd15c8ebd4cff76d-43
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x54ebab7ec57c78c8182ab6797bc4a41f31c852e4d206ea13c59166592c44f41a-91
2024-05-16 17:07:13,805 - INFO - Event received first from https://ethereum-mainnet.core.chainstack.com/: 0x8f69c9aa219fcc6d29edec88d72de91d7ab4bbb2f3854e7d4c339deb313badf9-128
You can find the full event in event
in handle_event
and you can use it for further processing:
async def handle_event(event, endpoint):
event_id = f"{event['transactionHash'].hex()}-{event['logIndex']}"
if event_id not in seen_events:
seen_events.add(event_id)
logging.info(f"Event received first from {endpoint}: {event_id}")
Conclusion
Building a resilient Ethereum event listener is crucial for maintaining reliable and consistent decentralized applications. Using multiple endpoints across different regions can ensure better event capture and redundancy, making your event listener robust against network issues and endpoint downtimes.
This tutorial guided you through setting up, configuring, and running a resilient event listener using Python and web3.py. With this setup, you are well-equipped to handle blockchain events efficiently and reliably, enhancing the effectiveness of your DApps.
Updated 6 months ago