Skip to main content

Streaming API

The Swarms API supports real-time streaming responses, allowing you to receive agent outputs as they’re generated. This provides immediate feedback and a better user experience for long-running tasks.
Streaming is enabled by setting "streaming_on": true in your agent configuration.

Quick Start

Enable streaming by adding the streaming_on parameter to your agent configuration:
  • Python
  • JavaScript
  • cURL
import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"

headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json",
    "Connection": "keep-alive",
    "X-Accel-Buffering": "no"
}

payload = {
    "agent_config": {
        "agent_name": "Research Analyst",
        "model_name": "claude-sonnet-4-20250514",
        "max_tokens": 8192,
        "streaming_on": True
    },
    "task": "What are the key trends in AI development?"
}

response = requests.post(
    f"{BASE_URL}/v1/agent/completions",
    headers=headers,
    json=payload,
    stream=True
)

Stream Format

The API uses Server-Sent Events (SSE) format. Each response contains event types and data:
event: metadata
data: {"job_id": "abc123", "name": "Research Analyst"}

event: chunk
data: {"content": "Based on current research"}

event: chunk
data: {"content": ", AI development shows"}

event: usage
data: {"tokens_used": 150, "cost": 0.003}

event: done
data: {"status": "finished"}

Parsing Streams

Here’s how to parse streaming responses in different languages:
  • Python
  • JavaScript
  • Go
def parse_streaming_response(response):
    """Parse streaming response and handle events"""
    full_content = ""
    current_event = None
    
    for line in response.iter_lines():
        if not line:
            continue
            
        line = line.decode("utf-8")
        
        # Parse event type
        if line.startswith("event: "):
            current_event = line[7:].strip()
            continue
        
        # Parse event data
        elif line.startswith("data: "):
            try:
                data = json.loads(line[6:])
                
                if current_event == "metadata":
                    print(f"Job ID: {data.get('job_id')}")
                    print(f"Agent: {data.get('name')}")
                    print("-" * 40)
                
                elif current_event == "chunk":
                    content = data.get("content", "")
                    full_content += content
                    print(content, end="", flush=True)
                
                elif current_event == "usage":
                    print(f"\nTokens used: {data.get('tokens_used')}")
                    print(f"Cost: ${data.get('cost', 0):.4f}")
                
                elif current_event == "done":
                    print("\nβœ… Complete!")
                
                elif current_event == "error":
                    print(f"\n❌ Error: {data.get('error')}")
                    
            except json.JSONDecodeError:
                continue
    
    return full_content

Complete Examples

  • Python
  • JavaScript
  • cURL
import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

def run_streaming_agent():
    """Complete example of streaming agent request"""
    
    API_KEY = os.getenv("SWARMS_API_KEY")
    BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
    
    headers = {
        "x-api-key": API_KEY,
        "Content-Type": "application/json",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no"
    }
    
    payload = {
        "agent_config": {
            "agent_name": "Research Analyst",
            "model_name": "claude-sonnet-4-20250514",
            "max_tokens": 8192,
            "streaming_on": True
        },
        "task": "What are the best ways to find samples of diabetes from blood samples?"
    }
    
    print("πŸš€ Starting streaming request...")
    
    response = requests.post(
        f"{BASE_URL}/v1/agent/completions",
        headers=headers,
        json=payload,
        stream=True,
        timeout=60
    )
    
    if response.status_code != 200:
        print(f"❌ Error: {response.status_code} - {response.text}")
        return
    
    # Parse the streaming response
    full_content = parse_streaming_response(response)
    print(f"\nπŸ“ Total content: {len(full_content)} characters")

# Run the example
if __name__ == "__main__":
    run_streaming_agent()

Event Types

EventDescriptionData Fields
metadataJob informationjob_id, name, temperature
chunkContent piececontent
usageToken usagetokens_used, cost
doneCompletionstatus
errorError infoerror, message

Best Practices

Error Handling

Always handle potential errors in your stream processing:
try:
    response = requests.post(url, json=payload, stream=True, timeout=60)
    if response.status_code != 200:
        print(f"Error: {response.status_code} - {response.text}")
        return
    
    full_content = parse_streaming_response(response)
    
except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"JSON decode error: {e}")

Timeout Management

Set appropriate timeouts for your use case:
# For quick responses
response = requests.post(url, json=payload, stream=True, timeout=30)

# For long-running tasks  
response = requests.post(url, json=payload, stream=True, timeout=300)

Benefits

  • Real-time Feedback: See results as they’re generated
  • Better UX: Reduced perceived latency
  • Progress Tracking: Monitor long-running operations
  • Error Handling: Immediate error feedback

Troubleshooting

Increase timeout values for long-running tasks. Set appropriate timeouts based on your expected response time.
Handle malformed data gracefully by wrapping JSON parsing in try-catch blocks.
Always check for done or error events to ensure the stream completed successfully.
Process chunks incrementally for large responses to avoid memory issues.

Debug Mode

Enable debug logging to troubleshoot stream issues:
import logging
logging.basicConfig(level=logging.DEBUG)
⌘I