Solana: Optimize your getBlock performance

With examples in curl and Python

TLDR:

  • getBlock is a core Solana RPC method that can dramatically impact performance if misused; don’t pull more data than you need.
  • Use json, jsonParsed, base58, or base64 encoding strategically; always enable compression (gzip) to reduce huge payloads.
  • Apply concurrency limits, caching, and backoff retry logic to avoid node overload and handle network hiccups gracefully.
  • Use robust error handling and consider block ranges for larger-scale data fetches.

Main article

Solana's getBlock RPC method is a fundamental method that can be tricky and will screw up your application performance in a jiffy if you are not paying attention.

This guide provides a comprehensive overview of how to use getBlock efficiently, with practical examples in Python and curl.

👍

Get you own node endpoint today

Start for free and get your app to production levels immediately. No credit card required.

You can sign up with your GitHub, X, Google, or Microsoft account.

Understanding getBlock

The getBlock method returns detailed information about a confirmed block in Solana's ledger. A block in Solana contains:

  • A set of transactions
  • Metadata like block hash, previous block hash
  • Timing information
  • Reward data

If you think you have a grasp now, check out Understanding the difference between blocks and slots on Solana. Solana is amazing but not for the faint heart.

What data does getBlock return?

A typical response includes:

  • blockhash — the unique hash (ID) of this block (base-58 encoded)
  • previousBlockhash — the hash of the parent block
  • parentSlot — the slot number of the parent block
  • blockHeight — the sequential block height (the number of blocks beneath this block)
  • blockTime — the timestamp when the block was produced, which is yet another trick. See Solana: Understanding block time.
  • transactions — an array of transactions in the block (if requested)
  • signatures — an array of transaction signatures in the block (if requested)
  • rewards — an array of block rewards (if requested)

Parameters

When calling getBlock, you can specify several parameters to control what data you receive:

  • commitmentfinalized (default) or confirmed
  • encodingjson (default), jsonParsed, base64, or base58.
  • transactionDetailsfull (default), accounts, signatures, or none
  • rewards — boolean to include block rewards
  • maxSupportedTransactionVersion — for handling versioned transactions

Basic example with curl

Let's start with the simplest way to fetch a block using curl:

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    329849011,
    {
      "encoding": "jsonParsed",
      "transactionDetails": "full",
      "commitment": "finalized",
      "maxSupportedTransactionVersion": 0
    }
  ]
}'

This request fetches block 329849011 with full transaction details in JSON format.

Note two things here:

  • We are using jsonParsed, which produces the largest output on the node side and gets transported to you. You should never do this in production with a heavy load. This more of a one-off inspection call than anything else.
  • If you are using a full and not an archive Solana node, use a block number within the last 20 hours or so. Otherwise this will be an archive call. See Limits at the bottom for the archive node methods availability.

Getting just the signatures

If you're only interested in transaction signatures (hashes), which is much lighter:

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    329849011,
    {
      "encoding": "json",
      "transactionDetails": "signatures",
      "commitment": "finalized"
    }
  ]
}'

Same note on the block number in params as above. Make sure you stay within the last 20 hours or so, unless you want to use an archive call, which is also fine as Chainstack is extremely affordable & transparent with pricing — a full node request is counted as one request, and an archive node request is counted as 2 requests, and that's it.

Using compression (gzip) for better performance

HTTP compression is critical when working with Solana's getBlock method due to the large size of block data. Here's why compression is essential and how to implement it:

Why use compression

  1. Dramatic size reduction — Solana block data with full transaction details easily take several MBs in JSON format. Gzip compression typically reduces this by 70-90%, bringing it down to a few hundred KB.

  2. Faster response times — less data transmitted means faster responses, especially important when:

    • Working with blocks containing many transactions
    • Operating on networks with limited bandwidth
    • Fetching multiple blocks in sequence
  3. Reduced bandwidth costs — If you're paying for bandwidth (e.g., in cloud environments), compression significantly reduces costs.

  4. Server-friendly — compression reduces load on both the RPC node and your client's network connection.

Compression example with curl

Adding compression is simple with curl—just add the Accept-Encoding: gzip header and the --compressed flag:

curl -X POST CHAINSTACK_SOLANA_URL \
  -H "Content-Type: application/json" \
  -H "Accept-Encoding: gzip" \
  --compressed \
  -d '{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      329849011,
      {
        "encoding": "json",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'

When you run this command:

  1. The Accept-Encoding: gzip header tells the server you can handle compressed responses
  2. The server compresses the JSON data before sending it
  3. The --compressed flag tells curl to automatically decompress the data on receipt
  4. You see the normal JSON output, but the actual network transfer was much smaller

Understanding the compression process

Here's what happens under the hood:

  1. Request — your client sends a request with Accept-Encoding: gzip
  2. Server processing — the server generates the JSON response
  3. Compression — the server compresses this data using gzip
  4. Transfer — the compressed data (much smaller) is sent over the network
  5. Decompression — your client decompresses the data
  6. Processing — you work with the original JSON data

Most HTTP libraries handle steps 1, 4, and 5 automatically when configured correctly.

Example of handling the compressed (gzip) data manually

For illustration purposes and to compare the actual size in compressed & decompressed state:

curl -X POST CHAINSTACK_SOLANA_URL \
  -H "Content-Type: application/json" \
  -H "Accept-Encoding: gzip" \
  -d '{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [329849011, {"encoding": "json", "maxSupportedTransactionVersion": 0}]
  }' \
  --output block_data.gz

