Skip to main content

What This Example Shows

  • A HierarchicalSwarm with a Research Director coordinating three specialist workers — Rating Tracker, Price Target Tracker, and Key Driver Extractor — running in parallel against a single PDF note
  • OpenAI-format function tools for PDF parsing, ratings/target regex extraction, consensus lookup, and Slack delivery
  • A mixed-provider agent team: Claude Sonnet 4.5 directing, GPT-4.1 / GPT-4.1-mini for surgical extraction, Gemini 2.5 Pro for long-document grounding
  • A consensus-check step that flags every new target as above, in line with, or below the street
  • End-of-day batch processing across 150+ notes via /v1/swarm/batch/completions, scheduled at 4:30pm ET, output is a ranked delta sheet
  • The structured signal payload your OMS, EMS, or research database actually wants — not free-form text
This pipeline processes a high volume of PDF documents per day. The batch endpoint and parallel function-tool execution sit on the Premium tier — Pro will rate-limit you once daily note volume crosses ~50. Read the Cost Optimization Playbook before turning the cron on, and upgrade at https://swarms.world/platform/account.

Why This Matters

Every PM in the firm wakes up to a ranked sheet of rating + target deltas — already cross-referenced against consensus — for the price of one Bloomberg lunch. A multi-strategy PM gets roughly 150 sell-side notes hitting their inbox between 5am and 9am ET on any given trading day. Reading every page is physically impossible and almost entirely a waste — 95% of any sell-side note is restated boilerplate and an unchanged thesis. The actionable content is in the deltas: a Goldman analyst going from Buy to Hold, JPM lifting their target by 18%, Morgan Stanley swapping in a new bear case on China data center demand. Those three things move books. This pipeline does nothing except find those deltas, score them against street consensus, and put a structured signal in front of the PM before the bell.

The Architecture

                Email inbox / S3 drop
                         |
                         v
                 +---------------+
                 |  PDF Parser   |  (parse_pdf_note tool)
                 +---------------+
                         |
                         v
   +-------------------------------------------------+
   |              HierarchicalSwarm                  |
   |                                                 |
   |        +-----------------------------+          |
   |        |   Research Director         |          |
   |        |   (claude-sonnet-4.5)       |          |
   |        +-------------+---------------+          |
   |                      |                          |
   |        +-------------+-------------+            |
   |        |             |             |            |
   |        v             v             v            |
   |  +-----------+ +-----------+ +-----------+      |
   |  | Rating    | | Price     | | Key       |      |
   |  | Tracker   | | Target    | | Driver    |      |
   |  | gpt-4.1   | | Tracker   | | Extractor |      |
   |  | -mini     | | gpt-4.1   | | gemini    |      |
   |  +-----------+ +-----------+ +-----------+      |
   +-------------------------------------------------+
                         |
                         v
                +-----------------+
                | Consensus Check |  (lookup_consensus_target)
                +-----------------+
                         |
                         v
                +-----------------+
                | Signal Payload  |  (structured JSON)
                +-----------------+
                         |
                +--------+--------+
                |                 |
                v                 v
          +----------+      +----------+
          |    DB    |      |  Slack   |
          +----------+      +----------+

Step 1: Setup

Install dependencies and configure credentials. The pipeline needs your Swarms API key plus either IMAP credentials for an inbox sweep or an S3 bucket where your prime broker drops PDFs.
pip install requests python-dotenv
export SWARMS_API_KEY="your-api-key-here"
export RESEARCH_INBOX_HOST="imap.your-firm.com"
export RESEARCH_INBOX_USER="research-inbox@your-firm.com"
export RESEARCH_INBOX_PASS="..."
export RESEARCH_S3_BUCKET="firm-sell-side-notes"
export RESEARCH_S3_PREFIX="2026/incoming/"
export CONSENSUS_API_URL="https://your-internal-consensus-service/v1"
export SLACK_RESEARCH_WEBHOOK="https://hooks.slack.com/services/..."
import json
import os

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

Step 2: Define the Function Tools

Every worker agent gets the tools it actually needs — nothing more. Tools are OpenAI-format function schemas; your runtime resolves the calls server-side or replays them locally after the swarm finishes, depending on your tool host.
PARSE_PDF_TOOL = {
    "type": "function",
    "function": {
        "name": "parse_pdf_note",
        "description": (
            "Download a sell-side research PDF (URL or base64) and return its "
            "extracted plaintext, page count, and detected broker."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "pdf_url_or_b64": {
                    "type": "string",
                    "description": "HTTPS URL to the PDF or a base64-encoded PDF payload.",
                },
            },
            "required": ["pdf_url_or_b64"],
        },
    },
}

