Skip to main content
Every twelve seconds the largest wallets on four chains get classified — and an hour before the spot move shows up on a chart, your Slack already knows where the flow is going.

What This Example Shows

  • A ConcurrentWorkflow that runs four per-chain monitors (ETH, BTC, SOL, Base) in parallel — one swarm call per block, every block
  • MCP wiring for Alchemy / Infura / Helius RPC endpoints so each chain agent reads its native mempool and block stream directly
  • Function tools that classify every whale tx: CEX inflow/outflow, OTC, DEX swap, or bridge — with wallet-label enrichment
  • Sustained polling rate-limit math: ETH block time of 12 seconds compounds to ~7,200 swarm calls/day, blowing through Free and Pro per-hour caps inside the first hour
  • Real-time alert routing into Slack and Discord with severity scoring, including programmatic muting on quiet hours
Continuous block-by-block polling breaches the Free tier per-hour cap within the first hour of running. Observability for missed alerts — execution traces, retry queues, and per-agent latency — is Premium-only. See the Rate Limits documentation before you flip this loop on in production.

Why This Matters

Whale wallets historically lead the tape by 15 to 60 minutes — a $40M wBTC pull off Coinbase Prime, a fresh deposit into a Binance OTC sub-account, a bridge from Base to Solana — these are the movements that show up on a price chart later as a green or red candle. The alpha is not in seeing the transaction (any block explorer does that). The alpha is in classifying it inside the same block: is this a market maker rebalancing, an OTC settlement that will hit spot in 20 minutes, or a known fund unwinding into a bridge? Sub-block latency on that classification — and the wallet label that makes it tradeable — is the entire edge. A ConcurrentWorkflow lets you process all four major L1/L2 chains in the same swarm call, so when the ETH monitor sees a whale deposit on block N, the Base monitor on that same swarm call has already seen the bridge ack on the other side.

The Architecture

                    ┌──────────────────────┐
                    │  Block Trigger Loop  │
                    │  (every 12 seconds)  │
                    └──────────┬───────────┘


        ┌──────────────────────────────────────────┐
        │           ConcurrentWorkflow             │
        │                                          │
        │  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐
        │  │  ETH   │  │  BTC   │  │  SOL   │  │  Base  │
        │  │Monitor │  │Monitor │  │Monitor │  │Monitor │
        │  └───┬────┘  └───┬────┘  └───┬────┘  └───┬────┘
        └──────┼───────────┼───────────┼───────────┼─────┘
               │           │           │           │
               └───────────┴─────┬─────┴───────────┘

                  ┌──────────────────────────┐
                  │   Movement Classifier    │
                  │      (grok-4)            │
                  └──────────────┬───────────┘

                  ┌──────────────────────────┐
                  │     Alert Generator      │
                  │   (claude-sonnet-4.5)    │
                  └──────────────┬───────────┘

                  ┌──────────────────────────┐
                  │     Slack / Discord      │
                  └──────────────────────────┘

Step 1: Setup

Install the dependencies and pull keys for Swarms, your RPC provider, an explorer for label lookups, and your alert webhook.
pip install requests python-dotenv
export SWARMS_API_KEY="your-swarms-key"
export ALCHEMY_API_KEY="your-alchemy-key"
export ETHERSCAN_API_KEY="your-etherscan-key"
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/..."
import json
import os
import time

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
The four per-chain agents each point to a different MCP server via the mcp_url field on the agent config. The Alchemy SSE endpoint is the simplest path for ETH and Base; Helius covers Solana; an Esplora/mempool.space MCP covers BTC. You can run all four as separate containers or terminate them behind a single proxy — the swarm only cares about the URL.
MCP_URLS = {
    "ETH":  "https://your-alchemy-mcp.internal/eth/sse",
    "BTC":  "https://your-mempool-mcp.internal/btc/sse",
    "SOL":  "https://your-helius-mcp.internal/sol/sse",
    "BASE": "https://your-alchemy-mcp.internal/base/sse",
}

Step 2: Define the Function Tools

Function tools are OpenAI-schema JSON. Define them once, attach them per-agent. The classifier and chain agents share the lookup tools; the alert generator only gets post_whale_alert.
FETCH_RECENT_BLOCK = {
    "type": "function",
    "function": {
        "name": "fetch_recent_block",
        "description": "Return the full block (header + transactions) for a given chain at a given block number.",
        "parameters": {
            "type": "object",
            "properties": {
                "chain": {
                    "type": "string",
                    "enum": ["ETH", "BTC", "SOL", "BASE"],
                },
                "block_number": {"type": "integer"},
            },
            "required": ["chain", "block_number"],
        },
    },
}

