Mastering multithreading in Python for Web3 requests: A comprehensive guide
Introduction
This guide is designed for developers using the web3.py library to interact with Ethereum. We aim to unravel the mystique of multithreading and show you how to bring this powerful technique to bear on your Web3 requests.
But first, what is multithreading? Multithreading allows a program to run multiple processes simultaneously, like sending a request while processing another. This is particularly useful when dealing with I/O tasks such as network requests where the program spends a lot of time waiting. Multithreading can significantly speed up your program by allowing it to do other tasks during these waiting times, which also allows you to use system resources efficiently.
By the end of this guide, you'll have a good idea about working with and without multithreading, handling exceptions, and following best practices to ensure your code is efficient and robust.
Understanding multithreading
Multithreading is a technique that allows multiple parts of a single program to run concurrently. But what does that mean, exactly? Let's break it down.
In the simplest terms, a thread is the smallest unit of processing that can be performed in an OS (operating system). In the context of programming, a thread is the flow of execution of a program. A single-threaded program has just one flow and can perform one operation at a time. Conversely, multithreading enables a program to control multiple threads, essentially allowing multiple operations to run in parallel.
Multithreading is a technique to achieve asynchronous behavior in a program.
This is where things get exciting. With multithreading, we can run different parts of our program at the same time, as if they were separate miniature programs. This is particularly handy when performing I/O operations, like network requests or file reads and writes, which can often take significant time to complete. Multithreading allows us to start multiple operations simultaneously instead of waiting for one operation to finish before the next. This means we can fetch data from one part of a blockchain while sending a transaction request to another, all within the same program.
Multithreading is accessible in Python primarily because the language offers built-in support via the threading
and concurrent.futures
modules, among others. These modules provide high-level, easy-to-use APIs for creating and managing threads. They handle a lot of the complex details of thread management behind the scenes, making it easier for developers to leverage multithreading in their applications.
But why does this matter? Well, think about the benefits:
-
Efficiency. By allowing multiple operations to run concurrently, we make better use of our system's resources. We can start processing a new request as soon as we've sent an old one rather than waiting for a response.
-
Performance. Multithreading can significantly speed up our programs. This is particularly noticeable in programs that make a lot of I/O requests, like those interacting with a blockchain.
-
Responsiveness. In user-facing applications, multithreading can make our program feel more responsive. We can continue to interact with the user interface while waiting for I/O operations to complete.
However, it's important to note that multithreading comes with its own set of challenges. Issues such as thread synchronization, deadlocks, and race conditions need to be handled carefully; we'll tackle these in later sections.
In the next sections, we'll dive into how to use multithreading in Python to improve the performance of your Chainstack RPC endpoint. You'll learn how to send multiple requests to your RPC node concurrently, which can significantly speed up tasks like fetching data from the blockchain or waiting for transactions to be mined. Let's get started!
Setting up your environment
Before we dive into the code, let's set up our development environment. This will ensure you have all the necessary tools and libraries installed to follow along with this guide.
Here's what you'll need:
-
Python. We'll be using Python for our examples because of its readability and the robust libraries it offers for working with Ethereum. We recommend using Python 3.8 or newer. You can download Python from the official website if you don't have it installed already.
-
web3.py. This is a Python library for interacting with Ethereum. It's a comprehensive tool that lets us connect to Ethereum nodes, make requests, and work with Ethereum data. You can install web3.py using pip, the Python package installer. Open a terminal and run the following command:
pip install web3
-
An Ethereum archive node. We'll interact with the Ethereum network for this guide. We'll need access to an Ethereum archive node since we’ll query older states. To get an RPC endpoint, follow these steps:
Creating a simple Web3 script without multithreading
Let's start with a simple example of making Web3 requests without multithreading. We'll write a script fetching an Ethereum address's balance at various block numbers. Here's the code:
from web3 import Web3
import time
web3 = Web3(Web3.HTTPProvider("YOUR_CHAINSTACK_ENDPOINT"))
address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326"
start_block = web3.eth.block_number
end_block = start_block - 500
def get_balance_at_block(block_num):
balance = web3.eth.get_balance(address, block_identifier=block_num)
print(f"Balance at block {block_num}: {web3.from_wei(balance, 'ether')} ETH")
start = time.time()
for block_num in range(start_block, end_block, -1):
get_balance_at_block(block_num)
print(f"Time taken: {time.time() - start}")
Now, let's walk through the code, line by line:
- Import necessary modules. We start by importing the
Web3
module from theweb3
package. We also import thetime
module, which we'll use to measure the execution time of our script. - Set up the Web3 provider. We set up a connection to our Ethereum node using
Web3.HTTPProvider
. ReplaceYOUR_CHAINSTACK_ENDPOINT
with the link to your Ethereum node. - Define the Ethereum address and block range. We specify the Ethereum address we're interested in and the range of block numbers we want to check. We use
web3.eth.block_number
to get the latest block number and subtract 500 to get the start of our range. - Define the function for fetching the balance. We define a function
get_balance_at_block
that takes a block number as input, fetches the balance of our specified address at that block number, and prints the balance in ether (converted from wei). - Fetch the balance at each block number. We loop over the range of block numbers from
start_block
toend_block
, callingget_balance_at_block
for each one. Since we're not using multithreading, these requests are made sequentially, meaning the script waits for each request to complete before moving on to the next one. - Measure and print the execution time. We use
time.time()
to get the current time at the start and end of our script, subtract the two to get the total execution time, and print the result.
This script provides a baseline for understanding how long it takes to make these Web3 requests without multithreading. The result is heavily dependent on your location compared to the node’s location. The next section modifies this script to use multithreading and compares the performance.
Creating a simple Web3 script with multithreading
Now let's take our previous example and modify it to use multithreading. This will allow us to make multiple Web3 requests concurrently, potentially speeding up our script. Here's the modified code:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from web3 import Web3
import time
web3 = Web3(Web3.HTTPProvider("YOUR_CHAINSTACK_ENDPOINT"))
address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326"
start_block = web3.eth.block_number
end_block = start_block - 500
max_workers = 100
def get_balance_at_block(block_num):
balance = web3.eth.get_balance(address, block_identifier=block_num)
print(f"Balance at block {block_num}: {web3.from_wei(balance, 'ether')} ETH")
async def main():
with ThreadPoolExecutor(max_workers=max_workers) as executor:
tasks = [
loop.run_in_executor(
executor,
get_balance_at_block,
block_num
) for block_num in range(start_block, end_block, -1)
]
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(main())
print(f"Time taken: {time.time() - start}")
Now let's go through the changes step by step:
- Additional imports. We import
asyncio
andThreadPoolExecutor
fromconcurrent.futures
.asyncio
is a library for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives.ThreadPoolExecutor
is a class that creates a pool of worker threads and provides a simple way to offload tasks to them. - Creating the
ThreadPoolExecutor
. Inside themain
function, we create aThreadPoolExecutor
with a maximum of 100 worker threads. These threads will be used to run ourget_balance_at_block
function concurrently. - Creating tasks. We create a list of tasks, where each task is a call to
get_balance_at_block
for a different block number. Each of these tasks is run in the executor, meaning it's run in a separate thread. This is done using theloop.run_in_executor
method, which schedules the callable to be executed and returns aFuture
object representing the execution of the callable. - Running tasks concurrently. We use
asyncio.gather(*tasks)
to run these tasks concurrently. This function returns a Future aggregating result from the given Future or coroutine objects. This Future completes when all of the given Futures are complete. - Running the event loop. Finally, we use
loop.run_until_complete(main())
to run the event loop until themain()
function has been completed. This starts the execution of the tasks in the executor and waits for them to complete.
Using a ThreadPoolExecutor
and asyncio
, we can make multiple Web3 requests concurrently, potentially speeding up our script significantly compared to the sequential version.
In the next section, we'll compare the performance of this multithreaded version with the sequential version and discuss some of the considerations and best practices when using multithreading in Python.
Explain ThreadPoolExecutor
and workers
ThreadPoolExecutor
and workersNow, let's dive deeper into ThreadPoolExecutor
and the concept of worker threads, as this is the main concept.
In Python's concurrent.futures
module, a ThreadPoolExecutor
is a class that creates a pool of worker threads and provides methods to submit tasks to this pool. Once a task is submitted, a worker thread picks it up and executes it. When a worker thread finishes executing a task, it becomes available to pick up another task.
The parameter max_workers
defines the maximum number of worker threads the executor can use. This doesn't mean it will always use this many threads; it won't use more than this. If you submit more tasks than max_workers
, the executor will queue the extra tasks and execute them as worker threads become available.
Choosing the right value for max_workers
depends on the nature of the tasks and the resources available.
- If the tasks are I/O-bound (for example, making network requests), like in this case, you can benefit from a relatively high number of worker threads, as these tasks spend much of their time waiting for I/O operations to complete. While one thread is waiting for its I/O operation, other threads can be executing.
- However, if the tasks are CPU-bound, having more worker threads than the number of CPUs can lead to context switching overhead and won't usually provide any speedup due to Python's global interpreter lock (GIL).
- You also need to consider the resources available. Each thread consumes some amount of memory, so having many worker threads could consume a lot of memory.
In the example, we set max_workers
to 100, which means the executor will use up to 100 threads to execute the get_balance_at_block
function concurrently. I used this number because although my machine runs a 16-core CPU, the tasks are I/O, so we can leverage the CPU idle time while waiting for the server to respond. If the task was CPU bound, we would want to cap the workers to 16. Also, after running multiple tests, this number of workers gives me the best performance between speed, resource consumption, and server response/stability. This provides a significant speedup compared to the sequential version, as while one thread is waiting for a response from the Ethereum node, other threads can send their requests or process received data.
Be aware that the Ethereum node might also have limits on how many concurrent requests it can handle. If you make too many requests at once, it might slow down and start rejecting requests. However, Chainstack does not throttle the requests, meaning you should not experience issues if you keep the requests under 3,000 requests per second.
Understanding the differences
Now that we've seen both a sequential and a multithreaded approach to making Web3 requests let's compare the two and understand the differences in performance and efficiency.
- Sequential approach. In the sequential version of the script, we made one request at a time. After sending a request, we waited for the response before sending the next request. This is a straightforward approach, and it's easy to understand what's happening at each step. However, this approach doesn't make efficient use of our resources. While waiting for a response from the Ethereum node, our program isn't doing anything else. In my test, this approach took 88 seconds to complete.
- Multithreaded approach. In the multithreaded version of the script, we used a
ThreadPoolExecutor
to create a pool of worker threads. We then used these threads to send multiple requests concurrently. While one thread is waiting for a response, other threads can send their requests or process received data. This approach can be more efficient because it allows us to do more work at the same time. In my test, the multithreaded approach took only 2 seconds, about 97% faster compared to the sequential approach.
These results demonstrate the potential benefits of multithreading for making Web3 requests. By sending multiple requests concurrently, we can significantly speed up our script and make better use of our system's resources.
However, it's important to note that multithreading comes with its own set of challenges. Managing multiple threads can add complexity to our code, and we need to be careful to avoid issues like race conditions, where two threads try to modify a shared resource at the same time. Also, if we try to make too many requests at once, we might overwhelm the Ethereum node or run into rate limits.
Organizing the response
While running the code, you may observe that the results are presented in the order they are received, rather than in a block-wise sequence, which would seem more logical. With a small modification to the code, we can align the output to display results in accordance with their respective block numbers.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from web3 import Web3
import time
web3 = Web3(Web3.HTTPProvider("YOUR_CHAINSTACK_ENDPOINT"))
address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326"
start_block = web3.eth.block_number
end_block = start_block - 500
max_workers = 100
def get_balance_at_block(block_num):
try:
balance = web3.eth.get_balance(address, block_identifier=block_num)
return (block_num, web3.from_wei(balance, 'ether')) # Return a tuple
except Exception as e:
print(f"Error occurred while getting balance at block {block_num}: {e}")
async def main():
with ThreadPoolExecutor(max_workers=max_workers) as executor:
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
get_balance_at_block,
block_num
) for block_num in range(start_block, end_block, -1)
]
results = []
for future in asyncio.as_completed(futures):
try:
# This will raise an exception if the thread raised an exception
result = await future
results.append(result) # Collect the results
except Exception as e:
print(f"Error occurred in thread: {e}")
# Sort the results by block number and print them
results.sort(key=lambda x: x[0])
for block_num, balance in results:
print(f"Balance at block {block_num}: {balance} ETH")
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(main())
print(f"Time taken: {time.time() - start}")
In this version, the get_balance_at_block
function returns a tuple (block_num, balance)
. These tuples are collected in the results
list. After all futures are completed, the results
list is sorted by block number (the first element of each tuple), and the results are printed in order.
Handling errors and exceptions in multithreaded architecture
As with any code, errors and exceptions can occur in a multithreaded architecture. Handling these errors properly is crucial for ensuring the robustness and reliability of your code. In a multithreaded context, error handling can be slightly more complex because errors can occur in multiple threads simultaneously.
Here are some common errors and exceptions that you might encounter and how to handle them:
-
Rate limit errors. Since multithreading allows you to make more requests quickly, you might encounter a rate limit. Chainstack does not pose limits, but you might get errors due to too many requests, so it is recommended to stay below 3,000 RPS.
Rate limit errors can be handled by catching the appropriate exception in a
try
/except
block. You should also include logic to delay or reduce the rate of your requests if you encounter a rate limit error. -
Thread errors. These are errors related to the management of threads, such as creating too many threads or problems with thread synchronization. If too many threads are created, you might run into system limits or performance issues due to the overhead of managing a large number of threads.
-
Synchronization errors. These occur when multiple threads try to modify a shared resource at the same time, leading to race conditions, inconsistent results, or even data corruption. While these errors are less likely to occur in the context of Web3 requests, they're an important consideration when working with multithreaded applications.
-
Unhandled exceptions in threads. If an unhandled exception occurs in a thread, it can cause the thread to terminate unexpectedly. This might lead to resource leaks if the thread doesn't clean up its resources before terminating, and it can also lead to incomplete results if the thread doesn't finish its task.
Handling these errors and exceptions properly is crucial for ensuring the robustness and reliability of your multithreaded application. Further, let us discuss some strategies.
Handling thread errors. You can handle thread errors by limiting the number of threads that your program creates and by catching and handling any exceptions that occur when creating or managing threads. For example:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from web3 import Web3
import time
web3 = Web3(Web3.HTTPProvider("YOUR_CHAINSTACK_ENDPOINT"))
address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326"
start_block = web3.eth.block_number
end_block = start_block - 500
max_workers = 100
def get_balance_at_block(block_num):
try:
balance = web3.eth.get_balance(address, block_identifier=block_num)
print(f"Balance at block {block_num}: {web3.from_wei(balance, 'ether')} ETH")
except Exception as e:
print(f"Error occurred while getting balance at block {block_num}: {e}")
async def main():
with ThreadPoolExecutor(max_workers=max_workers) as executor:
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
get_balance_at_block,
block_num
) for block_num in range(start_block, end_block, -1)
]
for future in asyncio.as_completed(futures):
try:
# This will raise an exception if the thread raised an exception
result = await future
except Exception as e:
print(f"Error occurred in thread: {e}")
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(main())
print(f"Time taken: {time.time() - start}")
Handling synchronization errors. If your threads are sharing resources, you should use synchronization primitives like locks, semaphores, or condition variables to ensure that only one thread modifies the shared resource at a time.
Handling unhandled exceptions in threads. To handle unhandled exceptions in threads, you can catch and handle exceptions within each thread, or you can use the Future.result()
method, which reraises any exception that occurred in the thread. If an exception occurs in a thread, it's stored in the Future
object for that thread, and calling Future.result()
will raise that exception in the main thread. This allows you to handle the exception in the main thread and decide how to proceed.
Best practices for multithreaded Web3 requests
When implementing multithreading, you can follow several best practices to ensure your application is efficient, robust, and easy to maintain.
-
Choose an appropriate number of threads. The optimal number of threads depends on a variety of factors, including the nature of the tasks (I/O-bound vs CPU-bound), the specifications of the system (number of CPUs, memory), and the server's capacity to handle concurrent requests. Too many threads can lead to excessive context switching, memory usage, and potentially hitting rate limits on the Ethereum node. Too few threads might not fully utilize the available resources. Typically, for I/O-bound tasks like network requests, a higher number of threads can be beneficial. Start with a reasonable number and adjust based on performance observations and system characteristics.
-
Handle exceptions properly. Unhandled exceptions in a thread can cause the thread to terminate unexpectedly, which can lead to unpredictable behavior and resource leaks. Use try/except blocks to catch and handle exceptions in each thread. Also, consider using the
Future.result()
method, which reraises any exception that occurred in the thread, allowing you to handle the exception in the main thread. -
Manage thread lifecycles. Be sure to clean up after your threads when they're done, especially for long-running applications. Using a context manager (the
with
keyword) withThreadPoolExecutor
automatically starts and stops the threads. -
Avoid shared state when possible. Shared state can lead to race conditions and make your code harder to reason. Whenever possible, design your threads to be independent of each other.
-
Use appropriate synchronization primitives. If you must use shared state, use locks, semaphores, or other synchronization primitives to ensure that threads don't interfere. However, be aware that improper use of synchronization primitives can lead to deadlocks and other issues.
-
Don't ignore the GIL. Python's global interpreter lock can limit the performance benefits of multithreading for CPU-bound tasks. However, for I/O-bound tasks like making web requests, multithreading can still provide significant performance benefits.
-
Respect the server's limits. Be aware that there may be limits on the number of requests you can make per minute or day to a server. Making too many requests in a short period of time may lead to your requests being throttled or your IP being blocked.
Conclusion
Multithreading is a powerful technique that can help you make the most efficient use of your resources when making blockchain requests. By making multiple requests concurrently, you can significantly speed up your scripts and get more work done in the same amount of time.
In this guide, we've explored how to set up your Python environment for multithreading, create a simple script using both sequential and multithreaded approaches, and handle errors and exceptions in multithreaded code. We've also discussed some best practices for writing multithreaded code, such as choosing an appropriate number of threads, managing thread lifecycles, and handling shared states properly.
However, it's important to remember that multithreading comes with its own set of challenges. Managing multiple threads can add complexity to your code, and you need to be careful to avoid issues like race conditions and resource leaks. Also, you need to be aware of the server's capacity and respect any rate limits that might be in place.
About the author
Updated 8 months ago