TLDR

  • Demonstrates how to collect Uniswap v4 ETH-USDC trading data on Unichain using a multi-threaded Python approach with Chainstack nodes.
  • Monitors the Uniswap v4 Pool Manager contract for swap events and filters trades originating from the Universal Router.
  • Includes production-ready features like persistence with resume capability, error handling with exponential backoff, and parallel block processing.
  • Shows how to parse Uniswap v4 swap events from the singleton architecture and save comprehensive trading data to CSV files for analytics or LLM training.

Overview

There are different use cases for why you may want to collect the Uniswap v4 ETH-USDC trades on the Unichain mainnet, and data analytics and synthetic data generation for LLM fine-tuning is one of them.

Uniswap v4, however, with its singleton architecture, despite the name, can be tricky.

This tutorial will guide you through setting up a multi-threaded persistent Python script to quickly and without failure collect the precious trading data off Uniswap v4.

We are going to do this Python.

Prerequisites

Implementation

We will be collecting Uniswap v4 ETH-USDC swap events by monitoring the Uniswap v4 Pool Manager contract on Unichain. The script uses a multi-threaded approach to efficiently collect trading data while handling network issues gracefully.

Here’s how the implementation works:

  • Event monitoring — We monitor the Swap event from the Uniswap v4 Pool Manager contract.
  • Router filtering — We only collect trades that originate from the Universal Router.
  • Multi-threading — The script uses parallel processing to handle multiple block ranges simultaneously.
  • Persistence — Data is saved to CSV files with resume capability if the script is interrupted.
  • Error Handling — Comprehensive retry mechanisms handle network issues and connection failures.

Key parameters in our example:

The script:

import os
from datetime import datetime
import time
import pandas as pd
from web3 import Web3
from web3.exceptions import TransactionNotFound, BlockNotFound
from requests.exceptions import ConnectionError
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configuration constants
UNICHAIN_RPC_URL = "CHAINSTACK_NODE_ENDPOINT"
UNISWAP_V4_POOL_MANAGER = "0x1f98400000000000000000000000000000000004"
UNISWAP_V4_UNIVERSAL_ROUTER = "0xef740bf23acae26f6492b10de645d6b98dc8eaf3"
UNISWAP_V4_ETH_USDC_POOL = "0x3258f413c7a88cda2fa8709a589d221a80f6574f63df5a5b6774485d8acc39d9"
BATCH_SIZE = 100
START_BLOCK = 17910700
UNICHAIN_BLOCK_TIME = 1  # seconds per block on Unichain

# Retry configuration
MAX_RETRIES = 5
BASE_RETRY_DELAY = 5  # Base delay between retries (will be multiplied by retry attempt number)
MAX_RETRY_DELAY = 60  # Maximum delay between retries

