Skip to main content

What This Example Shows

  • How to enable swarm-level streaming with stream: true on /v1/swarm/completions
  • The full SSE event taxonomy for swarms: metadata, agent_start, chunk, agent_end, usage, end
  • How to parse the SSE stream into typed events with agent attribution on every chunk
  • How to detect parallel-phase interleaving in AgentRearrange (when two agents stream concurrently)
  • Why this differs from single-agent streaming on /v1/agent/completions
Single-agent streaming (covered in Streaming Responses) streams one model’s tokens. Swarm streaming is harder: tokens come from multiple agents, sometimes overlapping in time. Every chunk carries an agent field so you know which worker produced it.

Why This Matters

Multi-agent swarms are powerful but feel slow without streaming — users stare at a spinner while three agents take turns thinking. Per-token swarm streaming fixes that: the moment any agent in the pipeline starts producing output, you can render it. For SequentialWorkflow this gives a “live whiteboard” of one agent finishing before the next picks up. For AgentRearrange with a parallel phase (A, B -> C), it gives the truly novel experience of watching two agents type simultaneously into the same UI, then a third synthesize their output the moment they finish. This unlocks chat-style UX for swarm products instead of the batch-job UX you get from non-streaming calls.

Step 1: Setup

import json
import os
import time

import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"

# X-Accel-Buffering: no disables proxy buffering so tokens flush immediately.
headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json",
    "Connection": "keep-alive",
    "X-Accel-Buffering": "no",
}

Step 2: Understand the SSE Event Types

When stream=true is set on a SequentialWorkflow or AgentRearrange swarm, the API responds with a Server-Sent Events stream. Each event has an event: line and a data: JSON payload.
EventEmittedPayload (key fields)
metadataOnce, at the startjob_id, swarm_name, swarm_type, number_of_agents
startOnce, before agents runSwarm-level status
agent_startOnce per agent, when it beginsagent (name)
chunkMany per agent — one per token (or small group)agent, content (the token text), output (running aggregate)
agent_endOnce per agent, when it finishesagent, final per-agent content
usageOnce, near the endinput_tokens, output_tokens, total_tokens, billing_info
endOnce, terminalexecution_time, final aggregated output
The defining feature of swarm streaming is that every chunk event carries an agent field. That’s how you attribute a token to the correct worker when multiple agents are running. Single-agent streaming on /v1/agent/completions does not need this — there is only one agent.

Step 3: A Reusable SSE Parser

This helper turns the raw stream into a list of {event, data, received_at} dicts. The received_at timestamp is what lets you detect interleaving in the parallel case. (This mirrors the parser in the test suite — see tests/test_sequential_streaming.py and tests/test_rearrange_streaming.py.)
def parse_sse_events(response):
    """Parse an SSE stream into a list of {event, data, received_at} dicts."""
    events = []
    current_event = None
    for line in response.iter_lines(decode_unicode=True):
        if line is None:
            continue
        if line.startswith("event:"):
            current_event = line[len("event:"):].strip()
        elif line.startswith("data:"):
            raw = line[len("data:"):].strip()
            try:
                data = json.loads(raw)
            except json.JSONDecodeError:
                data = {"raw": raw}
            events.append({
                "event": current_event or "message",
                "data": data,
                "received_at": time.perf_counter(),
            })
            current_event = None
        elif line == "":
            current_event = None
    return events

Step 4: Stream a SequentialWorkflow

In a sequential swarm, agents run one after another. Tokens from agent A all arrive before any tokens from agent B. You’ll see this clearly in the chunk stream — the agent field on each chunk stays constant for a long run, then flips to the next agent and stays there.
def make_agent(name, system_prompt):
    return {
        "agent_name": name,
        "system_prompt": system_prompt,
        "model_name": "gpt-4.1-mini",
        "role": "worker",
        "max_loops": 1,
    }


def stream_sequential():
    payload = {
        "name": "streaming-sequential-example",
        "swarm_type": "SequentialWorkflow",
        "task": "List two short bullets about solid-state batteries.",
        "stream": True,
        "max_loops": 1,
        "agents": [
            make_agent("Researcher", "List two short bullets on the topic."),
            make_agent("Writer", "Combine the bullets into one short paragraph."),
        ],
    }

    response = requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        stream=True,
        timeout=120,
    )
    response.raise_for_status()

    current_agent = None
    event_name = None
    for line in response.iter_lines(decode_unicode=True):
        if not line:
            continue
        if line.startswith("event:"):
            event_name = line[len("event:"):].strip()
            continue
        if line.startswith("data:"):
            try:
                data = json.loads(line[len("data:"):].strip())
            except json.JSONDecodeError:
                continue
            if event_name == "agent_start":
                current_agent = data.get("agent")
                print(f"\n\n--- {current_agent} starting ---")
            elif event_name == "chunk":
                token = data.get("content") or ""
                print(token, end="", flush=True)
            elif event_name == "agent_end":
                print(f"\n--- {data.get('agent')} done ---")
            elif event_name == "usage":
                print(f"\n[usage] total_tokens={data.get('total_tokens')}")
            elif event_name == "end":
                elapsed = data.get("execution_time")
                print(f"\n[end] {elapsed:.2f}s" if elapsed else "\n[end]")


