Skip to main content

What This Example Shows

  • A real production use case: a content moderation graph where edges carry severity and priority tags
  • The shape of edge metadata in the Graph Workflow request payload
  • How to encode severity (low/medium/high/critical), routing target, and audit labels at the edge level
  • How the metadata flows through the response so downstream systems can read it
  • Concrete ops patterns — log filters, SLO grouping, audit queries — that the metadata enables
The Graph Workflow endpoint (POST /v1/graph-workflow/completions) is a premium-only feature available on Pro, Ultra, and Premium plans. Free-tier keys receive a 403. Upgrade your account to run production DAGs.

Why This Matters

Content moderation is a routing problem masquerading as a classification problem. A post comes in, a classifier labels it, and the label has to get to the right place — critical to on-call humans inside an SLA, medium to a review queue, low to passive logging — each with different retry policies, audit requirements, and downstream tooling. When this routing logic lives inside agent prompts, it’s invisible to ops; when it lives in glue code, it’s not auditable. Edge metadata moves the routing semantics to where they belong: the edges of the DAG. Once ops can read severity: "critical" straight off the edge, dashboards group correctly, audit queries become one-liners, and the agents stay focused on the classification job. This tutorial shows the pattern in a content moderation DAG and then shows how downstream systems consume the metadata that comes back in the response.

Step 1: Setup

pip install requests python-dotenv
import json
import os

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json",
}

Step 2: Sketch the Moderation DAG

A single classifier fans out to three downstream routes, each handling a different severity tier. Each edge carries the routing semantics ops cares about.
                              ┌── severity:critical ──> [HumanEscalation]
[ContentClassifier] ──────────┼── severity:medium ────> [ReviewQueueRouter]
                              └── severity:low ───────> [PassiveLogger]
Three edges, three different metadata blocks. The classifier itself is metadata-agnostic — it just labels and writes; the metadata on the edges tells the platform and downstream tools how each label should be handled.

Step 3: Define the Agents

