Solana’s getBlock RPC method is a fundamental method that can be tricky and will screw up your application performance in a jiffy if you are not paying attention.
This guide provides a comprehensive overview of how to use getBlock efficiently, with practical examples in Python and curl.
This request fetches block 329849011 with full transaction details in JSON format.
Note two things here:
We are using jsonParsed, which produces the largest output on the node side and gets transported to you. You should never do this in production with a heavy load. This more of a one-off inspection call than anything else.
If you are using a full and not an archive Solana node, use a block number within the last 20 hours or so. Otherwise this will be an archive call. See Limits at the bottom for the archive node methods availability.
Same note on the block number in params as above. Make sure you stay within the last 20 hours or so, unless you want to use an archive call, which is also fine as Chainstack is extremely affordable & transparent with pricing — a full node request is counted as one request, and an archive node request is counted as 2 requests, and that’s it.
HTTP compression is critical when working with Solana’s getBlock method due to the large size of block data. Here’s why compression is essential and how to implement it:
Dramatic size reduction — Solana block data with full transaction details easily take several MBs in JSON format. Gzip compression typically reduces this by 70-90%, bringing it down to a few hundred KB.
Faster response times — less data transmitted means faster responses, especially important when:
Working with blocks containing many transactions
Operating on networks with limited bandwidth
Fetching multiple blocks in sequence
Reduced bandwidth costs — If you’re paying for bandwidth (e.g., in cloud environments), compression significantly reduces costs.
Server-friendly — compression reduces load on both the RPC node and your client’s network connection.
Most modern HTTP libraries support compression automatically:
Python requests — add headers={"Accept-Encoding": "gzip"} or set requests.get(..., stream=True)
Node.js — most HTTP clients like Axios support this out of the box
Rust — libraries like reqwest have compression features
Using compression is one of the simplest and most effective optimizations when working with Solana’s getBlock RPC method, especially for blocks with many transactions. So use compression.
When using Solana’s getBlock RPC method, you can request data in different encoding formats based on your specific needs.
Note that when you are doing a getBlock call with "encoding": "base58" or "encoding": "base64", you are getting the respective encoding on the transaction level, not the entire block. In other words, you will still get back a JSON response, it’s only the transaction data that will be encoded in base58 or base64.
Best for: General use cases where you need a balance of readability and performance. Binary data remains encoded but the overall structure is easily parseable.
Debugging and analysis where you need to understand transaction contents
Decoding program instructions without additional parsing work
Applications that display transaction details to users
Limitations — not all programs can be parsed as you need an IDL (similar to EVM’s ABIs) or source code, and response size is larger than other encodings.
Python example with json, jsonParsed, base58, base64
First, install the package:pip install solana.
import requestsimport timeimport jsonimport osimport base64import binascii# Add base58 decoding supporttry: import base58except ImportError: print("Installing base58 module...") import subprocess subprocess.check_call(["pip", "install", "base58"]) import base58# Initialize the Solana RPC endpoint URLrpc_url = "CHAINSTACK_SOLANA_URL"def save_raw_response(response_data, slot, encoding): """Save raw RPC response data to a file""" # Create output directory if it doesn't exist os.makedirs('block_data', exist_ok=True) filename = f"block_data/block_{slot}_{encoding}.json" with open(filename, 'w') as f: json.dump(response_data, f, indent=2) print(f"Saved raw {encoding} response to {filename}") file_size = os.path.getsize(filename) / (1024 * 1024) # Size in MB print(f"File size: {file_size:.2f} MB")def make_rpc_request(method, params): """Make a JSON-RPC request to the Solana node""" headers = {"Content-Type": "application/json"} payload = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } response = requests.post(rpc_url, headers=headers, json=payload) return response.json()def get_block_with_encoding(slot, encoding="json"): """Fetch a block with specified encoding and return the raw response""" try: start_time = time.time() # Make direct RPC request params = [ slot, { "encoding": encoding, "maxSupportedTransactionVersion": 0, "transactionDetails": "full", "rewards": False } ] raw_response = make_rpc_request("getBlock", params) elapsed = time.time() - start_time if not raw_response.get("result"): print(f"Block {slot} not found with {encoding} encoding.") return None print(f"Fetched block {slot} with {encoding} encoding in {elapsed:.2f} seconds") # Save the raw response save_raw_response(raw_response, slot, encoding) return raw_response except Exception as e: print(f"Error fetching block with {encoding} encoding: {e}") return Nonedef get_latest_slot(): """Get the latest finalized slot""" try: # Correct parameter format for getSlot - commitment should be in an object params = [{"commitment": "finalized"}] slot_resp = make_rpc_request("getSlot", params) if "result" not in slot_resp: print("Failed to get current slot") print(f"Error response: {slot_resp}") return None current_slot = slot_resp["result"] print(f"Current slot: {current_slot}") return current_slot except Exception as e: print(f"Error fetching latest slot: {e}") return Nonedef inspect_transaction_format(response, encoding): """Inspect and print transaction format details for the first transaction""" if not response or "result" not in response: print(f"No result to inspect for {encoding}") return result = response["result"] if "transactions" not in result or not result["transactions"]: print(f"No transactions found in {encoding} response") return # Get the first transaction first_tx = result["transactions"][0] # Print transaction format details print(f"\n{encoding} Transaction Format Analysis:") print("-" * 40) # For base64 and base58 encodings, we need to handle the response differently if encoding in ["base64", "base58"]: print(f"Transaction structure type: {type(first_tx)}") print(f"Available fields: {list(first_tx.keys())}") # Check for specific encoding fields if "transaction" in first_tx: tx_data = first_tx["transaction"] print(f"Transaction data type: {type(tx_data)}") if isinstance(tx_data, list) and len(tx_data) >= 2: # The format is [encoded_data, encoding_type] encoded_str = tx_data[0] format_type = tx_data[1] print(f"Transaction is a {format_type} encoded array") print(f"First 50 chars of encoded data: {encoded_str[:50]}...") # Try to extract a small sample of the decoded data try: if format_type == "base64": sample = base64.b64decode(encoded_str[:100] + "=" * (4 - len(encoded_str[:100]) % 4)) elif format_type == "base58": sample = base58.b58decode(encoded_str[:100]) print(f"Sample of decoded bytes (hex): {sample.hex()[:60]}...") except Exception as e: print(f"Error decoding sample: {e}") elif isinstance(tx_data, str): print(f"Transaction is a string of length: {len(tx_data)}") print(f"First 50 chars: {tx_data[:50]}...") else: print(f"Transaction is not a string or list but: {type(tx_data)}") if isinstance(tx_data, dict): print(f"Keys: {list(tx_data.keys())}") else: # Original handling for JSON and jsonParsed if "transaction" in first_tx: tx = first_tx["transaction"] # Check if it's a string (likely base64) or an object if isinstance(tx, str): print(f"Transaction is a string (likely base64)") print(f"Length: {len(tx)} characters") print(f"First 100 chars: {tx[:100]}...") else: print(f"Transaction is an object/dict") print(f"Keys: {list(tx.keys()) if isinstance(tx, dict) else 'N/A'}") # If there's a message field, check its format if isinstance(tx, dict) and "message" in tx: msg = tx["message"] if isinstance(msg, str): print(f"Message is a string, length: {len(msg)}") print(f"First 100 chars: {msg[:100]}...") else: print(f"Message is an object with keys: {list(msg.keys()) if isinstance(msg, dict) else 'N/A'}") else: print(f"Transaction doesn't have expected 'transaction' field") print(f"Available fields: {list(first_tx.keys())}") print("-" * 40)# Fetch the latest slotslot_number = get_latest_slot()if slot_number: # Fetch the block with each encoding type print("\nFetching block with JSON encoding:") json_response = get_block_with_encoding(slot_number, encoding="json") print("\nFetching block with jsonParsed encoding:") jsonParsed_response = get_block_with_encoding(slot_number, encoding="jsonParsed") print("\nFetching block with base64 encoding:") base64_response = get_block_with_encoding(slot_number, encoding="base64") print("\nFetching block with base58 encoding:") base58_response = get_block_with_encoding(slot_number, encoding="base58") # Inspect the transaction formats to verify encoding differences if json_response: inspect_transaction_format(json_response, "json") if jsonParsed_response: inspect_transaction_format(jsonParsed_response, "jsonParsed") if base64_response: inspect_transaction_format(base64_response, "base64") if base58_response: inspect_transaction_format(base58_response, "base58") print("\nComparison complete. Check the block_data directory for the raw responses.")
When fetching multiple blocks, avoid sending too many requests simultaneously:
import asyncioimport concurrent.futuresfrom solana.rpc.api import Clientimport jsonimport osimport pathlibasync def fetch_block_with_solana(executor, client, slot, semaphore): # Use a semaphore to limit concurrency async with semaphore: try: # Run the synchronous client.get_block in a thread pool loop = asyncio.get_running_loop() response = await loop.run_in_executor( executor, lambda: client.get_block( slot, encoding="json", max_supported_transaction_version=0 ) ) # Solana-py returns a solders.rpc.responses.GetBlockResp object # Check if the value field exists (indicates success) if hasattr(response, 'value') and response.value is not None: return slot, response.value else: print(f"Error in response for slot {slot}") return slot, None except Exception as e: print(f"Exception for slot {slot}: {e}") return slot, Nonedef save_block_to_file(slot, block, output_dir): """Save block data to a JSON file""" os.makedirs(output_dir, exist_ok=True) try: if hasattr(block, '__dict__'): block_dict = block.__dict__ else: block_dict = { 'blockhash': str(block.blockhash) if hasattr(block, 'blockhash') else None, 'blockHeight': block.block_height if hasattr(block, 'block_height') else None, 'blockTime': block.block_time if hasattr(block, 'block_time') else None, 'parentSlot': block.parent_slot if hasattr(block, 'parent_slot') else None, 'previousBlockhash': str(block.previous_blockhash) if hasattr(block, 'previous_blockhash') else None, 'transactions': [ { 'meta': tx.meta.__dict__ if hasattr(tx, 'meta') and hasattr(tx.meta, '__dict__') else None, 'transaction': str(tx.transaction) if hasattr(tx, 'transaction') else None, } for tx in block.transactions if hasattr(block, 'transactions') ] if hasattr(block, 'transactions') else [] } # Save to file file_path = pathlib.Path(output_dir) / f"block_{slot}.json" with open(file_path, 'w') as f: json.dump(block_dict, f, indent=2, default=str) print(f"Saved block {slot} to {file_path}") except Exception as e: print(f"Error saving block {slot}: {e}")async def get_multiple_blocks(rpc_url, slots, max_concurrency=5, output_dir="block_data"): # Create a semaphore to limit concurrent requests semaphore = asyncio.Semaphore(max_concurrency) # Create the Solana client client = Client(rpc_url) # Create a thread pool executor for running synchronous code with concurrent.futures.ThreadPoolExecutor() as executor: tasks = [fetch_block_with_solana(executor, client, slot, semaphore) for slot in slots] results = await asyncio.gather(*tasks) # Process and save each block blocks = {} for slot, block in results: if block is not None: blocks[slot] = block # Save to file save_block_to_file(slot, block, output_dir) return blocks# Example usageasync def main(): rpc_url = "CHAINSTACK_SOLANA_URL" slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015] blocks = await get_multiple_blocks(rpc_url, slots_to_fetch, max_concurrency=3) print(f"\nSuccessfully fetched {len(blocks)} blocks") for slot, block in blocks.items(): if block: # Access the transactions field from the block object tx_count = len(block.transactions if hasattr(block, 'transactions') else []) print(f"Slot {slot}: {tx_count} transactions")# Run the async functionif __name__ == "__main__": asyncio.run(main())
As always, make sure you get your own range of blocks in slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015].
Recommended encodings:
For most use cases: json (good balance of size and parsing speed)
For human-readable data: jsonParsed (larger but provides decoded instruction data)
For binary efficiency: base64 (efficient for storage and transmission)
Let’s start with a working script and explain it below:
"""Solana getBlock with retries implementationThis script demonstrates how to fetch a Solana block with propererror handling and exponential backoff retries."""import timefrom solana.rpc.api import Clientdef get_block_with_retries(client, slot, max_retries=3, backoff_factor=2): """Fetch a block with exponential backoff retries Args: client: Solana RPC client slot: The slot number to fetch max_retries: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff Returns: Block data or None if failed after retries """ retry_count = 0 base_wait_time = 1 # Start with 1 second wait while retry_count < max_retries: try: # Get the block response block_resp = client.get_block( slot, encoding="json", max_supported_transaction_version=0 ) # The response is a solders.rpc.responses.GetBlockResp object # We need to check if it has a value (success) or error if hasattr(block_resp, 'value') and block_resp.value is not None: return block_resp.value elif hasattr(block_resp, 'error'): error = block_resp.error # Handle specific error codes if possible if hasattr(error, 'code'): code = error.code message = getattr(error, 'message', 'Unknown error') if code == -32007: # Block not available (purged from ledger) print(f"Block {slot} not available in ledger storage") return None elif code == -32004: # Slot skipped print(f"Slot {slot} was skipped (no block produced)") return None elif code == -32603: # Internal error # This might be temporary, retry print(f"Internal error, will retry: {message}") elif code == 429: # Rate limit print("Rate limited, backing off significantly") retry_count += 1 time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits continue else: print(f"RPC error code {code}: {message}") else: # If we can't get a specific error code print(f"Error in response: {error}") else: # Null result print(f"Block {slot} returned null (not found or not confirmed)") return None except Exception as e: retry_count += 1 wait_time = base_wait_time * backoff_factor ** retry_count print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}") if retry_count < max_retries: time.sleep(wait_time) else: print(f"Max retries reached for block {slot}") return None # If we get here, there was an error but not one we want to immediately retry on retry_count += 1 wait_time = base_wait_time * backoff_factor ** retry_count if retry_count < max_retries: print(f"Retrying after {wait_time}s...") time.sleep(wait_time) else: print(f"Max retries reached for block {slot}") return None return None # Shouldn't reach here, but just in casedef main(): # Example usage rpc_url = "CHAINSTACK_SOLANA_URL" print(f"Connecting to Solana RPC: {rpc_url}") client = Client(rpc_url) slot_to_fetch = 329849011 # Example slot print(f"Fetching block at slot {slot_to_fetch}...") block = get_block_with_retries(client, slot_to_fetch) if block: # Print summary of the block transactions = getattr(block, 'transactions', []) # Safely extract signatures signatures = [] for tx in transactions: if hasattr(tx, 'transaction') and hasattr(tx.transaction, 'signatures'): if tx.transaction.signatures: signatures.append(tx.transaction.signatures[0]) print(f"\nBlock {slot_to_fetch} summary:") print(f" Blockhash: {getattr(block, 'blockhash', 'unknown')}") print(f" Parent Slot: {getattr(block, 'parent_slot', 'unknown')}") print(f" Block Time: {getattr(block, 'block_time', 'unknown')}") print(f" Block Height: {getattr(block, 'block_height', 'unknown')}") print(f" Transactions: {len(transactions)}") if signatures: print(f"\nFirst few transaction signatures:") for sig in signatures[:3]: print(f" {sig}") if len(signatures) > 3: print(f" ... and {len(signatures)-3} more") else: print("Failed to fetch block")if __name__ == "__main__": main()
if hasattr(block_resp, 'value') and block_resp.value is not None: return block_resp.valueelif hasattr(block_resp, 'error'): error = block_resp.error # Error handling logicelse: # Null result print(f"Block {slot} returned null (not found or not confirmed)") return None
The code thoroughly validates responses before processing:
This allows tuning based on network conditions or application requirements
These techniques together create a resilient implementation that gracefully handles various network issues, temporary failures, and rate limiting while providing clear feedback about what’s happening during the process.
Implement flexible block range handling through a range-first processing pattern — first identify all available blocks in the target range, then systematically process them in batches.
#!/usr/bin/env python3"""Solana Block Range ProcessingThis script demonstrates how to process ranges of Solana blocksefficiently with batching and proper error handling."""import timefrom solana.rpc.api import Clientdef get_block_with_retries(client, slot, max_retries=3, backoff_factor=2): """Fetch a block with exponential backoff retries Args: client: Solana RPC client slot: The slot number to fetch max_retries: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff Returns: Block data or None if failed after retries """ retry_count = 0 base_wait_time = 1 # Start with 1 second wait while retry_count < max_retries: try: # Get the block response block_resp = client.get_block( slot, encoding="json", max_supported_transaction_version=0 ) # The response is a solders.rpc.responses.GetBlockResp object # We need to check if it has a value (success) or error if hasattr(block_resp, 'value') and block_resp.value is not None: return block_resp.value elif hasattr(block_resp, 'error'): error = block_resp.error # Handle specific error codes if possible if hasattr(error, 'code'): code = error.code message = getattr(error, 'message', 'Unknown error') if code == -32007: # Block not available (purged from ledger) print(f"Block {slot} not available in ledger storage") return None elif code == -32004: # Slot skipped print(f"Slot {slot} was skipped (no block produced)") return None elif code == -32603: # Internal error # This might be temporary, retry print(f"Internal error, will retry: {message}") elif code == 429: # Rate limit print("Rate limited, backing off significantly") retry_count += 1 time.sleep(base_wait_time * backoff_factor ** retry_count * 2) # Extra backoff for rate limits continue else: print(f"RPC error code {code}: {message}") else: # If we can't get a specific error code print(f"Error in response: {error}") else: # Null result print(f"Block {slot} returned null (not found or not confirmed)") return None except Exception as e: retry_count += 1 wait_time = base_wait_time * backoff_factor ** retry_count print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}") if retry_count < max_retries: time.sleep(wait_time) else: print(f"Max retries reached for block {slot}") return None # If we get here, there was an error but not one we want to immediately retry on retry_count += 1 wait_time = base_wait_time * backoff_factor ** retry_count if retry_count < max_retries: print(f"Retrying after {wait_time}s...") time.sleep(wait_time) else: print(f"Max retries reached for block {slot}") return None return None # Shouldn't reach here, but just in casedef get_block_range(client, start_slot, end_slot=None, limit=500): """ Get a list of available block slots in a range. If end_slot is None, will use getBlocksWithLimit instead of getBlocks. Args: client: Solana RPC client start_slot: The starting slot number end_slot: The ending slot number (optional) limit: Maximum number of blocks to return when end_slot is None Returns: List of available block slots in the range """ try: if end_slot is None: # Use getBlocksWithLimit blocks_resp = client.get_blocks_with_limit(start_slot, limit) else: # Use getBlocks blocks_resp = client.get_blocks(start_slot, end_slot) # Handle the response which could be an object or a dict if hasattr(blocks_resp, 'value'): # Object response return blocks_resp.value or [] else: # Dict response return blocks_resp.get('result', []) except Exception as e: print(f"Error getting block range: {e}") return []def process_block_range(client, start_slot, end_slot, batch_size=10): """Process blocks in a range in batches Args: client: Solana RPC client start_slot: The starting slot number end_slot: The ending slot number batch_size: Number of blocks to process in each batch Returns: Total number of transactions processed """ # First get all available block slots in the range print(f"Fetching block slots from {start_slot} to {end_slot}...") block_slots = get_block_range(client, start_slot, end_slot) print(f"Found {len(block_slots)} blocks to process") # Process in batches to avoid overwhelming the RPC node total_blocks = len(block_slots) total_transactions = 0 for i in range(0, total_blocks, batch_size): batch = block_slots[i:i+batch_size] batch_num = i//batch_size + 1 total_batches = (total_blocks + batch_size - 1)//batch_size print(f"Processing batch {batch_num}/{total_batches}...") # Process each block in the batch for slot in batch: block = get_block_with_retries(client, slot) if block: # Get transaction count safely transactions = getattr(block, 'transactions', []) tx_count = len(transactions) total_transactions += tx_count print(f" Slot {slot}: {tx_count} transactions") # Add a small delay between blocks to be nice to the RPC node time.sleep(0.1) # Add a delay between batches if i + batch_size < total_blocks: print(f"Waiting before next batch...") time.sleep(1) print(f"Processed {total_blocks} blocks with {total_transactions} total transactions") return total_transactionsdef process_recent_blocks(client, count=100): """Process the most recent blocks Args: client: Solana RPC client count: Number of recent blocks to process Returns: Total number of transactions processed """ # Get the current slot current_slot_resp = client.get_slot() # Handle either object or dict response if hasattr(current_slot_resp, 'value'): # Object response current_slot = current_slot_resp.value else: # Dict response current_slot = current_slot_resp.get('result') if current_slot: # Calculate the starting slot (count blocks back) start_slot = max(0, current_slot - count) print(f"Processing {count} recent blocks from {start_slot} to {current_slot}...") return process_block_range(client, start_slot, current_slot) else: print("Failed to get current slot") return 0def main(): # Example usage rpc_url = "CHAINSTACK_SOLANA_URL" print(f"Connecting to Solana RPC: {rpc_url}") client = Client(rpc_url) # Process the 20 most recent blocks count = 20 print(f"Processing {count} recent blocks...") tx_count = process_recent_blocks(client, count=count) print(f"Total transactions in last {count} blocks: {tx_count}")if __name__ == "__main__": main()
Working with Solana’s getBlock RPC method efficiently requires understanding both what data you need and how to optimize your requests. By following the best practices outlined in this guide—using compression, limiting concurrency, using block ranges, requesting only what you need, and implementing proper error handling—you can build robust applications that interact with Solana blocks effectively.
Remember these key takeaways:
Use the appropriate encoding and detail level for your use case — json, jsonParsed, base58, base64.
Always enable HTTP compression.
Implement client-side caching for frequently accessed blocks.
Use controlled concurrency and throttling for bulk operations.
Handle errors gracefully with retries and backoff.
Ake
Director of Developer Experiences @ Chainstack
Talk to me all things Web3
20 years in technology | 8+ years in Web3 full time years experience
Trusted advisor helping developers navigate the complexities of blockchain infrastructure