Skip to main content

What This Covers

  • How the four observability signals fit together: per-request logs, daily usage rollup, credit balance, and live rate-limit headers
  • A small Python script that pulls the last 24 hours of activity, aggregates by swarm_type, and prints a cost-and-error breakdown
  • The audit-trail story for enterprise, healthcare, finance, and other regulated workloads
  • Which signal answers which operational question — and which one you should be pulling first

Why This Matters

Most teams discover their observability gap the day a customer asks “what did the agent see on Tuesday at 2pm and how much did it cost us?” The Swarms API exposes everything you need to answer that — but the data is split across four endpoints with different shapes, time grains, and refresh cadences. This guide is the operator narrative on top of the Swarm Logs, Usage Report, and Account Credits reference pages: which signal to use when, how to combine them, and the minimum production-grade script for a daily dashboard.

The Four Signals

SignalEndpoint / SourceTime grainAnswers
Per-request logsGET /v1/swarm/logsPer request”What ran? Did it succeed? How much did it cost?”
Daily usage rollupGET /v1/usage/reportPer day”What’s our trend? Are we forecasting under budget?”
Credit balanceGET /v1/account/creditsSnapshot”Do we have headroom for the next batch job?”
Rate-limit headersX-RateLimit-* on every responsePer request”Are we about to get 429’d? Do we need to back off?”
The rule of thumb: headers for the next millisecond, logs for the last hour, usage report for the last month, credits before you submit a batch.

Step 1: Configure the Client

import os
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

Step 2: Pull Last-24h Logs, Aggregated by Swarm Type

The single most useful operator script: what ran in the last 24 hours, grouped by swarm_type, with cost and error counts per group.
def fetch_logs():
    resp = requests.get(f"{BASE_URL}/v1/swarm/logs", headers=headers, timeout=30)
    resp.raise_for_status()
    return resp.json()


def last_24h_breakdown():
    data = fetch_logs()
    cutoff = datetime.now(timezone.utc) - timedelta(hours=24)

    by_type = defaultdict(lambda: {"count": 0, "errors": 0, "cost": 0.0, "tokens": 0})
    untyped = {"count": 0, "errors": 0, "cost": 0.0, "tokens": 0}

    for log in data.get("logs", []):
        ts_raw = log.get("timestamp")
        if not ts_raw:
            continue
        ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
        if ts < cutoff:
            continue

        # swarm_type lives inside the request payload echoed in the log
        swarm_type = (
            log.get("data", {}).get("swarm_type")
            or log.get("swarm_type")
            or None
        )
        bucket = by_type[swarm_type] if swarm_type else untyped

        bucket["count"] += 1
        bucket["cost"] += float(log.get("cost", 0) or 0)
        bucket["tokens"] += int(log.get("tokens_used", 0) or 0)
        if (log.get("status_code") or 200) >= 400:
            bucket["errors"] += 1

    print("Last 24 hours, by swarm_type")
    print("-" * 70)
    print(f"{'swarm_type':<22} {'reqs':>6} {'errs':>6} {'tokens':>10} {'cost':>10}")
    print("-" * 70)
    for swarm_type, b in sorted(by_type.items(), key=lambda kv: kv[1]["cost"], reverse=True):
        print(
            f"{(swarm_type or '-'):<22} "
            f"{b['count']:>6} {b['errors']:>6} "
            f"{b['tokens']:>10,} ${b['cost']:>8.4f}"
        )
    if untyped["count"]:
        print(
            f"{'(single-agent)':<22} "
            f"{untyped['count']:>6} {untyped['errors']:>6} "
            f"{untyped['tokens']:>10,} ${untyped['cost']:>8.4f}"
        )

    total_cost = sum(b["cost"] for b in by_type.values()) + untyped["cost"]
    total_err = sum(b["errors"] for b in by_type.values()) + untyped["errors"]
    total_req = sum(b["count"] for b in by_type.values()) + untyped["count"]
    print("-" * 70)
    print(f"Total: {total_req} requests, {total_err} errors, ${total_cost:.4f}")


if __name__ == "__main__":
    last_24h_breakdown()
The exact shape of each log entry can vary slightly — swarm_type, agent_name, and model_name may appear at the top level or nested under data. The code above is defensive against both. See the Swarm Logs reference for the full schema.

Step 3: Reconcile Logs Against the Daily Rollup

