Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.chainstack.com/llms.txt

Use this file to discover all available pages before exploring further.

TLDR:
  • The Solana Subscriptions & Allowances program emits its events (subscription created, cancelled, resumed, and the three transfer types) as Anchor-compatible self-CPI inner instructions — not log lines — tagged with the 8-byte e445a52e51cb9a1d prefix plus a 1-byte event type.
  • Decoding those events gives you off-chain billing state — revenue per plan, active vs. cancelled subscribers, every pull and its amount — without scanning account state.
  • You can pull events from any transaction over standard RPC, or stream them in real time and low latency through a Chainstack Yellowstone gRPC (Geyser) node.
  • This guide gives you a complete Python decoder for all six event types, plus a live Yellowstone gRPC listener, validated against live mainnet activity.

Why index the events

If you build anything on the Subscriptions & Allowances program — a SaaS billing backend, a creator-payouts dashboard, a payment gateway — you need an off-chain view of what happened: who subscribed, who cancelled, which pulls succeeded, and how much revenue each plan produced. The program makes this clean because it emits a structured event on every state change. You don’t reconstruct billing history by diffing account snapshots; you read the events. The question is only how you get them — by fetching a transaction over RPC, or by streaming them live through Geyser. We’ll do both, on top of one shared decoder.

How the program emits events

Most Solana programs that emit events either write base64 blobs into program logs or, like Anchor, emit them through a self-CPI: the program calls its own no-op instruction, and the event payload rides along as that inner instruction’s data. The Subscriptions program uses the self-CPI approach, which is more reliable than log parsing (logs get truncated; inner instructions don’t). Each event’s inner-instruction data is laid out as:
[0:8]  e445a52e51cb9a1d   # Sha256("anchor:event")[:8], little-endian — the same tag Anchor uses
[8]    event type         # 1 byte (see table)
[9:]   payload            # fixed C-packed, little-endian fields
The program emits six event types:
TypeBytePayload fields
SubscriptionCreated0plan, subscriber, mint, created_ts
SubscriptionCancelled1plan, subscriber, expires_at_ts
SubscriptionTransfer2subscription, plan, delegator, mint, amount, period_start_ts, period_end_ts, amount_pulled_in_period, receiver
FixedTransfer3delegation, delegator, delegatee, mint, amount, remaining_amount, receiver
RecurringTransfer4delegation, delegator, delegatee, mint, amount, period_start_ts, period_end_ts, amount_pulled_in_period, receiver
SubscriptionResumed5plan, subscriber, resumed_ts
All pubkeys are 32 raw bytes; all integers are little-endian (u64 for amounts, i64 for timestamps). To find these events in a transaction, you look at its inner instructions, keep the ones whose program is the subscriptions program, and check whether the data starts with the tag.
SubscriptionTransfer (model: subscription plan), FixedTransfer (allowance), and RecurringTransfer (recurring delegation) are the three “money moved” events — the ones you sum for revenue. Created, Cancelled, and Resumed track the subscription lifecycle for churn.

Prerequisites

  • Python 3.10+
  • pip install solders base58 requests for the decoder and RPC path
  • pip install grpcio grpcio-tools for the Geyser streaming path
  • A Chainstack Solana node — any node for the RPC path; a Trader or dedicated node with a Yellowstone gRPC endpoint for streaming

Run Solana mainnet and devnet nodes on Chainstack

Start for free and get your app to production levels immediately. No credit card required. You can sign up with your GitHub, X, Google, or Microsoft account.

The decoder

This is the shared core — it turns a raw inner-instruction byte string into a typed event dict. Both the RPC and the Geyser paths feed bytes into decode_event.
# events.py
from solders.pubkey import Pubkey

PROGRAM_ID = "De1egAFMkMWZSN5rYXRj9CAdheBamobVNubTsi9avR44"
EVENT_TAG = bytes.fromhex("e445a52e51cb9a1d")  # Sha256("anchor:event")[:8], little-endian

EVENT_NAMES = {0: "SubscriptionCreated", 1: "SubscriptionCancelled", 2: "SubscriptionTransfer",
               3: "FixedTransfer", 4: "RecurringTransfer", 5: "SubscriptionResumed"}


def _pk(b):  # 32 raw bytes -> base58 string
    return str(Pubkey.from_bytes(b))

def _u64(b):
    return int.from_bytes(b, "little")

def _i64(b):
    return int.from_bytes(b, "little", signed=True)


