Migrating from Helius getTokenAccounts to standard Solana RPC methods
For starters, this is not a poke at Helius.
It's something that builders ask for and we provide a simple tutorial.
Overview
This tutorial shows how to migrate from Helius' custom getTokenAccounts
method to standard Solana JSON-RPC methods when switching node providers. We'll demonstrate how to achieve the same functionality using getProgramAccounts
with proper pagination and data filtering.
Implementation
For illustration purposes, we'll provide you with a Helius script, a standard methods script, and a sample script to print token holders using standard methods.
We'll be using the ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx token in all examples. So feel free to replace is with any other token. Although do note that tokens with higher holder count USDC will most likely need a more optimized approach.
Also be sure to check Limitsas getProgramAccounts
is a heavy call and in general this sort of operation on Solana infrastructurally is definitely not even in the cruiserweight class.
Helius
Here's a simple Helius example:
Remember to replace ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx
with your token if want.
import requests
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
url = "HELIUS_RPC"
def get_token_accounts(contract_address: str, batch_size: int = 1000):
cursor = None
while True:
try:
params = {
"limit": batch_size,
"mint": contract_address
}
if cursor:
params["cursor"] = cursor
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": "helius-test",
"method": "getTokenAccounts",
"params": params
},
timeout=30
)
response.raise_for_status()
data = response.json()
if not data.get("result"):
logging.error("Invalid response format - missing 'result' field")
break
token_accounts = data["result"].get("token_accounts", [])
if not token_accounts:
logging.info("No more token accounts to process")
break
for account in token_accounts:
logging.info(f"Account owner: {account['owner']}, Amount: {account['amount']}")
cursor = data["result"].get("cursor")
if not cursor:
logging.info("Reached end of pagination - all accounts processed")
break
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {str(e)}")
break
if __name__ == "__main__":
atlas_contract_address = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
get_token_accounts(atlas_contract_address)
Standard JSON-RPC example
Here's a standard one:
Remember to replace ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx
with your token if want.
import requests
import logging
import time
from base64 import b64decode
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
url = "CHAINSTACK_RPC"
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
def get_token_accounts(contract_address: str, batch_size: int = 25):
total_processed = 0
try:
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"dataSlice": {
"offset": 0,
"length": 0
},
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
]
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
total_accounts = len(data.get("result", []))
logging.info(f"Total accounts to process: {total_accounts}")
# Now fetch accounts in smaller batches with minimal data
for offset in range(0, total_accounts, batch_size):
logging.info(f"Fetching batch starting at offset {offset}")
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"encoding": "base64",
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
],
"dataSlice": {
"offset": 32,
"length": 32
},
"limit": batch_size
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
if "result" not in data:
logging.error("Invalid response format - missing 'result' field")
break
accounts = data["result"]
if not accounts:
break
for account in accounts:
try:
raw_data = account["account"]["data"][0]
owner_bytes = b64decode(raw_data)
owner = ''.join([f'{b:02x}' for b in owner_bytes])
logging.info(f"Account owner: {owner}")
total_processed += 1
except (KeyError, IndexError, TypeError) as e:
logging.error(f"Error processing account data: {e}")
continue
logging.info(f"Processed {total_processed}/{total_accounts} accounts")
if offset + batch_size < total_accounts:
time.sleep(0.5)
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {str(e)}")
if __name__ == "__main__":
atlas_token = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
get_token_accounts(atlas_token, batch_size=25)
Standard JSON-RPC example with DB connection
Here's a sample of how you'd be using the standard getProgramAccounts method to monitor the holders of a token and then dumping the data to a PostgreSQL DB and keeping key-value store of accounts holding the token and the number of tokens up to date.
import requests
from decimal import getcontext, Decimal, InvalidOperation
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any, Optional
from base64 import b64decode
import psycopg2
# PostgreSQL connection configuration
db_params = {
'host': '',
'database': '',
'user': '',
'password': '',
'port': '5432'
}
url = "CHAINSTACK_RPC"
getcontext().prec = 18
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
def get_token_accounts(contract_address: str, batch_size: int = 25) -> Dict[str, int]:
"""
Gets token holders using standard Solana RPC methods and returns holder data
"""
holder_data = {}
total_processed = 0
try:
# Initial request to get total count
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"dataSlice": {
"offset": 0,
"length": 0
},
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
]
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
total_accounts = len(data.get("result", []))
# Process accounts in batches
for offset in range(0, total_accounts, batch_size):
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"encoding": "base64",
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
],
"dataSlice": {
"offset": 32,
"length": 32
},
"limit": batch_size
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
if "result" not in data:
continue
accounts = data["result"]
if not accounts:
break
for account in accounts:
try:
raw_data = account["account"]["data"][0]
owner_bytes = b64decode(raw_data)
owner = ''.join([f'{b:02x}' for b in owner_bytes])
# Get token balance
balance_response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountBalance",
"params": [owner]
},
timeout=30
)
balance_data = balance_response.json()
if "result" in balance_data and "value" in balance_data["result"]:
amount = int(balance_data["result"]["value"]["amount"])
if amount > 0: # Only count non-zero balances
holder_data[owner] = amount
total_processed += 1
except (KeyError, IndexError, TypeError) as e:
continue
if offset + batch_size < total_accounts:
time.sleep(0.5)
except requests.exceptions.RequestException:
pass
return holder_data
def update_holder_data(contract_address: str, holder_data: Dict[str, int]):
"""
Updates the database with new holder data, tracking changes
"""
conn = None
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get existing holder data
cursor.execute("""
SELECT holder_address, amount
FROM token_holders
WHERE contract_address = %s
""", (contract_address,))
existing_holders = {row[0]: row[1] for row in cursor.fetchall()}
# Calculate changes
current_time = int(time.time())
date_time = datetime.fromtimestamp(current_time)
# Insert new/updated holders
for holder, amount in holder_data.items():
if holder not in existing_holders or existing_holders[holder] != amount:
cursor.execute("""
INSERT INTO token_holders
(contract_address, holder_address, amount, timestamp, date_time)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (contract_address, holder_address)
DO UPDATE SET amount = EXCLUDED.amount,
timestamp = EXCLUDED.timestamp,
date_time = EXCLUDED.date_time
""", (contract_address, holder, amount, current_time, date_time))
# Record total holder count
total_holders = len([amount for amount in holder_data.values() if amount > 0])
cursor.execute("""
INSERT INTO holder_counts
(contract_address, timestamp, date_time, count)
VALUES (%s, %s, %s, %s)
""", (contract_address, current_time, date_time, total_holders))
conn.commit()
except (psycopg2.Error, Exception) as e:
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
def main():
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get contracts to process
cursor.execute("""
SELECT contract_address
FROM contracts
WHERE active = true
""")
contracts = [row[0] for row in cursor.fetchall()]
for contract in contracts:
holder_data = get_token_accounts(contract)
update_holder_data(contract, holder_data)
time.sleep(1) # Rate limiting between contracts
except Exception as e:
print(f"Error in main process: {str(e)}")
finally:
if 'conn' in locals() and conn:
conn.close()
if __name__ == "__main__":
# Example token addresses
tokens = [
"ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx", # ATLAS token
# Add more token addresses here
]
for token in tokens:
try:
holder_data = get_token_accounts(token)
update_holder_data(token, holder_data)
time.sleep(1) # Rate limiting between tokens
except Exception as e:
print(f"Error processing token {token}: {str(e)}")
Updated about 10 hours ago