EXTRACT_RATING_TOOL = {
    "type": "function",
    "function": {
        "name": "extract_rating",
        "description": (
            "Parse the broker rating (Buy / Overweight / Hold / Neutral / Sell / "
            "Underweight) and any change vs. the prior published rating."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "text": {"type": "string", "description": "Full note plaintext."},
            },
            "required": ["text"],
        },
    },
}

EXTRACT_TARGET_TOOL = {
    "type": "function",
    "function": {
        "name": "extract_price_target",
        "description": (
            "Pull the new 12-month price target and the prior target from the note. "
            "Return both numerics plus the percent change."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "text": {"type": "string", "description": "Full note plaintext."},
            },
            "required": ["text"],
        },
    },
}

EXTRACT_DRIVERS_TOOL = {
    "type": "function",
    "function": {
        "name": "extract_thesis_drivers",
        "description": (
            "Identify the 3-5 key thesis drivers the analyst leans on in this note. "
            "Each driver should be a short noun phrase grounded in a quoted line."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "text": {"type": "string", "description": "Full note plaintext."},
            },
            "required": ["text"],
        },
    },
}

LOOKUP_CONSENSUS_TOOL = {
    "type": "function",
    "function": {
        "name": "lookup_consensus_target",
        "description": (
            "Fetch the current street-mean 12-month price target for a ticker from "
            "the firm's internal consensus service."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "ticker": {"type": "string", "description": "Equity ticker, e.g. NVDA."},
            },
            "required": ["ticker"],
        },
    },
}

IS_ABOVE_CONSENSUS_TOOL = {
    "type": "function",
    "function": {
        "name": "is_above_consensus",
        "description": (
            "Compare a new broker target against street consensus and return a "
            "categorical flag: 'above' / 'in_line' / 'below' plus the percent gap."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "target": {"type": "number", "description": "Broker's new 12-month target."},
                "ticker": {"type": "string", "description": "Equity ticker."},
            },
            "required": ["target", "ticker"],
        },
    },
}

POST_SIGNAL_TOOL = {
    "type": "function",
    "function": {
        "name": "post_signal_to_slack",
        "description": (
            "Publish a structured signal payload to the #research-signals Slack "
            "channel. Used only after the Research Director has validated the JSON."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "payload": {
                    "type": "object",
                    "description": "The signal JSON object — see Step 6 for schema.",
                },
            },
            "required": ["payload"],
        },
    },
}

Step 3: Define the Four Agents

The Research Director is the only agent with the coordinator role — it owns synthesis, consensus reasoning, and the final signal payload. The three workers each get exactly the tools their job requires.
RESEARCH_DIRECTOR_PROMPT = (
    "You are the Director of Research at a multi-strategy hedge fund. "
    "Your three specialists hand you (1) the broker's new rating + any change, "
    "(2) the new and prior price targets with percent move, and (3) the key "
    "thesis drivers. Your job: synthesize these into a single signal payload. "
    "Always call lookup_consensus_target and is_above_consensus on the new target. "
    "Reject the signal if no rating, target, or drivers can be extracted with "
    "confidence — better to skip than to publish noise. Output the final signal "
    "as strict JSON matching the schema in your system context."
)

RATING_TRACKER_PROMPT = (
    "You track sell-side rating changes. Given the plaintext of one research note, "
    "call extract_rating exactly once. Return: broker, ticker, prior_rating, "
    "new_rating, and a 'changed' boolean. If the note is a reiteration with no "
    "rating change, say so explicitly — do not fabricate a change."
)

PRICE_TARGET_PROMPT = (
    "You track sell-side price target changes. Given the plaintext of one research "
    "note, call extract_price_target exactly once. Return: prior_target, new_target, "
    "currency, and percent_change. If only a new target is published with no prior, "
    "mark prior_target as null — do not guess."
)

KEY_DRIVER_PROMPT = (
    "You extract the substantive thesis drivers from a sell-side note. Call "
    "extract_thesis_drivers exactly once. Return 3-5 drivers, each as a short noun "
    "phrase with a one-sentence quote that grounds it in the note. Ignore "
    "boilerplate, disclosure text, and prior-period restatements."
)