def decode_event(data: bytes):
    """data = full inner-instruction data: 8-byte tag + 1-byte type + payload."""
    if len(data) < 9 or data[:8] != EVENT_TAG:
        return None
    disc = data[8]
    p = data[9:]
    name = EVENT_NAMES.get(disc, f"Unknown({disc})")
    if disc == 0:  # SubscriptionCreated
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "mint": _pk(p[64:96]), "created_ts": _i64(p[96:104])}
    if disc == 1:  # SubscriptionCancelled
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "expires_at_ts": _i64(p[64:72])}
    if disc in (2, 4):  # SubscriptionTransfer / RecurringTransfer (identical shape)
        first = "subscription" if disc == 2 else "delegation"
        second = "plan" if disc == 2 else "delegator"
        third = "delegator" if disc == 2 else "delegatee"
        return {"event": name, first: _pk(p[0:32]), second: _pk(p[32:64]),
                third: _pk(p[64:96]), "mint": _pk(p[96:128]), "amount": _u64(p[128:136]),
                "period_start_ts": _i64(p[136:144]), "period_end_ts": _i64(p[144:152]),
                "amount_pulled_in_period": _u64(p[152:160]), "receiver": _pk(p[160:192])}
    if disc == 3:  # FixedTransfer
        return {"event": name, "delegation": _pk(p[0:32]), "delegator": _pk(p[32:64]),
                "delegatee": _pk(p[64:96]), "mint": _pk(p[96:128]), "amount": _u64(p[128:136]),
                "remaining_amount": _u64(p[136:144]), "receiver": _pk(p[144:176])}
    if disc == 5:  # SubscriptionResumed
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "resumed_ts": _i64(p[64:72])}
    return {"event": name, "raw_len": len(p)}

Option A: pull events from a transaction over RPC

For backfills, webhooks, or a quick look at one transaction, fetch it with getTransaction and walk its inner instructions. In jsonParsed encoding, an inner instruction for a program the parser doesn’t recognize comes back with a base58 data field and a programId.
import base58
import requests
from events import PROGRAM_ID, decode_event

CHAINSTACK_RPC = "YOUR_CHAINSTACK_SOLANA_ENDPOINT"


def events_in_tx(signature: str):
    body = {"jsonrpc": "2.0", "id": 1, "method": "getTransaction",
            "params": [signature, {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}]}
    result = requests.post(CHAINSTACK_RPC, json=body, timeout=30).json()["result"]
    events = []
    for group in (result["meta"].get("innerInstructions") or []):
        for ix in group["instructions"]:
            if ix.get("programId") != PROGRAM_ID:
                continue
            ev = decode_event(base58.b58decode(ix["data"]))
            if ev:
                events.append(ev)
    return events


if __name__ == "__main__":
    for ev in events_in_tx("PASTE_A_SUBSCRIPTION_TX_SIGNATURE"):
        print(ev)
Run it against a collect transaction and you get the decoded transfer:
{'event': 'SubscriptionTransfer', 'subscription': 'BCnjQmw2…', 'plan': '6zNfyVQK…',
 'delegator': '9VUegdUs…', 'mint': 'EPjFWdd5…', 'amount': 1000000,
 'period_start_ts': 1780447663, 'period_end_ts': 1780534063,
 'amount_pulled_in_period': 1000000, 'receiver': 'GGFFGrDR…'}
period_end_ts - period_start_ts is exactly 86400 here — a 24-hour billing period — and amount is the tokens pulled this cycle. That’s a billing row, straight off the chain. RPC polling is fine for backfills, but for a live dashboard you don’t want to poll getSignaturesForAddress in a loop. That’s what Geyser is for.

Option B: stream events live with Yellowstone gRPC

Chainstack’s Yellowstone gRPC (Geyser) endpoint pushes transactions to you as they’re confirmed. You subscribe once to transactions that mention the program, and decode events from each one with the same decode_event.
1

Generate the Geyser client stubs

Fetch the Yellowstone proto files and generate Python stubs.
mkdir proto
curl -s -o proto/geyser.proto \
  https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
curl -s -o proto/solana-storage.proto \
  https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/solana-storage.proto

python -m grpc_tools.protoc -Iproto \
  --python_out=. --grpc_python_out=. \
  proto/geyser.proto proto/solana-storage.proto
This writes geyser_pb2.py, geyser_pb2_grpc.py, and the solana_storage_pb2.py it depends on into your working directory.
2

Connect, subscribe, and decode

Chainstack Yellowstone endpoints use TLS and an x-token passed as gRPC metadata. Get both the gRPC endpoint and the token from your node’s details in the Chainstack console.The one subtlety on the streaming side: an inner instruction references its program by index into the transaction’s account keys, and for versioned transactions that list is message.account_keys plus the addresses loaded from lookup tables. Resolve the full list before checking the program.
# stream.py
import asyncio
import grpc
from solders.pubkey import Pubkey

