> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chainstack.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Migrate from Helius getTokenAccounts to Solana RPC

> Replace Helius-specific getTokenAccounts calls with standard Solana RPC methods like getTokenAccountsByOwner for provider-portable token data queries.

**TLDR:**

* Helius’ `getTokenAccounts` can be replaced with standard Solana JSON-RPC calls like `getProgramAccounts` and `getTokenAccountBalance`.
* `getProgramAccounts` with filters and pagination helps gather all token accounts for a mint, though be mindful of rate limits and data size.
* For larger tokens (like USDC), consider chunking requests and storing results in a database for performance.
* Overall, the standard RPC approach is fully portable and avoids provider lock-in.

## Main article

For starters, this is not a poke at Helius.

It's something that builders ask for and we provide a simple tutorial.

## Overview

This tutorial shows how to migrate from Helius' custom `getTokenAccounts` method to standard Solana JSON-RPC methods when switching node providers. We'll demonstrate how to achieve the same functionality using `getProgramAccounts` with proper pagination and data filtering.

## Implementation

For illustration purposes, we'll provide you with a Helius script, a standard methods script, and a sample script to print token holders using standard methods.

We'll be using the [ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx](https://solscan.io/token/ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx) token in all examples. So feel free to replace is with any other token. Although do note that tokens with higher holder count USDC will most likely need a more optimized approach.

Also be sure to check [Limits](/docs/limits) as `getProgramAccounts` is a heavy call and in general this sort of operation on Solana infrastructurally is definitely not even in the cruiserweight class.

