Solana: Optimize your getBlock performance
With examples in curl and Python
TLDR:
getBlock
is a core Solana RPC method that can dramatically impact performance if misused; don’t pull more data than you need.- Use
json
,jsonParsed
,base58
, orbase64
encoding strategically; always enable compression (gzip
) to reduce huge payloads. - Apply concurrency limits, caching, and backoff retry logic to avoid node overload and handle network hiccups gracefully.
- Use robust error handling and consider block ranges for larger-scale data fetches.
Main article
Solana's getBlock RPC method is a fundamental method that can be tricky and will screw up your application performance in a jiffy if you are not paying attention.
This guide provides a comprehensive overview of how to use getBlock
efficiently, with practical examples in Python and curl.
Get you own node endpoint today
Start for free and get your app to production levels immediately. No credit card required.
You can sign up with your GitHub, X, Google, or Microsoft account.
Understanding getBlock
The getBlock
method returns detailed information about a confirmed block in Solana's ledger. A block in Solana contains:
- A set of transactions
- Metadata like block hash, previous block hash
- Timing information
- Reward data
If you think you have a grasp now, check out Understanding the difference between blocks and slots on Solana. Solana is amazing but not for the faint heart.
What data does getBlock return?
A typical response includes:
blockhash
— the unique hash (ID) of this block (base-58 encoded)previousBlockhash
— the hash of the parent blockparentSlot
— the slot number of the parent blockblockHeight
— the sequential block height (the number of blocks beneath this block)blockTime
— the timestamp when the block was produced, which is yet another trick. See Solana: Understanding block time.transactions
— an array of transactions in the block (if requested)signatures
— an array of transaction signatures in the block (if requested)rewards
— an array of block rewards (if requested)
Parameters
When calling getBlock
, you can specify several parameters to control what data you receive:
commitment
—finalized
(default) orconfirmed
encoding
—json
(default),jsonParsed
,base64
, orbase58
.transactionDetails
—full
(default),accounts
,signatures
, ornone
rewards
— boolean to include block rewardsmaxSupportedTransactionVersion
— for handling versioned transactions
Basic example with curl
Let's start with the simplest way to fetch a block using curl:
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
329849011,
{
"encoding": "jsonParsed",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
This request fetches block 329849011 with full transaction details in JSON format.
Note two things here:
- We are using
jsonParsed
, which produces the largest output on the node side and gets transported to you. You should never do this in production with a heavy load. This more of a one-off inspection call than anything else. - If you are using a full and not an archive Solana node, use a block number within the last 20 hours or so. Otherwise this will be an archive call. See Limits at the bottom for the archive node methods availability.
Getting just the signatures
If you're only interested in transaction signatures (hashes), which is much lighter:
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
329849011,
{
"encoding": "json",
"transactionDetails": "signatures",
"commitment": "finalized"
}
]
}'
Same note on the block number in params
as above. Make sure you stay within the last 20 hours or so, unless you want to use an archive call, which is also fine as Chainstack is extremely affordable & transparent with pricing — a full node request is counted as one request, and an archive node request is counted as 2 requests, and that's it.
Using compression (gzip) for better performance
HTTP compression is critical when working with Solana's getBlock
method due to the large size of block data. Here's why compression is essential and how to implement it:
Why use compression
-
Dramatic size reduction — Solana block data with full transaction details easily take several MBs in JSON format. Gzip compression typically reduces this by 70-90%, bringing it down to a few hundred KB.
-
Faster response times — less data transmitted means faster responses, especially important when:
- Working with blocks containing many transactions
- Operating on networks with limited bandwidth
- Fetching multiple blocks in sequence
-
Reduced bandwidth costs — If you're paying for bandwidth (e.g., in cloud environments), compression significantly reduces costs.
-
Server-friendly — compression reduces load on both the RPC node and your client's network connection.
Compression example with curl
Adding compression is simple with curl—just add the Accept-Encoding: gzip
header and the --compressed
flag:
curl -X POST CHAINSTACK_SOLANA_URL \
-H "Content-Type: application/json" \
-H "Accept-Encoding: gzip" \
--compressed \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
329849011,
{
"encoding": "json",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
When you run this command:
- The
Accept-Encoding: gzip
header tells the server you can handle compressed responses - The server compresses the JSON data before sending it
- The
--compressed
flag tells curl to automatically decompress the data on receipt - You see the normal JSON output, but the actual network transfer was much smaller
Understanding the compression process
Here's what happens under the hood:
- Request — your client sends a request with
Accept-Encoding: gzip
- Server processing — the server generates the JSON response
- Compression — the server compresses this data using gzip
- Transfer — the compressed data (much smaller) is sent over the network
- Decompression — your client decompresses the data
- Processing — you work with the original JSON data
Most HTTP libraries handle steps 1, 4, and 5 automatically when configured correctly.
Example of handling the compressed (gzip) data manually
For illustration purposes and to compare the actual size in compressed & decompressed state:
curl -X POST CHAINSTACK_SOLANA_URL \
-H "Content-Type: application/json" \
-H "Accept-Encoding: gzip" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [329849011, {"encoding": "json", "maxSupportedTransactionVersion": 0}]
}' \
--output block_data.gz
Decompress the block_data.gz
file:
gunzip -c block_data.gz > block_data.json
See the ~80% reduction in size when you compare block_data.gz
to block_data.json
.
Compression in HTTP libraries
Most modern HTTP libraries support compression automatically:
- Python requests — add
headers={"Accept-Encoding": "gzip"}
or setrequests.get(..., stream=True)
- Node.js — most HTTP clients like Axios support this out of the box
- Rust — libraries like reqwest have compression features
Using compression is one of the simplest and most effective optimizations when working with Solana's getBlock
RPC method, especially for blocks with many transactions. So use compression.
json, jsonParsed, base58, base64
When using Solana's getBlock RPC method, you can request data in different encoding formats based on your specific needs.
Note that when you are doing a getBlock
call with "encoding": "base58"
or "encoding": "base64"
, you are getting the respective encoding on the transaction level, not the entire block. In other words, you will still get back a JSON response, it's only the transaction data that will be encoded in base58
or base64
.
Let's explore each option:
json (default)
The json
encoding provides transaction data in a standard JSON format with binary data encoded as base58 strings.
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
149546741,
{
"encoding": "json",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
Best for: General use cases where you need a balance of readability and performance. Binary data remains encoded but the overall structure is easily parseable.
jsonParsed
The jsonParsed
encoding goes beyond standard JSON by attempting to decode instruction data into human-readable format:
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
149546741,
{
"encoding": "jsonParsed",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
Best for:
- Debugging and analysis where you need to understand transaction contents
- Decoding program instructions without additional parsing work
- Applications that display transaction details to users
Limitations — not all programs can be parsed as you need an IDL (similar to EVM's ABIs) or source code, and response size is larger than other encodings.
base58
The base58
encoding returns binary data for transactions as base58-encoded strings:
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
149546741,
{
"encoding": "base58",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
Best for: Compatibility with tools that expect base58 encoding, which is common in Solana's ecosystem.
base64
The base64
encoding returns binary data for transactions as base64-encoded strings:
curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id": 1,
"method": "getBlock",
"params": [
149546741,
{
"encoding": "base64",
"transactionDetails": "full",
"commitment": "finalized",
"maxSupportedTransactionVersion": 0
}
]
}'
Best for:
- Performance-critical applications (base64 is more compact than base58)
- Storage efficiency when saving transaction data
- High-throughput systems processing many blocks
Encoding comparison
Encoding | Size | Human Readability | Parsing Complexity | Use Case |
---|---|---|---|---|
json | Medium | Good | Low | General purpose |
jsonParsed | Largest | Best | Lowest | Analysis & debugging |
base58 | Medium-Large | Poor | Medium | Ecosystem compatibility |
base64 | Smallest | Poor | Medium | Performance & storage |
Python example with json, jsonParsed, base58, base64
First, install the package:pip install solana
.
import requests
import time
import json
import os
import base64
import binascii
# Add base58 decoding support
try:
import base58
except ImportError:
print("Installing base58 module...")
import subprocess
subprocess.check_call(["pip", "install", "base58"])
import base58
# Initialize the Solana RPC endpoint URL
rpc_url = "CHAINSTACK_SOLANA_URL"
def save_raw_response(response_data, slot, encoding):
"""Save raw RPC response data to a file"""
# Create output directory if it doesn't exist
os.makedirs('block_data', exist_ok=True)
filename = f"block_data/block_{slot}_{encoding}.json"
with open(filename, 'w') as f:
json.dump(response_data, f, indent=2)
print(f"Saved raw {encoding} response to {filename}")
file_size = os.path.getsize(filename) / (1024 * 1024) # Size in MB
print(f"File size: {file_size:.2f} MB")
def make_rpc_request(method, params):
"""Make a JSON-RPC request to the Solana node"""
headers = {"Content-Type": "application/json"}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
response = requests.post(rpc_url, headers=headers, json=payload)
return response.json()
def get_block_with_encoding(slot, encoding="json"):
"""Fetch a block with specified encoding and return the raw response"""
try:
start_time = time.time()
# Make direct RPC request
params = [
slot,
{
"encoding": encoding,
"maxSupportedTransactionVersion": 0,
"transactionDetails": "full",
"rewards": False
}
]
raw_response = make_rpc_request("getBlock", params)
elapsed = time.time() - start_time
if not raw_response.get("result"):
print(f"Block {slot} not found with {encoding} encoding.")
return None
print(f"Fetched block {slot} with {encoding} encoding in {elapsed:.2f} seconds")
# Save the raw response
save_raw_response(raw_response, slot, encoding)
return raw_response
except Exception as e:
print(f"Error fetching block with {encoding} encoding: {e}")
return None
def get_latest_slot():
"""Get the latest finalized slot"""
try:
# Correct parameter format for getSlot - commitment should be in an object
params = [{"commitment": "finalized"}]
slot_resp = make_rpc_request("getSlot", params)
if "result" not in slot_resp:
print("Failed to get current slot")
print(f"Error response: {slot_resp}")
return None
current_slot = slot_resp["result"]
print(f"Current slot: {current_slot}")
return current_slot
except Exception as e:
print(f"Error fetching latest slot: {e}")
return None
def inspect_transaction_format(response, encoding):
"""Inspect and print transaction format details for the first transaction"""
if not response or "result" not in response:
print(f"No result to inspect for {encoding}")
return
result = response["result"]
if "transactions" not in result or not result["transactions"]:
print(f"No transactions found in {encoding} response")
return
# Get the first transaction
first_tx = result["transactions"][0]
# Print transaction format details
print(f"\n{encoding} Transaction Format Analysis:")
print("-" * 40)
# For base64 and base58 encodings, we need to handle the response differently
if encoding in ["base64", "base58"]:
print(f"Transaction structure type: {type(first_tx)}")
print(f"Available fields: {list(first_tx.keys())}")
# Check for specific encoding fields
if "transaction" in first_tx:
tx_data = first_tx["transaction"]
print(f"Transaction data type: {type(tx_data)}")
if isinstance(tx_data, list) and len(tx_data) >= 2:
# The format is [encoded_data, encoding_type]
encoded_str = tx_data[0]
format_type = tx_data[1]
print(f"Transaction is a {format_type} encoded array")
print(f"First 50 chars of encoded data: {encoded_str[:50]}...")
# Try to extract a small sample of the decoded data
try:
if format_type == "base64":
sample = base64.b64decode(encoded_str[:100] + "=" * (4 - len(encoded_str[:100]) % 4))
elif format_type == "base58":
sample = base58.b58decode(encoded_str[:100])
print(f"Sample of decoded bytes (hex): {sample.hex()[:60]}...")
except Exception as e:
print(f"Error decoding sample: {e}")
elif isinstance(tx_data, str):
print(f"Transaction is a string of length: {len(tx_data)}")
print(f"First 50 chars: {tx_data[:50]}...")
else:
print(f"Transaction is not a string or list but: {type(tx_data)}")
if isinstance(tx_data, dict):
print(f"Keys: {list(tx_data.keys())}")
else:
# Original handling for JSON and jsonParsed
if "transaction" in first_tx:
tx = first_tx["transaction"]
# Check if it's a string (likely base64) or an object
if isinstance(tx, str):
print(f"Transaction is a string (likely base64)")
print(f"Length: {len(tx)} characters")
print(f"First 100 chars: {tx[:100]}...")
else:
print(f"Transaction is an object/dict")
print(f"Keys: {list(tx.keys()) if isinstance(tx, dict) else 'N/A'}")
# If there's a message field, check its format
if isinstance(tx, dict) and "message" in tx:
msg = tx["message"]
if isinstance(msg, str):
print(f"Message is a string, length: {len(msg)}")
print(f"First 100 chars: {msg[:100]}...")
else:
print(f"Message is an object with keys: {list(msg.keys()) if isinstance(msg, dict) else 'N/A'}")
else:
print(f"Transaction doesn't have expected 'transaction' field")
print(f"Available fields: {list(first_tx.keys())}")
print("-" * 40)
# Fetch the latest slot
slot_number = get_latest_slot()
if slot_number:
# Fetch the block with each encoding type
print("\nFetching block with JSON encoding:")
json_response = get_block_with_encoding(slot_number, encoding="json")
print("\nFetching block with jsonParsed encoding:")
jsonParsed_response = get_block_with_encoding(slot_number, encoding="jsonParsed")
print("\nFetching block with base64 encoding:")
base64_response = get_block_with_encoding(slot_number, encoding="base64")
print("\nFetching block with base58 encoding:")
base58_response = get_block_with_encoding(slot_number, encoding="base58")
# Inspect the transaction formats to verify encoding differences
if json_response:
inspect_transaction_format(json_response, "json")
if jsonParsed_response:
inspect_transaction_format(jsonParsed_response, "jsonParsed")
if base64_response:
inspect_transaction_format(base64_response, "base64")
if base58_response:
inspect_transaction_format(base58_response, "base58")
print("\nComparison complete. Check the block_data directory for the raw responses.")
Here's an example output:
├── [6.3M] block_329897953_base58.json
├── [6.3M] block_329897953_base64.json
├── [8.0M] block_329897953_json.json
└── [ 16M] block_329897953_jsonParsed.json
Performance optimization
Examples in Python.
Limit concurrency and throttle requests
When fetching multiple blocks, avoid sending too many requests simultaneously:
import asyncio
import concurrent.futures
from solana.rpc.api import Client
import json
import os
import pathlib
async def fetch_block_with_solana(executor, client, slot, semaphore):
# Use a semaphore to limit concurrency
async with semaphore:
try:
# Run the synchronous client.get_block in a thread pool
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(
executor,
lambda: client.get_block(
slot,
encoding="json",
max_supported_transaction_version=0
)
)
# Solana-py returns a solders.rpc.responses.GetBlockResp object
# Check if the value field exists (indicates success)
if hasattr(response, 'value') and response.value is not None:
return slot, response.value
else:
print(f"Error in response for slot {slot}")
return slot, None
except Exception as e:
print(f"Exception for slot {slot}: {e}")
return slot, None
def save_block_to_file(slot, block, output_dir):
"""Save block data to a JSON file"""
os.makedirs(output_dir, exist_ok=True)
try:
if hasattr(block, '__dict__'):
block_dict = block.__dict__
else:
block_dict = {
'blockhash': str(block.blockhash) if hasattr(block, 'blockhash') else None,
'blockHeight': block.block_height if hasattr(block, 'block_height') else None,
'blockTime': block.block_time if hasattr(block, 'block_time') else None,
'parentSlot': block.parent_slot if hasattr(block, 'parent_slot') else None,
'previousBlockhash': str(block.previous_blockhash) if hasattr(block, 'previous_blockhash') else None,
'transactions': [
{
'meta': tx.meta.__dict__ if hasattr(tx, 'meta') and hasattr(tx.meta, '__dict__') else None,
'transaction': str(tx.transaction) if hasattr(tx, 'transaction') else None,
} for tx in block.transactions if hasattr(block, 'transactions')
] if hasattr(block, 'transactions') else []
}
# Save to file
file_path = pathlib.Path(output_dir) / f"block_{slot}.json"
with open(file_path, 'w') as f:
json.dump(block_dict, f, indent=2, default=str)
print(f"Saved block {slot} to {file_path}")
except Exception as e:
print(f"Error saving block {slot}: {e}")
async def get_multiple_blocks(rpc_url, slots, max_concurrency=5, output_dir="block_data"):
# Create a semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(max_concurrency)
# Create the Solana client
client = Client(rpc_url)
# Create a thread pool executor for running synchronous code
with concurrent.futures.ThreadPoolExecutor() as executor:
tasks = [fetch_block_with_solana(executor, client, slot, semaphore) for slot in slots]
results = await asyncio.gather(*tasks)
# Process and save each block
blocks = {}
for slot, block in results:
if block is not None:
blocks[slot] = block
# Save to file
save_block_to_file(slot, block, output_dir)
return blocks
# Example usage
async def main():
rpc_url = "CHAINSTACK_SOLANA_URL"
slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015]
blocks = await get_multiple_blocks(rpc_url, slots_to_fetch, max_concurrency=3)
print(f"\nSuccessfully fetched {len(blocks)} blocks")
for slot, block in blocks.items():
if block:
# Access the transactions field from the block object
tx_count = len(block.transactions if hasattr(block, 'transactions') else [])
print(f"Slot {slot}: {tx_count} transactions")
# Run the async function
if __name__ == "__main__":
asyncio.run(main())
As always, make sure you get your own range of blocks in slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015]
.
Recommended encodings:
- For most use cases:
json
(good balance of size and parsing speed) - For human-readable data:
jsonParsed
(larger but provides decoded instruction data) - For binary efficiency:
base64
(efficient for storage and transmission)
Use binary encoding for bulk requests
Same script as above, but in line 18 instead ofencoding="json"
use encoding="base64"
.
Error handling
Let's start with a working script and explain it below:
"""
Solana getBlock with retries implementation
This script demonstrates how to fetch a Solana block with proper
error handling and exponential backoff retries.
"""
import time
from solana.rpc.api import Client
def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
"""Fetch a block with exponential backoff retries
Args:
client: Solana RPC client
slot: The slot number to fetch
max_retries: Maximum number of retry attempts
backoff_factor: Multiplier for exponential backoff
Returns:
Block data or None if failed after retries
"""
retry_count = 0
base_wait_time = 1 # Start with 1 second wait
while retry_count < max_retries:
try:
# Get the block response
block_resp = client.get_block(
slot,
encoding="json",
max_supported_transaction_version=0
)
# The response is a solders.rpc.responses.GetBlockResp object
# We need to check if it has a value (success) or error
if hasattr(block_resp, 'value') and block_resp.value is not None:
return block_resp.value
elif hasattr(block_resp, 'error'):
error = block_resp.error
# Handle specific error codes if possible
if hasattr(error, 'code'):
code = error.code
message = getattr(error, 'message', 'Unknown error')
if code == -32007: # Block not available (purged from ledger)
print(f"Block {slot} not available in ledger storage")
return None
elif code == -32004: # Slot skipped
print(f"Slot {slot} was skipped (no block produced)")
return None
elif code == -32603: # Internal error
# This might be temporary, retry
print(f"Internal error, will retry: {message}")
elif code == 429: # Rate limit
print("Rate limited, backing off significantly")
retry_count += 1
time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits
continue
else:
print(f"RPC error code {code}: {message}")
else:
# If we can't get a specific error code
print(f"Error in response: {error}")
else:
# Null result
print(f"Block {slot} returned null (not found or not confirmed)")
return None
except Exception as e:
retry_count += 1
wait_time = base_wait_time * backoff_factor ** retry_count
print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
if retry_count < max_retries:
time.sleep(wait_time)
else:
print(f"Max retries reached for block {slot}")
return None
# If we get here, there was an error but not one we want to immediately retry on
retry_count += 1
wait_time = base_wait_time * backoff_factor ** retry_count
if retry_count < max_retries:
print(f"Retrying after {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Max retries reached for block {slot}")
return None
return None # Shouldn't reach here, but just in case
def main():
# Example usage
rpc_url = "CHAINSTACK_SOLANA_URL"
print(f"Connecting to Solana RPC: {rpc_url}")
client = Client(rpc_url)
slot_to_fetch = 329849011 # Example slot
print(f"Fetching block at slot {slot_to_fetch}...")
block = get_block_with_retries(client, slot_to_fetch)
if block:
# Print summary of the block
transactions = getattr(block, 'transactions', [])
# Safely extract signatures
signatures = []
for tx in transactions:
if hasattr(tx, 'transaction') and hasattr(tx.transaction, 'signatures'):
if tx.transaction.signatures:
signatures.append(tx.transaction.signatures[0])
print(f"\nBlock {slot_to_fetch} summary:")
print(f" Blockhash: {getattr(block, 'blockhash', 'unknown')}")
print(f" Parent Slot: {getattr(block, 'parent_slot', 'unknown')}")
print(f" Block Time: {getattr(block, 'block_time', 'unknown')}")
print(f" Block Height: {getattr(block, 'block_height', 'unknown')}")
print(f" Transactions: {len(transactions)}")
if signatures:
print(f"\nFirst few transaction signatures:")
for sig in signatures[:3]:
print(f" {sig}")
if len(signatures) > 3:
print(f" ... and {len(signatures)-3} more")
else:
print("Failed to fetch block")
if __name__ == "__main__":
main()
Exponential backoff retry mechanism
def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
retry_count = 0
base_wait_time = 1 # Start with 1 second wait
while retry_count < max_retries:
# ... [request code]
# When an error occurs:
wait_time = base_wait_time * backoff_factor ** retry_count
time.sleep(wait_time)
The code implements classic exponential backoff where:
- Wait time grows exponentially with each retry attempt
- Initial wait is 1 second
- Each subsequent wait is multiplied by backoff factor with retry count
- This prevents overwhelming the server with rapid reconnection attempts
Differential error handling
# Handle specific error codes if possible
if hasattr(error, 'code'):
code = error.code
message = getattr(error, 'message', 'Unknown error')
if code == -32007: # Block not available (purged from ledger)
print(f"Block {slot} not available in ledger storage")
return None
elif code == -32004: # Slot skipped
print(f"Slot {slot} was skipped (no block produced)")
return None
elif code == -32603: # Internal error
# This might be temporary, retry
print(f"Internal error, will retry: {message}")
elif code == 429: # Rate limit
print("Rate limited, backing off significantly")
retry_count += 1
time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits
continue
The script intelligently handles different error types:
- Non-retryable errors (like purged blocks) fail fast without wasting retries
- Temporary errors proceed with standard backoff
- Rate limit errors get special treatment with doubled backoff
Enhanced rate limit handling
elif code == 429: # Rate limit
print("Rate limited, backing off significantly")
retry_count += 1
time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits
continue
Rate limits receive special handling:
- Doubled backoff time compared to other errors
- This helps prevent repeatedly hitting rate limits
- The multiplier (2x) helps ensure the client stays under rate limits
Comprehensive exception handling
try:
# Get the block response
block_resp = client.get_block(...)
# Various response validation checks
except Exception as e:
retry_count += 1
wait_time = base_wait_time * backoff_factor ** retry_count
print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
if retry_count < max_retries:
time.sleep(wait_time)
else:
print(f"Max retries reached for block {slot}")
return None
The script catches all exceptions including:
- Network errors
- Timeout errors
- Malformed response errors
- Client library errors
Response validation
if hasattr(block_resp, 'value') and block_resp.value is not None:
return block_resp.value
elif hasattr(block_resp, 'error'):
error = block_resp.error
# Error handling logic
else:
# Null result
print(f"Block {slot} returned null (not found or not confirmed)")
return None
The code thoroughly validates responses before processing:
- Checks for valid response structure
- Handles null responses appropriately
- Verifies response has expected attributes
Detailed logging
print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
# ...
print(f"Max retries reached for block {slot}")
The script provides detailed logging:
- Error messages with specific error codes and descriptions
- Retry counts and wait times
- Final outcomes (success or failure)
- This aids in debugging and monitoring
Parameterized retry configuration
def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
The retry mechanism is fully customizable:
- Configurable maximum retries
- Adjustable backoff factor
- This allows tuning based on network conditions or application requirements
These techniques together create a resilient implementation that gracefully handles various network issues, temporary failures, and rate limiting while providing clear feedback about what's happening during the process.
Client-side caching
Implement a client-side cache that stores previously fetched blocks in memory, allowing applications to:
- Retrieve frequently accessed blocks without making additional RPC calls
- Track cache performance with hit/miss statistics
- Maintain a configurable maximum cache size
This should reduce unnecessary calls.
Example:
#!/usr/bin/env python3
"""
Solana Block Cache
A utility for efficiently fetching and caching Solana blocks to reduce RPC calls.
"""
import time
import json
import requests
from typing import Dict, Any, Optional, Union
from solana.rpc.api import Client
class BlockCache:
"""A simple cache for Solana blocks to reduce redundant RPC calls."""
def __init__(self, client: Client, max_size: int = 100):
"""
Initialize the block cache.
Args:
client: A Solana RPC client instance or URL string
max_size: Maximum number of blocks to keep in cache
"""
if isinstance(client, str):
self.rpc_url = client
self.client = None
else:
self.rpc_url = client._provider.endpoint_uri
self.client = client
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"Accept-Encoding": "gzip"
})
self.cache: Dict[str, Any] = {}
self.max_size = max_size
self.hits = 0
self.misses = 0
self.request_id = 0
def get_block(self, slot: int, encoding: str = "json", tx_details: str = "full",
max_supported_transaction_version: Optional[int] = 0) -> Optional[Dict[str, Any]]:
"""
Fetch a block, using cache if available.
Args:
slot: The slot number of the block to fetch
encoding: Block encoding format ("json", "jsonParsed", "base64", etc.)
tx_details: Transaction details level ("full", "signatures", "accounts", "none")
max_supported_transaction_version: Maximum supported transaction version
Returns:
The block data or None if not found/error
"""
cache_key = f"{slot}:{encoding}:{tx_details}"
# Return from cache if available
if cache_key in self.cache:
self.hits += 1
print(f"Cache hit for block {slot}")
return self.cache[cache_key]
# Fetch from RPC
self.misses += 1
print(f"Cache miss for block {slot}, fetching from RPC...")
try:
self.request_id += 1
# Prepare parameters for the getBlock method
params = {
"encoding": encoding,
"transactionDetails": tx_details,
"maxSupportedTransactionVersion": max_supported_transaction_version
}
# Create the JSON-RPC payload
payload = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": "getBlock",
"params": [slot, params]
}
# Make the HTTP request
response = self.session.post(self.rpc_url, json=payload, timeout=30)
block_resp = response.json()
# Get the result
result = block_resp.get('result')
# Store in cache if we got a result
if result:
# If cache is full, remove oldest entry
if len(self.cache) >= self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = result
return result
except Exception as e:
print(f"Error fetching block {slot}: {e}")
return None
def clear_cache(self) -> None:
"""Clear the entire cache."""
self.cache = {}
print("Cache cleared")
def remove_from_cache(self, slot: int, encoding: str = "json", tx_details: str = "full") -> bool:
"""
Remove a specific block from the cache.
Returns:
True if the block was in the cache and removed, False otherwise
"""
cache_key = f"{slot}:{encoding}:{tx_details}"
if cache_key in self.cache:
del self.cache[cache_key]
print(f"Removed block {slot} from cache")
return True
return False
def get_stats(self) -> Dict[str, Union[int, float]]:
"""Get cache performance statistics."""
total_requests = self.hits + self.misses
hit_rate = self.hits / total_requests if total_requests > 0 else 0
return {
"size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": hit_rate,
}
def main():
"""Example usage of BlockCache."""
rpc_url = "CHAINSTACK_SOLANA_URL"
print("Initializing Solana Block Cache")
block_cache = BlockCache(rpc_url)
# Example: Fetch a block (cache miss)
print("\nFetching block first time (should be cache miss):")
block1 = block_cache.get_block(329849011)
if block1:
print(f"Block hash: {block1.get('blockhash')}")
print(f"Block time: {block1.get('blockTime')}")
print(f"Transaction count: {len(block1.get('transactions', []))}")
# Example: Fetch the same block again (cache hit)
print("\nFetching same block again (should be cache hit):")
block2 = block_cache.get_block(329849011)
# Fetch with different parameters (should be a cache miss)
print("\nFetching same block with different parameters (should be cache miss):")
block3 = block_cache.get_block(329849011, tx_details="signatures")
# Print cache statistics
print("\nCache statistics:")
stats = block_cache.get_stats()
for key, value in stats.items():
if key == "hit_rate":
print(f"{key}: {value:.2%}")
else:
print(f"{key}: {value}")
if __name__ == "__main__":
main()
Block ranges
Implement flexible block range handling through a range-first processing pattern — first identify all available blocks in the target range, then systematically process them in batches.
#!/usr/bin/env python3
"""
Solana Block Range Processing
This script demonstrates how to process ranges of Solana blocks
efficiently with batching and proper error handling.
"""
import time
from solana.rpc.api import Client
def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
"""Fetch a block with exponential backoff retries
Args:
client: Solana RPC client
slot: The slot number to fetch
max_retries: Maximum number of retry attempts
backoff_factor: Multiplier for exponential backoff
Returns:
Block data or None if failed after retries
"""
retry_count = 0
base_wait_time = 1 # Start with 1 second wait
while retry_count < max_retries:
try:
# Get the block response
block_resp = client.get_block(
slot,
encoding="json",
max_supported_transaction_version=0
)
# The response is a solders.rpc.responses.GetBlockResp object
# We need to check if it has a value (success) or error
if hasattr(block_resp, 'value') and block_resp.value is not None:
return block_resp.value
elif hasattr(block_resp, 'error'):
error = block_resp.error
# Handle specific error codes if possible
if hasattr(error, 'code'):
code = error.code
message = getattr(error, 'message', 'Unknown error')
if code == -32007: # Block not available (purged from ledger)
print(f"Block {slot} not available in ledger storage")
return None
elif code == -32004: # Slot skipped
print(f"Slot {slot} was skipped (no block produced)")
return None
elif code == -32603: # Internal error
# This might be temporary, retry
print(f"Internal error, will retry: {message}")
elif code == 429: # Rate limit
print("Rate limited, backing off significantly")
retry_count += 1
time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits
continue
else:
print(f"RPC error code {code}: {message}")
else:
# If we can't get a specific error code
print(f"Error in response: {error}")
else:
# Null result
print(f"Block {slot} returned null (not found or not confirmed)")
return None
except Exception as e:
retry_count += 1
wait_time = base_wait_time * backoff_factor ** retry_count
print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
if retry_count < max_retries:
time.sleep(wait_time)
else:
print(f"Max retries reached for block {slot}")
return None
# If we get here, there was an error but not one we want to immediately retry on
retry_count += 1
wait_time = base_wait_time * backoff_factor ** retry_count
if retry_count < max_retries:
print(f"Retrying after {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Max retries reached for block {slot}")
return None
return None # Shouldn't reach here, but just in case
def get_block_range(client, start_slot, end_slot=None, limit=500):
"""
Get a list of available block slots in a range.
If end_slot is None, will use getBlocksWithLimit instead of getBlocks.
Args:
client: Solana RPC client
start_slot: The starting slot number
end_slot: The ending slot number (optional)
limit: Maximum number of blocks to return when end_slot is None
Returns:
List of available block slots in the range
"""
try:
if end_slot is None:
# Use getBlocksWithLimit
blocks_resp = client.get_blocks_with_limit(start_slot, limit)
else:
# Use getBlocks
blocks_resp = client.get_blocks(start_slot, end_slot)
# Handle the response which could be an object or a dict
if hasattr(blocks_resp, 'value'):
# Object response
return blocks_resp.value or []
else:
# Dict response
return blocks_resp.get('result', [])
except Exception as e:
print(f"Error getting block range: {e}")
return []
def process_block_range(client, start_slot, end_slot, batch_size=10):
"""Process blocks in a range in batches
Args:
client: Solana RPC client
start_slot: The starting slot number
end_slot: The ending slot number
batch_size: Number of blocks to process in each batch
Returns:
Total number of transactions processed
"""
# First get all available block slots in the range
print(f"Fetching block slots from {start_slot} to {end_slot}...")
block_slots = get_block_range(client, start_slot, end_slot)
print(f"Found {len(block_slots)} blocks to process")
# Process in batches to avoid overwhelming the RPC node
total_blocks = len(block_slots)
total_transactions = 0
for i in range(0, total_blocks, batch_size):
batch = block_slots[i:i+batch_size]
batch_num = i//batch_size + 1
total_batches = (total_blocks + batch_size - 1)//batch_size
print(f"Processing batch {batch_num}/{total_batches}...")
# Process each block in the batch
for slot in batch:
block = get_block_with_retries(client, slot)
if block:
# Get transaction count safely
transactions = getattr(block, 'transactions', [])
tx_count = len(transactions)
total_transactions += tx_count
print(f" Slot {slot}: {tx_count} transactions")
# Add a small delay between blocks to be nice to the RPC node
time.sleep(0.1)
# Add a delay between batches
if i + batch_size < total_blocks:
print(f"Waiting before next batch...")
time.sleep(1)
print(f"Processed {total_blocks} blocks with {total_transactions} total transactions")
return total_transactions
def process_recent_blocks(client, count=100):
"""Process the most recent blocks
Args:
client: Solana RPC client
count: Number of recent blocks to process
Returns:
Total number of transactions processed
"""
# Get the current slot
current_slot_resp = client.get_slot()
# Handle either object or dict response
if hasattr(current_slot_resp, 'value'):
# Object response
current_slot = current_slot_resp.value
else:
# Dict response
current_slot = current_slot_resp.get('result')
if current_slot:
# Calculate the starting slot (count blocks back)
start_slot = max(0, current_slot - count)
print(f"Processing {count} recent blocks from {start_slot} to {current_slot}...")
return process_block_range(client, start_slot, current_slot)
else:
print("Failed to get current slot")
return 0
def main():
# Example usage
rpc_url = "CHAINSTACK_SOLANA_URL"
print(f"Connecting to Solana RPC: {rpc_url}")
client = Client(rpc_url)
# Process the 20 most recent blocks
count = 20
print(f"Processing {count} recent blocks...")
tx_count = process_recent_blocks(client, count=count)
print(f"Total transactions in last {count} blocks: {tx_count}")
if __name__ == "__main__":
main()
Conclusion
Working with Solana's getBlock
RPC method efficiently requires understanding both what data you need and how to optimize your requests. By following the best practices outlined in this guide—using compression, limiting concurrency, using block ranges, requesting only what you need, and implementing proper error handling—you can build robust applications that interact with Solana blocks effectively.
Remember these key takeaways:
- Use the appropriate encoding and detail level for your use case —
json
,jsonParsed
,base58
,base64
. - Always enable HTTP compression.
- Implement client-side caching for frequently accessed blocks.
- Use controlled concurrency and throttling for bulk operations.
- Handle errors gracefully with retries and backoff.
Updated 2 days ago