class UniswapV4DataCollector:
    def __init__(self):
        self.setup_web3_connection()
        # Convert address to checksum format
        self.pool_manager_address = Web3.to_checksum_address(UNISWAP_V4_POOL_MANAGER)
        self.universal_router_address = Web3.to_checksum_address(UNISWAP_V4_UNIVERSAL_ROUTER)
        self.target_pool_id = UNISWAP_V4_ETH_USDC_POOL.replace('0x', '')
        
        # Uniswap v4 PoolManager ABI for Swap event
        self.pool_manager_abi = [
            {
                "anonymous": False,
                "inputs": [
                    {"indexed": True, "internalType": "PoolId", "name": "id", "type": "bytes32"},
                    {"indexed": True, "internalType": "address", "name": "sender", "type": "address"},
                    {"indexed": False, "internalType": "int128", "name": "amount0", "type": "int128"},
                    {"indexed": False, "internalType": "int128", "name": "amount1", "type": "int128"},
                    {"indexed": False, "internalType": "uint160", "name": "sqrtPriceX96", "type": "uint160"},
                    {"indexed": False, "internalType": "uint128", "name": "liquidity", "type": "uint128"},
                    {"indexed": False, "internalType": "int24", "name": "tick", "type": "int24"},
                    {"indexed": False, "internalType": "uint24", "name": "fee", "type": "uint24"}
                ],
                "name": "Swap",
                "type": "event"
            }
        ]
        
        self.setup_contract()

    def setup_web3_connection(self):
        """Initialize Web3 connection with retry mechanism."""
        retry_count = 0
        
        while True:
            try:
                self.w3 = Web3(Web3.HTTPProvider(UNICHAIN_RPC_URL))
                
                # Test connection
                self.w3.eth.block_number
                print(f"Successfully connected to Unichain network using {UNICHAIN_RPC_URL}")
                break
            except Exception as e:
                retry_count += 1
                
                if retry_count > MAX_RETRIES:
                    raise Exception(f"Failed to connect to Unichain network RPC after {MAX_RETRIES} attempts")
                
                delay = min(BASE_RETRY_DELAY * retry_count, MAX_RETRY_DELAY)
                print(f"Connection to {UNICHAIN_RPC_URL} failed: {str(e)}")
                print(f"Retrying in {delay} seconds... (Attempt {retry_count}/{MAX_RETRIES})")
                time.sleep(delay)

    def setup_contract(self):
        """Initialize contract with retry mechanism."""
        retry_count = 0
        while True:
            try:
                self.contract = self.w3.eth.contract(
                    address=self.pool_manager_address,
                    abi=self.pool_manager_abi
                )
                break
            except Exception as e:
                retry_count += 1
                if retry_count > MAX_RETRIES:
                    raise Exception(f"Failed to setup contract after {MAX_RETRIES} attempts")
                delay = min(BASE_RETRY_DELAY * retry_count, MAX_RETRY_DELAY)
                print(f"Contract setup failed. Retrying in {delay} seconds... (Attempt {retry_count}/{MAX_RETRIES})")
                time.sleep(delay)

    def get_last_processed_block(self, output_file: str) -> int:
        """Get the last successfully processed block from the CSV file."""
        try:
            if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
                df = pd.read_csv(output_file)
                if not df.empty:
                    return df['block_number'].max()
        except Exception as e:
            print(f"Error reading last processed block: {e}")
        return None

    def collect_swap_events(
        self, 
        start_block: int, 
        end_block: int, 
        batch_size: int = None
    ) -> pd.DataFrame:
        """Collect swap events with retry mechanism."""
        all_events = []
        current_block = start_block
        
        if batch_size is None:
            batch_size = BATCH_SIZE

        event_signature = "0x" + Web3.keccak(
            text="Swap(bytes32,address,int128,int128,uint160,uint128,int24,uint24)"
        ).hex()
        pool_id_bytes = bytes.fromhex(self.target_pool_id)
        pool_id_topic = '0x' + pool_id_bytes.hex().zfill(64)

        while current_block < end_block:
            retry_count = 0
            success = False
            
            while not success and retry_count < MAX_RETRIES:
                try:
                    batch_end = min(current_block + batch_size, end_block)
                    
                    event_filter = {
                        'address': self.pool_manager_address,
                        'fromBlock': current_block,
                        'toBlock': batch_end,
                        'topics': [event_signature, pool_id_topic]
                    }
                    
                    events = self.w3.eth.get_logs(event_filter)
                    event_count = 0
                    
                    for event in events:
                        try:
                            processed_event = self.contract.events.Swap().process_log(event)
                            
                            # Skip if not from the Universal Router
                            if processed_event.args.sender.lower() != self.universal_router_address.lower():
                                continue
                            
                            # Get transaction details
                            tx = self.w3.eth.get_transaction(processed_event.transactionHash.hex())
                            
                            event_data = {
                                'block_number': processed_event.blockNumber,
                                'transaction_hash': processed_event.transactionHash.hex(),
                                'pool_id': processed_event.args.id.hex(),
                                'router_address': processed_event.args.sender,
                                'original_sender': tx['from'],
                                'amount0': str(processed_event.args.amount0),
                                'amount1': str(processed_event.args.amount1),
                                'sqrt_price_x96': str(processed_event.args.sqrtPriceX96),
                                'liquidity': str(processed_event.args.liquidity),
                                'tick': processed_event.args.tick,
                                'fee': processed_event.args.fee,
                                'timestamp': self.w3.eth.get_block(processed_event.blockNumber)['timestamp']
                            }
                            
                            all_events.append(event_data)
                            event_count += 1
                        except Exception as e:
                            print(f"Error processing event: {e}")
                            continue
                    
                    print(f"Processed blocks {current_block} to {batch_end}, found {event_count} events")
                    success = True
                    current_block = batch_end + 1
                    
                except (ConnectionError, TransactionNotFound, BlockNotFound) as e:
                    retry_count += 1
                    if retry_count >= MAX_RETRIES:
                        print(f"Failed to process blocks {current_block} to {batch_end} after {MAX_RETRIES} attempts")
                        # Save the current progress
                        if all_events:
                            return pd.DataFrame(all_events)
                        return pd.DataFrame()
                    
                    delay = min(BASE_RETRY_DELAY * retry_count, MAX_RETRY_DELAY)
                    print(f"Error: {e}. Retrying in {delay} seconds... (Attempt {retry_count}/{MAX_RETRIES})")
                    time.sleep(delay)
                    
                    # Reconnect to the network
                    self.setup_web3_connection()
                    self.setup_contract()

        return pd.DataFrame(all_events) if all_events else pd.DataFrame()

    def collect_uniswap_v4_data(
        self,
        start_block: int = None,
    ) -> pd.DataFrame:
        """Main data collection pipeline with resume capability."""
        if start_block is None:
            start_block = START_BLOCK
        
        batch_size = BATCH_SIZE

        # Create output file
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        os.makedirs('data/raw', exist_ok=True)
        output_file = f'data/raw/swap_events_{timestamp}.csv'
        
        print(f"\n🔧 Configuration:")
        print(f"   Start block: {start_block}")
        print(f"   Batch size: {batch_size}")
        print(f"   Output file: {output_file}")
        print(f"   Pool ID: {UNISWAP_V4_ETH_USDC_POOL}")

        # Check for last processed block
        last_block = self.get_last_processed_block(output_file)
        if last_block is not None:
            print(f"Resuming collection from block {last_block + 1}")
            start_block = last_block + 1

        return self._collect_and_save(start_block, batch_size, output_file)

    def _collect_and_save(self, current_block: int, batch_size: int, output_file: str) -> pd.DataFrame:
        """Continuously collect data with error handling and resume capability."""
        blocks_per_chunk = int(600 / UNICHAIN_BLOCK_TIME)  # 600 seconds = 10 minutes
        max_workers = 8  # Number of parallel threads
        max_workers_cap = 16  # Maximum allowed workers to prevent system overload
        
        try:
            while True:
                try:
                    latest_block = self.w3.eth.block_number
                    
                    if current_block >= latest_block:
                        print("Waiting for new blocks...")
                        time.sleep(30)
                        continue

                    # Calculate how many chunks to process in parallel
                    total_blocks_to_process = min(blocks_per_chunk * max_workers, latest_block - current_block)
                    blocks_per_parallel_chunk = total_blocks_to_process // max_workers
                    
                    if blocks_per_parallel_chunk < batch_size:
                        blocks_per_parallel_chunk = min(batch_size, latest_block - current_block)
                        max_workers = min(max_workers_cap, max(1, (latest_block - current_block) // blocks_per_parallel_chunk))
                    
                    # Ensure max_workers doesn't exceed the cap
                    max_workers = min(max_workers, max_workers_cap)
                    
                    print(f"\nProcessing {total_blocks_to_process} blocks using {max_workers} workers")
                    print(f"Each worker will process chunks of {blocks_per_parallel_chunk} blocks in batches of {batch_size}")
                    
                    # Create block ranges for parallel processing
                    block_ranges = []
                    for i in range(max_workers):
                        start = current_block + (i * blocks_per_parallel_chunk)
                        end = min(start + blocks_per_parallel_chunk, latest_block)
                        if start < end:
                            block_ranges.append((start, end))
                    
                    if not block_ranges:
                        print("No valid block ranges to process")
                        time.sleep(30)
                        continue
                    
                    # Process block ranges in parallel
                    all_swap_dfs = []
                    total_events_found = 0
                    with ThreadPoolExecutor(max_workers=max_workers) as executor:
                        future_to_range = {
                            executor.submit(self.collect_swap_events, start, end, batch_size): (start, end) 
                            for start, end in block_ranges
                        }
                        
                        for future in as_completed(future_to_range):
                            block_range = future_to_range[future]
                            try:
                                swap_df = future.result()
                                if len(swap_df) > 0:
                                    all_swap_dfs.append(swap_df)
                                    total_events_found += len(swap_df)
                                    print(f"Completed blocks {block_range[0]} to {block_range[1]}, found {len(swap_df)} events")
                                else:
                                    print(f"Completed blocks {block_range[0]} to {block_range[1]}, found 0 events")
                            except Exception as e:
                                print(f"Error processing blocks {block_range[0]} to {block_range[1]}: {e}")
                    
                    print(f"\n📊 Summary: {len(all_swap_dfs)} DataFrames collected with {total_events_found} total events")
                    
                    # Combine results and save
                    if all_swap_dfs:
                        combined_df = pd.concat(all_swap_dfs, ignore_index=True)
                        file_exists = os.path.isfile(output_file)
                        combined_df.to_csv(output_file, mode='a', header=not file_exists, index=False, float_format='%.0f')
                        
                        print(f"\n✅ SAVED TO FILE: {output_file}")
                        print(f"Events collected in this round: {len(combined_df)}")
                        
                        # Count total events if needed (optional, can be removed for performance)
                        # if file_exists:
                        #     try:
                        #         total_events = len(pd.read_csv(output_file))
                        #         print(f"Total events collected so far: {total_events}")
                        #     except Exception as e:
                        #         print(f"Could not count total events: {e}")
                    else:
                        print(f"\n⚠️  No events found in this chunk, nothing to save")
                    
                    # Update current block for next iteration
                    current_block = max(end for _, end in block_ranges) + 1
                    
                except (ConnectionError, TransactionNotFound, BlockNotFound) as e:
                    print(f"Network error: {e}")
                    print("Attempting to reconnect...")
                    time.sleep(BASE_RETRY_DELAY)
                    self.setup_web3_connection()
                    self.setup_contract()
                    continue
                except Exception as e:
                    print(f"Unexpected error: {e}")
                    print("Continuing with next chunk...")
                    # Move forward by one chunk to avoid getting stuck
                    current_block = current_block + blocks_per_chunk
                    continue
                
        except KeyboardInterrupt:
            print("\nData collection stopped by user.")
            if os.path.isfile(output_file):
                final_df = pd.read_csv(output_file)
                print(f"\nFinal Collection Summary:")
                print(f"Total events collected: {len(final_df)}")
                return final_df
            return pd.DataFrame()

if __name__ == '__main__':
    while True:
        try:
            collector = UniswapV4DataCollector()
            collector.collect_uniswap_v4_data(
                start_block=START_BLOCK
            )
        except Exception as e:
            print(f"Fatal error: {e}")
            print("Restarting data collection in 60 seconds...")
            time.sleep(60)
            continue

Conclusion

This tutorial demonstrated how to build a robust Uniswap v4 data collection system for ETH-USDC trades on Unichain using Python and Chainstack’s Unichain nodes. This event-based monitoring approach provides reliable and fast trade data collection from the Uniswap v4 singleton architecture.

This pattern can be easily adapted for other Uniswap v4 pools by simply changing the pool ID parameter, or extended to monitor multiple pools simultaneously.

The collected data is perfect for analytics, research, or synthetic data generation for LLM fine-tuning purposes.

About the author

8_Bi4fdM_400x400

Ake

Director of Developer Experience @ Chainstack

Talk to me all things Web3

20 years in technology | 8+ years in Web3 full time years experience

Trusted advisor helping developers navigate the complexities of blockchain infrastructure