> ## Documentation Index
> Fetch the complete documentation index at: https://docs.chainstack.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Solana: Indexing subscription events with Geyser

> Decode and stream Solana Subscriptions & Allowances events in Python — created, cancelled, and pull transfers — from RPC or a Chainstack Yellowstone gRPC node, to build billing, revenue, and churn dashboards.

**TLDR:**

* The Solana Subscriptions & Allowances program emits its events (subscription created, cancelled, resumed, and the three transfer types) as **Anchor-compatible self-CPI inner instructions** — not log lines — tagged with the 8-byte `e445a52e51cb9a1d` prefix plus a 1-byte event type.
* Decoding those events gives you off-chain billing state — revenue per plan, active vs. cancelled subscribers, every pull and its amount — without scanning account state.
* You can pull events from any transaction over standard RPC, or stream them in real time and low latency through a Chainstack Yellowstone gRPC (Geyser) node.
* This guide gives you a complete Python decoder for all six event types, plus a live Yellowstone gRPC listener, validated against live mainnet activity.

## Why index the events

If you build anything on the [Subscriptions & Allowances program](/docs/solana-subscriptions-and-allowances) — a SaaS billing backend, a creator-payouts dashboard, a payment gateway — you need an off-chain view of what happened: who subscribed, who cancelled, which pulls succeeded, and how much revenue each plan produced.

The program makes this clean because it emits a structured event on every state change. You don't reconstruct billing history by diffing account snapshots; you read the events. The question is only how you get them — by fetching a transaction over RPC, or by streaming them live through Geyser. We'll do both, on top of one shared decoder.

## How the program emits events

Most Solana programs that emit events either write base64 blobs into program logs or, like Anchor, emit them through a **self-CPI**: the program calls its own no-op instruction, and the event payload rides along as that inner instruction's data. The Subscriptions program uses the self-CPI approach, which is more reliable than log parsing (logs get truncated; inner instructions don't).

Each event's inner-instruction data is laid out as:

```text theme={"system"}
[0:8]  e445a52e51cb9a1d   # Sha256("anchor:event")[:8], little-endian — the same tag Anchor uses
[8]    event type         # 1 byte (see table)
[9:]   payload            # fixed C-packed, little-endian fields
```

The program emits six event types:

| Type                    | Byte | Payload fields                                                                                                           |
| ----------------------- | ---- | ------------------------------------------------------------------------------------------------------------------------ |
| `SubscriptionCreated`   | 0    | plan, subscriber, mint, created\_ts                                                                                      |
| `SubscriptionCancelled` | 1    | plan, subscriber, expires\_at\_ts                                                                                        |
| `SubscriptionTransfer`  | 2    | subscription, plan, delegator, mint, amount, period\_start\_ts, period\_end\_ts, amount\_pulled\_in\_period, receiver    |
| `FixedTransfer`         | 3    | delegation, delegator, delegatee, mint, amount, remaining\_amount, receiver                                              |
| `RecurringTransfer`     | 4    | delegation, delegator, delegatee, mint, amount, period\_start\_ts, period\_end\_ts, amount\_pulled\_in\_period, receiver |
| `SubscriptionResumed`   | 5    | plan, subscriber, resumed\_ts                                                                                            |

All pubkeys are 32 raw bytes; all integers are little-endian (`u64` for amounts, `i64` for timestamps). To find these events in a transaction, you look at its **inner instructions**, keep the ones whose program is the subscriptions program, and check whether the data starts with the tag.

<Note>
  `SubscriptionTransfer` (model: subscription plan), `FixedTransfer` (allowance), and `RecurringTransfer` (recurring delegation) are the three "money moved" events — the ones you sum for revenue. `Created`, `Cancelled`, and `Resumed` track the subscription lifecycle for churn.
</Note>

## Prerequisites

* Python 3.10+
* `pip install solders base58 requests` for the decoder and RPC path
* `pip install grpcio grpcio-tools` for the Geyser streaming path
* A Chainstack Solana node — any node for the RPC path; a Trader or dedicated node with a **Yellowstone gRPC** endpoint for streaming

