> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chainstack.com/llms.txt
> Use this file to discover all available pages before exploring further.

<AgentInstructions>

## Submitting Feedback

If you encounter incorrect, outdated, or confusing documentation on this page, submit feedback:

POST https://docs.chainstack.com/feedback

```json
{
  "path": "/docs/solana-optimize-your-getblock-performance",
  "feedback": "Description of the issue"
}
```

Only submit feedback when you have something specific and actionable to report.

</AgentInstructions>

# Solana: Optimize your getBlock performance

> Optimize getBlock call performance on Solana by tuning maxSupportedTransactionVersion, encoding, and reward parameters on your Chainstack RPC node.

**TLDR:**

* `getBlock` is a core Solana RPC method that can dramatically impact performance if misused; don’t pull more data than you need.
* Use `json`, `jsonParsed`, `base58`, or `base64` encoding strategically; always enable compression (`gzip`) to reduce huge payloads.
* Apply concurrency limits, caching, and backoff retry logic to avoid node overload and handle network hiccups gracefully.
* Use robust error handling and consider block ranges for larger-scale data fetches.

## Main article

Solana's [getBlock](/reference/solana-getblock) RPC method is a fundamental method that can be tricky and will screw up your application performance in a jiffy if you are not paying attention.

This guide provides a comprehensive overview of how to use `getBlock` efficiently, with practical examples in Python and curl.

