Skip to main content

What This Example Shows

  • A real production DAG: an SDR (sales development) enrichment pipeline with three parallel research branches that converge into synthesis and scoring nodes
  • How to express fan-out → fan-in graphs with edges, entry_points, and end_points
  • Why this beats stacking SequentialWorkflow + ConcurrentWorkflow (single round-trip, server-side concurrency, single billing event)
  • How to read per-node outputs from the response so you can persist branch outputs to your CRM
  • Patterns for retries and conditional paths via downstream gating
The Graph Workflow endpoint (POST /v1/graph-workflow/completions) is a premium-only feature available on Pro, Ultra, and Premium plans. Free tier keys receive a 403. Upgrade your account to run production DAGs on the Swarms API.

Why This Matters

Sales teams burn the first 20 minutes of every cold outreach reading three different tabs — LinkedIn for the buyer, Crunchbase for the company, and a news search for the trigger event. Then somebody distills it into a one-line pitch hook, somebody else scores the lead, and only then does the SDR send. That sequence is a DAG, not a chain: the three lookups are independent of each other, but they all block the synthesis step, which blocks the scoring step. A Graph Workflow lets you describe that DAG once and have the platform run the three lookups in parallel, fan them into the synthesis node, then feed scoring — in a single API call with one billing event. You stop maintaining glue code that orchestrates async tasks, and your SDR pipeline runs 3-5x faster than the sequential version.

Step 1: Setup

pip install requests python-dotenv
import json
import os

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json",
}

Step 2: Sketch the Graph Before You Code It

The graph for an SDR enrichment looks like this:
[BuyerResearcher]   ──┐
[CompanyResearcher] ──┼──> [PitchSynthesizer] ──> [LeadScorer]
[TriggerResearcher] ──┘
Three independent researchers (parallel) → synthesizer (waits for all three) → scorer (waits for synthesis). Three edges into the synthesizer, one edge out to the scorer.
Always draw the graph on paper before encoding it. The edges and entry_points/end_points fields are mechanical once the picture is right — and almost always wrong when they aren’t.

Step 3: Define the Agents