import geyser_pb2
import geyser_pb2_grpc
from events import PROGRAM_ID, decode_event

GRPC_ENDPOINT = "YOUR_NODE.core.chainstack.com:443"
X_TOKEN = "YOUR_X_TOKEN"
PROGRAM_ID_BYTES = bytes(Pubkey.from_string(PROGRAM_ID))


def make_channel():
    auth = grpc.metadata_call_credentials(
        lambda ctx, cb: cb((("x-token", X_TOKEN),), None)
    )
    creds = grpc.composite_channel_credentials(grpc.ssl_channel_credentials(), auth)
    return grpc.aio.secure_channel(GRPC_ENDPOINT, creds)


def build_request():
    req = geyser_pb2.SubscribeRequest()
    f = req.transactions["subs"]
    f.account_include.append(PROGRAM_ID)
    f.vote = False
    f.failed = False
    req.commitment = geyser_pb2.CONFIRMED
    return req


def decode_tx(info):
    # Resolve account keys, including addresses loaded from lookup tables.
    keys = list(info.transaction.message.account_keys)
    keys += list(info.meta.loaded_writable_addresses)
    keys += list(info.meta.loaded_readonly_addresses)
    for group in info.meta.inner_instructions:
        for ix in group.instructions:
            if keys[ix.program_id_index] != PROGRAM_ID_BYTES:
                continue
            ev = decode_event(bytes(ix.data))
            if ev:
                yield ev


async def main():
    async def requests():
        yield build_request()
        while True:
            await asyncio.sleep(3600)  # keep the stream open

    while True:
        try:
            async with make_channel() as channel:
                stub = geyser_pb2_grpc.GeyserStub(channel)
                async for update in stub.Subscribe(requests()):
                    if update.WhichOneof("update_oneof") != "transaction":
                        continue
                    for ev in decode_tx(update.transaction.transaction):
                        print(f"[slot {update.transaction.slot}] {ev}")
        except grpc.aio.AioRpcError as e:
            print(f"stream dropped ({e.code().name}); reconnecting")
            await asyncio.sleep(2)


if __name__ == "__main__":
    asyncio.run(main())
Point it at a mainnet Yellowstone node and you see live subscription activity stream in:
[slot 423913010] {'event': 'SubscriptionCancelled', 'plan': '…', 'subscriber': '…', 'expires_at_ts': …}
[slot 423912992] {'event': 'SubscriptionTransfer', 'mint': 'EPjFWdd5…', 'amount': 19800, …}
[slot 423905157] {'event': 'RecurringTransfer', 'mint': 'So111111…', 'amount': 1000000, …}
The reconnect loop matters: long-lived gRPC streams drop occasionally, and you want to resume rather than miss events. For production you’d also persist the last processed slot and replay from it on reconnect using the request’s from_slot field.

What you can build

Once events are flowing into your own store, the billing views write themselves:
  • Revenue per plan — sum amount on SubscriptionTransfer grouped by plan. The mint tells you the currency; on mainnet you’ll see USDC and wrapped SOL.
  • Monthly recurring revenue and active subscriber counts — count SubscriptionCreated minus SubscriptionCancelled (net of SubscriptionResumed) per plan.
  • Churn — SubscriptionCancelled events with their expires_at_ts, so you know when each subscriber actually lapses.
  • Failed or missed collections — expected a pull this period and saw no SubscriptionTransfer? That subscriber’s payment didn’t go through; flag it for retry or dunning.
  • Fee splits — a single collect can emit multiple SubscriptionTransfer events to different receivers (for example a 99% / 1% split between a merchant and a platform fee account), and each one decodes cleanly.
All of it from a single subscription to one program, decoded with one function, with no billing database to reconcile against the chain — the chain is the billing database.

Conclusion

The Subscriptions & Allowances program was built to be indexed: every lifecycle change and every pull emits a typed event as a self-CPI inner instruction, tagged exactly like an Anchor event. Decode the tag and the fixed payload and you have your billing feed. Pull it from RPC for backfills, stream it from a Chainstack Yellowstone gRPC node for live dashboards, and reuse the same decoder for both. Next in this series: handing an AI agent a capped onchain budget with a fixed-delegation allowance, and where that meets x402 for per-call agentic commerce.

About the author

Ake

Ake Director of Developer Experience @ Chainstack
Talk to me all things Web3
20 years in technology | 8+ years in Web3 full time years experience
Last modified on June 3, 2026