/v1/rate/limits
endpoint provides comprehensive information about your API usage and limits.
Rate limits are tier-based and automatically enforced. Monitor your usage to avoid hitting limits and optimize your API calls.
Quick Start
- Python
- JavaScript
- cURL
Copy
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
}
def get_rate_limits():
"""Get current rate limits and usage"""
response = requests.get(
f"{BASE_URL}/v1/rate/limits",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code} - {response.text}")
return None
# Get rate limits
limits_data = get_rate_limits()
if limits_data:
print("✅ Rate limits retrieved successfully!")
print(json.dumps(limits_data, indent=2))
Understanding Rate Limit Response
Copy
{
"success": true,
"rate_limits": {
"minute": {
"count": 45, // Requests made in current minute
"limit": 100, // Maximum requests per minute
"exceeded": false, // Whether limit is exceeded
"remaining": 55, // Requests remaining
"reset_time": "2024-01-01T12:05:00Z" // When limit resets
},
"hour": { /* hourly limits */ },
"day": { /* daily limits */ }
},
"limits": {
"maximum_requests_per_minute": 100,
"maximum_requests_per_hour": 1000,
"maximum_requests_per_day": 5000,
"tokens_per_agent": 10000
},
"tier": "premium", // Your subscription tier
"timestamp": "2024-01-01T12:03:30Z"
}
Rate Limit Monitoring
- Python
- JavaScript
Copy
def monitor_rate_limits():
"""Monitor rate limits and provide usage insights"""
limits_data = get_rate_limits()
if not limits_data or not limits_data.get("success"):
print("❌ Failed to retrieve rate limits")
return
rate_limits = limits_data.get("rate_limits", {})
limits = limits_data.get("limits", {})
tier = limits_data.get("tier", "unknown")
print(f"📊 Subscription Tier: {tier}")
print("=" * 50)
for period, data in rate_limits.items():
count = data.get("count", 0)
limit = data.get("limit", 0)
remaining = data.get("remaining", 0)
exceeded = data.get("exceeded", False)
reset_time = data.get("reset_time", "")
# Calculate usage percentage
usage_percent = (count / limit * 100) if limit > 0 else 0
# Determine status
if exceeded:
status = "🔴 EXCEEDED"
elif usage_percent > 80:
status = "🟡 HIGH USAGE"
elif usage_percent > 50:
status = "🟠 MODERATE"
else:
status = "🟢 LOW"
print(f"{period.upper()} LIMITS:")
print(f" Status: {status}")
print(f" Used: {count}/{limit} ({usage_percent:.1f}%)")
print(f" Remaining: {remaining}")
print(f" Reset: {reset_time}")
print()
# Monitor usage
monitor_rate_limits()
Rate Limit Handling
- Python
- JavaScript
Copy
import time
import random
class RateLimitHandler:
def __init__(self, base_delay=1, max_delay=60):
self.base_delay = base_delay
self.max_delay = max_delay
def execute_with_retry(self, func, max_retries=3, *args, **kwargs):
"""Execute a function with automatic retry on rate limit errors"""
for attempt in range(max_retries):
try:
# Check rate limits before executing
limits_data = get_rate_limits()
if limits_data:
rate_limits = limits_data.get("rate_limits", {})
minute_data = rate_limits.get("minute", {})
if minute_data.get("exceeded"):
print("🔴 Rate limit exceeded! Waiting for reset...")
time.sleep(self.base_delay * 2)
continue
# Check if we're close to the limit
remaining = minute_data.get("remaining", 0)
if remaining < 5: # Less than 5 requests remaining
wait_time = min(self.base_delay * (2 ** attempt), self.max_delay)
print(f"🟡 Approaching rate limit. Waiting {wait_time}s...")
time.sleep(wait_time)
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Rate limit exceeded
if attempt < max_retries - 1:
wait_time = min(self.base_delay * (2 ** attempt), self.max_delay)
print(f"⏳ Rate limited. Retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_time)
continue
else:
print("❌ Max retries exceeded")
raise
else:
raise
# Usage example
handler = RateLimitHandler()
def safe_api_call():
response = requests.get(f"{BASE_URL}/v1/models/available", headers=headers)
response.raise_for_status()
return response.json()
# Execute with rate limit handling
result = handler.execute_with_retry(safe_api_call)
print("Request completed successfully!")
Best Practices
Rate Limit Management
- Monitor Regularly: Check rate limits before making requests
- Implement Backoff: Use exponential backoff for retries
- Batch Requests: Combine multiple operations when possible
- Handle 429 Errors: Implement proper retry logic for rate limit errors
- Cost Optimization: Monitor usage costs and optimize model selection
Rate Limit Tiers
Tier | Requests/Minute | Requests/Hour | Requests/Day | Tokens/Agent |
---|---|---|---|---|
Free | 100 | 500 | 1000 | 1000 |
Premium | 1000 | 5000 | 10000 | 10000 |
Enterprise | 10000 | 50000 | 100000 | 50000 |
Cost Optimization
- Batch Processing
- Model Selection
Copy
def optimize_batch_processing(tasks):
"""Process multiple tasks efficiently within rate limits"""
limits_data = get_rate_limits()
if not limits_data:
return []
rate_limits = limits_data.get("rate_limits", {})
minute_remaining = rate_limits.get("minute", {}).get("remaining", 10)
# Process in batches that fit within rate limits
batch_size = min(minute_remaining, len(tasks), 10)
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
# Create batch payload
batch_payload = [
{
"agent_config": {
"agent_name": f"Batch Agent {j+1}",
"model_name": "gpt-4o-mini", # Use cost-effective model
"max_tokens": 512
},
"task": task
}
for j, task in enumerate(batch)
]
# Execute batch
try:
response = requests.post(
f"{BASE_URL}/v1/agent/batch/completions",
headers=headers,
json=batch_payload,
timeout=60
)
if response.status_code == 200:
batch_results = response.json()
results.extend(batch_results)
print(f"✅ Processed batch {i//batch_size + 1}")
else:
print(f"❌ Batch {i//batch_size + 1} failed: {response.status_code}")
except Exception as e:
print(f"❌ Batch {i//batch_size + 1} error: {e}")
# Wait between batches to respect rate limits
time.sleep(1)
return results