import requests
from decimal import getcontext, Decimal, InvalidOperation
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any, Optional
from base64 import b64decode
import psycopg2
# PostgreSQL connection configuration
db_params = {
'host': '',
'database': '',
'user': '',
'password': '',
'port': '5432'
}
url = "CHAINSTACK_RPC"
getcontext().prec = 18
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
def get_token_accounts(contract_address: str, batch_size: int = 25) -> Dict[str, int]:
"""
Gets token holders using standard Solana RPC methods and returns holder data
"""
holder_data = {}
total_processed = 0
try:
# Initial request to get total count
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"dataSlice": {
"offset": 0,
"length": 0
},
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
]
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
total_accounts = len(data.get("result", []))
# Process accounts in batches
for offset in range(0, total_accounts, batch_size):
response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getProgramAccounts",
"params": [
TOKEN_PROGRAM_ID,
{
"encoding": "base64",
"filters": [
{
"dataSize": 165
},
{
"memcmp": {
"offset": 0,
"bytes": contract_address
}
}
],
"dataSlice": {
"offset": 32,
"length": 32
},
"limit": batch_size
}
]
},
timeout=60
)
response.raise_for_status()
data = response.json()
if "result" not in data:
continue
accounts = data["result"]
if not accounts:
break
for account in accounts:
try:
raw_data = account["account"]["data"][0]
owner_bytes = b64decode(raw_data)
owner = ''.join([f'{b:02x}' for b in owner_bytes])
# Get token balance
balance_response = requests.post(
url,
headers={"Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountBalance",
"params": [owner]
},
timeout=30
)
balance_data = balance_response.json()
if "result" in balance_data and "value" in balance_data["result"]:
amount = int(balance_data["result"]["value"]["amount"])
if amount > 0: # Only count non-zero balances
holder_data[owner] = amount
total_processed += 1
except (KeyError, IndexError, TypeError) as e:
continue
if offset + batch_size < total_accounts:
time.sleep(0.5)
except requests.exceptions.RequestException:
pass
return holder_data
def update_holder_data(contract_address: str, holder_data: Dict[str, int]):
"""
Updates the database with new holder data, tracking changes
"""
conn = None
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get existing holder data
cursor.execute("""
SELECT holder_address, amount
FROM token_holders
WHERE contract_address = %s
""", (contract_address,))
existing_holders = {row[0]: row[1] for row in cursor.fetchall()}
# Calculate changes
current_time = int(time.time())
date_time = datetime.fromtimestamp(current_time)
# Insert new/updated holders
for holder, amount in holder_data.items():
if holder not in existing_holders or existing_holders[holder] != amount:
cursor.execute("""
INSERT INTO token_holders
(contract_address, holder_address, amount, timestamp, date_time)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (contract_address, holder_address)
DO UPDATE SET amount = EXCLUDED.amount,
timestamp = EXCLUDED.timestamp,
date_time = EXCLUDED.date_time
""", (contract_address, holder, amount, current_time, date_time))
# Record total holder count
total_holders = len([amount for amount in holder_data.values() if amount > 0])
cursor.execute("""
INSERT INTO holder_counts
(contract_address, timestamp, date_time, count)
VALUES (%s, %s, %s, %s)
""", (contract_address, current_time, date_time, total_holders))
conn.commit()
except (psycopg2.Error, Exception) as e:
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
def main():
try:
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()
# Get contracts to process
cursor.execute("""
SELECT contract_address
FROM contracts
WHERE active = true
""")
contracts = [row[0] for row in cursor.fetchall()]
for contract in contracts:
holder_data = get_token_accounts(contract)
update_holder_data(contract, holder_data)
time.sleep(1) # Rate limiting between contracts
except Exception as e:
print(f"Error in main process: {str(e)}")
finally:
if 'conn' in locals() and conn:
conn.close()
if __name__ == "__main__":
# Example token addresses
tokens = [
"ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx", # ATLAS token
# Add more token addresses here
]
for token in tokens:
try:
holder_data = get_token_accounts(token)
update_holder_data(token, holder_data)
time.sleep(1) # Rate limiting between tokens
except Exception as e:
print(f"Error processing token {token}: {str(e)}")