def build_swarm_for_note(pdf_ref: str, ticker: str) -> dict:
    return {
        "name": f"Sell-Side Note Signal — {ticker}",
        "description": "Research Director coordinating Rating, Target, and Driver workers.",
        "swarm_type": "HierarchicalSwarm",
        "max_loops": 1,
        "task": (
            f"A sell-side research note on {ticker} just landed at {pdf_ref}. "
            f"Call parse_pdf_note first, then dispatch the three workers in parallel. "
            f"Synthesize their output into the structured signal payload. Cross-check "
            f"the new target against street consensus before emitting the signal."
        ),
        "agents": [
            {
                "agent_name": "Research Director",
                "description": "Director — synthesizes worker output into a signal.",
                "system_prompt": RESEARCH_DIRECTOR_PROMPT,
                "model_name": "anthropic/claude-sonnet-4-5",
                "role": "coordinator",
                "max_loops": 1,
                "max_tokens": 4096,
                "temperature": 0.1,
                "tools_dictionary": [
                    PARSE_PDF_TOOL,
                    LOOKUP_CONSENSUS_TOOL,
                    IS_ABOVE_CONSENSUS_TOOL,
                    POST_SIGNAL_TOOL,
                ],
            },
            {
                "agent_name": "Rating Tracker",
                "description": "Extracts rating + rating change.",
                "system_prompt": RATING_TRACKER_PROMPT,
                "model_name": "openai/gpt-4.1-mini",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 1024,
                "temperature": 0.0,
                "tools_dictionary": [EXTRACT_RATING_TOOL],
            },
            {
                "agent_name": "Price Target Tracker",
                "description": "Extracts new vs. prior price target.",
                "system_prompt": PRICE_TARGET_PROMPT,
                "model_name": "openai/gpt-4.1",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 1024,
                "temperature": 0.0,
                "tools_dictionary": [EXTRACT_TARGET_TOOL],
            },
            {
                "agent_name": "Key Driver Extractor",
                "description": "Pulls the 3-5 thesis drivers from the note body.",
                "system_prompt": KEY_DRIVER_PROMPT,
                "model_name": "gemini/gemini-2.5-pro",
                "role": "worker",
                "max_loops": 1,
                "max_tokens": 2048,
                "temperature": 0.2,
                "tools_dictionary": [EXTRACT_DRIVERS_TOOL],
            },
        ],
    }
The model mix is deliberate. Rating extraction is a small classification problem — gpt-4.1-mini is plenty and roughly 5x cheaper. Target extraction needs careful number handling — gpt-4.1 earns its keep. Driver extraction is the only step that reads the full multi-page document end to end, and Gemini 2.5 Pro is empirically the strongest on long-document grounding. The Director uses Claude Sonnet 4.5 because synthesis + tool-call orchestration is what it was built for.

Step 4: Process One Note End-to-End

Start with a single note. This is the loop you scale.
def run_single_note(pdf_ref: str, ticker: str) -> dict:
    payload = build_swarm_for_note(pdf_ref, ticker)
    response = requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        timeout=300,
    )
    response.raise_for_status()
    return response.json()


result = run_single_note(
    pdf_ref="s3://firm-sell-side-notes/2026/incoming/gs_nvda_2026_05_28.pdf",
    ticker="NVDA",
)

for output in result.get("output", []):
    print("=" * 60)
    print(output["role"])
    print("=" * 60)
    content = output["content"]
    if isinstance(content, list):
        content = " ".join(str(c) for c in content)
    print(str(content)[:600])

print(f"\nTotal cost: ${result['usage']['billing_info']['total_cost']:.4f}")
print(f"Execution time: {result['execution_time']:.1f}s")
The Research Director’s last message is the signal payload you persist. The three worker outputs are the audit trail — every signal is fully reproducible from the source PDF plus the worker briefs.

Step 5: End-of-Day Batch Across All Notes

The real value shows up when you sweep the entire inbox in one shot. Build a payload list keyed by the day’s PDF drops and hand the whole thing to /v1/swarm/batch/completions.
def sweep_inbox_for_today() -> list[tuple[str, str]]:
    """Return [(pdf_ref, ticker), ...] for every note that arrived today."""
    # Replace with your IMAP/S3 implementation. Each entry is a tuple of
    # (s3_uri_or_url, primary_ticker_detected_from_subject_or_filename).
    return [
        ("s3://firm-sell-side-notes/2026/incoming/gs_nvda_2026_05_28.pdf", "NVDA"),
        ("s3://firm-sell-side-notes/2026/incoming/jpm_amd_2026_05_28.pdf", "AMD"),
        ("s3://firm-sell-side-notes/2026/incoming/ms_avgo_2026_05_28.pdf", "AVGO"),
        # ... ~150 of these per day on a real desk
    ]