GET_WALLET_TRANSACTIONS = {
    "type": "function",
    "function": {
        "name": "get_wallet_transactions",
        "description": "Return all transactions involving a specific wallet address since a given block.",
        "parameters": {
            "type": "object",
            "properties": {
                "address": {"type": "string"},
                "since_block": {"type": "integer"},
            },
            "required": ["address", "since_block"],
        },
    },
}

LOOKUP_WALLET_LABEL = {
    "type": "function",
    "function": {
        "name": "lookup_wallet_label",
        "description": "Resolve a wallet address to a known label (e.g. 'Binance 14', 'Jump Trading', 'a16z fund 2', 'Wintermute OTC').",
        "parameters": {
            "type": "object",
            "properties": {"address": {"type": "string"}},
            "required": ["address"],
        },
    },
}

IS_CEX_ADDRESS = {
    "type": "function",
    "function": {
        "name": "is_cex_address",
        "description": "Return whether the address is a known centralized exchange hot or cold wallet, and which exchange.",
        "parameters": {
            "type": "object",
            "properties": {"address": {"type": "string"}},
            "required": ["address"],
        },
    },
}

CLASSIFY_MOVEMENT = {
    "type": "function",
    "function": {
        "name": "classify_movement",
        "description": "Classify a single transaction into one of: CEX_INFLOW, CEX_OUTFLOW, OTC_SETTLEMENT, DEX_SWAP, BRIDGE_OUT, BRIDGE_IN, WALLET_REBALANCE, UNKNOWN.",
        "parameters": {
            "type": "object",
            "properties": {
                "tx": {
                    "type": "object",
                    "description": "Raw transaction object including from, to, value, input data, and chain.",
                },
            },
            "required": ["tx"],
        },
    },
}

IS_DEX_SWAP = {
    "type": "function",
    "function": {
        "name": "is_dex_swap",
        "description": "Return whether a transaction's `to` field matches a known DEX router (Uniswap V2/V3/V4, Jupiter, Raydium, Aerodrome).",
        "parameters": {
            "type": "object",
            "properties": {
                "tx": {"type": "object"},
                "dex_routers": {
                    "type": "array",
                    "items": {"type": "string"},
                },
            },
            "required": ["tx", "dex_routers"],
        },
    },
}

IS_BRIDGE_TX = {
    "type": "function",
    "function": {
        "name": "is_bridge_tx",
        "description": "Return whether a transaction interacts with a known canonical bridge (Wormhole, LayerZero, Across, Stargate, Base bridge).",
        "parameters": {
            "type": "object",
            "properties": {
                "tx": {"type": "object"},
                "bridge_contracts": {
                    "type": "array",
                    "items": {"type": "string"},
                },
            },
            "required": ["tx", "bridge_contracts"],
        },
    },
}

POST_WHALE_ALERT = {
    "type": "function",
    "function": {
        "name": "post_whale_alert",
        "description": "Post a formatted whale alert to the configured Slack and Discord webhooks.",
        "parameters": {
            "type": "object",
            "properties": {
                "text": {"type": "string"},
                "severity": {
                    "type": "string",
                    "enum": ["INFO", "WATCH", "ALERT", "CRITICAL"],
                },
                "ticker": {"type": "string"},
            },
            "required": ["text", "severity", "ticker"],
        },
    },
}

CHAIN_TOOLS = [
    FETCH_RECENT_BLOCK,
    GET_WALLET_TRANSACTIONS,
    LOOKUP_WALLET_LABEL,
    IS_CEX_ADDRESS,
    IS_DEX_SWAP,
    IS_BRIDGE_TX,
]

CLASSIFIER_TOOLS = [CLASSIFY_MOVEMENT, LOOKUP_WALLET_LABEL, IS_CEX_ADDRESS]
ALERT_TOOLS = [POST_WHALE_ALERT]

Step 3: Define the Concurrent Per-Chain Agents

Each chain monitor runs on gpt-4.1-mini — fast and cheap is the only thing that matters when you are firing this swarm every 12 seconds. The Movement Classifier runs on grok-4 for its real-time bias and willingness to commit to a classification under uncertainty. The Alert Generator runs on claude-sonnet-4.5 because the natural-language quality of the Slack message is what makes the desk actually read it.
CHAIN_MONITOR_PROMPT = (
    "You are an on-chain whale monitor for {chain}. "
    "For the given block, pull the transactions involving any address in the "
    "top-100 whale list. For each tx, return a structured row with: "
    "from, to, value_native, value_usd_estimate, raw_input_summary. "
    "Use the available tools to enrich with wallet labels and CEX/bridge/DEX flags. "
    "Output strict JSON. Do not editorialize. Speed matters — your output feeds the classifier "
    "in the same swarm call."
)