<Check>
  ### Run Solana mainnet and devnet nodes on Chainstack

  [Start for free](https://console.chainstack.com/) and get your app to production levels immediately. No credit card required. You can sign up with your GitHub, X, Google, or Microsoft account.
</Check>

## The decoder

This is the shared core — it turns a raw inner-instruction byte string into a typed event dict. Both the RPC and the Geyser paths feed bytes into `decode_event`.

```python theme={"system"}
# events.py
from solders.pubkey import Pubkey

PROGRAM_ID = "De1egAFMkMWZSN5rYXRj9CAdheBamobVNubTsi9avR44"
EVENT_TAG = bytes.fromhex("e445a52e51cb9a1d")  # Sha256("anchor:event")[:8], little-endian

EVENT_NAMES = {0: "SubscriptionCreated", 1: "SubscriptionCancelled", 2: "SubscriptionTransfer",
               3: "FixedTransfer", 4: "RecurringTransfer", 5: "SubscriptionResumed"}


def _pk(b):  # 32 raw bytes -> base58 string
    return str(Pubkey.from_bytes(b))

def _u64(b):
    return int.from_bytes(b, "little")

def _i64(b):
    return int.from_bytes(b, "little", signed=True)


def decode_event(data: bytes):
    """data = full inner-instruction data: 8-byte tag + 1-byte type + payload."""
    if len(data) < 9 or data[:8] != EVENT_TAG:
        return None
    disc = data[8]
    p = data[9:]
    name = EVENT_NAMES.get(disc, f"Unknown({disc})")
    if disc == 0:  # SubscriptionCreated
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "mint": _pk(p[64:96]), "created_ts": _i64(p[96:104])}
    if disc == 1:  # SubscriptionCancelled
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "expires_at_ts": _i64(p[64:72])}
    if disc in (2, 4):  # SubscriptionTransfer / RecurringTransfer (identical shape)
        first = "subscription" if disc == 2 else "delegation"
        second = "plan" if disc == 2 else "delegator"
        third = "delegator" if disc == 2 else "delegatee"
        return {"event": name, first: _pk(p[0:32]), second: _pk(p[32:64]),
                third: _pk(p[64:96]), "mint": _pk(p[96:128]), "amount": _u64(p[128:136]),
                "period_start_ts": _i64(p[136:144]), "period_end_ts": _i64(p[144:152]),
                "amount_pulled_in_period": _u64(p[152:160]), "receiver": _pk(p[160:192])}
    if disc == 3:  # FixedTransfer
        return {"event": name, "delegation": _pk(p[0:32]), "delegator": _pk(p[32:64]),
                "delegatee": _pk(p[64:96]), "mint": _pk(p[96:128]), "amount": _u64(p[128:136]),
                "remaining_amount": _u64(p[136:144]), "receiver": _pk(p[144:176])}
    if disc == 5:  # SubscriptionResumed
        return {"event": name, "plan": _pk(p[0:32]), "subscriber": _pk(p[32:64]),
                "resumed_ts": _i64(p[64:72])}
    return {"event": name, "raw_len": len(p)}
```

## Option A: pull events from a transaction over RPC

For backfills, webhooks, or a quick look at one transaction, fetch it with `getTransaction` and walk its inner instructions. In `jsonParsed` encoding, an inner instruction for a program the parser doesn't recognize comes back with a base58 `data` field and a `programId`.

```python theme={"system"}
import base58
import requests
from events import PROGRAM_ID, decode_event

CHAINSTACK_RPC = "YOUR_CHAINSTACK_SOLANA_ENDPOINT"


def events_in_tx(signature: str):
    body = {"jsonrpc": "2.0", "id": 1, "method": "getTransaction",
            "params": [signature, {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}]}
    result = requests.post(CHAINSTACK_RPC, json=body, timeout=30).json()["result"]
    events = []
    for group in (result["meta"].get("innerInstructions") or []):
        for ix in group["instructions"]:
            if ix.get("programId") != PROGRAM_ID:
                continue
            ev = decode_event(base58.b58decode(ix["data"]))
            if ev:
                events.append(ev)
    return events


if __name__ == "__main__":
    for ev in events_in_tx("PASTE_A_SUBSCRIPTION_TX_SIGNATURE"):
        print(ev)
```

Run it against a collect transaction and you get the decoded transfer:

```python theme={"system"}
{'event': 'SubscriptionTransfer', 'subscription': 'BCnjQmw2…', 'plan': '6zNfyVQK…',
 'delegator': '9VUegdUs…', 'mint': 'EPjFWdd5…', 'amount': 1000000,
 'period_start_ts': 1780447663, 'period_end_ts': 1780534063,
 'amount_pulled_in_period': 1000000, 'receiver': 'GGFFGrDR…'}
```

`period_end_ts - period_start_ts` is exactly `86400` here — a 24-hour billing period — and `amount` is the tokens pulled this cycle. That's a billing row, straight off the chain.

RPC polling is fine for backfills, but for a live dashboard you don't want to poll `getSignaturesForAddress` in a loop. That's what Geyser is for.

## Option B: stream events live with Yellowstone gRPC

Chainstack's Yellowstone gRPC (Geyser) endpoint pushes transactions to you as they're confirmed. You subscribe once to transactions that mention the program, and decode events from each one with the same `decode_event`.

<Steps>
  <Step title="Generate the Geyser client stubs">
    Fetch the Yellowstone proto files and generate Python stubs.

    ```bash theme={"system"}
    mkdir proto
    curl -s -o proto/geyser.proto \
      https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/geyser.proto
    curl -s -o proto/solana-storage.proto \
      https://raw.githubusercontent.com/rpcpool/yellowstone-grpc/master/yellowstone-grpc-proto/proto/solana-storage.proto

    python -m grpc_tools.protoc -Iproto \
      --python_out=. --grpc_python_out=. \
      proto/geyser.proto proto/solana-storage.proto
    ```

    This writes `geyser_pb2.py`, `geyser_pb2_grpc.py`, and the `solana_storage_pb2.py` it depends on into your working directory.
  </Step>

  <Step title="Connect, subscribe, and decode">
    Chainstack Yellowstone endpoints use TLS and an `x-token` passed as gRPC metadata. Get both the gRPC endpoint and the token from your node's details in the [Chainstack console](https://console.chainstack.com/).

    The one subtlety on the streaming side: an inner instruction references its program by **index** into the transaction's account keys, and for versioned transactions that list is `message.account_keys` plus the addresses loaded from lookup tables. Resolve the full list before checking the program.

    ```python theme={"system"}
    # stream.py
    import asyncio
    import grpc
    from solders.pubkey import Pubkey

    import geyser_pb2
    import geyser_pb2_grpc
    from events import PROGRAM_ID, decode_event

    GRPC_ENDPOINT = "YOUR_NODE.core.chainstack.com:443"
    X_TOKEN = "YOUR_X_TOKEN"
    PROGRAM_ID_BYTES = bytes(Pubkey.from_string(PROGRAM_ID))


    def make_channel():
        auth = grpc.metadata_call_credentials(
            lambda ctx, cb: cb((("x-token", X_TOKEN),), None)
        )
        creds = grpc.composite_channel_credentials(grpc.ssl_channel_credentials(), auth)
        return grpc.aio.secure_channel(GRPC_ENDPOINT, creds)


    def build_request():
        req = geyser_pb2.SubscribeRequest()
        f = req.transactions["subs"]
        f.account_include.append(PROGRAM_ID)
        f.vote = False
        f.failed = False
        req.commitment = geyser_pb2.CONFIRMED
        return req


    def decode_tx(info):
        # Resolve account keys, including addresses loaded from lookup tables.
        keys = list(info.transaction.message.account_keys)
        keys += list(info.meta.loaded_writable_addresses)
        keys += list(info.meta.loaded_readonly_addresses)
        for group in info.meta.inner_instructions:
            for ix in group.instructions:
                if keys[ix.program_id_index] != PROGRAM_ID_BYTES:
                    continue
                ev = decode_event(bytes(ix.data))
                if ev:
                    yield ev


    async def main():
        async def requests():
            yield build_request()
            while True:
                await asyncio.sleep(3600)  # keep the stream open

        while True:
            try:
                async with make_channel() as channel:
                    stub = geyser_pb2_grpc.GeyserStub(channel)
                    async for update in stub.Subscribe(requests()):
                        if update.WhichOneof("update_oneof") != "transaction":
                            continue
                        for ev in decode_tx(update.transaction.transaction):
                            print(f"[slot {update.transaction.slot}] {ev}")
            except grpc.aio.AioRpcError as e:
                print(f"stream dropped ({e.code().name}); reconnecting")
                await asyncio.sleep(2)


    if __name__ == "__main__":
        asyncio.run(main())
    ```

    Point it at a mainnet Yellowstone node and you see live subscription activity stream in:

    ```text theme={"system"}
    [slot 423913010] {'event': 'SubscriptionCancelled', 'plan': '…', 'subscriber': '…', 'expires_at_ts': …}
    [slot 423912992] {'event': 'SubscriptionTransfer', 'mint': 'EPjFWdd5…', 'amount': 19800, …}
    [slot 423905157] {'event': 'RecurringTransfer', 'mint': 'So111111…', 'amount': 1000000, …}
    ```

    The reconnect loop matters: long-lived gRPC streams drop occasionally, and you want to resume rather than miss events. For production you'd also persist the last processed slot and replay from it on reconnect using the request's `from_slot` field.
  </Step>
</Steps>

## What you can build

Once events are flowing into your own store, the billing views write themselves:

* Revenue per plan — sum `amount` on `SubscriptionTransfer` grouped by `plan`. The mint tells you the currency; on mainnet you'll see USDC and wrapped SOL.
* Monthly recurring revenue and active subscriber counts — count `SubscriptionCreated` minus `SubscriptionCancelled` (net of `SubscriptionResumed`) per plan.
* Churn — `SubscriptionCancelled` events with their `expires_at_ts`, so you know when each subscriber actually lapses.
* Failed or missed collections — expected a pull this period and saw no `SubscriptionTransfer`? That subscriber's payment didn't go through; flag it for retry or dunning.
* Fee splits — a single collect can emit multiple `SubscriptionTransfer` events to different receivers (for example a 99% / 1% split between a merchant and a platform fee account), and each one decodes cleanly.

All of it from a single subscription to one program, decoded with one function, with no billing database to reconcile against the chain — the chain *is* the billing database.

## Conclusion

The Subscriptions & Allowances program was built to be indexed: every lifecycle change and every pull emits a typed event as a self-CPI inner instruction, tagged exactly like an Anchor event. Decode the tag and the fixed payload and you have your billing feed. Pull it from RPC for backfills, stream it from a Chainstack Yellowstone gRPC node for live dashboards, and reuse the same decoder for both.

Next in this series: handing an AI agent a capped onchain budget with a fixed-delegation allowance, and where that meets x402 for per-call agentic commerce.

### About the author

<CardGroup>
  <Card title="Ake">
    <img src="https://mintcdn.com/chainstack/UN3rP7zhB69idvnC/images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg?fit=max&auto=format&n=UN3rP7zhB69idvnC&q=85&s=792a24ab1b4682406fa589c0ecd88e5d" alt="Ake" style={{width: '80px', height: '80px', borderRadius: '50%', objectFit: 'cover', display: 'block', margin: '0 auto'}} noZoom width="400" height="400" data-path="images/docs/profile_images/1719912994363326464/8_Bi4fdM_400x400.jpg" />

    <Icon icon="code" iconType="solid" /> Director of Developer Experience @ Chainstack
    <br /><Icon icon="screwdriver-wrench" iconType="solid" /> Talk to me all things Web3
    <br />20 years in technology | 8+ years in Web3 full time years experience

    <div style={{display: "flex", justifyContent: "center", gap: "12px"}}>
      <a href="https://github.com/akegaviar/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="github" iconType="brands" />
      </a>

      <a href="https://twitter.com/akegaviar" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="twitter" iconType="brands" />
      </a>

      <a href="https://www.linkedin.com/in/ake/" style={{textDecoration: "none", borderBottom: "none"}}>
        <Icon icon="linkedin" iconType="brands" />
      </a>
    </div>
  </Card>
</CardGroup>
