Skip to main content

What This Example Shows

  • The enterprise pattern: kick off a long batch of independent multi-agent swarms at end-of-day, have results waiting at 7am
  • A real due-diligence workload across 50 portfolio companies
  • How to schedule with cron, persist results, and recover from partial failures
  • Concrete cost framing versus a human research desk
  • Why this is materially different from running each swarm one at a time
Premium-only endpoint. /v1/swarm/batch/completions is restricted to Pro, Ultra, and Premium subscribers. Free-tier keys will get a 403. Upgrade your account to unlock enterprise batch swarm execution.

Why This Matters

A single multi-agent swarm — director plus three specialists — does the work of an analyst team over a 30-minute call. But the real enterprise pattern isn’t one swarm. It’s fifty swarms running while everyone sleeps: one per portfolio company, one per RFP response, one per region for a market scan, one per candidate in a final-round panel. Schedule a batch swarm job at 8pm, have a folder of completed due-diligence memos waiting at 7am. This tutorial shows how to build that job.

Step 1: Setup

pip install requests python-dotenv
export SWARMS_API_KEY="your-api-key-here"

Step 2: Define a Reusable Due-Diligence Swarm Template

Every swarm in the batch is a full SwarmSpec. We define one template (a four-agent HierarchicalSwarm) and stamp it out per company.
import json
import os
from datetime import datetime
from pathlib import Path

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}


def due_diligence_swarm(company: str, ticker: str) -> dict:
    return {
        "name": f"Due Diligence - {company}",
        "description": f"Hierarchical due diligence swarm for {company} ({ticker}).",
        "swarm_type": "HierarchicalSwarm",
        "max_loops": 1,
        "task": (
            f"Produce a board-ready due diligence memo on {company} (ticker: {ticker}). "
            "Cover: business model, competitive moat, financial health, key risks, "
            "and a one-paragraph recommendation. Maximum 800 words."
        ),
        "agents": [
            {
                "agent_name": "DD Director",
                "description": "Coordinates specialists and writes the final memo.",
                "system_prompt": (
                    "You are a senior due diligence director at a tier-1 investment firm. "
                    "Synthesize specialist analyses into a board-ready memo with a clear "
                    "recommendation (Invest / Pass / Watchlist)."
                ),
                "model_name": "gpt-4.1",
                "role": "coordinator",
                "max_loops": 1,
                "max_tokens": 4096,
                "temperature": 0.3,
            },
            {
                "agent_name": "Financial Analyst",
                "description": "Analyzes revenue, margins, balance sheet, and cash flow.",
                "system_prompt": (
                    "You are a sell-side financial analyst. Evaluate revenue growth, "
                    "gross and operating margins, cash position, and debt load. Cite the "
                    "specific metrics you would underwrite the investment against."
                ),
                "model_name": "gpt-4.1",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 2048,
                "temperature": 0.3,
            },
            {
                "agent_name": "Market Analyst",
                "description": "Sizes the market and assesses competitive position.",
                "system_prompt": (
                    "You are a market research analyst. Estimate TAM/SAM, name the top "
                    "three competitors, and call out the structural moat (or lack of one)."
                ),
                "model_name": "gpt-4.1",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 2048,
                "temperature": 0.4,
            },
            {
                "agent_name": "Risk Analyst",
                "description": "Identifies regulatory, operational, and execution risks.",
                "system_prompt": (
                    "You are a risk officer. List the top 5 risks (regulatory, operational, "
                    "execution, macro, key-person). For each, give a likelihood and a "
                    "mitigation."
                ),
                "model_name": "gpt-4.1",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 2048,
                "temperature": 0.3,
            },
        ],
    }

Step 3: Build the 50-Company Batch

In a real workload, the watchlist comes from a portfolio database. Here we hard-code 50 tickers.
WATCHLIST = [
    ("NVIDIA", "NVDA"),
    ("Microsoft", "MSFT"),
    ("Apple", "AAPL"),
    ("Alphabet", "GOOGL"),
    ("Meta Platforms", "META"),
    ("Amazon", "AMZN"),
    ("Tesla", "TSLA"),
    ("Broadcom", "AVGO"),
    ("AMD", "AMD"),
    ("Salesforce", "CRM"),
    # ... fill out to 50 entries from your portfolio system
]

batch_payload = [due_diligence_swarm(name, ticker) for name, ticker in WATCHLIST]
print(f"Prepared batch of {len(batch_payload)} due-diligence swarms.")

