Skip to main content

What This Example Shows

  • A GraphWorkflow modeling the analyst workflow used inside intelligence shops: ingest, parallel analysis, red team, synthesis
  • Three parallel domain analysts (Geopolitical, Cyber, Open-Source HUMINT) running concurrently against the same packet
  • A dedicated Red Team agent whose only job is to attack the analysts’ conclusions before they reach the brief
  • A Synthesis Editor that produces a structured PIR-style executive brief tagged with confidence levels (HIGH / MODERATE / LOW)
  • How to scale the same pipeline overnight across a 200-report inbox using /v1/swarm/batch/completions
Premium tier required. GraphWorkflow and /v1/swarm/batch/completions are available on Pro, Ultra, and Premium plans. Upgrade or manage your subscription at https://swarms.world/platform/account.
Unclassified use only. The Swarms API runs on commercial cloud infrastructure. It is suitable for OSINT, PAI, and CUI-appropriate workflows where your data classification policy permits commercial LLM processing. It is not an accredited environment for classified information. Do not pass classified, SCI, or export-controlled material through this API.

Why This Matters

Every morning, analysts in defense, intelligence, and corporate-security shops face the same job: triage an overnight firehose of open-source reporting — news wires, regional press, vendor threat feeds, social signal, technical indicators — and turn it into a single brief that a principal will read in under five minutes. Most of the day is spent reading, not reasoning. This pipeline does the reading in parallel, runs a red team against itself, and hands the analyst a draft brief with confidence levels they can edit and sign — turning a six-hour triage cycle into a fifteen-minute review.

Step 1: Setup

pip install requests python-dotenv
export SWARMS_API_KEY="your-api-key-here"
import json
import os

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

Step 2: Define the Pipeline

The graph models how a real watch floor works. The intake agent normalizes the raw packet. Three analysts attack it in parallel from different tradecraft angles. A red team agent challenges them. A synthesis editor produces the final brief.
[IntakeNormalizer] ──┬──> [GeopoliticalAnalyst] ──┐
                     ├──> [CyberAnalyst]          ├──> [RedTeam] ──> [SynthesisEditor]
                     └──> [OSINTAnalyst]          ┘