Decompress the block_data.gz file:

gunzip -c block_data.gz > block_data.json

See the ~80% reduction in size when you compare block_data.gz to block_data.json.

Compression in HTTP libraries

Most modern HTTP libraries support compression automatically:

  • Python requests — add headers={"Accept-Encoding": "gzip"} or set requests.get(..., stream=True)
  • Node.js — most HTTP clients like Axios support this out of the box
  • Rust — libraries like reqwest have compression features

Using compression is one of the simplest and most effective optimizations when working with Solana's getBlock RPC method, especially for blocks with many transactions. So use compression.

json, jsonParsed, base58, base64

When using Solana's getBlock RPC method, you can request data in different encoding formats based on your specific needs.

Note that when you are doing a getBlock call with "encoding": "base58" or "encoding": "base64", you are getting the respective encoding on the transaction level, not the entire block. In other words, you will still get back a JSON response, it's only the transaction data that will be encoded in base58 or base64.

Let's explore each option:

json (default)

The json encoding provides transaction data in a standard JSON format with binary data encoded as base58 strings.

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    149546741,
    {
      "encoding": "json",
      "transactionDetails": "full",
      "commitment": "finalized",
      "maxSupportedTransactionVersion": 0
    }
  ]
}'

Best for: General use cases where you need a balance of readability and performance. Binary data remains encoded but the overall structure is easily parseable.

jsonParsed

The jsonParsed encoding goes beyond standard JSON by attempting to decode instruction data into human-readable format:

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    149546741,
    {
      "encoding": "jsonParsed",
      "transactionDetails": "full",
      "commitment": "finalized",
      "maxSupportedTransactionVersion": 0
    }
  ]
}'

Best for:

  • Debugging and analysis where you need to understand transaction contents
  • Decoding program instructions without additional parsing work
  • Applications that display transaction details to users

Limitations — not all programs can be parsed as you need an IDL (similar to EVM's ABIs) or source code, and response size is larger than other encodings.

base58

The base58 encoding returns binary data for transactions as base58-encoded strings:

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    149546741,
    {
      "encoding": "base58",
      "transactionDetails": "full",
      "commitment": "finalized",
      "maxSupportedTransactionVersion": 0
    }
  ]
}'

Best for: Compatibility with tools that expect base58 encoding, which is common in Solana's ecosystem.

base64

The base64 encoding returns binary data for transactions as base64-encoded strings:

curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "getBlock",
  "params": [
    149546741,
    {
      "encoding": "base64",
      "transactionDetails": "full",
      "commitment": "finalized",
      "maxSupportedTransactionVersion": 0
    }
  ]
}'

Best for:

  • Performance-critical applications (base64 is more compact than base58)
  • Storage efficiency when saving transaction data
  • High-throughput systems processing many blocks

Encoding comparison

EncodingSizeHuman ReadabilityParsing ComplexityUse Case
jsonMediumGoodLowGeneral purpose
jsonParsedLargestBestLowestAnalysis & debugging
base58Medium-LargePoorMediumEcosystem compatibility
base64SmallestPoorMediumPerformance & storage

Python example with json, jsonParsed, base58, base64

First, install the package:pip install solana.

import requests
import time
import json
import os
import base64
import binascii

# Add base58 decoding support
try:
    import base58
except ImportError:
    print("Installing base58 module...")
    import subprocess
    subprocess.check_call(["pip", "install", "base58"])
    import base58

# Initialize the Solana RPC endpoint URL
rpc_url = "CHAINSTACK_SOLANA_URL"

def save_raw_response(response_data, slot, encoding):
    """Save raw RPC response data to a file"""
    # Create output directory if it doesn't exist
    os.makedirs('block_data', exist_ok=True)
    
    filename = f"block_data/block_{slot}_{encoding}.json"
    
    with open(filename, 'w') as f:
        json.dump(response_data, f, indent=2)
    
    print(f"Saved raw {encoding} response to {filename}")
    file_size = os.path.getsize(filename) / (1024 * 1024)  # Size in MB
    print(f"File size: {file_size:.2f} MB")

def make_rpc_request(method, params):
    """Make a JSON-RPC request to the Solana node"""
    headers = {"Content-Type": "application/json"}
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": method,
        "params": params
    }
    
    response = requests.post(rpc_url, headers=headers, json=payload)
    return response.json()