Step 4: Submit the Overnight Job

One POST. The server parallelizes the swarms behind the gateway.
def run_overnight_batch(payload: list[dict]) -> list[dict]:
    response = requests.post(
        f"{BASE_URL}/v1/swarm/batch/completions",
        headers=headers,
        json=payload,
        timeout=3600,  # one hour; large overnight runs may need more
    )
    response.raise_for_status()
    return response.json()


results = run_overnight_batch(batch_payload)
The endpoint returns one entry per swarm in the same order as the request, each with its own job_id, status, usage, and output. Persist the array to disk before you start parsing — partial network failures are easier to recover from when you have the raw response saved.

Step 5: Persist Per-Memo Files

Drop one Markdown file per company into a dated folder so your team finds the memos waiting in the morning.
def persist_results(results: list[dict], watchlist: list[tuple[str, str]]) -> Path:
    out_dir = Path("dd_runs") / datetime.utcnow().strftime("%Y-%m-%d")
    out_dir.mkdir(parents=True, exist_ok=True)

    total_cost = 0.0
    for (name, ticker), swarm_result in zip(watchlist, results):
        status = swarm_result.get("status", "unknown")
        usage = swarm_result.get("usage", {}) or {}
        cost = usage.get("token_cost") or usage.get("billing_info", {}).get("total_cost", 0.0)
        total_cost += float(cost or 0.0)

        memo_path = out_dir / f"{ticker}_{name.replace(' ', '_')}.md"
        output = swarm_result.get("output") or swarm_result.get("outputs") or {}
        memo_path.write_text(
            f"# {name} ({ticker})\n\n"
            f"Status: {status}\n\n"
            f"Cost: ${cost}\n\n"
            f"---\n\n"
            f"{json.dumps(output, indent=2)}\n"
        )

    (out_dir / "_summary.json").write_text(
        json.dumps({"total_cost": total_cost, "count": len(results)}, indent=2)
    )
    return out_dir


folder = persist_results(results, WATCHLIST)
print(f"Memos written to: {folder}")

Step 6: Schedule the Overnight Run

The simplest production deploy is a cron job on a small VM or a scheduled GitHub Action. Save the script above as nightly_dd.py and add to crontab:
# Run at 8pm every weekday
0 20 * * 1-5 /usr/bin/python3 /opt/dd/nightly_dd.py >> /var/log/dd.log 2>&1
For zero-infrastructure scheduling, use a serverless cron platform (GitHub Actions schedule: cron, AWS EventBridge, Vercel Cron). The job itself is a single Python script — anywhere that can run Python on a schedule will work.

Recovering From Partial Failures

Each entry in the response carries its own status. Retry only the failed ones — never resubmit the full batch.
def retryable(swarm_result: dict) -> bool:
    return swarm_result.get("status") not in {"completed", "success"}


failed = [
    swarm
    for swarm, result in zip(batch_payload, results)
    if retryable(result)
]

if failed:
    print(f"Retrying {len(failed)} failed swarms...")
    retry_results = run_overnight_batch(failed)
    # merge retry_results back into results by job_id / index

The Cost Math

Pricing varies by model and current token rates — these numbers are illustrative.
ApproachWall timeDirect costBurdened cost
Junior analyst team writing 50 DD memos~250 hours (5 hrs per memo)$25,000 at $100/hr$40,000+ with overhead
One swarm at a time, sequentially~5-8 hours~$75A workday of analyst babysitting
Batch swarm overnight, one POST~30-60 minutes server-side~$75$75 + you were asleep
This overnight batch costs roughly $50-$100 to run for 50 four-agent due-diligence swarms. A junior analyst team would take ~250 hours at $100/hour to produce the same fifty memos — about $25,000 in direct labor. Run it nightly across your watchlist and you have a permanent overnight research desk for the cost of a single analyst’s coffee budget.

Adapting the Pattern

The same overnight-batch shape applies to any “N independent multi-agent jobs” workload:
WorkloadOne swarm per…Agents inside each swarm
M&A pipeline reviewtarget companyDD director + financial + legal + technical
RFP response factoryopen RFPproposal lead + writer + pricing + compliance
Clinical literature scanindication or moleculesenior MD + biostatistician + safety reviewer
Marketing market scangeography or personabrand lead + copywriter + designer + performance
Vendor security reviewvendorCISO + AppSec engineer + privacy counsel
Only the swarm template and the input list change. The batch envelope, scheduling, persistence, and retry pattern stay identical.

Next Steps