Solana’s getBlock RPC method is a fundamental method that can be tricky and will screw up your application performance in a jiffy if you are not paying attention.
This guide provides a comprehensive overview of how to use getBlock efficiently, with practical examples in Python and curl.
Let’s start with the simplest way to fetch a block using curl:
curl-X POST CHAINSTACK_SOLANA_URL -H"Content-Type: application/json"-d '{"jsonrpc":"2.0","id":1,"method":"getBlock","params":[329849011,{"encoding":"jsonParsed","transactionDetails":"full","commitment":"finalized","maxSupportedTransactionVersion":0}]}'
This request fetches block 329849011 with full transaction details in JSON format.
Note two things here:
We are using jsonParsed, which produces the largest output on the node side and gets transported to you. You should never do this in production with a heavy load. This more of a one-off inspection call than anything else.
If you are using a full and not an archive Solana node, use a block number within the last 20 hours or so. Otherwise this will be an archive call. See Limits at the bottom for the archive node methods availability.
If you’re only interested in transaction signatures (hashes), which is much lighter:
curl-X POST CHAINSTACK_SOLANA_URL -H"Content-Type: application/json"-d '{"jsonrpc":"2.0","id":1,"method":"getBlock","params":[329849011,{"encoding":"json","transactionDetails":"signatures","commitment":"finalized"}]}'
Same note on the block number in params as above. Make sure you stay within the last 20 hours or so, unless you want to use an archive call, which is also fine as Chainstack is extremely affordable & transparent with pricing — a full node request is counted as one request, and an archive node request is counted as 2 requests, and that’s it.
HTTP compression is critical when working with Solana’s getBlock method due to the large size of block data. Here’s why compression is essential and how to implement it:
Dramatic size reduction — Solana block data with full transaction details easily take several MBs in JSON format. Gzip compression typically reduces this by 70-90%, bringing it down to a few hundred KB.
Faster response times — less data transmitted means faster responses, especially important when:
Working with blocks containing many transactions
Operating on networks with limited bandwidth
Fetching multiple blocks in sequence
Reduced bandwidth costs — If you’re paying for bandwidth (e.g., in cloud environments), compression significantly reduces costs.
Server-friendly — compression reduces load on both the RPC node and your client’s network connection.
Most modern HTTP libraries support compression automatically:
Python requests — add headers={"Accept-Encoding": "gzip"} or set requests.get(..., stream=True)
Node.js — most HTTP clients like Axios support this out of the box
Rust — libraries like reqwest have compression features
Using compression is one of the simplest and most effective optimizations when working with Solana’s getBlock RPC method, especially for blocks with many transactions. So use compression.
When using Solana’s getBlock RPC method, you can request data in different encoding formats based on your specific needs.
Note that when you are doing a getBlock call with "encoding": "base58" or "encoding": "base64", you are getting the respective encoding on the transaction level, not the entire block. In other words, you will still get back a JSON response, it’s only the transaction data that will be encoded in base58 or base64.
The json encoding provides transaction data in a standard JSON format with binary data encoded as base58 strings.
curl-X POST CHAINSTACK_SOLANA_URL -H"Content-Type: application/json"-d '{"jsonrpc":"2.0","id":1,"method":"getBlock","params":[149546741,{"encoding":"json","transactionDetails":"full","commitment":"finalized","maxSupportedTransactionVersion":0}]}'
Best for: General use cases where you need a balance of readability and performance. Binary data remains encoded but the overall structure is easily parseable.
The jsonParsed encoding goes beyond standard JSON by attempting to decode instruction data into human-readable format:
curl-X POST CHAINSTACK_SOLANA_URL -H"Content-Type: application/json"-d '{"jsonrpc":"2.0","id":1,"method":"getBlock","params":[149546741,{"encoding":"jsonParsed","transactionDetails":"full","commitment":"finalized","maxSupportedTransactionVersion":0}]}'
Best for:
Debugging and analysis where you need to understand transaction contents
Decoding program instructions without additional parsing work
Applications that display transaction details to users
Limitations — not all programs can be parsed as you need an IDL (similar to EVM’s ABIs) or source code, and response size is larger than other encodings.
Python example with json, jsonParsed, base58, base64
First, install the package:pip install solana.
import requestsimport timeimport jsonimport osimport base64import binascii# Add base58 decoding supporttry: import base58except ImportError: print("Installing base58 module...") import subprocess subprocess.check_call(["pip", "install", "base58"]) import base58# Initialize the Solana RPC endpoint URLrpc_url = "CHAINSTACK_SOLANA_URL"def save_raw_response(response_data, slot, encoding): """Save raw RPC response data to a file""" # Create output directory if it doesn't exist os.makedirs('block_data', exist_ok=True) filename = f"block_data/block_{slot}_{encoding}.json" with open(filename, 'w') as f: json.dump(response_data, f, indent=2) print(f"Saved raw {encoding} response to {filename}") file_size = os.path.getsize(filename) / (1024 * 1024) # Size in MB print(f"File size: {file_size:.2f} MB")def make_rpc_request(method, params): """Make a JSON-RPC request to the Solana node""" headers = {"Content-Type": "application/json"} payload = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } response = requests.post(rpc_url, headers=headers, json=payload) return response.json()def get_block_with_encoding(slot, encoding="json"): """Fetch a block with specified encoding and return the raw response""" try: start_time = time.time() # Make direct RPC request params = [ slot, { "encoding": encoding, "maxSupportedTransactionVersion": 0, "transactionDetails": "full", "rewards": False } ] raw_response = make_rpc_request("getBlock", params) elapsed = time.time() - start_time if not raw_response.get("result"): print(f"Block {slot} not found with {encoding} encoding.") return None print(f"Fetched block {slot} with {encoding} encoding in {elapsed:.2f} seconds") # Save the raw response save_raw_response(raw_response, slot, encoding) return raw_response except Exception as e: print(f"Error fetching block with {encoding} encoding: {e}") return Nonedef get_latest_slot(): """Get the latest finalized slot""" try: # Correct parameter format for getSlot - commitment should be in an object params = [{"commitment": "finalized"}] slot_resp = make_rpc_request("getSlot", params) if "result" not in slot_resp: print("Failed to get current slot") print(f"Error response: {slot_resp}") return None current_slot = slot_resp["result"] print(f"Current slot: {current_slot}") return current_slot except Exception as e: print(f"Error fetching latest slot: {e}") return Nonedef inspect_transaction_format(response, encoding): """Inspect and print transaction format details for the first transaction""" if not response or "result" not in response: print(f"No result to inspect for {encoding}") return result = response["result"] if "transactions" not in result or not result["transactions"]: print(f"No transactions found in {encoding} response") return # Get the first transaction first_tx = result["transactions"][0] # Print transaction format details print(f"\n{encoding} Transaction Format Analysis:") print("-" * 40) # For base64 and base58 encodings, we need to handle the response differently if encoding in ["base64", "base58"]: print(f"Transaction structure type: {type(first_tx)}") print(f"Available fields: {list(first_tx.keys())}") # Check for specific encoding fields if "transaction" in first_tx: tx_data = first_tx["transaction"] print(f"Transaction data type: {type(tx_data)}") if isinstance(tx_data, list) and len(tx_data) >= 2: # The format is [encoded_data, encoding_type] encoded_str = tx_data[0] format_type = tx_data[1] print(f"Transaction is a {format_type} encoded array") print(f"First 50 chars of encoded data: {encoded_str[:50]}...") # Try to extract a small sample of the decoded data try: if format_type == "base64": sample = base64.b64decode(encoded_str[:100] + "=" * (4 - len(encoded_str[:100]) % 4)) elif format_type == "base58": sample = base58.b58decode(encoded_str[:100]) print(f"Sample of decoded bytes (hex): {sample.hex()[:60]}...") except Exception as e: print(f"Error decoding sample: {e}") elif isinstance(tx_data, str): print(f"Transaction is a string of length: {len(tx_data)}") print(f"First 50 chars: {tx_data[:50]}...") else: print(f"Transaction is not a string or list but: {type(tx_data)}") if isinstance(tx_data, dict): print(f"Keys: {list(tx_data.keys())}") else: # Original handling for JSON and jsonParsed if "transaction" in first_tx: tx = first_tx["transaction"] # Check if it's a string (likely base64) or an object if isinstance(tx, str): print(f"Transaction is a string (likely base64)") print(f"Length: {len(tx)} characters") print(f"First 100 chars: {tx[:100]}...") else: print(f"Transaction is an object/dict") print(f"Keys: {list(tx.keys()) if isinstance(tx, dict) else 'N/A'}") # If there's a message field, check its format if isinstance(tx, dict) and "message" in tx: msg = tx["message"] if isinstance(msg, str): print(f"Message is a string, length: {len(msg)}") print(f"First 100 chars: {msg[:100]}...") else: print(f"Message is an object with keys: {list(msg.keys()) if isinstance(msg, dict) else 'N/A'}") else: print(f"Transaction doesn't have expected 'transaction' field") print(f"Available fields: {list(first_tx.keys())}") print("-" * 40)# Fetch the latest slotslot_number = get_latest_slot()if slot_number: # Fetch the block with each encoding type print("\nFetching block with JSON encoding:") json_response = get_block_with_encoding(slot_number, encoding="json") print("\nFetching block with jsonParsed encoding:") jsonParsed_response = get_block_with_encoding(slot_number, encoding="jsonParsed") print("\nFetching block with base64 encoding:") base64_response = get_block_with_encoding(slot_number, encoding="base64") print("\nFetching block with base58 encoding:") base58_response = get_block_with_encoding(slot_number, encoding="base58") # Inspect the transaction formats to verify encoding differences if json_response: inspect_transaction_format(json_response, "json") if jsonParsed_response: inspect_transaction_format(jsonParsed_response, "jsonParsed") if base64_response: inspect_transaction_format(base64_response, "base64") if base58_response: inspect_transaction_format(base58_response, "base58") print("\nComparison complete. Check the block_data directory for the raw responses.")
When fetching multiple blocks, avoid sending too many requests simultaneously:
import asyncioimport concurrent.futuresfrom solana.rpc.api import Clientimport jsonimport osimport pathlibasyncdeffetch_block_with_solana(executor, client, slot, semaphore):# Use a semaphore to limit concurrencyasyncwith semaphore:try:# Run the synchronous client.get_block in a thread pool loop = asyncio.get_running_loop() response =await loop.run_in_executor( executor,lambda: client.get_block( slot, encoding="json", max_supported_transaction_version=0))# Solana-py returns a solders.rpc.responses.GetBlockResp object# Check if the value field exists (indicates success)ifhasattr(response,'value')and response.value isnotNone:return slot, response.valueelse:print(f"Error in response for slot {slot}")return slot,Noneexcept Exception as e:print(f"Exception for slot {slot}: {e}")return slot,Nonedefsave_block_to_file(slot, block, output_dir):"""Save block data to a JSON file""" os.makedirs(output_dir, exist_ok=True)try:ifhasattr(block,'__dict__'): block_dict = block.__dict__else: block_dict ={'blockhash':str(block.blockhash)ifhasattr(block,'blockhash')elseNone,'blockHeight': block.block_height ifhasattr(block,'block_height')elseNone,'blockTime': block.block_time ifhasattr(block,'block_time')elseNone,'parentSlot': block.parent_slot ifhasattr(block,'parent_slot')elseNone,'previousBlockhash':str(block.previous_blockhash)ifhasattr(block,'previous_blockhash')elseNone,'transactions':[{'meta': tx.meta.__dict__ ifhasattr(tx,'meta')andhasattr(tx.meta,'__dict__')elseNone,'transaction':str(tx.transaction)ifhasattr(tx,'transaction')elseNone,}for tx in block.transactions ifhasattr(block,'transactions')]ifhasattr(block,'transactions')else[]}# Save to file file_path = pathlib.Path(output_dir)/f"block_{slot}.json"withopen(file_path,'w')as f: json.dump(block_dict, f, indent=2, default=str)print(f"Saved block {slot} to {file_path}")except Exception as e:print(f"Error saving block {slot}: {e}")asyncdefget_multiple_blocks(rpc_url, slots, max_concurrency=5, output_dir="block_data"):# Create a semaphore to limit concurrent requests semaphore = asyncio.Semaphore(max_concurrency)# Create the Solana client client = Client(rpc_url)# Create a thread pool executor for running synchronous codewith concurrent.futures.ThreadPoolExecutor()as executor: tasks =[fetch_block_with_solana(executor, client, slot, semaphore)for slot in slots] results =await asyncio.gather(*tasks)# Process and save each block blocks ={}for slot, block in results:if block isnotNone: blocks[slot]= block# Save to file save_block_to_file(slot, block, output_dir)return blocks# Example usageasyncdefmain(): rpc_url ="CHAINSTACK_SOLANA_URL" slots_to_fetch =[329849011,329849012,329849013,329849014,329849015] blocks =await get_multiple_blocks(rpc_url, slots_to_fetch, max_concurrency=3)print(f"\nSuccessfully fetched {len(blocks)} blocks")for slot, block in blocks.items():if block:# Access the transactions field from the block object tx_count =len(block.transactions ifhasattr(block,'transactions')else[])print(f"Slot {slot}: {tx_count} transactions")# Run the async functionif __name__ =="__main__": asyncio.run(main())
As always, make sure you get your own range of blocks in slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015].
Recommended encodings:
For most use cases: json (good balance of size and parsing speed)
For human-readable data: jsonParsed (larger but provides decoded instruction data)
For binary efficiency: base64 (efficient for storage and transmission)
Let’s start with a working script and explain it below:
"""Solana getBlock with retries implementationThis script demonstrates how to fetch a Solana block with propererror handling and exponential backoff retries."""import timefrom solana.rpc.api import Clientdefget_block_with_retries(client, slot, max_retries=3, backoff_factor=2):"""Fetch a block with exponential backoff retries Args: client: Solana RPC client slot: The slot number to fetch max_retries: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff Returns: Block data orNoneif failed after retries""" retry_count =0 base_wait_time =1# Start with 1 second waitwhile retry_count < max_retries:try:# Get the block response block_resp = client.get_block( slot, encoding="json", max_supported_transaction_version=0)# The response is a solders.rpc.responses.GetBlockResp object# We need to check if it has a value (success) or errorifhasattr(block_resp,'value')and block_resp.value isnotNone:return block_resp.valueelifhasattr(block_resp,'error'): error = block_resp.error# Handle specific error codes if possibleifhasattr(error,'code'): code = error.code message =getattr(error,'message','Unknown error')if code ==-32007:# Block not available (purged from ledger)print(f"Block {slot} not available in ledger storage")returnNoneelif code ==-32004:# Slot skippedprint(f"Slot {slot} was skipped (no block produced)")returnNoneelif code ==-32603:# Internal error# This might be temporary, retryprint(f"Internal error, will retry: {message}")elif code ==429:# Rate limitprint("Rate limited, backing off significantly") retry_count +=1 time.sleep(base_wait_time * backoff_factor ** retry_count *2)# Extra backoff for rate limitscontinueelse:print(f"RPC error code {code}: {message}")else:# If we can't get a specific error codeprint(f"Error in response: {error}")else:# Null resultprint(f"Block {slot} returned null (not found or not confirmed)")returnNoneexcept Exception as e: retry_count +=1 wait_time = base_wait_time * backoff_factor ** retry_countprint(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")if retry_count < max_retries: time.sleep(wait_time)else:print(f"Max retries reached for block {slot}")returnNone# If we get here, there was an error but not one we want to immediately retry on retry_count +=1 wait_time = base_wait_time * backoff_factor ** retry_countif retry_count < max_retries:print(f"Retrying after {wait_time}s...") time.sleep(wait_time)else:print(f"Max retries reached for block {slot}")returnNonereturnNone# Shouldn't reach here, but just in casedefmain():# Example usage rpc_url ="CHAINSTACK_SOLANA_URL"print(f"Connecting to Solana RPC: {rpc_url}") client = Client(rpc_url) slot_to_fetch =329849011# Example slotprint(f"Fetching block at slot {slot_to_fetch}...") block = get_block_with_retries(client, slot_to_fetch)if block:# Print summary of the block transactions =getattr(block,'transactions',[])# Safely extract signatures signatures =[]for tx in transactions:ifhasattr(tx,'transaction')andhasattr(tx.transaction,'signatures'):if tx.transaction.signatures: signatures.append(tx.transaction.signatures[0])print(f"\nBlock {slot_to_fetch} summary:")print(f" Blockhash: {getattr(block,'blockhash','unknown')}")print(f" Parent Slot: {getattr(block,'parent_slot','unknown')}")print(f" Block Time: {getattr(block,'block_time','unknown')}")print(f" Block Height: {getattr(block,'block_height','unknown')}")print(f" Transactions: {len(transactions)}")if signatures:print(f"\nFirst few transaction signatures:")for sig in signatures[:3]:print(f" {sig}")iflen(signatures)>3:print(f" ... and {len(signatures)-3} more")else:print("Failed to fetch block")if __name__ =="__main__": main()
# Handle specific error codes if possibleifhasattr(error,'code'): code = error.code message =getattr(error,'message','Unknown error')if code ==-32007:# Block not available (purged from ledger)print(f"Block {slot} not available in ledger storage")returnNoneelif code ==-32004:# Slot skippedprint(f"Slot {slot} was skipped (no block produced)")returnNoneelif code ==-32603:# Internal error# This might be temporary, retryprint(f"Internal error, will retry: {message}")elif code ==429:# Rate limitprint("Rate limited, backing off significantly") retry_count +=1 time.sleep(base_wait_time * backoff_factor ** retry_count *2)# Extra backoff for rate limitscontinue
The script intelligently handles different error types:
Non-retryable errors (like purged blocks) fail fast without wasting retries
Temporary errors proceed with standard backoff
Rate limit errors get special treatment with doubled backoff
ifhasattr(block_resp,'value')and block_resp.value isnotNone:return block_resp.valueelifhasattr(block_resp,'error'): error = block_resp.error# Error handling logicelse:# Null resultprint(f"Block {slot} returned null (not found or not confirmed)")returnNone
The code thoroughly validates responses before processing:
This allows tuning based on network conditions or application requirements
These techniques together create a resilient implementation that gracefully handles various network issues, temporary failures, and rate limiting while providing clear feedback about what’s happening during the process.
Implement a client-side cache that stores previously fetched blocks in memory, allowing applications to:
Retrieve frequently accessed blocks without making additional RPC calls
Track cache performance with hit/miss statistics
Maintain a configurable maximum cache size
This should reduce unnecessary calls.
Example:
#!/usr/bin/env python3"""Solana Block CacheA utility for efficiently fetching and caching Solana blocks to reduce RPC calls."""import timeimport jsonimport requestsfrom typing import Dict, Any, Optional, Unionfrom solana.rpc.api import ClientclassBlockCache:"""A simple cache for Solana blocks to reduce redundant RPC calls."""def__init__(self, client: Client, max_size:int=100):""" Initialize the block cache. Args: client: A Solana RPC client instance or URL string max_size: Maximum number of blocks to keep in cache"""ifisinstance(client,str): self.rpc_url = client self.client =Noneelse: self.rpc_url = client._provider.endpoint_uri self.client = client self.session = requests.Session() self.session.headers.update({"Content-Type":"application/json","Accept-Encoding":"gzip"}) self.cache: Dict[str, Any]={} self.max_size = max_size self.hits =0 self.misses =0 self.request_id =0defget_block(self, slot:int, encoding:str="json", tx_details:str="full", max_supported_transaction_version: Optional[int]=0)-> Optional[Dict[str, Any]]:""" Fetch a block, using cache if available. Args: slot: The slot number of the block to fetch encoding: Block encoding format("json","jsonParsed","base64", etc.) tx_details: Transaction details level ("full","signatures","accounts","none") max_supported_transaction_version: Maximum supported transaction version Returns: The block data orNoneifnot found/error""" cache_key =f"{slot}:{encoding}:{tx_details}"# Return from cache if availableif cache_key in self.cache: self.hits +=1print(f"Cache hit for block {slot}")return self.cache[cache_key]# Fetch from RPC self.misses +=1print(f"Cache miss for block {slot}, fetching from RPC...")try: self.request_id +=1# Prepare parameters for the getBlock method params ={"encoding": encoding,"transactionDetails": tx_details,"maxSupportedTransactionVersion": max_supported_transaction_version}# Create the JSON-RPC payload payload ={"jsonrpc":"2.0","id": self.request_id,"method":"getBlock","params":[slot, params]}# Make the HTTP request response = self.session.post(self.rpc_url, json=payload, timeout=30) block_resp = response.json()# Get the result result = block_resp.get('result')# Store in cache if we got a resultif result:# If cache is full, remove oldest entryiflen(self.cache)>= self.max_size: oldest_key =next(iter(self.cache))del self.cache[oldest_key] self.cache[cache_key]= resultreturn resultexcept Exception as e:print(f"Error fetching block {slot}: {e}")returnNonedefclear_cache(self)->None:"""Clear the entire cache.""" self.cache ={}print("Cache cleared")defremove_from_cache(self, slot:int, encoding:str="json", tx_details:str="full")->bool:""" Remove a specific block from the cache. Returns:Trueif the block was in the cache and removed,False otherwise""" cache_key =f"{slot}:{encoding}:{tx_details}"if cache_key in self.cache:del self.cache[cache_key]print(f"Removed block {slot} from cache")returnTruereturnFalsedefget_stats(self)-> Dict[str, Union[int,float]]:"""Get cache performance statistics.""" total_requests = self.hits + self.misses hit_rate = self.hits / total_requests if total_requests >0else0return{"size":len(self.cache),"max_size": self.max_size,"hits": self.hits,"misses": self.misses,"hit_rate": hit_rate,}defmain():"""Example usage of BlockCache.""" rpc_url ="CHAINSTACK_SOLANA_URL"print("Initializing Solana Block Cache") block_cache = BlockCache(rpc_url)# Example: Fetch a block (cache miss)print("\nFetching block first time (should be cache miss):") block1 = block_cache.get_block(329849011)if block1:print(f"Block hash: {block1.get('blockhash')}")print(f"Block time: {block1.get('blockTime')}")print(f"Transaction count: {len(block1.get('transactions',[]))}")# Example: Fetch the same block again (cache hit)print("\nFetching same block again (should be cache hit):") block2 = block_cache.get_block(329849011)# Fetch with different parameters (should be a cache miss)print("\nFetching same block with different parameters (should be cache miss):") block3 = block_cache.get_block(329849011, tx_details="signatures")# Print cache statisticsprint("\nCache statistics:") stats = block_cache.get_stats()for key, value in stats.items():if key =="hit_rate":print(f"{key}: {value:.2%}")else:print(f"{key}: {value}")if __name__ =="__main__": main()
Implement flexible block range handling through a range-first processing pattern — first identify all available blocks in the target range, then systematically process them in batches.
#!/usr/bin/env python3"""Solana Block Range ProcessingThis script demonstrates how to process ranges of Solana blocksefficiently with batching and proper error handling."""import timefrom solana.rpc.api import Clientdefget_block_with_retries(client, slot, max_retries=3, backoff_factor=2):"""Fetch a block with exponential backoff retries Args: client: Solana RPC client slot: The slot number to fetch max_retries: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff Returns: Block data orNoneif failed after retries""" retry_count =0 base_wait_time =1# Start with 1 second waitwhile retry_count < max_retries:try:# Get the block response block_resp = client.get_block( slot, encoding="json", max_supported_transaction_version=0)# The response is a solders.rpc.responses.GetBlockResp object# We need to check if it has a value (success) or errorifhasattr(block_resp,'value')and block_resp.value isnotNone:return block_resp.valueelifhasattr(block_resp,'error'): error = block_resp.error# Handle specific error codes if possibleifhasattr(error,'code'): code = error.code message =getattr(error,'message','Unknown error')if code ==-32007:# Block not available (purged from ledger)print(f"Block {slot} not available in ledger storage")returnNoneelif code ==-32004:# Slot skippedprint(f"Slot {slot} was skipped (no block produced)")returnNoneelif code ==-32603:# Internal error# This might be temporary, retryprint(f"Internal error, will retry: {message}")elif code ==429:# Rate limitprint("Rate limited, backing off significantly") retry_count +=1 time.sleep(base_wait_time * backoff_factor ** retry_count *2)# Extra backoff for rate limitscontinueelse:print(f"RPC error code {code}: {message}")else:# If we can't get a specific error codeprint(f"Error in response: {error}")else:# Null resultprint(f"Block {slot} returned null (not found or not confirmed)")returnNoneexcept Exception as e: retry_count +=1 wait_time = base_wait_time * backoff_factor ** retry_countprint(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")if retry_count < max_retries: time.sleep(wait_time)else:print(f"Max retries reached for block {slot}")returnNone# If we get here, there was an error but not one we want to immediately retry on retry_count +=1 wait_time = base_wait_time * backoff_factor ** retry_countif retry_count < max_retries:print(f"Retrying after {wait_time}s...") time.sleep(wait_time)else:print(f"Max retries reached for block {slot}")returnNonereturnNone# Shouldn't reach here, but just in casedefget_block_range(client, start_slot, end_slot=None, limit=500):""" Get a list of available block slots in a range. If end_slot isNone, will use getBlocksWithLimit instead of getBlocks. Args: client: Solana RPC client start_slot: The starting slot number end_slot: The ending slot number (optional) limit: Maximum number of blocks to return when end_slot isNone Returns: List of available block slots in the range"""try:if end_slot isNone:# Use getBlocksWithLimit blocks_resp = client.get_blocks_with_limit(start_slot, limit)else:# Use getBlocks blocks_resp = client.get_blocks(start_slot, end_slot)# Handle the response which could be an object or a dictifhasattr(blocks_resp,'value'):# Object responsereturn blocks_resp.value or[]else:# Dict responsereturn blocks_resp.get('result',[])except Exception as e:print(f"Error getting block range: {e}")return[]defprocess_block_range(client, start_slot, end_slot, batch_size=10):"""Process blocks in a rangein batches Args: client: Solana RPC client start_slot: The starting slot number end_slot: The ending slot number batch_size: Number of blocks to process in each batch Returns: Total number of transactions processed"""# First get all available block slots in the rangeprint(f"Fetching block slots from {start_slot} to {end_slot}...") block_slots = get_block_range(client, start_slot, end_slot)print(f"Found {len(block_slots)} blocks to process")# Process in batches to avoid overwhelming the RPC node total_blocks =len(block_slots) total_transactions =0for i inrange(0, total_blocks, batch_size): batch = block_slots[i:i+batch_size] batch_num = i//batch_size +1 total_batches =(total_blocks + batch_size -1)//batch_sizeprint(f"Processing batch {batch_num}/{total_batches}...")# Process each block in the batchfor slot in batch: block = get_block_with_retries(client, slot)if block:# Get transaction count safely transactions =getattr(block,'transactions',[]) tx_count =len(transactions) total_transactions += tx_countprint(f" Slot {slot}: {tx_count} transactions")# Add a small delay between blocks to be nice to the RPC node time.sleep(0.1)# Add a delay between batchesif i + batch_size < total_blocks:print(f"Waiting before next batch...") time.sleep(1)print(f"Processed {total_blocks} blocks with {total_transactions} total transactions")return total_transactionsdefprocess_recent_blocks(client, count=100):"""Process the most recent blocks Args: client: Solana RPC client count: Number of recent blocks to process Returns: Total number of transactions processed"""# Get the current slot current_slot_resp = client.get_slot()# Handle either object or dict responseifhasattr(current_slot_resp,'value'):# Object response current_slot = current_slot_resp.valueelse:# Dict response current_slot = current_slot_resp.get('result')if current_slot:# Calculate the starting slot (count blocks back) start_slot =max(0, current_slot - count)print(f"Processing {count} recent blocks from {start_slot} to {current_slot}...")return process_block_range(client, start_slot, current_slot)else:print("Failed to get current slot")return0defmain():# Example usage rpc_url ="CHAINSTACK_SOLANA_URL"print(f"Connecting to Solana RPC: {rpc_url}") client = Client(rpc_url)# Process the 20 most recent blocks count =20print(f"Processing {count} recent blocks...") tx_count = process_recent_blocks(client, count=count)print(f"Total transactions in last {count} blocks: {tx_count}")if __name__ =="__main__": main()
Working with Solana’s getBlock RPC method efficiently requires understanding both what data you need and how to optimize your requests. By following the best practices outlined in this guide—using compression, limiting concurrency, using block ranges, requesting only what you need, and implementing proper error handling—you can build robust applications that interact with Solana blocks effectively.
Remember these key takeaways:
Use the appropriate encoding and detail level for your use case — json, jsonParsed, base58, base64.
Always enable HTTP compression.
Implement client-side caching for frequently accessed blocks.
Use controlled concurrency and throttling for bulk operations.
Handle errors gracefully with retries and backoff.
Ake
Director of Developer Experiences @ Chainstack
Talk to me all things Web3
20 years in technology | 8+ years in Web3 full time years experience
Trusted advisor helping developers navigate the complexities of blockchain infrastructure