Streaming API
The Swarms API supports real-time streaming responses, allowing you to receive agent outputs as theyβre generated. This provides immediate feedback and a better user experience for long-running tasks.
Streaming is enabled by setting "streaming_on": true
in your agent configuration.
Quick Start
Enable streaming by adding the streaming_on
parameter to your agent configuration:
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv( "SWARMS_API_KEY" )
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
}
payload = {
"agent_config" : {
"agent_name" : "Research Analyst" ,
"model_name" : "claude-sonnet-4-20250514" ,
"max_tokens" : 8192 ,
"streaming_on" : True
},
"task" : "What are the key trends in AI development?"
}
response = requests.post(
f " { BASE_URL } /v1/agent/completions" ,
headers = headers,
json = payload,
stream = True
)
The API uses Server-Sent Events (SSE) format. Each response contains event types and data:
event: metadata
data: {"job_id": "abc123", "name": "Research Analyst"}
event: chunk
data: {"content": "Based on current research"}
event: chunk
data: {"content": ", AI development shows"}
event: usage
data: {"tokens_used": 150, "cost": 0.003}
event: done
data: {"status": "finished"}
Parsing Streams
Hereβs how to parse streaming responses in different languages:
def parse_streaming_response ( response ):
"""Parse streaming response and handle events"""
full_content = ""
current_event = None
for line in response.iter_lines():
if not line:
continue
line = line.decode( "utf-8" )
# Parse event type
if line.startswith( "event: " ):
current_event = line[ 7 :].strip()
continue
# Parse event data
elif line.startswith( "data: " ):
try :
data = json.loads(line[ 6 :])
if current_event == "metadata" :
print ( f "Job ID: { data.get( 'job_id' ) } " )
print ( f "Agent: { data.get( 'name' ) } " )
print ( "-" * 40 )
elif current_event == "chunk" :
content = data.get( "content" , "" )
full_content += content
print (content, end = "" , flush = True )
elif current_event == "usage" :
print ( f " \n Tokens used: { data.get( 'tokens_used' ) } " )
print ( f "Cost: $ { data.get( 'cost' , 0 ) :.4f} " )
elif current_event == "done" :
print ( " \n β
Complete!" )
elif current_event == "error" :
print ( f " \n β Error: { data.get( 'error' ) } " )
except json.JSONDecodeError:
continue
return full_content
Complete Examples
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
def run_streaming_agent ():
"""Complete example of streaming agent request"""
API_KEY = os.getenv( "SWARMS_API_KEY" )
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
headers = {
"x-api-key" : API_KEY ,
"Content-Type" : "application/json" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no"
}
payload = {
"agent_config" : {
"agent_name" : "Research Analyst" ,
"model_name" : "claude-sonnet-4-20250514" ,
"max_tokens" : 8192 ,
"streaming_on" : True
},
"task" : "What are the best ways to find samples of diabetes from blood samples?"
}
print ( "π Starting streaming request..." )
response = requests.post(
f " { BASE_URL } /v1/agent/completions" ,
headers = headers,
json = payload,
stream = True ,
timeout = 60
)
if response.status_code != 200 :
print ( f "β Error: { response.status_code } - { response.text } " )
return
# Parse the streaming response
full_content = parse_streaming_response(response)
print ( f " \n π Total content: { len (full_content) } characters" )
# Run the example
if __name__ == "__main__" :
run_streaming_agent()
Event Types
Event Description Data Fields metadata
Job information job_id
, name
, temperature
chunk
Content piece content
usage
Token usage tokens_used
, cost
done
Completion status
error
Error info error
, message
Best Practices
Error Handling
Always handle potential errors in your stream processing:
try :
response = requests.post(url, json = payload, stream = True , timeout = 60 )
if response.status_code != 200 :
print ( f "Error: { response.status_code } - { response.text } " )
return
full_content = parse_streaming_response(response)
except requests.exceptions.RequestException as e:
print ( f "Request failed: { e } " )
except json.JSONDecodeError as e:
print ( f "JSON decode error: { e } " )
Timeout Management
Set appropriate timeouts for your use case:
# For quick responses
response = requests.post(url, json = payload, stream = True , timeout = 30 )
# For long-running tasks
response = requests.post(url, json = payload, stream = True , timeout = 300 )
Benefits
Real-time Feedback : See results as theyβre generated
Better UX : Reduced perceived latency
Progress Tracking : Monitor long-running operations
Error Handling : Immediate error feedback
Troubleshooting
Increase timeout values for long-running tasks. Set appropriate timeouts based on your expected response time.
Handle malformed data gracefully by wrapping JSON parsing in try-catch blocks.
Always check for done
or error
events to ensure the stream completed successfully.
Process chunks incrementally for large responses to avoid memory issues.
Debug Mode
Enable debug logging to troubleshoot stream issues:
import logging
logging.basicConfig( level = logging. DEBUG )