def build_triage_workflow(packet: str) -> dict:
    return {
        "name": "Intelligence-Triage-Pipeline",
        "description": (
            "Four-stage triage: normalize packet, three parallel analysts, "
            "red team challenge, executive brief with confidence levels."
        ),
        "task": packet,
        "agents": [
            {
                "agent_name": "IntakeNormalizer",
                "description": "Normalizes raw multi-source reporting into a structured packet.",
                "system_prompt": (
                    "You are an intelligence intake officer. Read the raw reporting packet "
                    "and produce a normalized brief with: (1) source list with reliability "
                    "rating A-F and credibility 1-6 (Admiralty Code), (2) extracted entities "
                    "(persons, organizations, locations, capabilities), (3) timeline of events "
                    "in ISO-8601, (4) initial topic tags (e.g., GEO-EAP, CYBER-APT, ECON-SANC). "
                    "Be terse. No analytic conclusions."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 4000,
                "temperature": 0.2,
                "max_loops": 1,
            },
            {
                "agent_name": "GeopoliticalAnalyst",
                "description": "Regional and political analysis.",
                "system_prompt": (
                    "You are a geopolitical analyst. From the normalized packet, identify "
                    "state-actor intent, regional power dynamics, alliance signals, and "
                    "near-term escalation indicators. Tie each judgment to a specific source "
                    "from the packet. Use ICD 203 estimative language ('almost certainly', "
                    "'likely', 'roughly even chance', 'unlikely'). Avoid speculation past "
                    "what the packet supports."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 3500,
                "temperature": 0.3,
                "max_loops": 1,
            },
            {
                "agent_name": "CyberAnalyst",
                "description": "Technical indicators and threat-actor analysis.",
                "system_prompt": (
                    "You are a cyber threat intelligence analyst. From the packet, extract "
                    "TTPs (map to MITRE ATT&CK where supported), IOCs, suspected actor "
                    "clusters, and targeting patterns. Distinguish observed activity from "
                    "inferred attribution. Flag any attribution as confidence LOW unless "
                    "the packet provides corroborating technical evidence from two or more "
                    "independent sources."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 3500,
                "temperature": 0.2,
                "max_loops": 1,
            },
            {
                "agent_name": "OSINTAnalyst",
                "description": "Open-source HUMINT-style pattern analysis from public reporting.",
                "system_prompt": (
                    "You are an open-source analyst. From the packet, identify behavioral "
                    "patterns, network associations, public statements, and corroborating "
                    "ground signal (imagery cues, social posts, local press). Distinguish "
                    "first-hand reporting from aggregator restatements. Surface any single-"
                    "source claims that the other analysts may treat as confirmed."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 3500,
                "temperature": 0.3,
                "max_loops": 1,
            },
            {
                "agent_name": "RedTeam",
                "description": "Adversarial review of the three analytic lines.",
                "system_prompt": (
                    "You are a red team analyst. You have all three analytic lines in front "
                    "of you. Your job is to attack them. For each major judgment: identify "
                    "(1) the strongest competing hypothesis the analysts did not consider, "
                    "(2) any source the analysts over-weighted relative to its Admiralty "
                    "rating, (3) any deception or denial scenario consistent with the same "
                    "evidence. Do not produce a final answer — produce a list of specific "
                    "challenges the synthesis editor must address."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 4000,
                "temperature": 0.4,
                "max_loops": 1,
            },
            {
                "agent_name": "SynthesisEditor",
                "description": "Produces the final executive brief with confidence levels.",
                "system_prompt": (
                    "You are the senior editor producing the morning brief. Synthesize the "
                    "three analytic lines and explicitly address each red team challenge "
                    "before reaching your judgments. Output format:\n\n"
                    "TITLE: <one-line topic>\n"
                    "TAGS: <region codes, topic codes>\n"
                    "BOTTOM LINE (2-3 sentences):\n"
                    "KEY JUDGMENTS (3-5 bullets, each tagged HIGH/MODERATE/LOW confidence):\n"
                    "OUTLOOK (next 30 days):\n"
                    "INTELLIGENCE GAPS (what would change the assessment):\n"
                    "SOURCING NOTE (which analytic line(s) drove each judgment).\n\n"
                    "Confidence levels follow ICD 203. Do not exceed 400 words total."
                ),
                "model_name": "gpt-4.1",
                "max_tokens": 4000,
                "temperature": 0.3,
                "max_loops": 1,
            },
        ],
        "edges": [
            {"source": "IntakeNormalizer", "target": "GeopoliticalAnalyst"},
            {"source": "IntakeNormalizer", "target": "CyberAnalyst"},
            {"source": "IntakeNormalizer", "target": "OSINTAnalyst"},
            {"source": "GeopoliticalAnalyst", "target": "RedTeam"},
            {"source": "CyberAnalyst", "target": "RedTeam"},
            {"source": "OSINTAnalyst", "target": "RedTeam"},
            {"source": "RedTeam", "target": "SynthesisEditor"},
        ],
        "entry_points": ["IntakeNormalizer"],
        "end_points": ["SynthesisEditor"],
        "max_loops": 1,
    }

Step 3: Run a Single Report

A realistic input packet is a concatenation of normalized headlines, vendor advisories, and social signal pulled from your collection tooling. For this tutorial we use a synthetic packet so the example is runnable end-to-end.
packet = """
RAW REPORTING PACKET — 2026-05-27 0500Z

[1] Reuters (A2): Two regional carriers in Country X grounded after reported
    GPS spoofing along the eastern corridor. No casualties. 2026-05-26 1840Z.
[2] Local press (C3, translated): Defense ministry spokesperson denies any
    related exercise. 2026-05-26 2110Z.
[3] Vendor advisory (B2): Threat group cluster TA-Kestrel observed deploying
    new ICS-focused implant against aviation telemetry vendors in adjacent
    region. TTPs include T1190, T1133. 2026-05-25.
[4] Social signal (D4): Multiple accounts in-region posting imagery of
    mobile EW vehicles near border. Geolocation pending.
[5] Wire (A1): Foreign minister of Country Y issues statement reiterating
    "all options on the table" re: airspace incidents. 2026-05-27 0200Z.
"""

payload = build_triage_workflow(packet)

response = requests.post(
    f"{BASE_URL}/v1/graph-workflow/completions",
    headers=headers,
    json=payload,
    timeout=300,
)

result = response.json()

print(f"Workflow: {result['name']}")
print(f"Status: {result['status']}\n")

stages = [
    ("STAGE 1 — Intake", ["IntakeNormalizer"]),
    ("STAGE 2 — Parallel Analysis", ["GeopoliticalAnalyst", "CyberAnalyst", "OSINTAnalyst"]),
    ("STAGE 3 — Red Team", ["RedTeam"]),
    ("STAGE 4 — Executive Brief", ["SynthesisEditor"]),
]

for label, names in stages:
    print(f"\n{'=' * 60}\n{label}\n{'=' * 60}")
    for name in names:
        if name in result.get("outputs", {}):
            output = result["outputs"][name]
            if isinstance(output, list):
                output = " ".join(str(item) for item in output)
            print(f"\n[{name}]\n{str(output)[:500]}...")

usage = result.get("usage", {})
billing = usage.get("billing_info", {})
print(f"\nTotal cost: ${billing.get('total_cost', usage.get('total_cost', 0)):.4f}")
print(f"Execution time: {result.get('execution_time', 'n/a')}s")
The Red Team agent never produces a final judgment — it produces a list of challenges that the Synthesis Editor must explicitly address. This is the analytic discipline IC analysts call “structured analytic techniques”: you cannot get to the brief without first surviving your strongest critic.

Step 4: Run the Full Overnight Inbox via Batch

A real watch floor does not run one packet at a time. It runs the whole inbox between midnight and 0500Z so the brief is on desks at 0700. Use /v1/swarm/batch/completions to fan out the same pipeline across every packet in parallel.
def build_batch_payload(packets: list[str]) -> list[dict]:
    return [build_triage_workflow(p) for p in packets]

# Imagine `inbox` is 200 packets pulled from your collection pipeline.
inbox = [packet]  # replace with your real list

batch_payload = build_batch_payload(inbox)

batch_response = requests.post(
    f"{BASE_URL}/v1/swarm/batch/completions",
    headers=headers,
    json=batch_payload,
    timeout=1800,
)

batch_result = batch_response.json()
print(f"Briefs produced: {len(batch_result)}")
The batch endpoint accepts a list of full swarm specifications and runs them in parallel server-side. For 200 packets this typically completes in well under an hour — overnight is more than enough buffer for the morning brief cycle.

Step 5: Audit Trail and Cost Reconciliation

Procurement and IG reviews almost always ask the same two questions: who ran what, and what did it cost. Two endpoints answer both.
# Full request history with timestamps, endpoints, and execution metadata
logs = requests.get(f"{BASE_URL}/v1/swarm/logs", headers=headers, timeout=60).json()

# Aggregated usage and cost for billing reconciliation
usage = requests.get(f"{BASE_URL}/v1/usage", headers=headers, timeout=60).json()
Every swarm run is logged with its inputs, agent outputs, token counts, and the model used per agent. This is the same trail your auditor will ask for during a contract review.

Cost vs. a Human Triage Cycle

Real numbers. A typical analyst (fully-loaded GS-13 or contractor equivalent at roughly $120/hour) spends six hours triaging an overnight inbox of 200 open-source reports — about $720 of labor per morning, per analyst, and most shops staff two to three on the cycle. Running this pipeline across the same 200-report inbox via /v1/swarm/batch/completions typically lands in the single-digit dollars range total, and produces a draft brief the analyst signs in fifteen minutes instead of writing from scratch. The analyst is still in the loop — but the loop is review-and-sign, not read-and-write.

Next Steps