<Check>
  ### Get your own node endpoint today

  [Start for free](https://console.chainstack.com/) and get your app to production levels immediately. No credit card required.

  You can sign up with your GitHub, X, Google, or Microsoft account.
</Check>

### Helius

Here's a simple Helius example:

Remember to replace `ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx` with your token if want.

<CodeGroup>
  ```python Python theme={"system"}
  import requests
  import logging

  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(levelname)s - %(message)s',
      handlers=[logging.StreamHandler()]
  )

  url = "HELIUS_RPC"

  def get_token_accounts(contract_address: str, batch_size: int = 1000):
      cursor = None
      while True:
          try:
              params = {
                  "limit": batch_size,
                  "mint": contract_address
              }
              if cursor:
                  params["cursor"] = cursor

              response = requests.post(
                  url,
                  headers={"Content-Type": "application/json"},
                  json={
                      "jsonrpc": "2.0",
                      "id": "helius-test",
                      "method": "getTokenAccounts",
                      "params": params
                  },
                  timeout=30
              )

              response.raise_for_status()
              data = response.json()

              if not data.get("result"):
                  logging.error("Invalid response format - missing 'result' field")
                  break

              token_accounts = data["result"].get("token_accounts", [])
              if not token_accounts:
                  logging.info("No more token accounts to process")
                  break

              for account in token_accounts:
                  logging.info(f"Account owner: {account['owner']}, Amount: {account['amount']}")

              cursor = data["result"].get("cursor")

              if not cursor:
                  logging.info("Reached end of pagination - all accounts processed")
                  break

          except requests.exceptions.RequestException as e:
              logging.error(f"API request failed: {str(e)}")
              break

  if __name__ == "__main__":
      atlas_contract_address = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
      get_token_accounts(atlas_contract_address)
  ```
</CodeGroup>

### Standard JSON-RPC example

Here's a standard one:

Remember to replace `ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx` with your token if want.

<CodeGroup>
  ```python Python theme={"system"}
  import requests
  import logging
  import time
  from base64 import b64decode

  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(levelname)s - %(message)s',
      handlers=[logging.StreamHandler()]
  )

  url = "CHAINSTACK_RPC"

  TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"

  def get_token_accounts(contract_address: str, batch_size: int = 25):
      total_processed = 0
      try:
          response = requests.post(
              url,
              headers={"Content-Type": "application/json"},
              json={
                  "jsonrpc": "2.0",
                  "id": 1,
                  "method": "getProgramAccounts",
                  "params": [
                      TOKEN_PROGRAM_ID,
                      {
                          "dataSlice": {
                              "offset": 0,
                              "length": 0
                          },
                          "filters": [
                              {
                                  "dataSize": 165
                              },
                              {
                                  "memcmp": {
                                      "offset": 0,
                                      "bytes": contract_address
                                  }
                              }
                          ]
                      }
                  ]
              },
              timeout=60
          )

          response.raise_for_status()
          data = response.json()
          total_accounts = len(data.get("result", []))
          logging.info(f"Total accounts to process: {total_accounts}")

          # Now fetch accounts in smaller batches with minimal data
          for offset in range(0, total_accounts, batch_size):
              logging.info(f"Fetching batch starting at offset {offset}")

              response = requests.post(
                  url,
                  headers={"Content-Type": "application/json"},
                  json={
                      "jsonrpc": "2.0",
                      "id": 1,
                      "method": "getProgramAccounts",
                      "params": [
                          TOKEN_PROGRAM_ID,
                          {
                              "encoding": "base64",
                              "filters": [
                                  {
                                      "dataSize": 165
                                  },
                                  {
                                      "memcmp": {
                                          "offset": 0,
                                          "bytes": contract_address
                                      }
                                  }
                              ],
                              "dataSlice": {
                                  "offset": 32,
                                  "length": 32
                              },
                              "limit": batch_size
                          }
                      ]
                  },
                  timeout=60
              )

              response.raise_for_status()
              data = response.json()

              if "result" not in data:
                  logging.error("Invalid response format - missing 'result' field")
                  break

              accounts = data["result"]
              if not accounts:
                  break

              for account in accounts:
                  try:
                      raw_data = account["account"]["data"][0]
                      owner_bytes = b64decode(raw_data)
                      owner = ''.join([f'{b:02x}' for b in owner_bytes])
                      logging.info(f"Account owner: {owner}")
                      total_processed += 1
                  except (KeyError, IndexError, TypeError) as e:
                      logging.error(f"Error processing account data: {e}")
                      continue

              logging.info(f"Processed {total_processed}/{total_accounts} accounts")

              if offset + batch_size < total_accounts:
                  time.sleep(0.5)

      except requests.exceptions.RequestException as e:
          logging.error(f"API request failed: {str(e)}")

  if __name__ == "__main__":

      atlas_token = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
      get_token_accounts(atlas_token, batch_size=25)
  ```
</CodeGroup>

### Standard JSON-RPC example with DB connection

Here's a *sample* of how you'd be using the standard [getProgramAccounts](/reference/solana-getprogramaccounts) method to monitor the holders of a token and then dumping the data to a PostgreSQL DB and keeping key-value store of accounts holding the token and the number of tokens up to date.

<CodeGroup>
  ```python Python theme={"system"}
  import requests
  from decimal import getcontext, Decimal, InvalidOperation
  import time
  from datetime import datetime
  from typing import Dict, List, Tuple, Any, Optional
  from base64 import b64decode
  import psycopg2

  # PostgreSQL connection configuration
  db_params = {
      'host': '',
      'database': '',
      'user': '',
      'password': '',
      'port': '5432'
  }

  url = "CHAINSTACK_RPC"
  getcontext().prec = 18
  TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"

  def get_token_accounts(contract_address: str, batch_size: int = 25) -> Dict[str, int]:
      """
      Gets token holders using standard Solana RPC methods and returns holder data
      """
      holder_data = {}
      total_processed = 0

      try:
          # Initial request to get total count
          response = requests.post(
              url,
              headers={"Content-Type": "application/json"},
              json={
                  "jsonrpc": "2.0",
                  "id": 1,
                  "method": "getProgramAccounts",
                  "params": [
                      TOKEN_PROGRAM_ID,
                      {
                          "dataSlice": {
                              "offset": 0,
                              "length": 0
                          },
                          "filters": [
                              {
                                  "dataSize": 165
                              },
                              {
                                  "memcmp": {
                                      "offset": 0,
                                      "bytes": contract_address
                                  }
                              }
                          ]
                      }
                  ]
              },
              timeout=60
          )

          response.raise_for_status()
          data = response.json()
          total_accounts = len(data.get("result", []))

          # Process accounts in batches
          for offset in range(0, total_accounts, batch_size):
              response = requests.post(
                  url,
                  headers={"Content-Type": "application/json"},
                  json={
                      "jsonrpc": "2.0",
                      "id": 1,
                      "method": "getProgramAccounts",
                      "params": [
                          TOKEN_PROGRAM_ID,
                          {
                              "encoding": "base64",
                              "filters": [
                                  {
                                      "dataSize": 165
                                  },
                                  {
                                      "memcmp": {
                                          "offset": 0,
                                          "bytes": contract_address
                                      }
                                  }
                              ],
                              "dataSlice": {
                                  "offset": 32,
                                  "length": 32
                              },
                              "limit": batch_size
                          }
                      ]
                  },
                  timeout=60
              )

              response.raise_for_status()
              data = response.json()

              if "result" not in data:
                  continue

              accounts = data["result"]
              if not accounts:
                  break

              for account in accounts:
                  try:
                      raw_data = account["account"]["data"][0]
                      owner_bytes = b64decode(raw_data)
                      owner = ''.join([f'{b:02x}' for b in owner_bytes])

                      # Get token balance
                      balance_response = requests.post(
                          url,
                          headers={"Content-Type": "application/json"},
                          json={
                              "jsonrpc": "2.0",
                              "id": 1,
                              "method": "getTokenAccountBalance",
                              "params": [owner]
                          },
                          timeout=30
                      )

                      balance_data = balance_response.json()
                      if "result" in balance_data and "value" in balance_data["result"]:
                          amount = int(balance_data["result"]["value"]["amount"])
                          if amount > 0:  # Only count non-zero balances
                              holder_data[owner] = amount

                      total_processed += 1

                  except (KeyError, IndexError, TypeError) as e:
                      continue

              if offset + batch_size < total_accounts:
                  time.sleep(0.5)

      except requests.exceptions.RequestException:
          pass

      return holder_data

  def update_holder_data(contract_address: str, holder_data: Dict[str, int]):
      """
      Updates the database with new holder data, tracking changes
      """
      conn = None
      try:
          conn = psycopg2.connect(**db_params)
          cursor = conn.cursor()

          # Get existing holder data
          cursor.execute("""
              SELECT holder_address, amount
              FROM token_holders
              WHERE contract_address = %s
          """, (contract_address,))

          existing_holders = {row[0]: row[1] for row in cursor.fetchall()}

          # Calculate changes
          current_time = int(time.time())
          date_time = datetime.fromtimestamp(current_time)

          # Insert new/updated holders
          for holder, amount in holder_data.items():
              if holder not in existing_holders or existing_holders[holder] != amount:
                  cursor.execute("""
                      INSERT INTO token_holders
                      (contract_address, holder_address, amount, timestamp, date_time)
                      VALUES (%s, %s, %s, %s, %s)
                      ON CONFLICT (contract_address, holder_address)
                      DO UPDATE SET amount = EXCLUDED.amount,
                                  timestamp = EXCLUDED.timestamp,
                                  date_time = EXCLUDED.date_time
                  """, (contract_address, holder, amount, current_time, date_time))

          # Record total holder count
          total_holders = len([amount for amount in holder_data.values() if amount > 0])
          cursor.execute("""
              INSERT INTO holder_counts
              (contract_address, timestamp, date_time, count)
              VALUES (%s, %s, %s, %s)
          """, (contract_address, current_time, date_time, total_holders))

          conn.commit()

      except (psycopg2.Error, Exception) as e:
          if conn:
              conn.rollback()
          raise
      finally:
          if conn:
              conn.close()

  def main():
      try:
          conn = psycopg2.connect(**db_params)
          cursor = conn.cursor()

          # Get contracts to process
          cursor.execute("""
              SELECT contract_address
              FROM contracts
              WHERE active = true
          """)

          contracts = [row[0] for row in cursor.fetchall()]

          for contract in contracts:
              holder_data = get_token_accounts(contract)
              update_holder_data(contract, holder_data)
              time.sleep(1)  # Rate limiting between contracts

      except Exception as e:
          print(f"Error in main process: {str(e)}")
      finally:
          if 'conn' in locals() and conn:
              conn.close()

  if __name__ == "__main__":
      # Example token addresses
      tokens = [
          "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx",  # ATLAS token
          # Add more token addresses here
      ]

      for token in tokens:
          try:
              holder_data = get_token_accounts(token)
              update_holder_data(token, holder_data)
              time.sleep(1)  # Rate limiting between tokens
          except Exception as e:
              print(f"Error processing token {token}: {str(e)}")


  ```
</CodeGroup>

<CardGroup>
  <Card title="Ake">
    <img src="https://mintcdn.com/chainstack/UN3rP7zhB69idvnC/images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg?fit=max&auto=format&n=UN3rP7zhB69idvnC&q=85&s=792a24ab1b4682406fa589c0ecd88e5d" alt="Ake" style={{width: '80px', height: '80px', borderRadius: '50%', objectFit: 'cover', display: 'block', margin: '0 auto'}} noZoom width="400" height="400" data-path="images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg" />

    <Icon icon="code" iconType="solid" /> Director of Developer Experience @ Chainstack
    <br /><Icon icon="screwdriver-wrench" iconType="solid" /> Talk to me all things Web3
    <br />20 years in technology | 8+ years in Web3 full time years experience

    <div style={{display: "flex", justifyContent: "center", gap: "12px"}}>
      <a href="https://github.com/akegaviar/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="github" iconType="brands" />
      </a>

      <a href="https://twitter.com/akegaviar" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="twitter" iconType="brands" />
      </a>

      <a href="https://www.linkedin.com/in/ake/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="linkedin" iconType="brands" />
      </a>
    </div>
  </Card>
</CardGroup>