def get_block_with_encoding(slot, encoding="json"):
    """Fetch a block with specified encoding and return the raw response"""
    try:
        start_time = time.time()
        
        # Make direct RPC request
        params = [
            slot,
            {
                "encoding": encoding,
                "maxSupportedTransactionVersion": 0,
                "transactionDetails": "full",
                "rewards": False
            }
        ]
        
        raw_response = make_rpc_request("getBlock", params)
        elapsed = time.time() - start_time
        
        if not raw_response.get("result"):
            print(f"Block {slot} not found with {encoding} encoding.")
            return None
        
        print(f"Fetched block {slot} with {encoding} encoding in {elapsed:.2f} seconds")
        
        # Save the raw response
        save_raw_response(raw_response, slot, encoding)
        
        return raw_response
    except Exception as e:
        print(f"Error fetching block with {encoding} encoding: {e}")
        return None

def get_latest_slot():
    """Get the latest finalized slot"""
    try:
        # Correct parameter format for getSlot - commitment should be in an object
        params = [{"commitment": "finalized"}]
        slot_resp = make_rpc_request("getSlot", params)
        
        if "result" not in slot_resp:
            print("Failed to get current slot")
            print(f"Error response: {slot_resp}")
            return None
            
        current_slot = slot_resp["result"]
        print(f"Current slot: {current_slot}")
        return current_slot
    except Exception as e:
        print(f"Error fetching latest slot: {e}")
        return None

def inspect_transaction_format(response, encoding):
    """Inspect and print transaction format details for the first transaction"""
    if not response or "result" not in response:
        print(f"No result to inspect for {encoding}")
        return
    
    result = response["result"]
    if "transactions" not in result or not result["transactions"]:
        print(f"No transactions found in {encoding} response")
        return
    
    # Get the first transaction
    first_tx = result["transactions"][0]
    
    # Print transaction format details
    print(f"\n{encoding} Transaction Format Analysis:")
    print("-" * 40)
    
    # For base64 and base58 encodings, we need to handle the response differently
    if encoding in ["base64", "base58"]:
        print(f"Transaction structure type: {type(first_tx)}")
        print(f"Available fields: {list(first_tx.keys())}")
        
        # Check for specific encoding fields
        if "transaction" in first_tx:
            tx_data = first_tx["transaction"]
            print(f"Transaction data type: {type(tx_data)}")
            
            if isinstance(tx_data, list) and len(tx_data) >= 2:
                # The format is [encoded_data, encoding_type]
                encoded_str = tx_data[0]
                format_type = tx_data[1]
                print(f"Transaction is a {format_type} encoded array")
                print(f"First 50 chars of encoded data: {encoded_str[:50]}...")
                
                # Try to extract a small sample of the decoded data
                try:
                    if format_type == "base64":
                        sample = base64.b64decode(encoded_str[:100] + "=" * (4 - len(encoded_str[:100]) % 4))
                    elif format_type == "base58":
                        sample = base58.b58decode(encoded_str[:100])
                    
                    print(f"Sample of decoded bytes (hex): {sample.hex()[:60]}...")
                except Exception as e:
                    print(f"Error decoding sample: {e}")
            elif isinstance(tx_data, str):
                print(f"Transaction is a string of length: {len(tx_data)}")
                print(f"First 50 chars: {tx_data[:50]}...")
            else:
                print(f"Transaction is not a string or list but: {type(tx_data)}")
                if isinstance(tx_data, dict):
                    print(f"Keys: {list(tx_data.keys())}")
    else:
        # Original handling for JSON and jsonParsed
        if "transaction" in first_tx:
            tx = first_tx["transaction"]
            
            # Check if it's a string (likely base64) or an object
            if isinstance(tx, str):
                print(f"Transaction is a string (likely base64)")
                print(f"Length: {len(tx)} characters")
                print(f"First 100 chars: {tx[:100]}...")
            else:
                print(f"Transaction is an object/dict")
                print(f"Keys: {list(tx.keys()) if isinstance(tx, dict) else 'N/A'}")
                
                # If there's a message field, check its format
                if isinstance(tx, dict) and "message" in tx:
                    msg = tx["message"]
                    if isinstance(msg, str):
                        print(f"Message is a string, length: {len(msg)}")
                        print(f"First 100 chars: {msg[:100]}...")
                    else:
                        print(f"Message is an object with keys: {list(msg.keys()) if isinstance(msg, dict) else 'N/A'}")
        else:
            print(f"Transaction doesn't have expected 'transaction' field")
            print(f"Available fields: {list(first_tx.keys())}")
    
    print("-" * 40)

# Fetch the latest slot
slot_number = get_latest_slot()

if slot_number:
    # Fetch the block with each encoding type
    print("\nFetching block with JSON encoding:")
    json_response = get_block_with_encoding(slot_number, encoding="json")
    
    print("\nFetching block with jsonParsed encoding:")
    jsonParsed_response = get_block_with_encoding(slot_number, encoding="jsonParsed")
    
    print("\nFetching block with base64 encoding:")
    base64_response = get_block_with_encoding(slot_number, encoding="base64")
    
    print("\nFetching block with base58 encoding:")
    base58_response = get_block_with_encoding(slot_number, encoding="base58")
    
    # Inspect the transaction formats to verify encoding differences
    if json_response:
        inspect_transaction_format(json_response, "json")
    
    if jsonParsed_response:
        inspect_transaction_format(jsonParsed_response, "jsonParsed")
    
    if base64_response:
        inspect_transaction_format(base64_response, "base64")
    
    if base58_response:
        inspect_transaction_format(base58_response, "base58")
    
    print("\nComparison complete. Check the block_data directory for the raw responses.")