MOVEMENT_CLASSIFIER_PROMPT = (
    "You are the Movement Classifier. You receive per-chain whale tx rows from four monitors "
    "running concurrently. For each row, call classify_movement and assign one of: "
    "CEX_INFLOW, CEX_OUTFLOW, OTC_SETTLEMENT, DEX_SWAP, BRIDGE_OUT, BRIDGE_IN, "
    "WALLET_REBALANCE, UNKNOWN. "
    "Cross-correlate across chains — a BRIDGE_OUT on ETH and a matching BRIDGE_IN on BASE "
    "in the same swarm window should be linked. Output strict JSON, one row per movement. "
    "Bias toward committing — UNKNOWN is the last resort."
)

ALERT_GENERATOR_PROMPT = (
    "You are the Alert Generator. You receive classified whale movements from the Movement "
    "Classifier. For each one, decide severity (INFO/WATCH/ALERT/CRITICAL) based on "
    "value_usd, target_label, and movement_type. Write a one-line, desk-readable Slack message "
    "in this style: "
    "':whale: ALERT — 1,250 BTC moved from Tether Treasury 7 to Binance 14 (~$92M, BRIDGE_IN, block 879314)'. "
    "Then call post_whale_alert. Be precise, be brief, never speculate on price impact."
)


def build_whale_swarm(block_targets: dict[str, int]) -> dict:
    """block_targets maps chain → block number to inspect, e.g. {'ETH': 21000123, ...}"""

    chain_agents = []
    for chain, block_num in block_targets.items():
        chain_agents.append({
            "agent_name": f"{chain} Whale Monitor",
            "description": f"Pulls whale-relevant transactions on {chain} for a specific block.",
            "system_prompt": CHAIN_MONITOR_PROMPT.format(chain=chain),
            "model_name": "gpt-4.1-mini",
            "role": "worker",
            "max_loops": 1,
            "max_tokens": 4096,
            "temperature": 0.1,
            "tools_dictionary": CHAIN_TOOLS,
            "mcp_url": MCP_URLS[chain],
        })

    classifier = {
        "agent_name": "Movement Classifier",
        "description": "Classifies whale movements across chains in real time.",
        "system_prompt": MOVEMENT_CLASSIFIER_PROMPT,
        "model_name": "grok-4",
        "role": "worker",
        "max_loops": 1,
        "max_tokens": 6144,
        "temperature": 0.2,
        "tools_dictionary": CLASSIFIER_TOOLS,
    }

    alert_generator = {
        "agent_name": "Alert Generator",
        "description": "Writes severity-scored Slack/Discord alerts.",
        "system_prompt": ALERT_GENERATOR_PROMPT,
        "model_name": "claude-sonnet-4.5",
        "role": "worker",
        "max_loops": 1,
        "max_tokens": 2048,
        "temperature": 0.3,
        "tools_dictionary": ALERT_TOOLS,
    }

    return {
        "name": "On-Chain Whale Tracker",
        "description": "Per-chain concurrent monitors → classifier → alerter, one swarm call per block.",
        "swarm_type": "ConcurrentWorkflow",
        "max_loops": 1,
        "task": (
            "For the given block on each chain, identify and classify every whale-relevant "
            "transaction and emit structured alerts. Block targets: "
            + json.dumps(block_targets)
        ),
        "agents": chain_agents + [classifier, alert_generator],
        "output_type": "dict",
    }
The classifier and alert generator are inside the same ConcurrentWorkflow call as the chain monitors. The swarm scheduler still gives the chain monitors their parallel fan-out — the later-stage agents simply run with the aggregated outputs as input context. If you want a strict pipeline (fan-out then sequential), wrap the chain monitors in an inner ConcurrentWorkflow and the classifier/alerter in an outer SequentialWorkflow. For a hot loop you want the simpler single-swarm shape.

Step 4: Process One Block End-to-End

Before flipping on the watcher, pin to a known block on each chain and dry-run the swarm. This validates MCP wiring, tool schemas, and the classifier’s commitment behavior on a static input.
def run_one_block(block_targets: dict[str, int]) -> dict:
    payload = build_whale_swarm(block_targets)
    response = requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        timeout=180,
    )
    response.raise_for_status()
    return response.json()


# Replay a known whale block
result = run_one_block({
    "ETH":  21000123,
    "BTC":  879314,
    "SOL":  301288010,
    "BASE": 22001456,
})

for output in result.get("output", []):
    print("=" * 60)
    print(output["role"])
    print("=" * 60)
    content = output["content"]
    if isinstance(content, list):
        content = " ".join(str(c) for c in content)
    print(str(content)[:600])

print(f"\nSwarm cost: ${result['usage']['billing_info']['total_cost']:.4f}")
print(f"Wall time:  {result['execution_time']:.2f}s")
If wall time on the replay exceeds ETH block time (12 seconds), you have a latency problem and the watcher will fall behind under live load. Lower max_tokens on the chain monitors and tighten the classifier prompt before going live.

