Documentation Index Fetch the complete documentation index at: https://docs.swarms.ai/llms.txt
Use this file to discover all available pages before exploring further.
The Swarms API supports real-time streaming responses, allowing you to receive agent outputs as theyβre generated. This provides immediate feedback and a better user experience for long-running tasks.
Streaming is enabled by setting "streaming_on": true in your agent configuration.
Quick Start
Enable streaming by adding the streaming_on parameter to your agent configuration:
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv( "SWARMS_API_KEY" )
BASE_URL = "https://api.swarms.world"
headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
}
payload = {
"agent_config" : {
"agent_name" : "Research Analyst" ,
"model_name" : "claude-sonnet-4-20250514" ,
"max_tokens" : 8192 ,
"streaming_on" : True
},
"task" : "What are the key trends in AI development?"
}
response = requests.post(
f " { BASE_URL } /v1/agent/completions" ,
headers = headers,
json = payload,
stream = True
)
const fetch = require ( 'node-fetch' );
require ( 'dotenv' ). config ();
const API_KEY = process . env . SWARMS_API_KEY ;
const BASE_URL = "https://api.swarms.world" ;
const headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
};
const payload = {
agent_config: {
agent_name: "Research Analyst" ,
model_name: "claude-sonnet-4-20250514" ,
max_tokens: 8192 ,
streaming_on: true
},
task: "What are the key trends in AI development?"
};
const response = await fetch ( ` ${ BASE_URL } /v1/agent/completions` , {
method: 'POST' ,
headers: headers ,
body: JSON . stringify ( payload )
});
curl -X POST "https://api.swarms.world/v1/agent/completions" \
-H "x-api-key: your-api-key" \
-H "Content-Type: application/json" \
-H "Connection: keep-alive" \
-H "X-Accel-Buffering: no" \
-d '{
"agent_config": {
"agent_name": "Research Analyst",
"model_name": "claude-sonnet-4-20250514",
"max_tokens": 8192,
"streaming_on": true
},
"task": "What are the key trends in AI development?"
}' \
--no-buffer -N
The API uses Server-Sent Events (SSE) format. Each response contains event types and data:
event: metadata
data: {"job_id": "abc123", "name": "Research Analyst"}
event: chunk
data: {"content": "Based on current research"}
event: chunk
data: {"content": ", AI development shows"}
event: usage
data: {"tokens_used": 150, "cost": 0.003}
event: done
data: {"status": "finished"}
Parsing Streams
Hereβs how to parse streaming responses in different languages:
def parse_streaming_response ( response ):
"""Parse streaming response and handle events"""
full_content = ""
current_event = None
for line in response.iter_lines():
if not line:
continue
line = line.decode( "utf-8" )
# Parse event type
if line.startswith( "event: " ):
current_event = line[ 7 :].strip()
continue
# Parse event data
elif line.startswith( "data: " ):
try :
data = json.loads(line[ 6 :])
if current_event == "metadata" :
print ( f "Job ID: { data.get( 'job_id' ) } " )
print ( f "Agent: { data.get( 'name' ) } " )
print ( "-" * 40 )
elif current_event == "chunk" :
content = data.get( "content" , "" )
full_content += content
print (content, end = "" , flush = True )
elif current_event == "usage" :
print ( f " \n Tokens used: { data.get( 'tokens_used' ) } " )
print ( f "Cost: $ { data.get( 'cost' , 0 ) :.4f} " )
elif current_event == "done" :
print ( " \n β
Complete!" )
elif current_event == "error" :
print ( f " \n β Error: { data.get( 'error' ) } " )
except json.JSONDecodeError:
continue
return full_content
async function parseStreamingResponse ( response ) {
let fullContent = "" ;
let currentEvent = null ;
const reader = response . body . getReader ();
const decoder = new TextDecoder ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
const chunk = decoder . decode ( value );
const lines = chunk . split ( ' \n ' );
for ( const line of lines ) {
if ( line . startsWith ( 'event: ' )) {
currentEvent = line . substring ( 7 ). trim ();
continue ;
}
if ( line . startsWith ( 'data: ' )) {
try {
const data = JSON . parse ( line . substring ( 6 ));
if ( currentEvent === 'metadata' ) {
console . log ( `Job ID: ${ data . job_id } ` );
console . log ( `Agent: ${ data . name } ` );
console . log ( "-" . repeat ( 40 ));
} else if ( currentEvent === 'chunk' ) {
const content = data . content || "" ;
fullContent += content ;
process . stdout . write ( content );
} else if ( currentEvent === 'usage' ) {
console . log ( ` \n Tokens used: ${ data . tokens_used } ` );
console . log ( `Cost: $ ${ data . cost ?. toFixed ( 4 ) || 0 } ` );
} else if ( currentEvent === 'done' ) {
console . log ( " \n β
Complete!" );
} else if ( currentEvent === 'error' ) {
console . log ( ` \n β Error: ${ data . error } ` );
}
} catch ( e ) {
// Skip malformed JSON
}
}
}
}
return fullContent ;
}
func parseStreamingResponse ( body io . Reader ) {
scanner := bufio . NewScanner ( body )
var currentEvent string
var fullContent strings . Builder
for scanner . Scan () {
line := scanner . Text ()
if strings . HasPrefix ( line , "event: " ) {
currentEvent = strings . TrimSpace ( line [ 7 :])
continue
}
if strings . HasPrefix ( line , "data: " ) {
var data StreamData
if err := json . Unmarshal ([] byte ( line [ 6 :]), & data ); err != nil {
continue
}
switch currentEvent {
case "metadata" :
fmt . Printf ( "Job ID: %s \n " , data . JobID )
fmt . Printf ( "Agent: %s \n " , data . Name )
fmt . Println ( strings . Repeat ( "-" , 40 ))
case "chunk" :
fullContent . WriteString ( data . Content )
fmt . Print ( data . Content )
case "usage" :
fmt . Printf ( " \n Tokens used: %d \n " , data . TokensUsed )
fmt . Printf ( "Cost: $ %.4f \n " , data . Cost )
case "done" :
fmt . Println ( " \n β
Complete!" )
case "error" :
fmt . Printf ( " \n β Error: %s \n " , data . Error )
}
}
}
fmt . Printf ( " \n π Total content: %d characters \n " , fullContent . Len ())
}
Complete Examples
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
def run_streaming_agent ():
"""Complete example of streaming agent request"""
API_KEY = os.getenv( "SWARMS_API_KEY" )
BASE_URL = "https://api.swarms.world"
headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
}
payload = {
"agent_config" : {
"agent_name" : "Research Analyst" ,
"model_name" : "claude-sonnet-4-20250514" ,
"max_tokens" : 8192 ,
"streaming_on" : True
},
"task" : "What are the best ways to find samples of diabetes from blood samples?"
}
print ( "π Starting streaming request..." )
response = requests.post(
f " { BASE_URL } /v1/agent/completions" ,
headers = headers,
json = payload,
stream = True ,
timeout = 60
)
if response.status_code != 200 :
print ( f "β Error: { response.status_code } - { response.text } " )
return
# Parse the streaming response
full_content = parse_streaming_response(response)
print ( f " \n π Total content: { len (full_content) } characters" )
# Run the example
if __name__ == "__main__" :
run_streaming_agent()
const fetch = require ( 'node-fetch' );
require ( 'dotenv' ). config ();
async function runStreamingAgent () {
const API_KEY = process . env . SWARMS_API_KEY ;
const BASE_URL = "https://api.swarms.world" ;
const headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
};
const payload = {
agent_config: {
agent_name: "Research Analyst" ,
model_name: "claude-sonnet-4-20250514" ,
max_tokens: 8192 ,
streaming_on: true
},
task: "What are the best ways to find samples of diabetes from blood samples?"
};
console . log ( "π Starting streaming request..." );
try {
const response = await fetch ( ` ${ BASE_URL } /v1/agent/completions` , {
method: 'POST' ,
headers: headers ,
body: JSON . stringify ( payload )
});
if ( ! response . ok ) {
console . error ( `β Error: ${ response . status } - ${ await response . text () } ` );
return ;
}
// Parse streaming response
const fullContent = await parseStreamingResponse ( response );
console . log ( ` \n π Total content: ${ fullContent . length } characters` );
} catch ( error ) {
console . error ( "Request failed:" , error );
}
}
// Run the example
runStreamingAgent ();
#!/bin/bash
API_KEY = "your-api-key-here"
BASE_URL = "https://api.swarms.world"
echo "π Starting streaming request..."
curl -X POST "${ BASE_URL }/v1/agent/completions" \
-H "x-api-key: ${ API_KEY }" \
-H "Content-Type: application/json" \
-H "Connection: keep-alive" \
-H "X-Accel-Buffering: no" \
-d '{
"agent_config": {
"agent_name": "Research Analyst",
"model_name": "claude-sonnet-4-20250514",
"max_tokens": 8192,
"streaming_on": true
},
"task": "What are the best ways to find samples of diabetes from blood samples?"
}' \
--no-buffer \
-N
Event Types
Event Description Data Fields metadataJob information job_id, name, temperaturechunkContent piece contentusageToken usage tokens_used, costdoneCompletion statuserrorError info error, message
Best Practices
Error Handling
Always handle potential errors in your stream processing:
try :
response = requests.post(url, json = payload, stream = True , timeout = 60 )
if response.status_code != 200 :
print ( f "Error: { response.status_code } - { response.text } " )
return
full_content = parse_streaming_response(response)
except requests.exceptions.RequestException as e:
print ( f "Request failed: { e } " )
except json.JSONDecodeError as e:
print ( f "JSON decode error: { e } " )
Timeout Management
Set appropriate timeouts for your use case:
# For quick responses
response = requests.post(url, json = payload, stream = True , timeout = 30 )
# For long-running tasks
response = requests.post(url, json = payload, stream = True , timeout = 300 )
Benefits
Real-time Feedback : See results as theyβre generated
Better UX : Reduced perceived latency
Progress Tracking : Monitor long-running operations
Error Handling : Immediate error feedback
Troubleshooting
Increase timeout values for long-running tasks. Set appropriate timeouts based on your expected response time.
Handle malformed data gracefully by wrapping JSON parsing in try-catch blocks.
Always check for done or error events to ensure the stream completed successfully.
Process chunks incrementally for large responses to avoid memory issues.
Debug Mode
Enable debug logging to troubleshoot stream issues:
import logging
logging.basicConfig( level = logging. DEBUG )