def run_eod_batch() -> list[dict]:
    notes = sweep_inbox_for_today()
    payload = [build_swarm_for_note(pdf_ref, ticker) for pdf_ref, ticker in notes]

    response = requests.post(
        f"{BASE_URL}/v1/swarm/batch/completions",
        headers=headers,
        json=payload,
        timeout=1800,
    )
    response.raise_for_status()
    return response.json()


results = run_eod_batch()

signals: list[dict] = []
for r in results:
    director_msg = next(
        (o for o in r.get("output", []) if "Research Director" in o.get("role", "")),
        None,
    )
    if not director_msg:
        continue
    content = director_msg["content"]
    if isinstance(content, list):
        content = " ".join(str(c) for c in content)
    try:
        signals.append(json.loads(str(content)))
    except json.JSONDecodeError:
        continue

# Rank by conviction, then by absolute target move vs. consensus
signals.sort(
    key=lambda s: (
        {"HIGH": 3, "MEDIUM": 2, "LOW": 1}.get(s.get("conviction", "LOW"), 0),
        abs(s.get("vs_consensus", {}).get("pct_gap", 0)),
    ),
    reverse=True,
)

with open("eod_signals.jsonl", "w") as f:
    for sig in signals:
        f.write(json.dumps(sig) + "\n")

total_cost = sum(
    r.get("usage", {}).get("billing_info", {}).get("total_cost", 0) for r in results
)
print(f"Processed {len(results)} notes — emitted {len(signals)} signals for ${total_cost:.2f}")
Schedule this script as a cron job for 4:30pm ET on weekdays. The post-close window means every note that hit the inbox during regular trading hours is captured, the consensus service has finished its end-of-day refresh, and the PM sees the ranked delta sheet on Slack before they leave the desk — not in their pre-market inbox the next morning.

Step 6: The Output Schema

The Research Director is constrained to emit signals in this exact shape. This is the contract your OMS, EMS, research database, and PM Slack channel all consume.
{
  "ticker": "NVDA",
  "broker": "Goldman Sachs",
  "analyst": "Toshiya Hari",
  "note_id": "gs_nvda_2026_05_28",
  "published_at": "2026-05-28T06:42:00Z",
  "prior_rating": "Buy",
  "new_rating": "Buy",
  "rating_changed": false,
  "prior_target": 165.0,
  "new_target": 195.0,
  "target_pct_change": 18.18,
  "currency": "USD",
  "vs_consensus": {
    "consensus_target": 178.45,
    "flag": "above",
    "pct_gap": 9.28
  },
  "drivers": [
    {
      "label": "Blackwell ramp ahead of plan",
      "quote": "Channel checks indicate Blackwell production is tracking 12-15% above the prior Street model into Q3."
    },
    {
      "label": "Sovereign AI pipeline conversion",
      "quote": "Three new sovereign customers in EMEA moved from MoU to firm orders this quarter."
    },
    {
      "label": "Networking attach rate expansion",
      "quote": "Spectrum-X attach rates inside HGX clusters now exceed 70% versus our prior 55% assumption."
    }
  ],
  "conviction": "HIGH",
  "audit_trail": {
    "rating_tracker_output": "...",
    "price_target_tracker_output": "...",
    "key_driver_extractor_output": "..."
  }
}
The audit_trail block is what makes this defensible in a compliance review — every field in the signal traces back to a specific worker output, which traces back to a specific line in the PDF.

Real Cost vs. Junior Analyst Reading Notes

ScenarioPer notePer trading day (150 notes)Annualized (~250 days)
HierarchicalSwarm pipeline (mixed-provider, batched)~$0.25~$38~$9,500
Junior analyst (fully loaded $150k) reading notesMaxes out at ~20 notes/day$150,000 — and they skip dinner
Two-analyst rotation just for note triage~40 notes/day$300,000 — and they still miss 110
The swarm is not reading notes for fun — it is producing a structured, ranked signal sheet that ties every delta back to the underlying quote. Your humans stop being PDF janitors and start spending their time on the names where the signal is actually interesting.

Next Steps