Here's an example output:

├── [6.3M]  block_329897953_base58.json
├── [6.3M]  block_329897953_base64.json
├── [8.0M]  block_329897953_json.json
└── [ 16M]  block_329897953_jsonParsed.json

Performance optimization

Examples in Python.

Limit concurrency and throttle requests

When fetching multiple blocks, avoid sending too many requests simultaneously:

import asyncio
import concurrent.futures
from solana.rpc.api import Client
import json
import os
import pathlib

async def fetch_block_with_solana(executor, client, slot, semaphore):
    # Use a semaphore to limit concurrency
    async with semaphore:
        try:
            # Run the synchronous client.get_block in a thread pool
            loop = asyncio.get_running_loop()
            response = await loop.run_in_executor(
                executor,
                lambda: client.get_block(
                    slot,
                    encoding="json",
                    max_supported_transaction_version=0
                )
            )
            
            # Solana-py returns a solders.rpc.responses.GetBlockResp object
            # Check if the value field exists (indicates success)
            if hasattr(response, 'value') and response.value is not None:
                return slot, response.value
            else:
                print(f"Error in response for slot {slot}")
                return slot, None
        except Exception as e:
            print(f"Exception for slot {slot}: {e}")
            return slot, None

def save_block_to_file(slot, block, output_dir):
    """Save block data to a JSON file"""
    os.makedirs(output_dir, exist_ok=True)
    
    try:
        if hasattr(block, '__dict__'):
            block_dict = block.__dict__
        else:
            block_dict = {
                'blockhash': str(block.blockhash) if hasattr(block, 'blockhash') else None,
                'blockHeight': block.block_height if hasattr(block, 'block_height') else None,
                'blockTime': block.block_time if hasattr(block, 'block_time') else None,
                'parentSlot': block.parent_slot if hasattr(block, 'parent_slot') else None,
                'previousBlockhash': str(block.previous_blockhash) if hasattr(block, 'previous_blockhash') else None,
                'transactions': [
                    {
                        'meta': tx.meta.__dict__ if hasattr(tx, 'meta') and hasattr(tx.meta, '__dict__') else None,
                        'transaction': str(tx.transaction) if hasattr(tx, 'transaction') else None,
                    } for tx in block.transactions if hasattr(block, 'transactions')
                ] if hasattr(block, 'transactions') else []
            }
        
        # Save to file
        file_path = pathlib.Path(output_dir) / f"block_{slot}.json"
        with open(file_path, 'w') as f:
            json.dump(block_dict, f, indent=2, default=str)
        
        print(f"Saved block {slot} to {file_path}")
    except Exception as e:
        print(f"Error saving block {slot}: {e}")

async def get_multiple_blocks(rpc_url, slots, max_concurrency=5, output_dir="block_data"):
    # Create a semaphore to limit concurrent requests
    semaphore = asyncio.Semaphore(max_concurrency)
    
    # Create the Solana client
    client = Client(rpc_url)
    
    # Create a thread pool executor for running synchronous code
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tasks = [fetch_block_with_solana(executor, client, slot, semaphore) for slot in slots]
        results = await asyncio.gather(*tasks)
        
        # Process and save each block
        blocks = {}
        for slot, block in results:
            if block is not None:
                blocks[slot] = block
                # Save to file
                save_block_to_file(slot, block, output_dir)
                
        return blocks

# Example usage
async def main():
    rpc_url = "CHAINSTACK_SOLANA_URL"
    slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015]
    
    blocks = await get_multiple_blocks(rpc_url, slots_to_fetch, max_concurrency=3)
    print(f"\nSuccessfully fetched {len(blocks)} blocks")
    
    for slot, block in blocks.items():
        if block:
            # Access the transactions field from the block object
            tx_count = len(block.transactions if hasattr(block, 'transactions') else [])
            print(f"Slot {slot}: {tx_count} transactions")

# Run the async function
if __name__ == "__main__":
    asyncio.run(main())

As always, make sure you get your own range of blocks in slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015].

Recommended encodings:

  • For most use cases: json (good balance of size and parsing speed)
  • For human-readable data: jsonParsed (larger but provides decoded instruction data)
  • For binary efficiency: base64 (efficient for storage and transmission)

Use binary encoding for bulk requests

Same script as above, but in line 18 instead ofencoding="json" use encoding="base64".

Error handling

Let's start with a working script and explain it below:

"""
Solana getBlock with retries implementation

This script demonstrates how to fetch a Solana block with proper
error handling and exponential backoff retries.
"""