<Check>
  ### Get your own node endpoint today

  [Start for free](https://console.chainstack.com/) and get your app to production levels immediately. No credit card required.

  You can sign up with your GitHub, X, Google, or Microsoft account.
</Check>

## Understanding getBlock

The `getBlock` method returns detailed information about a confirmed block in Solana's ledger. A block in Solana contains:

* A set of transactions
* Metadata like block hash, previous block hash
* Timing information
* Reward data

If you think you have a grasp now, check out [Understanding the difference between blocks and slots on Solana](/docs/understanding-the-difference-between-blocks-and-slots-on-solana). Solana is amazing but not for the faint heart.

### What data does getBlock return?

A typical response includes:

* `blockhash` — the unique hash (ID) of this block (base-58 encoded)
* `previousBlockhash` — the hash of the parent block
* `parentSlot` — the slot number of the parent block
* `blockHeight` — the sequential block height (the number of blocks beneath this block)
* `blockTime` — the timestamp when the block was produced, which is yet another trick. See [Solana: Understanding block time](/docs/solana-understanding-block-time).
* `transactions` — an array of transactions in the block (if requested)
* `signatures` — an array of transaction signatures in the block (if requested)
* `rewards` — an array of block rewards (if requested)

### Parameters

When calling `getBlock`, you can specify several parameters to control what data you receive:

* `commitment` — `finalized` (default) or `confirmed`
* `encoding` — `json` (default), `jsonParsed`, `base64`, or `base58`.
* `transactionDetails` — `full` (default), `accounts`, `signatures`, or `none`
* `rewards` — boolean to include block rewards
* `maxSupportedTransactionVersion` — for handling versioned transactions

## Basic example with curl

Let's start with the simplest way to fetch a block using curl:

<CodeGroup>
  ```shell Shell theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      329849011,
      {
        "encoding": "jsonParsed",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

This request fetches block [329849011](https://solscan.io/block/329849011) with full transaction details in JSON format.

Note two things here:

* We are using `jsonParsed`, which produces the largest output on the node side and gets transported to you. You should never do this in production with a heavy load. This more of a one-off inspection call than anything else.
* If you are using a full and not an archive Solana node, use a block number within the last 20 hours or so. Otherwise this will be an archive call. See [Limits](/docs/limits) at the bottom for the archive node methods availability.

### Getting just the signatures

If you're only interested in transaction signatures (hashes), which is much lighter:

<CodeGroup>
  ```shell Shell theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      329849011,
      {
        "encoding": "json",
        "transactionDetails": "signatures",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

Same note on the block number in `params` as above. Make sure you stay within the last 20 hours or so, unless you want to use an archive call, which is also fine as Chainstack is extremely affordable & transparent with pricing — a full node request is counted as one request, and an archive node request is counted as 2 requests, and that's it.

### Using compression (gzip) for better performance

HTTP compression is critical when working with Solana's `getBlock` method due to the large size of block data. Here's why compression is essential and how to implement it:

#### Why use compression

1. **Dramatic size reduction** — Solana block data with full transaction details easily take several MBs in JSON format. Gzip compression typically reduces this by 70-90%, bringing it down to a few hundred KB.

2. **Faster response times** — less data transmitted means faster responses, especially important when:

   * Working with blocks containing many transactions
   * Operating on networks with limited bandwidth
   * Fetching multiple blocks in sequence

3. **Reduced bandwidth costs** — If you're paying for bandwidth (e.g., in cloud environments), compression significantly reduces costs.

4. **Server-friendly** — compression reduces load on both the RPC node and your client's network connection.

#### Compression example with curl

Adding compression is simple with curl—just add the `Accept-Encoding: gzip` header and the `--compressed` flag:

<CodeGroup>
  ```shell Shell theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL \
    -H "Content-Type: application/json" \
    -H "Accept-Encoding: gzip" \
    --compressed \
    -d '{
      "jsonrpc": "2.0",
      "id": 1,
      "method": "getBlock",
      "params": [
        329849011,
        {
          "encoding": "json",
          "transactionDetails": "full",
          "commitment": "finalized",
          "maxSupportedTransactionVersion": 0
        }
      ]
    }'
  ```
</CodeGroup>

When you run this command:

1. The `Accept-Encoding: gzip` header tells the server you can handle compressed responses
2. The server compresses the JSON data before sending it
3. The `--compressed` flag tells curl to automatically decompress the data on receipt
4. You see the normal JSON output, but the actual network transfer was much smaller

#### Understanding the compression process

Here's what happens under the hood:

1. **Request** — your client sends a request with `Accept-Encoding: gzip`
2. **Server processing** — the server generates the JSON response
3. **Compression** — the server compresses this data using gzip
4. **Transfer** — the compressed data (much smaller) is sent over the network
5. **Decompression** — your client decompresses the data
6. **Processing** — you work with the original JSON data

Most HTTP libraries handle steps 1, 4, and 5 automatically when configured correctly.

#### Example of handling the compressed (gzip) data manually

For illustration purposes and to compare the actual size in compressed & decompressed state:

<CodeGroup>
  ```shell Shell theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL \
    -H "Content-Type: application/json" \
    -H "Accept-Encoding: gzip" \
    -d '{
      "jsonrpc": "2.0",
      "id": 1,
      "method": "getBlock",
      "params": [329849011, {"encoding": "json", "maxSupportedTransactionVersion": 0}]
    }' \
    --output block_data.gz
  ```
</CodeGroup>

Decompress the `block_data.gz` file:

<CodeGroup>
  ```shell Shell theme={"system"}
  gunzip -c block_data.gz > block_data.json
  ```
</CodeGroup>

See the \~80% reduction in size when you compare `block_data.gz` to `block_data.json`.

#### Compression in HTTP libraries

Most modern HTTP libraries support compression automatically:

* **Python requests** — add `headers={"Accept-Encoding": "gzip"}` or set `requests.get(..., stream=True)`
* **Node.js** — most HTTP clients like Axios support this out of the box
* **Rust** — libraries like reqwest have compression features

Using compression is one of the simplest and most effective optimizations when working with Solana's `getBlock` RPC method, especially for blocks with many transactions. So use compression.

## json, jsonParsed, base58, base64

When using Solana's getBlock RPC method, you can request data in different encoding formats based on your specific needs.

Note that when you are doing a `getBlock` call with `"encoding": "base58"` or `"encoding": "base64"`, you are getting the respective encoding *on the transaction level*, not the entire block. In other words, you will still get back a JSON response, it's only the transaction data that will be encoded in `base58` or `base64`.

Let's explore each option:

### json (default)

The `json` encoding provides transaction data in a standard JSON format with binary data encoded as base58 strings.

<CodeGroup>
  ```bash Bash theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      149546741,
      {
        "encoding": "json",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

**Best for**: General use cases where you need a balance of readability and performance. Binary data remains encoded but the overall structure is easily parseable.

### jsonParsed

The `jsonParsed` encoding goes beyond standard JSON by attempting to decode instruction data into human-readable format:

<CodeGroup>
  ```bash Bash theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      149546741,
      {
        "encoding": "jsonParsed",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

**Best for**:

* Debugging and analysis where you need to understand transaction contents
* Decoding program instructions without additional parsing work
* Applications that display transaction details to users

**Limitations** — not all programs can be parsed as you need an IDL (similar to EVM's ABIs) or source code, and response size is larger than other encodings.

### base58

The `base58` encoding returns binary data for transactions as base58-encoded strings:

<CodeGroup>
  ```bash Bash theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      149546741,
      {
        "encoding": "base58",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

**Best for**: Compatibility with tools that expect base58 encoding, which is common in Solana's ecosystem.

### base64

The `base64` encoding returns binary data for transactions as base64-encoded strings:

<CodeGroup>
  ```bash Bash theme={"system"}
  curl -X POST CHAINSTACK_SOLANA_URL -H "Content-Type: application/json" -d '
  {
    "jsonrpc": "2.0",
    "id": 1,
    "method": "getBlock",
    "params": [
      149546741,
      {
        "encoding": "base64",
        "transactionDetails": "full",
        "commitment": "finalized",
        "maxSupportedTransactionVersion": 0
      }
    ]
  }'
  ```
</CodeGroup>

**Best for**:

* Performance-critical applications (base64 is more compact than base58)
* Storage efficiency when saving transaction data
* High-throughput systems processing many blocks

### Encoding comparison

| Encoding   | Size         | Human Readability | Parsing Complexity | Use Case                |
| ---------- | ------------ | ----------------- | ------------------ | ----------------------- |
| json       | Medium       | Good              | Low                | General purpose         |
| jsonParsed | Largest      | Best              | Lowest             | Analysis & debugging    |
| base58     | Medium-Large | Poor              | Medium             | Ecosystem compatibility |
| base64     | Smallest     | Poor              | Medium             | Performance & storage   |

## Python example with json, jsonParsed, base58, base64

First, install the package:`pip install solana`.

```
import requests
import time
import json
import os
import base64
import binascii

# Add base58 decoding support
try:
    import base58
except ImportError:
    print("Installing base58 module...")
    import subprocess
    subprocess.check_call(["pip", "install", "base58"])
    import base58

# Initialize the Solana RPC endpoint URL
rpc_url = "CHAINSTACK_SOLANA_URL"

def save_raw_response(response_data, slot, encoding):
    """Save raw RPC response data to a file"""
    # Create output directory if it doesn't exist
    os.makedirs('block_data', exist_ok=True)

    filename = f"block_data/block_{slot}_{encoding}.json"

    with open(filename, 'w') as f:
        json.dump(response_data, f, indent=2)

    print(f"Saved raw {encoding} response to {filename}")
    file_size = os.path.getsize(filename) / (1024 * 1024)  # Size in MB
    print(f"File size: {file_size:.2f} MB")

def make_rpc_request(method, params):
    """Make a JSON-RPC request to the Solana node"""
    headers = {"Content-Type": "application/json"}
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": method,
        "params": params
    }

    response = requests.post(rpc_url, headers=headers, json=payload)
    return response.json()

def get_block_with_encoding(slot, encoding="json"):
    """Fetch a block with specified encoding and return the raw response"""
    try:
        start_time = time.time()

        # Make direct RPC request
        params = [
            slot,
            {
                "encoding": encoding,
                "maxSupportedTransactionVersion": 0,
                "transactionDetails": "full",
                "rewards": False
            }
        ]

        raw_response = make_rpc_request("getBlock", params)
        elapsed = time.time() - start_time

        if not raw_response.get("result"):
            print(f"Block {slot} not found with {encoding} encoding.")
            return None

        print(f"Fetched block {slot} with {encoding} encoding in {elapsed:.2f} seconds")

        # Save the raw response
        save_raw_response(raw_response, slot, encoding)

        return raw_response
    except Exception as e:
        print(f"Error fetching block with {encoding} encoding: {e}")
        return None

def get_latest_slot():
    """Get the latest finalized slot"""
    try:
        # Correct parameter format for getSlot - commitment should be in an object
        params = [{"commitment": "finalized"}]
        slot_resp = make_rpc_request("getSlot", params)

        if "result" not in slot_resp:
            print("Failed to get current slot")
            print(f"Error response: {slot_resp}")
            return None

        current_slot = slot_resp["result"]
        print(f"Current slot: {current_slot}")
        return current_slot
    except Exception as e:
        print(f"Error fetching latest slot: {e}")
        return None

def inspect_transaction_format(response, encoding):
    """Inspect and print transaction format details for the first transaction"""
    if not response or "result" not in response:
        print(f"No result to inspect for {encoding}")
        return

    result = response["result"]
    if "transactions" not in result or not result["transactions"]:
        print(f"No transactions found in {encoding} response")
        return

    # Get the first transaction
    first_tx = result["transactions"][0]

    # Print transaction format details
    print(f"\n{encoding} Transaction Format Analysis:")
    print("-" * 40)

    # For base64 and base58 encodings, we need to handle the response differently
    if encoding in ["base64", "base58"]:
        print(f"Transaction structure type: {type(first_tx)}")
        print(f"Available fields: {list(first_tx.keys())}")

        # Check for specific encoding fields
        if "transaction" in first_tx:
            tx_data = first_tx["transaction"]
            print(f"Transaction data type: {type(tx_data)}")

            if isinstance(tx_data, list) and len(tx_data) >= 2:
                # The format is [encoded_data, encoding_type]
                encoded_str = tx_data[0]
                format_type = tx_data[1]
                print(f"Transaction is a {format_type} encoded array")
                print(f"First 50 chars of encoded data: {encoded_str[:50]}...")

                # Try to extract a small sample of the decoded data
                try:
                    if format_type == "base64":
                        sample = base64.b64decode(encoded_str[:100] + "=" * (4 - len(encoded_str[:100]) % 4))
                    elif format_type == "base58":
                        sample = base58.b58decode(encoded_str[:100])

                    print(f"Sample of decoded bytes (hex): {sample.hex()[:60]}...")
                except Exception as e:
                    print(f"Error decoding sample: {e}")
            elif isinstance(tx_data, str):
                print(f"Transaction is a string of length: {len(tx_data)}")
                print(f"First 50 chars: {tx_data[:50]}...")
            else:
                print(f"Transaction is not a string or list but: {type(tx_data)}")
                if isinstance(tx_data, dict):
                    print(f"Keys: {list(tx_data.keys())}")
    else:
        # Original handling for JSON and jsonParsed
        if "transaction" in first_tx:
            tx = first_tx["transaction"]

            # Check if it's a string (likely base64) or an object
            if isinstance(tx, str):
                print(f"Transaction is a string (likely base64)")
                print(f"Length: {len(tx)} characters")
                print(f"First 100 chars: {tx[:100]}...")
            else:
                print(f"Transaction is an object/dict")
                print(f"Keys: {list(tx.keys()) if isinstance(tx, dict) else 'N/A'}")

                # If there's a message field, check its format
                if isinstance(tx, dict) and "message" in tx:
                    msg = tx["message"]
                    if isinstance(msg, str):
                        print(f"Message is a string, length: {len(msg)}")
                        print(f"First 100 chars: {msg[:100]}...")
                    else:
                        print(f"Message is an object with keys: {list(msg.keys()) if isinstance(msg, dict) else 'N/A'}")
        else:
            print(f"Transaction doesn't have expected 'transaction' field")
            print(f"Available fields: {list(first_tx.keys())}")

    print("-" * 40)

# Fetch the latest slot
slot_number = get_latest_slot()

if slot_number:
    # Fetch the block with each encoding type
    print("\nFetching block with JSON encoding:")
    json_response = get_block_with_encoding(slot_number, encoding="json")

    print("\nFetching block with jsonParsed encoding:")
    jsonParsed_response = get_block_with_encoding(slot_number, encoding="jsonParsed")

    print("\nFetching block with base64 encoding:")
    base64_response = get_block_with_encoding(slot_number, encoding="base64")

    print("\nFetching block with base58 encoding:")
    base58_response = get_block_with_encoding(slot_number, encoding="base58")

    # Inspect the transaction formats to verify encoding differences
    if json_response:
        inspect_transaction_format(json_response, "json")

    if jsonParsed_response:
        inspect_transaction_format(jsonParsed_response, "jsonParsed")

    if base64_response:
        inspect_transaction_format(base64_response, "base64")

    if base58_response:
        inspect_transaction_format(base58_response, "base58")

    print("\nComparison complete. Check the block_data directory for the raw responses.")
```

Here's an example output:

<CodeGroup>
  ```shell Shell theme={"system"}
  ├── [6.3M]  block_329897953_base58.json
  ├── [6.3M]  block_329897953_base64.json
  ├── [8.0M]  block_329897953_json.json
  └── [ 16M]  block_329897953_jsonParsed.json
  ```
</CodeGroup>

## Performance optimization

Examples in Python.

### Limit concurrency and throttle requests

When fetching multiple blocks, avoid sending too many requests simultaneously:

<CodeGroup>
  ```python Python theme={"system"}
  import asyncio
  import concurrent.futures
  from solana.rpc.api import Client
  import json
  import os
  import pathlib

  async def fetch_block_with_solana(executor, client, slot, semaphore):
      # Use a semaphore to limit concurrency
      async with semaphore:
          try:
              # Run the synchronous client.get_block in a thread pool
              loop = asyncio.get_running_loop()
              response = await loop.run_in_executor(
                  executor,
                  lambda: client.get_block(
                      slot,
                      encoding="json",
                      max_supported_transaction_version=0
                  )
              )

              # Solana-py returns a solders.rpc.responses.GetBlockResp object
              # Check if the value field exists (indicates success)
              if hasattr(response, 'value') and response.value is not None:
                  return slot, response.value
              else:
                  print(f"Error in response for slot {slot}")
                  return slot, None
          except Exception as e:
              print(f"Exception for slot {slot}: {e}")
              return slot, None

  def save_block_to_file(slot, block, output_dir):
      """Save block data to a JSON file"""
      os.makedirs(output_dir, exist_ok=True)

      try:
          if hasattr(block, '__dict__'):
              block_dict = block.__dict__
          else:
              block_dict = {
                  'blockhash': str(block.blockhash) if hasattr(block, 'blockhash') else None,
                  'blockHeight': block.block_height if hasattr(block, 'block_height') else None,
                  'blockTime': block.block_time if hasattr(block, 'block_time') else None,
                  'parentSlot': block.parent_slot if hasattr(block, 'parent_slot') else None,
                  'previousBlockhash': str(block.previous_blockhash) if hasattr(block, 'previous_blockhash') else None,
                  'transactions': [
                      {
                          'meta': tx.meta.__dict__ if hasattr(tx, 'meta') and hasattr(tx.meta, '__dict__') else None,
                          'transaction': str(tx.transaction) if hasattr(tx, 'transaction') else None,
                      } for tx in block.transactions if hasattr(block, 'transactions')
                  ] if hasattr(block, 'transactions') else []
              }

          # Save to file
          file_path = pathlib.Path(output_dir) / f"block_{slot}.json"
          with open(file_path, 'w') as f:
              json.dump(block_dict, f, indent=2, default=str)

          print(f"Saved block {slot} to {file_path}")
      except Exception as e:
          print(f"Error saving block {slot}: {e}")

  async def get_multiple_blocks(rpc_url, slots, max_concurrency=5, output_dir="block_data"):
      # Create a semaphore to limit concurrent requests
      semaphore = asyncio.Semaphore(max_concurrency)

      # Create the Solana client
      client = Client(rpc_url)

      # Create a thread pool executor for running synchronous code
      with concurrent.futures.ThreadPoolExecutor() as executor:
          tasks = [fetch_block_with_solana(executor, client, slot, semaphore) for slot in slots]
          results = await asyncio.gather(*tasks)

          # Process and save each block
          blocks = {}
          for slot, block in results:
              if block is not None:
                  blocks[slot] = block
                  # Save to file
                  save_block_to_file(slot, block, output_dir)

          return blocks

  # Example usage
  async def main():
      rpc_url = "CHAINSTACK_SOLANA_URL"
      slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015]

      blocks = await get_multiple_blocks(rpc_url, slots_to_fetch, max_concurrency=3)
      print(f"\nSuccessfully fetched {len(blocks)} blocks")

      for slot, block in blocks.items():
          if block:
              # Access the transactions field from the block object
              tx_count = len(block.transactions if hasattr(block, 'transactions') else [])
              print(f"Slot {slot}: {tx_count} transactions")

  # Run the async function
  if __name__ == "__main__":
      asyncio.run(main())
  ```
</CodeGroup>

As always, make sure you get your own range of blocks in `slots_to_fetch = [329849011, 329849012, 329849013, 329849014, 329849015]`.

Recommended encodings:

* For most use cases: `json` (good balance of size and parsing speed)
* For human-readable data: `jsonParsed` (larger but provides decoded instruction data)
* For binary efficiency: `base64` (efficient for storage and transmission)

### Use binary encoding for bulk requests

Same script as above, but in line 18 instead of`encoding="json"` use `encoding="base64"`.

### Error handling

Let's start with a working script and explain it below:

<CodeGroup>
  ```python Python theme={"system"}
  """
  Solana getBlock with retries implementation

  This script demonstrates how to fetch a Solana block with proper
  error handling and exponential backoff retries.
  """

  import time
  from solana.rpc.api import Client

  def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
      """Fetch a block with exponential backoff retries

      Args:
          client: Solana RPC client
          slot: The slot number to fetch
          max_retries: Maximum number of retry attempts
          backoff_factor: Multiplier for exponential backoff

      Returns:
          Block data or None if failed after retries
      """
      retry_count = 0
      base_wait_time = 1  # Start with 1 second wait

      while retry_count < max_retries:
          try:
              # Get the block response
              block_resp = client.get_block(
                  slot,
                  encoding="json",
                  max_supported_transaction_version=0
              )

              # The response is a solders.rpc.responses.GetBlockResp object
              # We need to check if it has a value (success) or error
              if hasattr(block_resp, 'value') and block_resp.value is not None:
                  return block_resp.value
              elif hasattr(block_resp, 'error'):
                  error = block_resp.error

                  # Handle specific error codes if possible
                  if hasattr(error, 'code'):
                      code = error.code
                      message = getattr(error, 'message', 'Unknown error')

                      if code == -32007:  # Block not available (purged from ledger)
                          print(f"Block {slot} not available in ledger storage")
                          return None
                      elif code == -32004:  # Slot skipped
                          print(f"Slot {slot} was skipped (no block produced)")
                          return None
                      elif code == -32603:  # Internal error
                          # This might be temporary, retry
                          print(f"Internal error, will retry: {message}")
                      elif code == 429:  # Rate limit
                          print("Rate limited, backing off significantly")
                          retry_count += 1
                          time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
                          continue
                      else:
                          print(f"RPC error code {code}: {message}")
                  else:
                      # If we can't get a specific error code
                      print(f"Error in response: {error}")
              else:
                  # Null result
                  print(f"Block {slot} returned null (not found or not confirmed)")
                  return None

          except Exception as e:
              retry_count += 1
              wait_time = base_wait_time * backoff_factor ** retry_count

              print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")

              if retry_count < max_retries:
                  time.sleep(wait_time)
              else:
                  print(f"Max retries reached for block {slot}")
                  return None

          # If we get here, there was an error but not one we want to immediately retry on
          retry_count += 1
          wait_time = base_wait_time * backoff_factor ** retry_count

          if retry_count < max_retries:
              print(f"Retrying after {wait_time}s...")
              time.sleep(wait_time)
          else:
              print(f"Max retries reached for block {slot}")
              return None

      return None  # Shouldn't reach here, but just in case

  def main():
      # Example usage
      rpc_url = "CHAINSTACK_SOLANA_URL"
      print(f"Connecting to Solana RPC: {rpc_url}")

      client = Client(rpc_url)
      slot_to_fetch = 329849011  # Example slot

      print(f"Fetching block at slot {slot_to_fetch}...")
      block = get_block_with_retries(client, slot_to_fetch)

      if block:
          # Print summary of the block
          transactions = getattr(block, 'transactions', [])

          # Safely extract signatures
          signatures = []
          for tx in transactions:
              if hasattr(tx, 'transaction') and hasattr(tx.transaction, 'signatures'):
                  if tx.transaction.signatures:
                      signatures.append(tx.transaction.signatures[0])

          print(f"\nBlock {slot_to_fetch} summary:")
          print(f"  Blockhash: {getattr(block, 'blockhash', 'unknown')}")
          print(f"  Parent Slot: {getattr(block, 'parent_slot', 'unknown')}")
          print(f"  Block Time: {getattr(block, 'block_time', 'unknown')}")
          print(f"  Block Height: {getattr(block, 'block_height', 'unknown')}")
          print(f"  Transactions: {len(transactions)}")

          if signatures:
              print(f"\nFirst few transaction signatures:")
              for sig in signatures[:3]:
                  print(f"  {sig}")
              if len(signatures) > 3:
                  print(f"  ... and {len(signatures)-3} more")
      else:
          print("Failed to fetch block")

  if __name__ == "__main__":
      main()
  ```
</CodeGroup>

#### Exponential backoff retry mechanism

<CodeGroup>
  ```python Python theme={"system"}
  def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
      retry_count = 0
      base_wait_time = 1  # Start with 1 second wait

      while retry_count < max_retries:
          # ... [request code]

          # When an error occurs:
          wait_time = base_wait_time * backoff_factor ** retry_count
          time.sleep(wait_time)
  ```
</CodeGroup>

The code implements classic exponential backoff where:

* Wait time grows exponentially with each retry attempt
* Initial wait is 1 second
* Each subsequent wait is multiplied by backoff factor with retry count
* This prevents overwhelming the server with rapid reconnection attempts

#### Differential error handling

<CodeGroup>
  ```python Python theme={"system"}
  # Handle specific error codes if possible
  if hasattr(error, 'code'):
      code = error.code
      message = getattr(error, 'message', 'Unknown error')

      if code == -32007:  # Block not available (purged from ledger)
          print(f"Block {slot} not available in ledger storage")
          return None
      elif code == -32004:  # Slot skipped
          print(f"Slot {slot} was skipped (no block produced)")
          return None
      elif code == -32603:  # Internal error
          # This might be temporary, retry
          print(f"Internal error, will retry: {message}")
      elif code == 429:  # Rate limit
          print("Rate limited, backing off significantly")
          retry_count += 1
          time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
          continue
  ```
</CodeGroup>

The script intelligently handles different error types:

* Non-retryable errors (like purged blocks) fail fast without wasting retries
* Temporary errors proceed with standard backoff
* Rate limit errors get special treatment with doubled backoff

#### Enhanced rate limit handling

<CodeGroup>
  ```python Python theme={"system"}
  elif code == 429:  # Rate limit
      print("Rate limited, backing off significantly")
      retry_count += 1
      time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
      continue
  ```
</CodeGroup>

Rate limits receive special handling:

* Doubled backoff time compared to other errors
* This helps prevent repeatedly hitting rate limits
* The multiplier (2x) helps ensure the client stays under rate limits

#### Comprehensive exception handling

<CodeGroup>
  ```python Python theme={"system"}
  try:
      # Get the block response
      block_resp = client.get_block(...)

      # Various response validation checks

  except Exception as e:
      retry_count += 1
      wait_time = base_wait_time * backoff_factor ** retry_count

      print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")

      if retry_count < max_retries:
          time.sleep(wait_time)
      else:
          print(f"Max retries reached for block {slot}")
          return None
  ```
</CodeGroup>

The script catches all exceptions including:

* Network errors
* Timeout errors
* Malformed response errors
* Client library errors

#### Response validation

<CodeGroup>
  ```python Python theme={"system"}
  if hasattr(block_resp, 'value') and block_resp.value is not None:
      return block_resp.value
  elif hasattr(block_resp, 'error'):
      error = block_resp.error
      # Error handling logic
  else:
      # Null result
      print(f"Block {slot} returned null (not found or not confirmed)")
      return None
  ```
</CodeGroup>

The code thoroughly validates responses before processing:

* Checks for valid response structure
* Handles null responses appropriately
* Verifies response has expected attributes

#### Detailed logging

<CodeGroup>
  ```python Python theme={"system"}
  print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")
  # ...
  print(f"Max retries reached for block {slot}")
  ```
</CodeGroup>

The script provides detailed logging:

* Error messages with specific error codes and descriptions
* Retry counts and wait times
* Final outcomes (success or failure)
* This aids in debugging and monitoring

#### Parameterized retry configuration

<CodeGroup>
  ```python Python theme={"system"}
  def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
  ```
</CodeGroup>

The retry mechanism is fully customizable:

* Configurable maximum retries
* Adjustable backoff factor
* This allows tuning based on network conditions or application requirements

These techniques together create a resilient implementation that gracefully handles various network issues, temporary failures, and rate limiting while providing clear feedback about what's happening during the process.

### Client-side caching

Implement a client-side cache that stores previously fetched blocks in memory, allowing applications to:

* Retrieve frequently accessed blocks without making additional RPC calls
* Track cache performance with hit/miss statistics
* Maintain a configurable maximum cache size

This should reduce unnecessary calls.

Example:

<CodeGroup>
  ```python Python theme={"system"}
  #!/usr/bin/env python3
  """
  Solana Block Cache

  A utility for efficiently fetching and caching Solana blocks to reduce RPC calls.
  """

  import time
  import json
  import requests
  from typing import Dict, Any, Optional, Union
  from solana.rpc.api import Client

  class BlockCache:
      """A simple cache for Solana blocks to reduce redundant RPC calls."""

      def __init__(self, client: Client, max_size: int = 100):
          """
          Initialize the block cache.

          Args:
              client: A Solana RPC client instance or URL string
              max_size: Maximum number of blocks to keep in cache
          """
          if isinstance(client, str):
              self.rpc_url = client
              self.client = None
          else:
              self.rpc_url = client._provider.endpoint_uri
              self.client = client

          self.session = requests.Session()
          self.session.headers.update({
              "Content-Type": "application/json",
              "Accept-Encoding": "gzip"
          })
          self.cache: Dict[str, Any] = {}
          self.max_size = max_size
          self.hits = 0
          self.misses = 0
          self.request_id = 0

      def get_block(self, slot: int, encoding: str = "json", tx_details: str = "full",
                    max_supported_transaction_version: Optional[int] = 0) -> Optional[Dict[str, Any]]:
          """
          Fetch a block, using cache if available.

          Args:
              slot: The slot number of the block to fetch
              encoding: Block encoding format ("json", "jsonParsed", "base64", etc.)
              tx_details: Transaction details level ("full", "signatures", "accounts", "none")
              max_supported_transaction_version: Maximum supported transaction version

          Returns:
              The block data or None if not found/error
          """
          cache_key = f"{slot}:{encoding}:{tx_details}"

          # Return from cache if available
          if cache_key in self.cache:
              self.hits += 1
              print(f"Cache hit for block {slot}")
              return self.cache[cache_key]

          # Fetch from RPC
          self.misses += 1
          print(f"Cache miss for block {slot}, fetching from RPC...")

          try:

              self.request_id += 1

              # Prepare parameters for the getBlock method
              params = {
                  "encoding": encoding,
                  "transactionDetails": tx_details,
                  "maxSupportedTransactionVersion": max_supported_transaction_version
              }

              # Create the JSON-RPC payload
              payload = {
                  "jsonrpc": "2.0",
                  "id": self.request_id,
                  "method": "getBlock",
                  "params": [slot, params]
              }

              # Make the HTTP request
              response = self.session.post(self.rpc_url, json=payload, timeout=30)
              block_resp = response.json()

              # Get the result
              result = block_resp.get('result')

              # Store in cache if we got a result
              if result:
                  # If cache is full, remove oldest entry
                  if len(self.cache) >= self.max_size:
                      oldest_key = next(iter(self.cache))
                      del self.cache[oldest_key]

                  self.cache[cache_key] = result

              return result
          except Exception as e:
              print(f"Error fetching block {slot}: {e}")
              return None

      def clear_cache(self) -> None:
          """Clear the entire cache."""
          self.cache = {}
          print("Cache cleared")

      def remove_from_cache(self, slot: int, encoding: str = "json", tx_details: str = "full") -> bool:
          """
          Remove a specific block from the cache.

          Returns:
              True if the block was in the cache and removed, False otherwise
          """
          cache_key = f"{slot}:{encoding}:{tx_details}"
          if cache_key in self.cache:
              del self.cache[cache_key]
              print(f"Removed block {slot} from cache")
              return True
          return False

      def get_stats(self) -> Dict[str, Union[int, float]]:
          """Get cache performance statistics."""
          total_requests = self.hits + self.misses
          hit_rate = self.hits / total_requests if total_requests > 0 else 0

          return {
              "size": len(self.cache),
              "max_size": self.max_size,
              "hits": self.hits,
              "misses": self.misses,
              "hit_rate": hit_rate,
          }

  def main():
      """Example usage of BlockCache."""

      rpc_url = "CHAINSTACK_SOLANA_URL"

      print("Initializing Solana Block Cache")
      block_cache = BlockCache(rpc_url)

      # Example: Fetch a block (cache miss)
      print("\nFetching block first time (should be cache miss):")
      block1 = block_cache.get_block(329849011)
      if block1:
          print(f"Block hash: {block1.get('blockhash')}")
          print(f"Block time: {block1.get('blockTime')}")
          print(f"Transaction count: {len(block1.get('transactions', []))}")

      # Example: Fetch the same block again (cache hit)
      print("\nFetching same block again (should be cache hit):")
      block2 = block_cache.get_block(329849011)

      # Fetch with different parameters (should be a cache miss)
      print("\nFetching same block with different parameters (should be cache miss):")
      block3 = block_cache.get_block(329849011, tx_details="signatures")

      # Print cache statistics
      print("\nCache statistics:")
      stats = block_cache.get_stats()
      for key, value in stats.items():
          if key == "hit_rate":
              print(f"{key}: {value:.2%}")
          else:
              print(f"{key}: {value}")

  if __name__ == "__main__":
      main()
  ```
</CodeGroup>

### Block ranges

Implement flexible block range handling through a range-first processing pattern — first identify all available blocks in the target range, then systematically process them in batches.

<CodeGroup>
  ```python Python theme={"system"}
  #!/usr/bin/env python3
  """
  Solana Block Range Processing

  This script demonstrates how to process ranges of Solana blocks
  efficiently with batching and proper error handling.
  """

  import time
  from solana.rpc.api import Client

  def get_block_with_retries(client, slot, max_retries=3, backoff_factor=2):
      """Fetch a block with exponential backoff retries

      Args:
          client: Solana RPC client
          slot: The slot number to fetch
          max_retries: Maximum number of retry attempts
          backoff_factor: Multiplier for exponential backoff

      Returns:
          Block data or None if failed after retries
      """
      retry_count = 0
      base_wait_time = 1  # Start with 1 second wait

      while retry_count < max_retries:
          try:
              # Get the block response
              block_resp = client.get_block(
                  slot,
                  encoding="json",
                  max_supported_transaction_version=0
              )

              # The response is a solders.rpc.responses.GetBlockResp object
              # We need to check if it has a value (success) or error
              if hasattr(block_resp, 'value') and block_resp.value is not None:
                  return block_resp.value
              elif hasattr(block_resp, 'error'):
                  error = block_resp.error

                  # Handle specific error codes if possible
                  if hasattr(error, 'code'):
                      code = error.code
                      message = getattr(error, 'message', 'Unknown error')

                      if code == -32007:  # Block not available (purged from ledger)
                          print(f"Block {slot} not available in ledger storage")
                          return None
                      elif code == -32004:  # Slot skipped
                          print(f"Slot {slot} was skipped (no block produced)")
                          return None
                      elif code == -32603:  # Internal error
                          # This might be temporary, retry
                          print(f"Internal error, will retry: {message}")
                      elif code == 429:  # Rate limit
                          print("Rate limited, backing off significantly")
                          retry_count += 1
                          time.sleep(base_wait_time * backoff_factor ** retry_count * 2)  # Extra backoff for rate limits
                          continue
                      else:
                          print(f"RPC error code {code}: {message}")
                  else:
                      # If we can't get a specific error code
                      print(f"Error in response: {error}")
              else:
                  # Null result
                  print(f"Block {slot} returned null (not found or not confirmed)")
                  return None

          except Exception as e:
              retry_count += 1
              wait_time = base_wait_time * backoff_factor ** retry_count

              print(f"Error fetching block {slot}, retry {retry_count}/{max_retries} after {wait_time}s: {e}")

              if retry_count < max_retries:
                  time.sleep(wait_time)
              else:
                  print(f"Max retries reached for block {slot}")
                  return None

          # If we get here, there was an error but not one we want to immediately retry on
          retry_count += 1
          wait_time = base_wait_time * backoff_factor ** retry_count

          if retry_count < max_retries:
              print(f"Retrying after {wait_time}s...")
              time.sleep(wait_time)
          else:
              print(f"Max retries reached for block {slot}")
              return None

      return None  # Shouldn't reach here, but just in case

  def get_block_range(client, start_slot, end_slot=None, limit=500):
      """
      Get a list of available block slots in a range.
      If end_slot is None, will use getBlocksWithLimit instead of getBlocks.

      Args:
          client: Solana RPC client
          start_slot: The starting slot number
          end_slot: The ending slot number (optional)
          limit: Maximum number of blocks to return when end_slot is None

      Returns:
          List of available block slots in the range
      """
      try:
          if end_slot is None:
              # Use getBlocksWithLimit
              blocks_resp = client.get_blocks_with_limit(start_slot, limit)
          else:
              # Use getBlocks
              blocks_resp = client.get_blocks(start_slot, end_slot)

          # Handle the response which could be an object or a dict
          if hasattr(blocks_resp, 'value'):
              # Object response
              return blocks_resp.value or []
          else:
              # Dict response
              return blocks_resp.get('result', [])

      except Exception as e:
          print(f"Error getting block range: {e}")
          return []

  def process_block_range(client, start_slot, end_slot, batch_size=10):
      """Process blocks in a range in batches

      Args:
          client: Solana RPC client
          start_slot: The starting slot number
          end_slot: The ending slot number
          batch_size: Number of blocks to process in each batch

      Returns:
          Total number of transactions processed
      """
      # First get all available block slots in the range
      print(f"Fetching block slots from {start_slot} to {end_slot}...")
      block_slots = get_block_range(client, start_slot, end_slot)
      print(f"Found {len(block_slots)} blocks to process")

      # Process in batches to avoid overwhelming the RPC node
      total_blocks = len(block_slots)
      total_transactions = 0

      for i in range(0, total_blocks, batch_size):
          batch = block_slots[i:i+batch_size]
          batch_num = i//batch_size + 1
          total_batches = (total_blocks + batch_size - 1)//batch_size
          print(f"Processing batch {batch_num}/{total_batches}...")

          # Process each block in the batch
          for slot in batch:
              block = get_block_with_retries(client, slot)
              if block:
                  # Get transaction count safely
                  transactions = getattr(block, 'transactions', [])
                  tx_count = len(transactions)
                  total_transactions += tx_count
                  print(f"  Slot {slot}: {tx_count} transactions")

              # Add a small delay between blocks to be nice to the RPC node
              time.sleep(0.1)

          # Add a delay between batches
          if i + batch_size < total_blocks:
              print(f"Waiting before next batch...")
              time.sleep(1)

      print(f"Processed {total_blocks} blocks with {total_transactions} total transactions")
      return total_transactions

  def process_recent_blocks(client, count=100):
      """Process the most recent blocks

      Args:
          client: Solana RPC client
          count: Number of recent blocks to process

      Returns:
          Total number of transactions processed
      """
      # Get the current slot
      current_slot_resp = client.get_slot()

      # Handle either object or dict response
      if hasattr(current_slot_resp, 'value'):
          # Object response
          current_slot = current_slot_resp.value
      else:
          # Dict response
          current_slot = current_slot_resp.get('result')

      if current_slot:
          # Calculate the starting slot (count blocks back)
          start_slot = max(0, current_slot - count)
          print(f"Processing {count} recent blocks from {start_slot} to {current_slot}...")
          return process_block_range(client, start_slot, current_slot)
      else:
          print("Failed to get current slot")
          return 0

  def main():
      # Example usage
      rpc_url = "CHAINSTACK_SOLANA_URL"
      print(f"Connecting to Solana RPC: {rpc_url}")

      client = Client(rpc_url)

      # Process the 20 most recent blocks
      count = 20
      print(f"Processing {count} recent blocks...")

      tx_count = process_recent_blocks(client, count=count)
      print(f"Total transactions in last {count} blocks: {tx_count}")

  if __name__ == "__main__":
      main()
  ```
</CodeGroup>

## Conclusion

Working with Solana's `getBlock` RPC method efficiently requires understanding both what data you need and how to optimize your requests. By following the best practices outlined in this guide—using compression, limiting concurrency, using block ranges, requesting only what you need, and implementing proper error handling—you can build robust applications that interact with Solana blocks effectively.

Remember these key takeaways:

1. Use the appropriate encoding and detail level for your use case — `json`, `jsonParsed`, `base58`, `base64`.
2. Always enable HTTP compression.
3. Implement client-side caching for frequently accessed blocks.
4. Use controlled concurrency and throttling for bulk operations.
5. Handle errors gracefully with retries and backoff.

<CardGroup>
  <Card title="Ake">
    <img src="https://mintcdn.com/chainstack/UN3rP7zhB69idvnC/images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg?fit=max&auto=format&n=UN3rP7zhB69idvnC&q=85&s=792a24ab1b4682406fa589c0ecd88e5d" alt="Ake" style={{width: '80px', height: '80px', borderRadius: '50%', objectFit: 'cover', display: 'block', margin: '0 auto'}} noZoom width="400" height="400" data-path="images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg" />

    <Icon icon="code" iconType="solid" /> Director of Developer Experience @ Chainstack
    <br /><Icon icon="screwdriver-wrench" iconType="solid" /> Talk to me all things Web3
    <br />20 years in technology | 8+ years in Web3 full time years experience

    <div style={{display: "flex", justifyContent: "center", gap: "12px"}}>
      <a href="https://github.com/akegaviar/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="github" iconType="brands" />
      </a>

      <a href="https://twitter.com/akegaviar" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="twitter" iconType="brands" />
      </a>

      <a href="https://www.linkedin.com/in/ake/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="linkedin" iconType="brands" />
      </a>
    </div>
  </Card>
</CardGroup>