Each node is a specialist. Researchers are tuned low-temperature for factual recall; the synthesizer runs slightly warmer to write copy; the scorer is the coldest of all because we want a deterministic verdict.
agents = [
    {
        "agent_name": "BuyerResearcher",
        "description": "Researches the individual buyer and their role.",
        "system_prompt": (
            "You are a sales researcher specializing in buyer personas. Given a "
            "name, title, and company, produce: (1) role responsibilities, "
            "(2) likely KPIs, (3) recent public statements or content, "
            "(4) seniority and likely budget authority. Be concise."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 3000,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "CompanyResearcher",
        "description": "Researches the target company.",
        "system_prompt": (
            "You are a company research analyst. Given a company name, produce: "
            "(1) what they do in one sentence, (2) headcount and stage, "
            "(3) likely tech stack, (4) recent funding or M&A. Be concise."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 3000,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "TriggerResearcher",
        "description": "Finds a recent trigger event worth referencing.",
        "system_prompt": (
            "You are a trigger-event researcher. Given a company, find ONE "
            "recent (last 60 days) public event that creates a natural opening "
            "for outbound: a hire, a launch, a layoff, a funding round, a "
            "press release. Output a single trigger with date and source."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 2000,
        "temperature": 0.3,
        "max_loops": 1,
    },
    {
        "agent_name": "PitchSynthesizer",
        "description": "Synthesizes buyer + company + trigger into a 1-line hook.",
        "system_prompt": (
            "You are a senior SDR. Given buyer research, company research, and "
            "a trigger event, write a single outbound opener of at most 35 "
            "words that (a) references the trigger, (b) connects it to a KPI "
            "the buyer owns, (c) ends with a clear question. No fluff, no "
            "'hope you're doing well.'"
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 1500,
        "temperature": 0.5,
        "max_loops": 1,
    },
    {
        "agent_name": "LeadScorer",
        "description": "Scores the lead 1-100 with a confidence band.",
        "system_prompt": (
            "You are a revenue operations analyst. Given the buyer profile, "
            "the company profile, the trigger event, and the synthesized "
            "pitch, return a JSON object with fields: score (1-100), "
            "confidence ('low'|'medium'|'high'), tier ('A'|'B'|'C'), and "
            "one_line_rationale. Return ONLY the JSON object."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 800,
        "temperature": 0.1,
        "max_loops": 1,
    },
]

Step 4: Wire the Edges

Three fan-in edges, one straight edge. The entry_points are the three researchers (no upstream dependencies); the end_point is the scorer (no downstream dependencies).
edges = [
    {"source": "BuyerResearcher",   "target": "PitchSynthesizer"},
    {"source": "CompanyResearcher", "target": "PitchSynthesizer"},
    {"source": "TriggerResearcher", "target": "PitchSynthesizer"},
    {"source": "PitchSynthesizer",  "target": "LeadScorer"},
]

Step 5: Submit the Workflow

LEAD = {
    "buyer_name": "Jamie Chen",
    "buyer_title": "VP of Engineering",
    "company": "Northwind Analytics",
}

task = (
    f"Lead: {LEAD['buyer_name']}, {LEAD['buyer_title']} at {LEAD['company']}. "
    "Run the full enrichment pipeline: buyer research, company research, "
    "trigger event search, pitch synthesis, and final lead scoring."
)

workflow_input = {
    "name": "SDR-Enrichment-Pipeline",
    "description": (
        "Fan-out enrichment DAG. Three researchers run in parallel, converge "
        "into a pitch synthesizer, then feed a lead scorer."
    ),
    "agents": agents,
    "edges": edges,
    "entry_points": ["BuyerResearcher", "CompanyResearcher", "TriggerResearcher"],
    "end_points": ["LeadScorer"],
    "max_loops": 1,
    "task": task,
    "auto_compile": True,
    "verbose": False,
}

response = requests.post(
    f"{BASE_URL}/v1/graph-workflow/completions",
    headers=headers,
    json=workflow_input,
    timeout=300,
)
response.raise_for_status()
result = response.json()

Step 6: Read the Per-Node Output

Every node’s output is keyed by agent_name under outputs. Persist them individually — the buyer profile belongs on the contact record, the company profile on the account record, the pitch in the sequencer, the score in the CRM lead-scoring column.
outputs = result.get("outputs", {})

stages = [
    ("PARALLEL RESEARCH", ["BuyerResearcher", "CompanyResearcher", "TriggerResearcher"]),
    ("SYNTHESIS",         ["PitchSynthesizer"]),
    ("SCORING",           ["LeadScorer"]),
]

for stage_label, names in stages:
    print(f"\n{'=' * 60}\n{stage_label}\n{'=' * 60}")
    for name in names:
        out = outputs.get(name, "")
        if isinstance(out, list):
            out = "\n".join(str(x) for x in out)
        print(f"\n[{name}]\n{str(out)[:400]}")

# The scorer returned JSON — parse it
import re
score_raw = outputs.get("LeadScorer", "")
if isinstance(score_raw, list):
    score_raw = " ".join(str(x) for x in score_raw)

match = re.search(r"\{.*\}", str(score_raw), re.DOTALL)
if match:
    lead_score = json.loads(match.group(0))
    print(f"\nFinal score: {lead_score['score']} ({lead_score['tier']}-tier, "
          f"{lead_score['confidence']} confidence)")

usage = result.get("usage", {})
print(f"\nTotal cost: ${usage.get('token_cost', 0):.4f}")
print(f"Execution time: {result.get('execution_time', 0):.1f}s")

Why This Beats Sequential + Concurrent Stacking

You could build the same pipeline by calling ConcurrentWorkflow for the three researchers, manually stitching their outputs into a prompt, then calling SequentialWorkflow for synthesis → scoring. That works. It’s also worse:
ConcernSequential + Concurrent (DIY)Graph Workflow
API calls2 (concurrent, then sequential)1
Billing events2 separate cost records1 unified record
Glue code you maintain”wait for all 3, then build prompt, then call next workflow”None — declared in edges
Server-side parallelismYes for first hop onlyYes across the whole DAG
Wall clockSum of (slowest researcher + synth + scorer) plus 2 round-tripsSlowest researcher + synth + scorer, single round-trip
Failure handlingYou retry whichever hop failedServer-side retry on individual nodes
AuditabilityTwo job IDs in your logsOne job_id you can replay

Adding Conditional Paths and Retries

Two production patterns to layer on top: Conditional gating (early exit on cold leads). Add a Qualifier node before the heavy research and let the synthesizer’s prompt instruct it to output “SKIP” if the lead score’s company profile fails a hard filter (e.g., < 50 employees, US-only). Downstream nodes still run, but their prompts can be written to no-op when they see “SKIP” upstream. Retries on a flaky node. The graph compiler retries failed nodes automatically when auto_compile: true. For nodes that touch external services (search, scrapers, CRM enrichment), set max_loops: 2 on just that node — the upstream and downstream nodes will only see the successful run.
agents[2]["max_loops"] = 2  # TriggerResearcher gets one retry

Scaling the Pattern

Drop in any production DAG by swapping the node prompts:
PipelineEntry points (parallel)MiddleEnd point
SDR enrichmentBuyer / Company / TriggerPitch SynthLead Scorer
UnderwritingCredit / Income / CollateralRisk SynthApprover
Incident responseLogs / Metrics / AlertsRoot-Cause SynthRunbook Picker
CI/CD code reviewLint / Security / PerfReview SynthMerge Gate
Clinical triageHistory / Vitals / ImagingDifferential SynthDisposition
Everything else — request shape, response shape, billing, retries — stays identical.

Cost vs. Building It Yourself

Concrete numbers from a real SDR team enriching 1,000 leads per week:
ApproachTime per leadCost per leadEngineering owned
Manual SDR research (LinkedIn + Crunchbase + news)18-25 min~$22 in SDR timeNone
Custom async orchestration in-house (Lambda + Step Functions + LLM glue)~6-8 sec~$0.03 in API + ~$0.005 in infra2 engineers, ~$300k/yr to maintain
Graph Workflow endpoint~3-5 sec~$0.02-$0.04 in APINone — declared in JSON
Math: 1,000 leads/week at $22 manual vs. $0.03 on the endpoint is $22,000/week saved ($1.1M/year), before counting the engineering team you don’t need to staff for the DIY path. The pipeline goes live in a day instead of a quarter.
Per-lead cost will vary with model choice and token volume — gpt-4.1 on five short nodes is the cheap path; swap to anthropic/claude-opus-4-8 if your synthesizer needs deeper reasoning.

Next Steps