Migrating from Helius getTokenAccounts to standard Solana RPC methods

For starters, this is not a poke at Helius.

It's something that builders ask for and we provide a simple tutorial.

Overview

This tutorial shows how to migrate from Helius' custom getTokenAccounts method to standard Solana JSON-RPC methods when switching node providers. We'll demonstrate how to achieve the same functionality using getProgramAccounts with proper pagination and data filtering.

Implementation

For illustration purposes, we'll provide you with a Helius script, a standard methods script, and a sample script to print token holders using standard methods.

We'll be using the ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx token in all examples. So feel free to replace is with any other token. Although do note that tokens with higher holder count USDC will most likely need a more optimized approach.

Also be sure to check Limitsas getProgramAccounts is a heavy call and in general this sort of operation on Solana infrastructurally is definitely not even in the cruiserweight class.

Helius

Here's a simple Helius example:

Remember to replace ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx with your token if want.

import requests
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

url = "HELIUS_RPC"

def get_token_accounts(contract_address: str, batch_size: int = 1000):
    cursor = None
    while True:
        try:
            params = {
                "limit": batch_size,
                "mint": contract_address
            }
            if cursor:
                params["cursor"] = cursor

            response = requests.post(
                url,
                headers={"Content-Type": "application/json"},
                json={
                    "jsonrpc": "2.0",
                    "id": "helius-test",
                    "method": "getTokenAccounts",
                    "params": params
                },
                timeout=30
            )

            response.raise_for_status()
            data = response.json()

            if not data.get("result"):
                logging.error("Invalid response format - missing 'result' field")
                break

            token_accounts = data["result"].get("token_accounts", [])
            if not token_accounts:
                logging.info("No more token accounts to process")
                break

            for account in token_accounts:
                logging.info(f"Account owner: {account['owner']}, Amount: {account['amount']}")

            cursor = data["result"].get("cursor")

            if not cursor:
                logging.info("Reached end of pagination - all accounts processed")
                break

        except requests.exceptions.RequestException as e:
            logging.error(f"API request failed: {str(e)}")
            break

if __name__ == "__main__":
    atlas_contract_address = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
    get_token_accounts(atlas_contract_address)

Standard JSON-RPC example

Here's a standard one:

Remember to replace ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx with your token if want.

import requests
import logging
import time
from base64 import b64decode

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)


url = "CHAINSTACK_RPC"


TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"

def get_token_accounts(contract_address: str, batch_size: int = 25):
    total_processed = 0
    try:
        response = requests.post(
            url,
            headers={"Content-Type": "application/json"},
            json={
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getProgramAccounts",
                "params": [
                    TOKEN_PROGRAM_ID,
                    {
                        "dataSlice": {
                            "offset": 0,
                            "length": 0
                        },
                        "filters": [
                            {
                                "dataSize": 165
                            },
                            {
                                "memcmp": {
                                    "offset": 0,
                                    "bytes": contract_address
                                }
                            }
                        ]
                    }
                ]
            },
            timeout=60
        )

        response.raise_for_status()
        data = response.json()
        total_accounts = len(data.get("result", []))
        logging.info(f"Total accounts to process: {total_accounts}")

        # Now fetch accounts in smaller batches with minimal data
        for offset in range(0, total_accounts, batch_size):
            logging.info(f"Fetching batch starting at offset {offset}")
            
            response = requests.post(
                url,
                headers={"Content-Type": "application/json"},
                json={
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "getProgramAccounts",
                    "params": [
                        TOKEN_PROGRAM_ID,
                        {
                            "encoding": "base64",
                            "filters": [
                                {
                                    "dataSize": 165
                                },
                                {
                                    "memcmp": {
                                        "offset": 0,
                                        "bytes": contract_address
                                    }
                                }
                            ],
                            "dataSlice": {
                                "offset": 32,
                                "length": 32
                            },
                            "limit": batch_size
                        }
                    ]
                },
                timeout=60
            )

            response.raise_for_status()
            data = response.json()

            if "result" not in data:
                logging.error("Invalid response format - missing 'result' field")
                break

            accounts = data["result"]
            if not accounts:
                break

            for account in accounts:
                try:
                    raw_data = account["account"]["data"][0]
                    owner_bytes = b64decode(raw_data)
                    owner = ''.join([f'{b:02x}' for b in owner_bytes])
                    logging.info(f"Account owner: {owner}")
                    total_processed += 1
                except (KeyError, IndexError, TypeError) as e:
                    logging.error(f"Error processing account data: {e}")
                    continue

            logging.info(f"Processed {total_processed}/{total_accounts} accounts")
            
            if offset + batch_size < total_accounts:
                time.sleep(0.5)

    except requests.exceptions.RequestException as e:
        logging.error(f"API request failed: {str(e)}")

if __name__ == "__main__":

    atlas_token = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"  
    get_token_accounts(atlas_token, batch_size=25)

Standard JSON-RPC example with DB connection

Here's a sample of how you'd be using the standard getProgramAccounts method to monitor the holders of a token and then dumping the data to a PostgreSQL DB and keeping key-value store of accounts holding the token and the number of tokens up to date.

import requests
from decimal import getcontext, Decimal, InvalidOperation
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any, Optional
from base64 import b64decode
import psycopg2

# PostgreSQL connection configuration
db_params = {
    'host': '',
    'database': '',
    'user': '',
    'password': '',
    'port': '5432'
}

url = "CHAINSTACK_RPC"
getcontext().prec = 18
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"

