getTokenAccounts
can be replaced with standard Solana JSON-RPC calls like getProgramAccounts
and getTokenAccountBalance
.getProgramAccounts
with filters and pagination helps gather all token accounts for a mint, though be mindful of rate limits and data size.getTokenAccounts
method to standard Solana JSON-RPC methods when switching node providers. We’ll demonstrate how to achieve the same functionality using getProgramAccounts
with proper pagination and data filtering.
getProgramAccounts
is a heavy call and in general this sort of operation on Solana infrastructurally is definitely not even in the cruiserweight class.
ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx
with your token if want.
import requests
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
url = "HELIUS_RPC"
def get_token_accounts(contract_address: str, batch_size: int = 1000):
cursor = None
while True:
try:
params = {
"limit": batch_size,
"mint": contract_address
}
if cursor:
params["cursor"] = cursor
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": "helius-test",
"method": "getTokenAccounts",
"params": params
},
timeout=30
)
response.raise_for_status()
data = response.json()
if not data.get("result"):
logging.error("Invalid response format - missing 'result' field")
break
token_accounts = data["result"].get("token_accounts", [])
if not token_accounts:
logging.info("No more token accounts to process")
break
for account in token_accounts:
logging.info(f"Account owner: {account['owner']}, Amount: {account['amount']}")
cursor = data["result"].get("cursor")
if not cursor:
logging.info("Reached end of pagination - all accounts processed")
break
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {str(e)}")
break
if __name__ == "__main__":
atlas_contract_address = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
get_token_accounts(atlas_contract_address)
ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx
with your token if want.
import requests
import logging
import time
from base64 import b64decode
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
url = "CHAINSTACK_RPC"
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
def get_token_accounts(contract_address: str, batch_size: int = 25):
total_processed = 0
try:
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"dataSlice": {
"offset": 0,
"length": 0
},
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
]
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
total_accounts = len(data.get("result", []))
logging.info(f"Total accounts to process: {total_accounts}")
# Now fetch accounts in smaller batches with minimal data
for offset in range(0, total_accounts, batch_size):
logging.info(f"Fetching batch starting at offset {offset}")
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"encoding": "base64",
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
],
"dataSlice": {
"offset": 32,
"length": 32
},
"limit": batch_size
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
if "result" not in data:
logging.error("Invalid response format - missing 'result' field")
break
accounts = data["result"]
if not accounts:
break
for account in accounts:
try:
raw_data = account["account"]["data"][0]
owner_bytes = b64decode(raw_data)
owner = ''.join([f'{b:02x}' for b in owner_bytes])
logging.info(f"Account owner: {owner}")
total_processed += 1
except (KeyError, IndexError, TypeError) as e:
logging.error(f"Error processing account data: {e}")
continue
logging.info(f"Processed {total_processed}/{total_accounts} accounts")
if offset + batch_size < total_accounts:
time.sleep(0.5)
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {str(e)}")
if __name__ == "__main__":
atlas_token = "ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx"
get_token_accounts(atlas_token, batch_size=25)
import requests
from decimal import getcontext, Decimal, InvalidOperation
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any, Optional
from base64 import b64decode
import psycopg2
# PostgreSQL connection configuration
db_params = {
'host': '',
'database': '',
'user': '',
'password': '',
'port': '5432'
}
url = "CHAINSTACK_RPC"
getcontext().prec = 18
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
def get_token_accounts(contract_address: str, batch_size: int = 25) -> Dict[str, int]:
"""
Gets token holders using standard Solana RPC methods and returns holder data
"""
holder_data = {}
total_processed = 0
try:
# Initial request to get total count
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"dataSlice": {
"offset": 0,
"length": 0
},
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
]
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
total_accounts = len(data.get("result", []))
# Process accounts in batches
for offset in range(0, total_accounts, batch_size):
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"encoding": "base64",
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
],
"dataSlice": {
"offset": 32,
"length": 32
},
"limit": batch_size
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
if "result" not in data:
continue
accounts = data["result"]
if not accounts:
break
for account in accounts:
try:
raw_data = account["account"]["data"][0]
owner_bytes = b64decode(raw_data)
owner = ''.join([f'{b:02x}' for b in owner_bytes])
# Get token balance
balance_response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountBalance",
"params": [owner]
},
timeout=30
)
balance_data = balance_response.json()
if "result" in balance_data and "value" in balance_data["result"]:
amount = int(balance_data["result"]["value"]["amount"])
if amount > 0: # Only count non-zero balances
holder_data[owner] = amount
total_processed += 1
except (KeyError, IndexError, TypeError) as e:
continue
if offset + batch_size < total_accounts:
time.sleep(0.5)
except requests.exceptions.RequestException:
pass
return holder_data
def update_holder_data(contract_address: str, holder_data: Dict[str, int]):
"""
Updates the database with new holder data, tracking changes
"""
conn = None
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get existing holder data
cursor.execute("""
SELECT holder_address, amount
FROM token_holders
WHERE contract_address = %s
""", (contract_address,))
existing_holders = {row[0]: row[1] for row in cursor.fetchall()}
# Calculate changes
current_time = int(time.time())
date_time = datetime.fromtimestamp(current_time)
# Insert new/updated holders
for holder, amount in holder_data.items():
if holder not in existing_holders or existing_holders[holder] != amount:
cursor.execute("""
INSERT INTO token_holders
(contract_address, holder_address, amount, timestamp, date_time)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (contract_address, holder_address)
DO UPDATE SET amount = EXCLUDED.amount,
timestamp = EXCLUDED.timestamp,
date_time = EXCLUDED.date_time
""", (contract_address, holder, amount, current_time, date_time))
# Record total holder count
total_holders = len([amount for amount in holder_data.values() if amount > 0])
cursor.execute("""
INSERT INTO holder_counts
(contract_address, timestamp, date_time, count)
VALUES (%s, %s, %s, %s)
""", (contract_address, current_time, date_time, total_holders))
conn.commit()
except (psycopg2.Error, Exception) as e:
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
def main():
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get contracts to process
cursor.execute("""
SELECT contract_address
FROM contracts
WHERE active = true
""")
contracts = [row[0] for row in cursor.fetchall()]
for contract in contracts:
holder_data = get_token_accounts(contract)
update_holder_data(contract, holder_data)
time.sleep(1) # Rate limiting between contracts
except Exception as e:
print(f"Error in main process: {str(e)}")
finally:
if 'conn' in locals() and conn:
conn.close()
if __name__ == "__main__":
# Example token addresses
tokens = [
"ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx", # ATLAS token
# Add more token addresses here
]
for token in tokens:
try:
holder_data = get_token_accounts(token)
update_holder_data(token, holder_data)
time.sleep(1) # Rate limiting between tokens
except Exception as e:
print(f"Error processing token {token}: {str(e)}")