import time
from solana.rpc.api import Client

def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
    """Fetch a block with exponential backoff retries
    
    Args:
        client: Solana RPC client
        slot: The slot number to fetch
        max_retries: Maximum number of retry attempts
        backoff_factor: Multiplier for exponential backoff
    
    Returns:
        Block data or None if failed after retries
    """
    retry_count = 0
    base_wait_time = 1  # Start with 1 second wait
    
    while retry_count < max_retries:
        try:
            # Get the block response
            block_resp = client.get_block(
                slot,
                encoding="json",
                max_supported_transaction_version=0
            )
            
            # The response is a solders.rpc.responses.GetBlockResp object
            # We need to check if it has a value (success) or error
            if hasattr(block_resp, 'value') and block_resp.value is not None:
                return block_resp.value
            elif hasattr(block_resp, 'error'):
                error = block_resp.error
                
                # Handle specific error codes if possible
                if hasattr(error, 'code'):
                    code = error.code
                    message = getattr(error, 'message', 'Unknown error')
                    
                    if code == -32007:  # Block not available (purged from ledger)
                        print(f"Block {slot} not available in ledger storage")
                        return None
                    elif code == -32004:  # Slot skipped
                        print(f"Slot {slot} was skipped (no block produced)")
                        return None
                    elif code == -32603:  # Internal error
                        # This might be temporary, retry
                        print(f"Internal error, will retry: {message}")
                    elif code == 429:  # Rate limit
                        print("Rate limited, backing off significantly")
                        retry_count += 1
                        time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
                        continue
                    else:
                        print(f"RPC error code {code}: {message}")
                else:
                    # If we can't get a specific error code
                    print(f"Error in response: {error}")
            else:
                # Null result
                print(f"Block {slot} returned null (not found or not confirmed)")
                return None
            
        except Exception as e:
            retry_count += 1
            wait_time = base_wait_time * backoff_factor ** retry_count
            
            print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
            
            if retry_count < max_retries:
                time.sleep(wait_time)
            else:
                print(f"Max retries reached for block {slot}")
                return None
        
        # If we get here, there was an error but not one we want to immediately retry on
        retry_count += 1
        wait_time = base_wait_time * backoff_factor ** retry_count
        
        if retry_count < max_retries:
            print(f"Retrying after {wait_time}s...")
            time.sleep(wait_time)
        else:
            print(f"Max retries reached for block {slot}")
            return None
    
    return None  # Shouldn't reach here, but just in case


def main():
    # Example usage
    rpc_url = "CHAINSTACK_SOLANA_URL"
    print(f"Connecting to Solana RPC: {rpc_url}")
    
    client = Client(rpc_url)
    slot_to_fetch = 329849011  # Example slot
    
    print(f"Fetching block at slot {slot_to_fetch}...")
    block = get_block_with_retries(client, slot_to_fetch)
    
    if block:
        # Print summary of the block
        transactions = getattr(block, 'transactions', [])
        
        # Safely extract signatures
        signatures = []
        for tx in transactions:
            if hasattr(tx, 'transaction') and hasattr(tx.transaction, 'signatures'):
                if tx.transaction.signatures:
                    signatures.append(tx.transaction.signatures[0])
        
        print(f"\nBlock {slot_to_fetch} summary:")
        print(f"  Blockhash: {getattr(block, 'blockhash', 'unknown')}")
        print(f"  Parent Slot: {getattr(block, 'parent_slot', 'unknown')}")
        print(f"  Block Time: {getattr(block, 'block_time', 'unknown')}")
        print(f"  Block Height: {getattr(block, 'block_height', 'unknown')}")
        print(f"  Transactions: {len(transactions)}")
        
        if signatures:
            print(f"\nFirst few transaction signatures:")
            for sig in signatures[:3]:
                print(f"  {sig}")
            if len(signatures) > 3:
                print(f"  ... and {len(signatures)-3} more")
    else:
        print("Failed to fetch block")


if __name__ == "__main__":
    main() 

Exponential backoff retry mechanism

def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
    retry_count = 0
    base_wait_time = 1  # Start with 1 second wait
    
    while retry_count < max_retries:
        # ... [request code]
        
        # When an error occurs:
        wait_time = base_wait_time * backoff_factor ** retry_count
        time.sleep(wait_time)

The code implements classic exponential backoff where:

  • Wait time grows exponentially with each retry attempt
  • Initial wait is 1 second
  • Each subsequent wait is multiplied by backoff factor with retry count
  • This prevents overwhelming the server with rapid reconnection attempts

Differential error handling

# Handle specific error codes if possible
if hasattr(error, 'code'):
    code = error.code
    message = getattr(error, 'message', 'Unknown error')
    
    if code == -32007:  # Block not available (purged from ledger)
        print(f"Block {slot} not available in ledger storage")
        return None
    elif code == -32004:  # Slot skipped
        print(f"Slot {slot} was skipped (no block produced)")
        return None
    elif code == -32603:  # Internal error
        # This might be temporary, retry
        print(f"Internal error, will retry: {message}")
    elif code == 429:  # Rate limit
        print("Rate limited, backing off significantly")
        retry_count += 1
        time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
        continue