Step 5: The Continuous Block Watcher

This is the loop that bills. Every ~12 seconds — ETH block time, the slowest of the four — fire a fresh swarm against the latest block on each chain.
def get_latest_blocks() -> dict[str, int]:
    """Return the most recent block number on each chain via your RPC layer."""
    # Implementation depends on your RPC client; sketch:
    return {
        "ETH":  rpc_eth.block_number(),
        "BTC":  rpc_btc.block_height(),
        "SOL":  rpc_sol.slot(),
        "BASE": rpc_base.block_number(),
    }


def fire_swarm(block_targets: dict[str, int]) -> None:
    payload = build_whale_swarm(block_targets)
    requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        timeout=60,
    )


def whale_watcher_loop() -> None:
    last_seen = {"ETH": 0, "BTC": 0, "SOL": 0, "BASE": 0}
    while True:
        latest = get_latest_blocks()
        # Only fire if at least one chain has advanced
        if any(latest[c] > last_seen[c] for c in latest):
            fire_swarm(latest)
            last_seen = latest
        time.sleep(12)  # ETH block time pacing


if __name__ == "__main__":
    whale_watcher_loop()
The math is brutal:
  • 12-second cadence → 300 calls/hour, 7,200 calls/day
  • Free tier per-day cap on swarm calls is roughly 100/day — breached in 20 minutes
  • Pro per-hour caps are breached in the first hour
  • Premium is the only tier that holds this loop without backpressure, and it is the only tier with the execution traces you need to debug missed blocks
Live whale tracking is a Premium workload by construction. If you try to run this loop on Free or Pro you will hit 429s, miss blocks, and have no observability to tell which blocks you missed. See Rate Limits for the per-tier numbers and upgrade at https://swarms.world/platform/account.

Step 6: The Alert Output Schema

The Alert Generator emits structured rows alongside the Slack post — persist these to your warehouse so you can correlate alerts against spot moves later and grade the classifier.
{
  "chain": "ETH",
  "block": 21000123,
  "whale_address": "0xab5801a7d398351b8be11c439e05c5b3259aec9b",
  "movement_type": "CEX_INFLOW",
  "value_usd": 92_400_000,
  "target_address": "0x28c6c06298d514db089934071355e5743bf21d60",
  "target_label": "Binance 14",
  "severity": "CRITICAL",
  "narrative": "1,250 BTC moved from a long-dormant cold wallet to Binance hot wallet 14 (~$92M). Last activity on source wallet: 187 days ago."
}
A simple persistence pattern:
def persist_alerts(swarm_response: dict, path: str = "whale_alerts.jsonl") -> None:
    alerts_block = next(
        (o["content"] for o in swarm_response.get("output", []) if "Alert Generator" in o.get("role", "")),
        None,
    )
    if not alerts_block:
        return
    if isinstance(alerts_block, list):
        alerts_block = " ".join(str(c) for c in alerts_block)
    # Alerts arrive as JSON lines from the Alert Generator
    with open(path, "a") as f:
        for line in str(alerts_block).splitlines():
            line = line.strip()
            if line.startswith("{") and line.endswith("}"):
                f.write(line + "\n")

Real Cost vs. Whale Alert Service Subscriptions

StackMonthly costWhat you actually get
This stack (gpt-4.1-mini chains + grok-4 classifier + sonnet-4.5 alerter, night-mode discount on quiet 02:00–06:00 UTC)~$750/mo (~$25/day)Programmable layer you own, four chains, custom whale list, structured outputs to your warehouse
Retail whale-alert Twitter / Telegram services$50 – $500/moRead-only Twitter feed, no classification, no programmatic access
Nansen Alpha (single chain)$1,500/moBest-in-class labels, read-only dashboard, no programmable layer
Arkham Intelligence Pro$1,000/moStrong labels, dashboard + alerts, no per-block classification
Building this in-house with engineers$40,000+/moOne mid backend + one ML engineer plus infra; what you would actually do without this
Night-mode tip: between 02:00 and 06:00 UTC most chains are quiet. Drop the watcher to a 60-second cadence in that window — call cost falls ~5x and you almost never miss meaningful flow. The point is not that this is cheaper than a Twitter bot. The point is that a Twitter bot is read-only. This is a programmable layer with structured outputs that route into your own systems — execution, risk, research database — at whale-feed latency.

Next Steps

  • See Crypto Quant Agent for the single-agent quant analyst that consumes these alerts as a signal stream
  • See AI Hedge Fund Research Pipeline for the hierarchical pattern when you want a PM agent grading the whale alerts against a fundamental thesis
  • Read MCP Integration for the full Alchemy / Helius / mempool MCP wiring
  • Check Rate Limits before going live — the per-tier caps decide what cadence you can sustain