agents = [
    {
        "agent_name": "ContentClassifier",
        "description": "Classifies user-generated content by moderation severity.",
        "system_prompt": (
            "You are a content moderation classifier. Read the user-generated "
            "content and emit a JSON object with these fields:\n"
            "  severity: 'critical' | 'high' | 'medium' | 'low'\n"
            "  categories: array of {hate, harassment, self_harm, csam, spam, "
            "spam_link, sexual_minor, violence, none}\n"
            "  confidence: float between 0 and 1\n"
            "  rationale: one sentence explaining the classification\n"
            "Return ONLY the JSON object."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 2000,
        "temperature": 0.1,
        "max_loops": 1,
    },
    {
        "agent_name": "HumanEscalation",
        "description": "Drafts an escalation packet for an on-call human reviewer.",
        "system_prompt": (
            "You receive a content moderation classification with a 'critical' "
            "severity label. Draft a 4-bullet escalation packet for an on-call "
            "human reviewer: (1) the content fragment, (2) the rationale, "
            "(3) the category, (4) recommended immediate action. Be terse — "
            "this lands on a pager."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 1500,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "ReviewQueueRouter",
        "description": "Formats a medium-severity item for the asynchronous review queue.",
        "system_prompt": (
            "You receive a moderation classification with 'medium' or 'high' "
            "severity. Produce a JSON object suitable for the review queue: "
            "{queue: 'standard'|'priority', sla_hours: int, tags: [..], "
            "summary: string}. Return ONLY the JSON object."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 1500,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "PassiveLogger",
        "description": "Emits a compact analytics record for low-severity content.",
        "system_prompt": (
            "You receive a 'low' severity classification. Produce a single-line "
            "JSON record with: {ts_unix, category, rationale_short}. No prose, "
            "JSON only."
        ),
        "model_name": "gpt-4.1",
        "max_tokens": 600,
        "temperature": 0.1,
        "max_loops": 1,
    },
]

Step 4: Attach Severity and Routing Metadata to Edges

This is the load-bearing step. Each edge from the classifier into a downstream route gets a metadata block that encodes severity, priority, target queue, SLA budget, and audit label. The keys you pick become the keys your dashboards and audit scripts filter on, so standardize them across your team.
edges = [
    {
        "source": "ContentClassifier",
        "target": "HumanEscalation",
        "metadata": {
            "severity": "critical",
            "priority": "p0",
            "route": "on_call_human",
            "sla_minutes": 15,
            "retry_on_failure": True,
            "audit_tag": "trust_and_safety",
            "cost_center": "ts_critical",
        },
    },
    {
        "source": "ContentClassifier",
        "target": "ReviewQueueRouter",
        "metadata": {
            "severity": "medium",
            "priority": "p2",
            "route": "review_queue",
            "sla_hours": 24,
            "retry_on_failure": True,
            "audit_tag": "trust_and_safety",
            "cost_center": "ts_standard",
        },
    },
    {
        "source": "ContentClassifier",
        "target": "PassiveLogger",
        "metadata": {
            "severity": "low",
            "priority": "p4",
            "route": "analytics_sink",
            "sla_hours": 168,
            "retry_on_failure": False,
            "audit_tag": "analytics",
            "cost_center": "ts_passive",
        },
    },
]
Three edges from the same source, each with different metadata, is the canonical pattern for moderation routing. The classifier doesn’t need to know about severity routing — every downstream node receives its label and the platform knows which edge it crossed and what tags that edge carried.
The standard metadata fields production moderation teams converge on:
FieldTypeWhat it’s for
severity"critical" | "high" | "medium" | "low"The primary classification label, mirrored to the edge for filtering
priority"p0" | "p1" | "p2" | ...SLO grouping in dashboards
routestringLogical destination — on_call_human, review_queue, analytics_sink
sla_minutes / sla_hoursintegerPer-edge SLA budget surfaced in alerting
retry_on_failurebooleanOperational signal that this edge crosses an external boundary
audit_tagstringRegulatory/compliance label — "trust_and_safety", "pii", "sox"
cost_centerstringBilling attribution for chargeback across teams

Step 5: Submit the Workflow

workflow_input = {
    "name": "Content-Moderation-Routing",
    "description": (
        "Classifier fans out to three severity-routed handlers, with edge "
        "metadata carrying routing, SLA, and audit context."
    ),
    "agents": agents,
    "edges": edges,
    "entry_points": ["ContentClassifier"],
    "end_points": ["HumanEscalation", "ReviewQueueRouter", "PassiveLogger"],
    "max_loops": 1,
    "task": (
        "Classify the following user-generated content and route it to the "
        "appropriate downstream handler: 'Hey @user just shared the address "
        "of that scammer — link in bio.'"
    ),
    "auto_compile": True,
    "verbose": False,
}

response = requests.post(
    f"{BASE_URL}/v1/graph-workflow/completions",
    headers=headers,
    json=workflow_input,
    timeout=300,
)
response.raise_for_status()
result = response.json()

Step 6: How the Metadata Flows Through the Response

Edge metadata is preserved with the compiled graph, surfaced in the workflow’s audit log, and made available to downstream systems in two places: the per-agent outputs (so a downstream consumer can correlate a result back to the edge that fed it) and the usage telemetry (so cost can be attributed to the edge’s cost_center). A downstream system reading the response typically does this:
# 1. Persist the per-edge metadata alongside each downstream node's output
#    so your moderation pipeline can later answer "which edge produced this
#    escalation packet?" without re-running the graph.
edge_index = {(e["source"], e["target"]): e.get("metadata", {}) for e in edges}

outputs = result.get("outputs", {})

for downstream in ["HumanEscalation", "ReviewQueueRouter", "PassiveLogger"]:
    out = outputs.get(downstream)
    if not out:
        continue
    meta = edge_index.get(("ContentClassifier", downstream), {})
    record = {
        "node": downstream,
        "severity": meta.get("severity"),
        "priority": meta.get("priority"),
        "route": meta.get("route"),
        "sla_minutes": meta.get("sla_minutes"),
        "sla_hours": meta.get("sla_hours"),
        "audit_tag": meta.get("audit_tag"),
        "cost_center": meta.get("cost_center"),
        "output_preview": str(out)[:300],
    }
    print(json.dumps(record, indent=2))

# 2. Attribute cost back to the edge's cost_center
usage = result.get("usage", {})
print(f"\nTotal token cost: ${usage.get('token_cost', 0):.4f}")
print(f"Cost per agent: ${usage.get('cost_per_agent', 0):.4f}")
The edge metadata you submitted is the same metadata your downstream code keys on. The platform doesn’t transform it — what you put in is what you get back to correlate against. Treat the metadata block as a forward contract between the graph and your moderation infrastructure.

Step 7: Critical-Severity Escalation — Read the Tagged Output

For the on-call path specifically, you want to fast-path the metadata-tagged record into your pager:
critical_meta = edge_index.get(("ContentClassifier", "HumanEscalation"), {})

if outputs.get("HumanEscalation") and critical_meta.get("severity") == "critical":
    pager_payload = {
        "service": critical_meta.get("audit_tag", "trust_and_safety"),
        "severity": critical_meta.get("severity"),
        "priority": critical_meta.get("priority"),
        "sla_minutes": critical_meta.get("sla_minutes"),
        "summary": str(outputs["HumanEscalation"])[:500],
        "source_job_id": result.get("job_id"),
    }
    # send_to_pagerduty(pager_payload)
    print("\nPAGER PAYLOAD:")
    print(json.dumps(pager_payload, indent=2))
The on-call human gets the severity, the SLA budget, and a job ID they can replay if they need to see the full graph context — all from fields that originated on a single edge.

How Ops Teams Actually Use This

Three concrete patterns once metadata is on every edge: Pattern 1 — Audit grep. Compliance asks “show me every moderation decision tagged trust_and_safety in the last 7 days.” With audit_tag on the edges that cross the regulated boundary, this becomes a one-line log query rather than a substring search across prompts. Pattern 2 — SLO grouping. Dashboards group p95 latency by priority (p0, p2, p4). You instantly see which on-call paths are missing their 15-minute SLA and which p4 paths can absorb slowdowns without paging anyone. Pattern 3 — Retry budget enforcement. Edges tagged retry_on_failure: true are the ones that should be retried. When the retry budget for the day is consumed, you can selectively disable retries on priority: "p4" edges while keeping them on p0 ones — a one-line config change rather than a code deploy.

When Downstream Agents Should Inspect Upstream Metadata

The standard case is the one above: metadata is for ops and downstream systems, not the agent. There is one situation where the agent itself should read it: when a single downstream node receives edges from heterogeneous upstreams and needs to branch on which one fired. For that case, include the metadata in the prompt template you assemble before submission:
def build_synthesis_prompt(upstream_outputs: dict, edges: list, target: str) -> str:
    sections = []
    for edge in edges:
        if edge["target"] != target:
            continue
        source = edge["source"]
        meta = edge.get("metadata", {})
        section = f"## Upstream: {source}\n"
        if "severity" in meta:
            section += f"_Severity: {meta['severity']}_\n"
        if "audit_tag" in meta:
            section += f"_Audit tag: {meta['audit_tag']}_\n"
        section += "\n" + str(upstream_outputs.get(source, ""))
        sections.append(section)
    return "\n\n---\n\n".join(sections)
Most production DAGs do not need this — they get more mileage from the dashboard side of metadata than the prompt side.

Putting It All Together

import json
import os

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

agents = [
    {
        "agent_name": "ContentClassifier",
        "description": "Classifies user-generated content by moderation severity.",
        "system_prompt": "You are a content moderation classifier. Emit a JSON object with severity, categories, confidence, rationale. JSON only.",
        "model_name": "gpt-4.1",
        "max_tokens": 2000,
        "temperature": 0.1,
        "max_loops": 1,
    },
    {
        "agent_name": "HumanEscalation",
        "description": "Drafts an escalation packet for on-call humans.",
        "system_prompt": "Draft a terse 4-bullet escalation packet for on-call review.",
        "model_name": "gpt-4.1",
        "max_tokens": 1500,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "ReviewQueueRouter",
        "description": "Routes medium-severity items to the async review queue.",
        "system_prompt": "Produce a JSON record for the review queue. JSON only.",
        "model_name": "gpt-4.1",
        "max_tokens": 1500,
        "temperature": 0.2,
        "max_loops": 1,
    },
    {
        "agent_name": "PassiveLogger",
        "description": "Logs low-severity content for analytics.",
        "system_prompt": "Produce a one-line JSON analytics record. JSON only.",
        "model_name": "gpt-4.1",
        "max_tokens": 600,
        "temperature": 0.1,
        "max_loops": 1,
    },
]

edges = [
    {
        "source": "ContentClassifier",
        "target": "HumanEscalation",
        "metadata": {
            "severity": "critical",
            "priority": "p0",
            "route": "on_call_human",
            "sla_minutes": 15,
            "retry_on_failure": True,
            "audit_tag": "trust_and_safety",
            "cost_center": "ts_critical",
        },
    },
    {
        "source": "ContentClassifier",
        "target": "ReviewQueueRouter",
        "metadata": {
            "severity": "medium",
            "priority": "p2",
            "route": "review_queue",
            "sla_hours": 24,
            "retry_on_failure": True,
            "audit_tag": "trust_and_safety",
            "cost_center": "ts_standard",
        },
    },
    {
        "source": "ContentClassifier",
        "target": "PassiveLogger",
        "metadata": {
            "severity": "low",
            "priority": "p4",
            "route": "analytics_sink",
            "sla_hours": 168,
            "retry_on_failure": False,
            "audit_tag": "analytics",
            "cost_center": "ts_passive",
        },
    },
]

workflow_input = {
    "name": "Content-Moderation-Routing",
    "description": "Classifier fans out to three severity-routed handlers.",
    "agents": agents,
    "edges": edges,
    "entry_points": ["ContentClassifier"],
    "end_points": ["HumanEscalation", "ReviewQueueRouter", "PassiveLogger"],
    "max_loops": 1,
    "task": (
        "Classify and route: 'Hey @user just shared the address of that "
        "scammer — link in bio.'"
    ),
    "auto_compile": True,
    "verbose": False,
}

response = requests.post(
    f"{BASE_URL}/v1/graph-workflow/completions",
    headers=headers,
    json=workflow_input,
    timeout=300,
)

if response.status_code == 200:
    result = response.json()
    edge_index = {(e["source"], e["target"]): e.get("metadata", {}) for e in edges}
    print(f"Job ID: {result.get('job_id')}")
    print(f"Status: {result.get('status')}")
    outputs = result.get("outputs", {})
    for downstream in ["HumanEscalation", "ReviewQueueRouter", "PassiveLogger"]:
        meta = edge_index.get(("ContentClassifier", downstream), {})
        if downstream in outputs:
            print(f"\n[{downstream}] severity={meta.get('severity')} "
                  f"priority={meta.get('priority')}")
            print(str(outputs[downstream])[:300])
else:
    print(f"Error: {response.status_code}")
    print(response.text)

Next Steps