The script intelligently handles different error types:

  • Non-retryable errors (like purged blocks) fail fast without wasting retries
  • Temporary errors proceed with standard backoff
  • Rate limit errors get special treatment with doubled backoff

Enhanced rate limit handling

elif code == 429:  # Rate limit
    print("Rate limited, backing off significantly")
    retry_count += 1
    time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
    continue

Rate limits receive special handling:

  • Doubled backoff time compared to other errors
  • This helps prevent repeatedly hitting rate limits
  • The multiplier (2x) helps ensure the client stays under rate limits

Comprehensive exception handling

try:
    # Get the block response
    block_resp = client.get_block(...)
    
    # Various response validation checks
    
except Exception as e:
    retry_count += 1
    wait_time = base_wait_time * backoff_factor ** retry_count
    
    print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
    
    if retry_count < max_retries:
        time.sleep(wait_time)
    else:
        print(f"Max retries reached for block {slot}")
        return None

The script catches all exceptions including:

  • Network errors
  • Timeout errors
  • Malformed response errors
  • Client library errors

Response validation

if hasattr(block_resp, 'value') and block_resp.value is not None:
    return block_resp.value
elif hasattr(block_resp, 'error'):
    error = block_resp.error
    # Error handling logic
else:
    # Null result
    print(f"Block {slot} returned null (not found or not confirmed)")
    return None

The code thoroughly validates responses before processing:

  • Checks for valid response structure
  • Handles null responses appropriately
  • Verifies response has expected attributes

Detailed logging

print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
# ...
print(f"Max retries reached for block {slot}")

The script provides detailed logging:

  • Error messages with specific error codes and descriptions
  • Retry counts and wait times
  • Final outcomes (success or failure)
  • This aids in debugging and monitoring

Parameterized retry configuration

def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):

The retry mechanism is fully customizable:

  • Configurable maximum retries
  • Adjustable backoff factor
  • This allows tuning based on network conditions or application requirements

These techniques together create a resilient implementation that gracefully handles various network issues, temporary failures, and rate limiting while providing clear feedback about what's happening during the process.

Client-side caching

Implement a client-side cache that stores previously fetched blocks in memory, allowing applications to:

  • Retrieve frequently accessed blocks without making additional RPC calls
  • Track cache performance with hit/miss statistics
  • Maintain a configurable maximum cache size

This should reduce unnecessary calls.

Example:

#!/usr/bin/env python3
"""
Solana Block Cache

A utility for efficiently fetching and caching Solana blocks to reduce RPC calls.
"""

import time
import json
import requests
from typing import Dict, Any, Optional, Union
from solana.rpc.api import Client


class BlockCache:
    """A simple cache for Solana blocks to reduce redundant RPC calls."""
    
    def __init__(self, client: Client, max_size: int = 100):
        """
        Initialize the block cache.
        
        Args:
            client: A Solana RPC client instance or URL string
            max_size: Maximum number of blocks to keep in cache
        """
        if isinstance(client, str):
            self.rpc_url = client
            self.client = None
        else:
            self.rpc_url = client._provider.endpoint_uri
            self.client = client
            
        self.session = requests.Session()
        self.session.headers.update({
            "Content-Type": "application/json",
            "Accept-Encoding": "gzip"
        })
        self.cache: Dict[str, Any] = {}
        self.max_size = max_size
        self.hits = 0
        self.misses = 0
        self.request_id = 0
    
    def get_block(self, slot: int, encoding: str = "json", tx_details: str = "full", 
                  max_supported_transaction_version: Optional[int] = 0) -> Optional[Dict[str, Any]]:
        """
        Fetch a block, using cache if available.
        
        Args:
            slot: The slot number of the block to fetch
            encoding: Block encoding format ("json", "jsonParsed", "base64", etc.)
            tx_details: Transaction details level ("full", "signatures", "accounts", "none")
            max_supported_transaction_version: Maximum supported transaction version
            
        Returns:
            The block data or None if not found/error
        """
        cache_key = f"{slot}:{encoding}:{tx_details}"
        
        # Return from cache if available
        if cache_key in self.cache:
            self.hits += 1
            print(f"Cache hit for block {slot}")
            return self.cache[cache_key]
        
        # Fetch from RPC
        self.misses += 1
        print(f"Cache miss for block {slot}, fetching from RPC...")
        
        try:
            
            self.request_id += 1
            
            # Prepare parameters for the getBlock method
            params = {
                "encoding": encoding,
                "transactionDetails": tx_details,
                "maxSupportedTransactionVersion": max_supported_transaction_version
            }
            
            # Create the JSON-RPC payload
            payload = {
                "jsonrpc": "2.0",
                "id": self.request_id,
                "method": "getBlock",
                "params": [slot, params]
            }
            
            # Make the HTTP request
            response = self.session.post(self.rpc_url, json=payload, timeout=30)
            block_resp = response.json()
            
            # Get the result
            result = block_resp.get('result')
            
            # Store in cache if we got a result
            if result:
                # If cache is full, remove oldest entry
                if len(self.cache) >= self.max_size:
                    oldest_key = next(iter(self.cache))
                    del self.cache[oldest_key]
                
                self.cache[cache_key] = result
            
            return result
        except Exception as e:
            print(f"Error fetching block {slot}: {e}")
            return None
    
    def clear_cache(self) -> None:
        """Clear the entire cache."""
        self.cache = {}
        print("Cache cleared")
    
    def remove_from_cache(self, slot: int, encoding: str = "json", tx_details: str = "full") -> bool:
        """
        Remove a specific block from the cache.
        
        Returns:
            True if the block was in the cache and removed, False otherwise
        """
        cache_key = f"{slot}:{encoding}:{tx_details}"
        if cache_key in self.cache:
            del self.cache[cache_key]
            print(f"Removed block {slot} from cache")
            return True
        return False
    
    def get_stats(self) -> Dict[str, Union[int, float]]:
        """Get cache performance statistics."""
        total_requests = self.hits + self.misses
        hit_rate = self.hits / total_requests if total_requests > 0 else 0
        
        return {
            "size": len(self.cache),
            "max_size": self.max_size,
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": hit_rate,
        }