if __name__ == "__main__":
    stream_sequential()
You’ll see the Researcher’s tokens stream out completely, then the Writer’s tokens. No overlap — that’s the contract of SequentialWorkflow.

Step 5: Stream AgentRearrange and Detect Parallel Interleaving

AgentRearrange lets you express a flow with parallel branches using the syntax "A, B -> C" — A and B run concurrently, then C runs after both finish. With streaming on, you’ll see A’s and B’s tokens interleaved in the chunk stream. Detecting that interleaving is how you confirm parallel execution.
def stream_rearrange_and_detect_interleaving():
    payload = {
        "name": "streaming-rearrange-example",
        "swarm_type": "AgentRearrange",
        "rearrange_flow": "Optimist, Pessimist -> Summary",
        "task": "AI in healthcare",
        "stream": True,
        "max_loops": 1,
        "agents": [
            make_agent("Optimist", "Write 4-6 upbeat sentences about the topic. Be detailed."),
            make_agent("Pessimist", "Write 4-6 cautious sentences about the topic. Be detailed."),
            make_agent("Summary", "Write one short summary line of the prior views."),
        ],
    }

    response = requests.post(
        f"{BASE_URL}/v1/swarm/completions",
        headers=headers,
        json=payload,
        stream=True,
        timeout=120,
    )
    response.raise_for_status()

    events = parse_sse_events(response)
    chunk_events = [e for e in events if e["event"] == "chunk"]

    # Look only at the parallel-phase agents.
    parallel_agents_seen = [
        e["data"].get("agent")
        for e in chunk_events
        if e["data"].get("agent") in ("Optimist", "Pessimist")
    ]

    # A "flip" is consecutive chunks coming from different agents.
    flips = sum(
        1
        for i in range(1, len(parallel_agents_seen))
        if parallel_agents_seen[i] != parallel_agents_seen[i - 1]
    )

    print(f"Optimist chunks:  {parallel_agents_seen.count('Optimist')}")
    print(f"Pessimist chunks: {parallel_agents_seen.count('Pessimist')}")
    print(f"Agent flips during parallel phase: {flips}")

    if flips >= 3:
        print("Interleaving confirmed — Optimist and Pessimist streamed concurrently.")
    else:
        print("No interleaving detected — agents ran sequentially.")
A high flips count is your signal that the parallel branch is actually parallel. After both finish, the Summary agent’s chunks will follow in a single contiguous run.
This is the same detection logic used in the test suite. See tests/test_rearrange_streaming.py for the canonical reference — it asserts flips >= 3 to verify true parallel streaming.

Step 6: Rendering Chunks Per-Agent in a UI

In a real UI you want each agent’s tokens to land in its own panel even when they interleave. The pattern is to bucket chunks by agent and append:
def render_per_agent(response):
    panels: dict[str, str] = {}
    current_event = None
    for line in response.iter_lines(decode_unicode=True):
        if not line:
            continue
        if line.startswith("event:"):
            current_event = line[len("event:"):].strip()
        elif line.startswith("data:"):
            data = json.loads(line[len("data:"):].strip())
            if current_event == "agent_start":
                panels[data["agent"]] = ""
            elif current_event == "chunk":
                agent = data.get("agent", "unknown")
                panels[agent] = panels.get(agent, "") + (data.get("content") or "")
                # In a real UI, push (agent, content) into a websocket / SSE here.
            elif current_event == "end":
                break
    return panels
The interleaved chunk stream becomes N independent, growing text buffers — exactly what a multi-panel chat UI needs.
Per-token streaming is currently supported for SequentialWorkflow and AgentRearrange. Other swarm types either don’t expose per-token output or stream at the agent level only — check /v1/swarms/available for the current list.
Two common causes: (1) you forgot stream=True on the requests.post call (the API streams but your client buffers the whole response), or (2) a reverse proxy is buffering. Set X-Accel-Buffering: no in your request headers and make sure your client reads with response.iter_lines(), not response.text.
agent_end fires once per agent. end fires once for the entire swarm after the last agent finishes and usage is reported. If you want to update per-agent UI state, key off agent_end; if you want to dismiss a global spinner, key off end.

Next Steps