Logs are per-request; the usage report is the authoritative daily rollup. They should agree within rounding. Use this to catch missing log entries from high-volume periods.
def reconcile_today():
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    # Logs side: sum all of today's entries
    logs = fetch_logs().get("logs", [])
    logs_cost = sum(
        float(log.get("cost", 0) or 0)
        for log in logs
        if (log.get("timestamp") or "").startswith(today)
    )

    # Rollup side: pull the usage report and find today
    resp = requests.get(
        f"{BASE_URL}/v1/usage/report",
        headers=headers,
        params={"period": "day"},
        timeout=30,
    )
    resp.raise_for_status()
    report = resp.json()

    rollup_cost = next(
        (d["total_cost"] for d in report.get("results", []) if d["day"] == today),
        0.0,
    )

    drift = abs(logs_cost - rollup_cost)
    print(f"Logs total ({today}):    ${logs_cost:.4f}")
    print(f"Rollup total ({today}):  ${rollup_cost:.4f}")
    print(f"Drift:                   ${drift:.4f}")
    if drift > 0.05 and rollup_cost > 0:
        print("WARN: drift > $0.05 — investigate missing log entries.")

Step 4: Check Credits Before a Batch Job

The cheapest production incident to avoid is “the batch job stopped halfway because credits ran out.” One call before the submit loop is enough.
def credits_remaining() -> float:
    resp = requests.get(f"{BASE_URL}/v1/account/credits", headers=headers, timeout=10)
    resp.raise_for_status()
    return float(resp.json().get("total_credits", 0))


def guard_batch(estimated_cost: float, safety_margin: float = 1.5):
    """Refuse to start a batch if credits < estimated_cost * safety_margin."""
    available = credits_remaining()
    required = estimated_cost * safety_margin
    if available < required:
        raise RuntimeError(
            f"Insufficient credits: have ${available:.2f}, "
            f"need ${required:.2f} (estimated ${estimated_cost:.2f} x {safety_margin})."
        )
    print(f"Credit check OK: ${available:.2f} available, ${required:.2f} required.")

Step 5: Watch Rate-Limit Headers in Flight

The headers are on every authenticated response, including errors. You do not need a separate call. Log them after every request and feed the data into your throttling logic.
def call_with_headers_logged(payload: dict):
    resp = requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        timeout=600,
    )

    rl = {k: v for k, v in resp.headers.items() if k.lower().startswith("x-ratelimit")}
    print(
        f"tier={rl.get('X-RateLimit-Tier')} "
        f"min={rl.get('X-RateLimit-Remaining-Minute')}/{rl.get('X-RateLimit-Limit-Minute')} "
        f"day={rl.get('X-RateLimit-Remaining-Day')}/{rl.get('X-RateLimit-Limit-Day')}"
    )

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", "60"))
        print(f"Rate limited. Retry after {retry_after}s.")
        return None

    resp.raise_for_status()
    return resp.json()
See the Rate Limit Headers reference for the full header schema and tier thresholds.

The Audit-Trail Value

For enterprise and regulated workloads — healthcare, financial services, legal, defense — the per-request log is not a nice-to-have. It’s the artifact your compliance team needs to answer post-hoc questions like:
  • “Show every agent invocation that touched patient X’s data between March 1 and March 15.”
  • “Reconstruct the chain of agent outputs that produced this trade recommendation.”
  • “Produce the model name, system prompt, and output for the decision made at 14:32 UTC.”
The /v1/swarm/logs endpoint is filtered to your API key and excludes client IP addresses for privacy, but otherwise retains the request shape, the model invoked, the response time, and the cost. Combined with deterministic agent configs (low temperature, pinned model names, fixed max_loops), it gives you a reproducible record per agent call — which is what most regulators actually want.
The platform’s existing log retention is suitable for debugging and operational analytics. For workloads with formal retention requirements (GxP, HIPAA, SOX, SR 11-7), export logs to your own storage on a daily cadence — the Swarm Logs examples show CSV/JSON/compressed export patterns.

Putting It Together: Daily Operator Cron

A pragmatic daily cron looks like this:
  1. 00:05 UTC — pull /v1/usage/report?period=day for yesterday; record total cost in your warehouse
  2. 00:10 UTC — pull /v1/swarm/logs; archive yesterday’s entries to S3 / your log lake; aggregate by swarm_type and model_name for finance
  3. 00:15 UTC — pull /v1/account/credits; alert if total_credits < daily_budget * 7
  4. Continuously — every production request logs its X-RateLimit-Remaining-Minute; alert if a rolling 5-minute average drops below 20% of X-RateLimit-Limit-Minute
That’s the full observability story — three scheduled pulls and one inline log line per request.

Next Steps