def main():
    """Example usage of BlockCache."""

    rpc_url = "CHAINSTACK_SOLANA_URL" 
    
    print("Initializing Solana Block Cache")
    block_cache = BlockCache(rpc_url)
    
    # Example: Fetch a block (cache miss)
    print("\nFetching block first time (should be cache miss):")
    block1 = block_cache.get_block(329849011)
    if block1:
        print(f"Block hash: {block1.get('blockhash')}")
        print(f"Block time: {block1.get('blockTime')}")
        print(f"Transaction count: {len(block1.get('transactions', []))}")
    
    # Example: Fetch the same block again (cache hit)
    print("\nFetching same block again (should be cache hit):")
    block2 = block_cache.get_block(329849011)
    
    # Fetch with different parameters (should be a cache miss)
    print("\nFetching same block with different parameters (should be cache miss):")
    block3 = block_cache.get_block(329849011, tx_details="signatures")
    
    # Print cache statistics
    print("\nCache statistics:")
    stats = block_cache.get_stats()
    for key, value in stats.items():
        if key == "hit_rate":
            print(f"{key}: {value:.2%}")
        else:
            print(f"{key}: {value}")


if __name__ == "__main__":
    main() 

Block ranges

Implement flexible block range handling through a range-first processing pattern — first identify all available blocks in the target range, then systematically process them in batches.

#!/usr/bin/env python3
"""
Solana Block Range Processing

This script demonstrates how to process ranges of Solana blocks
efficiently with batching and proper error handling.
"""

import time
from solana.rpc.api import Client

def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
    """Fetch a block with exponential backoff retries
    
    Args:
        client: Solana RPC client
        slot: The slot number to fetch
        max_retries: Maximum number of retry attempts
        backoff_factor: Multiplier for exponential backoff
    
    Returns:
        Block data or None if failed after retries
    """
    retry_count = 0
    base_wait_time = 1  # Start with 1 second wait
    
    while retry_count < max_retries:
        try:
            # Get the block response
            block_resp = client.get_block(
                slot,
                encoding="json",
                max_supported_transaction_version=0
            )
            
            # The response is a solders.rpc.responses.GetBlockResp object
            # We need to check if it has a value (success) or error
            if hasattr(block_resp, 'value') and block_resp.value is not None:
                return block_resp.value
            elif hasattr(block_resp, 'error'):
                error = block_resp.error
                
                # Handle specific error codes if possible
                if hasattr(error, 'code'):
                    code = error.code
                    message = getattr(error, 'message', 'Unknown error')
                    
                    if code == -32007:  # Block not available (purged from ledger)
                        print(f"Block {slot} not available in ledger storage")
                        return None
                    elif code == -32004:  # Slot skipped
                        print(f"Slot {slot} was skipped (no block produced)")
                        return None
                    elif code == -32603:  # Internal error
                        # This might be temporary, retry
                        print(f"Internal error, will retry: {message}")
                    elif code == 429:  # Rate limit
                        print("Rate limited, backing off significantly")
                        retry_count += 1
                        time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
                        continue
                    else:
                        print(f"RPC error code {code}: {message}")
                else:
                    # If we can't get a specific error code
                    print(f"Error in response: {error}")
            else:
                # Null result
                print(f"Block {slot} returned null (not found or not confirmed)")
                return None
            
        except Exception as e:
            retry_count += 1
            wait_time = base_wait_time * backoff_factor ** retry_count
            
            print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
            
            if retry_count < max_retries:
                time.sleep(wait_time)
            else:
                print(f"Max retries reached for block {slot}")
                return None
        
        # If we get here, there was an error but not one we want to immediately retry on
        retry_count += 1
        wait_time = base_wait_time * backoff_factor ** retry_count
        
        if retry_count < max_retries:
            print(f"Retrying after {wait_time}s...")
            time.sleep(wait_time)
        else:
            print(f"Max retries reached for block {slot}")
            return None
    
    return None  # Shouldn't reach here, but just in case


