Skip to main content
Monitor your Swarms API usage, check current rate limits, and implement proper rate limit handling. The /v1/rate/limits endpoint provides comprehensive information about your API usage and limits.
Rate limits are tier-based and automatically enforced. Monitor your usage to avoid hitting limits and optimize your API calls.

Quick Start

  • Python
  • JavaScript
  • cURL
import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"

headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json"
}

def get_rate_limits():
    """Get current rate limits and usage"""
    response = requests.get(
        f"{BASE_URL}/v1/rate/limits",
        headers=headers
    )

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

# Get rate limits
limits_data = get_rate_limits()
if limits_data:
    print("✅ Rate limits retrieved successfully!")
    print(json.dumps(limits_data, indent=2))

Understanding Rate Limit Response

{
  "success": true,
  "rate_limits": {
    "minute": {
      "count": 45,           // Requests made in current minute
      "limit": 100,          // Maximum requests per minute
      "exceeded": false,     // Whether limit is exceeded
      "remaining": 55,       // Requests remaining
      "reset_time": "2024-01-01T12:05:00Z"  // When limit resets
    },
    "hour": { /* hourly limits */ },
    "day": { /* daily limits */ }
  },
  "limits": {
    "maximum_requests_per_minute": 100,
    "maximum_requests_per_hour": 1000,
    "maximum_requests_per_day": 5000,
    "tokens_per_agent": 10000
  },
  "tier": "premium",         // Your subscription tier
  "timestamp": "2024-01-01T12:03:30Z"
}

Rate Limit Monitoring

  • Python
  • JavaScript
def monitor_rate_limits():
    """Monitor rate limits and provide usage insights"""
    limits_data = get_rate_limits()

    if not limits_data or not limits_data.get("success"):
        print("❌ Failed to retrieve rate limits")
        return

    rate_limits = limits_data.get("rate_limits", {})
    limits = limits_data.get("limits", {})
    tier = limits_data.get("tier", "unknown")

    print(f"📊 Subscription Tier: {tier}")
    print("=" * 50)

    for period, data in rate_limits.items():
        count = data.get("count", 0)
        limit = data.get("limit", 0)
        remaining = data.get("remaining", 0)
        exceeded = data.get("exceeded", False)
        reset_time = data.get("reset_time", "")

        # Calculate usage percentage
        usage_percent = (count / limit * 100) if limit > 0 else 0

        # Determine status
        if exceeded:
            status = "🔴 EXCEEDED"
        elif usage_percent > 80:
            status = "🟡 HIGH USAGE"
        elif usage_percent > 50:
            status = "🟠 MODERATE"
        else:
            status = "🟢 LOW"

        print(f"{period.upper()} LIMITS:")
        print(f"  Status: {status}")
        print(f"  Used: {count}/{limit} ({usage_percent:.1f}%)")
        print(f"  Remaining: {remaining}")
        print(f"  Reset: {reset_time}")
        print()

# Monitor usage
monitor_rate_limits()

Rate Limit Handling

  • Python
  • JavaScript
import time
import random

class RateLimitHandler:
    def __init__(self, base_delay=1, max_delay=60):
        self.base_delay = base_delay
        self.max_delay = max_delay

    def execute_with_retry(self, func, max_retries=3, *args, **kwargs):
        """Execute a function with automatic retry on rate limit errors"""
        for attempt in range(max_retries):
            try:
                # Check rate limits before executing
                limits_data = get_rate_limits()
                if limits_data:
                    rate_limits = limits_data.get("rate_limits", {})
                    minute_data = rate_limits.get("minute", {})

                    if minute_data.get("exceeded"):
                        print("🔴 Rate limit exceeded! Waiting for reset...")
                        time.sleep(self.base_delay * 2)
                        continue

                    # Check if we're close to the limit
                    remaining = minute_data.get("remaining", 0)
                    if remaining < 5:  # Less than 5 requests remaining
                        wait_time = min(self.base_delay * (2 ** attempt), self.max_delay)
                        print(f"🟡 Approaching rate limit. Waiting {wait_time}s...")
                        time.sleep(wait_time)

                return func(*args, **kwargs)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:  # Rate limit exceeded
                    if attempt < max_retries - 1:
                        wait_time = min(self.base_delay * (2 ** attempt), self.max_delay)
                        print(f"⏳ Rate limited. Retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
                        time.sleep(wait_time)
                        continue
                    else:
                        print("❌ Max retries exceeded")
                        raise
                else:
                    raise

# Usage example
handler = RateLimitHandler()

def safe_api_call():
    response = requests.get(f"{BASE_URL}/v1/models/available", headers=headers)
    response.raise_for_status()
    return response.json()

# Execute with rate limit handling
result = handler.execute_with_retry(safe_api_call)
print("Request completed successfully!")

Best Practices

Rate Limit Management

  1. Monitor Regularly: Check rate limits before making requests
  2. Implement Backoff: Use exponential backoff for retries
  3. Batch Requests: Combine multiple operations when possible
  4. Handle 429 Errors: Implement proper retry logic for rate limit errors
  5. Cost Optimization: Monitor usage costs and optimize model selection

Rate Limit Tiers

TierRequests/MinuteRequests/HourRequests/DayTokens/Agent
Free10050010001000
Premium100050001000010000
Enterprise100005000010000050000

Cost Optimization

  • Batch Processing
  • Model Selection
def optimize_batch_processing(tasks):
    """Process multiple tasks efficiently within rate limits"""
    limits_data = get_rate_limits()
    if not limits_data:
        return []

    rate_limits = limits_data.get("rate_limits", {})
    minute_remaining = rate_limits.get("minute", {}).get("remaining", 10)

    # Process in batches that fit within rate limits
    batch_size = min(minute_remaining, len(tasks), 10)
    results = []

    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i + batch_size]

        # Create batch payload
        batch_payload = [
            {
                "agent_config": {
                    "agent_name": f"Batch Agent {j+1}",
                    "model_name": "gpt-4o-mini",  # Use cost-effective model
                    "max_tokens": 512
                },
                "task": task
            }
            for j, task in enumerate(batch)
        ]

        # Execute batch
        try:
            response = requests.post(
                f"{BASE_URL}/v1/agent/batch/completions",
                headers=headers,
                json=batch_payload,
                timeout=60
            )

            if response.status_code == 200:
                batch_results = response.json()
                results.extend(batch_results)
                print(f"✅ Processed batch {i//batch_size + 1}")
            else:
                print(f"❌ Batch {i//batch_size + 1} failed: {response.status_code}")

        except Exception as e:
            print(f"❌ Batch {i//batch_size + 1} error: {e}")

        # Wait between batches to respect rate limits
        time.sleep(1)

    return results
I