def get_token_accounts(contract_address: str, batch_size: int = 25) -> Dict[str, int]:
    """
    Gets token holders using standard Solana RPC methods and returns holder data
    """
    holder_data = {}
    total_processed = 0
    
    try:
        # Initial request to get total count
        response = requests.post(
            url,
            headers={"Content-Type": "application/json"},
            json={
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getProgramAccounts",
                "params": [
                    TOKEN_PROGRAM_ID,
                    {
                        "dataSlice": {
                            "offset": 0,
                            "length": 0
                        },
                        "filters": [
                            {
                                "dataSize": 165
                            },
                            {
                                "memcmp": {
                                    "offset": 0,
                                    "bytes": contract_address
                                }
                            }
                        ]
                    }
                ]
            },
            timeout=60
        )

        response.raise_for_status()
        data = response.json()
        total_accounts = len(data.get("result", []))

        # Process accounts in batches
        for offset in range(0, total_accounts, batch_size):
            response = requests.post(
                url,
                headers={"Content-Type": "application/json"},
                json={
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "getProgramAccounts",
                    "params": [
                        TOKEN_PROGRAM_ID,
                        {
                            "encoding": "base64",
                            "filters": [
                                {
                                    "dataSize": 165
                                },
                                {
                                    "memcmp": {
                                        "offset": 0,
                                        "bytes": contract_address
                                    }
                                }
                            ],
                            "dataSlice": {
                                "offset": 32,
                                "length": 32
                            },
                            "limit": batch_size
                        }
                    ]
                },
                timeout=60
            )

            response.raise_for_status()
            data = response.json()
            
            if "result" not in data:
                continue

            accounts = data["result"]
            if not accounts:
                break

            for account in accounts:
                try:
                    raw_data = account["account"]["data"][0]
                    owner_bytes = b64decode(raw_data)
                    owner = ''.join([f'{b:02x}' for b in owner_bytes])
                    
                    # Get token balance
                    balance_response = requests.post(
                        url,
                        headers={"Content-Type": "application/json"},
                        json={
                            "jsonrpc": "2.0",
                            "id": 1,
                            "method": "getTokenAccountBalance",
                            "params": [owner]
                        },
                        timeout=30
                    )
                    
                    balance_data = balance_response.json()
                    if "result" in balance_data and "value" in balance_data["result"]:
                        amount = int(balance_data["result"]["value"]["amount"])
                        if amount > 0:  # Only count non-zero balances
                            holder_data[owner] = amount
                    
                    total_processed += 1
                    
                except (KeyError, IndexError, TypeError) as e:
                    continue

            if offset + batch_size < total_accounts:
                time.sleep(0.5)

    except requests.exceptions.RequestException:
        pass

    return holder_data

def update_holder_data(contract_address: str, holder_data: Dict[str, int]):
    """
    Updates the database with new holder data, tracking changes
    """
    conn = None
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        # Get existing holder data
        cursor.execute("""
            SELECT holder_address, amount 
            FROM token_holders 
            WHERE contract_address = %s
        """, (contract_address,))
        
        existing_holders = {row[0]: row[1] for row in cursor.fetchall()}
        
        # Calculate changes
        current_time = int(time.time())
        date_time = datetime.fromtimestamp(current_time)
        
        # Insert new/updated holders
        for holder, amount in holder_data.items():
            if holder not in existing_holders or existing_holders[holder] != amount:
                cursor.execute("""
                    INSERT INTO token_holders 
                    (contract_address, holder_address, amount, timestamp, date_time)
                    VALUES (%s, %s, %s, %s, %s)
                    ON CONFLICT (contract_address, holder_address) 
                    DO UPDATE SET amount = EXCLUDED.amount,
                                timestamp = EXCLUDED.timestamp,
                                date_time = EXCLUDED.date_time
                """, (contract_address, holder, amount, current_time, date_time))
        
        # Record total holder count
        total_holders = len([amount for amount in holder_data.values() if amount > 0])
        cursor.execute("""
            INSERT INTO holder_counts 
            (contract_address, timestamp, date_time, count)
            VALUES (%s, %s, %s, %s)
        """, (contract_address, current_time, date_time, total_holders))
        
        conn.commit()
        
    except (psycopg2.Error, Exception) as e:
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            conn.close()

def main():
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()
        
        # Get contracts to process
        cursor.execute("""
            SELECT contract_address 
            FROM contracts 
            WHERE active = true
        """)
        
        contracts = [row[0] for row in cursor.fetchall()]
        
        for contract in contracts:
            holder_data = get_token_accounts(contract)
            update_holder_data(contract, holder_data)
            time.sleep(1)  # Rate limiting between contracts
            
    except Exception as e:
        print(f"Error in main process: {str(e)}")
    finally:
        if 'conn' in locals() and conn:
            conn.close()

if __name__ == "__main__":
    # Example token addresses
    tokens = [
        "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx",  # ATLAS token
        # Add more token addresses here
    ]
    
    for token in tokens:
        try:
            holder_data = get_token_accounts(token)
            update_holder_data(token, holder_data)
            time.sleep(1)  # Rate limiting between tokens
        except Exception as e:
            print(f"Error processing token {token}: {str(e)}")
        
        

Ake

🛠️ Developer Experience Director @ Chainstack
💸 Talk to me all things Web3 infrastructure and I'll save you the costs
Ake | Warpcast Ake | GitHub Ake | Twitter Ake | LinkedIN