def get_block_range(client, start_slot, end_slot=None, limit=500):
    """
    Get a list of available block slots in a range.
    If end_slot is None, will use getBlocksWithLimit instead of getBlocks.
    
    Args:
        client: Solana RPC client
        start_slot: The starting slot number
        end_slot: The ending slot number (optional)
        limit: Maximum number of blocks to return when end_slot is None
        
    Returns:
        List of available block slots in the range
    """
    try:
        if end_slot is None:
            # Use getBlocksWithLimit
            blocks_resp = client.get_blocks_with_limit(start_slot, limit)
        else:
            # Use getBlocks
            blocks_resp = client.get_blocks(start_slot, end_slot)
        
        # Handle the response which could be an object or a dict
        if hasattr(blocks_resp, 'value'):
            # Object response
            return blocks_resp.value or []
        else:
            # Dict response
            return blocks_resp.get('result', [])
            
    except Exception as e:
        print(f"Error getting block range: {e}")
        return []


def process_block_range(client, start_slot, end_slot, batch_size=10):
    """Process blocks in a range in batches
    
    Args:
        client: Solana RPC client
        start_slot: The starting slot number
        end_slot: The ending slot number
        batch_size: Number of blocks to process in each batch
        
    Returns:
        Total number of transactions processed
    """
    # First get all available block slots in the range
    print(f"Fetching block slots from {start_slot} to {end_slot}...")
    block_slots = get_block_range(client, start_slot, end_slot)
    print(f"Found {len(block_slots)} blocks to process")
    
    # Process in batches to avoid overwhelming the RPC node
    total_blocks = len(block_slots)
    total_transactions = 0
    
    for i in range(0, total_blocks, batch_size):
        batch = block_slots[i:i+batch_size]
        batch_num = i//batch_size + 1
        total_batches = (total_blocks + batch_size - 1)//batch_size
        print(f"Processing batch {batch_num}/{total_batches}...")
        
        # Process each block in the batch
        for slot in batch:
            block = get_block_with_retries(client, slot)
            if block:
                # Get transaction count safely
                transactions = getattr(block, 'transactions', [])
                tx_count = len(transactions)
                total_transactions += tx_count
                print(f"  Slot {slot}: {tx_count} transactions")
            
            # Add a small delay between blocks to be nice to the RPC node
            time.sleep(0.1)
        
        # Add a delay between batches
        if i + batch_size < total_blocks:
            print(f"Waiting before next batch...")
            time.sleep(1)
    
    print(f"Processed {total_blocks} blocks with {total_transactions} total transactions")
    return total_transactions


def process_recent_blocks(client, count=100):
    """Process the most recent blocks
    
    Args:
        client: Solana RPC client
        count: Number of recent blocks to process
        
    Returns:
        Total number of transactions processed
    """
    # Get the current slot
    current_slot_resp = client.get_slot()
    
    # Handle either object or dict response
    if hasattr(current_slot_resp, 'value'):
        # Object response
        current_slot = current_slot_resp.value
    else:
        # Dict response
        current_slot = current_slot_resp.get('result')
    
    if current_slot:
        # Calculate the starting slot (count blocks back)
        start_slot = max(0, current_slot - count)
        print(f"Processing {count} recent blocks from {start_slot} to {current_slot}...")
        return process_block_range(client, start_slot, current_slot)
    else:
        print("Failed to get current slot")
        return 0


def main():
    # Example usage
    rpc_url = "CHAINSTACK_SOLANA_URL"
    print(f"Connecting to Solana RPC: {rpc_url}")
    
    client = Client(rpc_url)
    
    # Process the 20 most recent blocks
    count = 20
    print(f"Processing {count} recent blocks...")
    
    tx_count = process_recent_blocks(client, count=count)
    print(f"Total transactions in last {count} blocks: {tx_count}")


if __name__ == "__main__":
    main() 

Conclusion

Working with Solana's getBlock RPC method efficiently requires understanding both what data you need and how to optimize your requests. By following the best practices outlined in this guide—using compression, limiting concurrency, using block ranges, requesting only what you need, and implementing proper error handling—you can build robust applications that interact with Solana blocks effectively.

Remember these key takeaways:

  1. Use the appropriate encoding and detail level for your use case — json, jsonParsed, base58, base64.
  2. Always enable HTTP compression.
  3. Implement client-side caching for frequently accessed blocks.
  4. Use controlled concurrency and throttling for bulk operations.
  5. Handle errors gracefully with retries and backoff.

Ake

🛠️ Developer Experience Director @ Chainstack
💸 Talk to me all things Web3

20 years in technology | 8+ years in Web3 full time years experience

Trusted advisor helping developers navigate the complexities of blockchain infrastructure