Skip to main content
Access comprehensive logs of all your API requests and swarm executions. The /v1/swarm/logs endpoint provides detailed information about your API usage history, including request timestamps, status codes, and execution details.
Logs are filtered to exclude any entries containing client IP addresses for privacy protection. Access is limited to logs associated with your API key.

Quick Start

  • Python
  • JavaScript
  • cURL
import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"

headers = {
    "x-api-key": API_KEY,
    "Content-Type": "application/json"
}

def get_swarm_logs():
    """Get all API request logs"""
    response = requests.get(
        f"{BASE_URL}/v1/swarm/logs",
        headers=headers
    )

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

# Get logs
logs_data = get_swarm_logs()
if logs_data:
    print("✅ Logs retrieved successfully!")
    print(f"Total logs: {logs_data.get('count', 0)}")
    print(json.dumps(logs_data, indent=2))

Understanding Log Response

The logs endpoint returns structured information about your API usage:
{
  "status": "success",
  "count": 150,
  "logs": [
    {
      "timestamp": "2024-01-01T10:30:00Z",
      "endpoint": "/v1/agent/completions",
      "method": "POST",
      "status_code": 200,
      "response_time": 2.5,
      "tokens_used": 150,
      "cost": 0.003,
      "agent_name": "Research Assistant",
      "model_name": "gpt-4o-mini"
    }
  ],
  "timestamp": "2024-01-01T12:00:00Z"
}

Log Analysis and Filtering

  • Python
  • JavaScript
from datetime import datetime, timedelta
from collections import defaultdict, Counter

def analyze_logs(logs_data):
    """Analyze API usage logs"""
    if not logs_data or not logs_data.get('logs'):
        print("No logs available for analysis")
        return

    logs = logs_data['logs']

    # Basic statistics
    total_requests = len(logs)
    successful_requests = len([log for log in logs if log.get('status_code') == 200])
    failed_requests = total_requests - successful_requests

    print("📊 API Usage Analysis"        print("=" * 50)
    print(f"Total Requests: {total_requests}")
    print(f"Successful: {successful_requests} ({successful_requests/total_requests*100:.1f}%)")
    print(f"Failed: {failed_requests} ({failed_requests/total_requests*100:.1f}%)")
    print()

    # Endpoint usage
    endpoint_counts = Counter(log.get('endpoint', 'unknown') for log in logs)
    print("🔗 Endpoint Usage:"        for endpoint, count in endpoint_counts.most_common():
        print(f"  {endpoint}: {count} requests")
    print()

    # Model usage
    model_counts = Counter(log.get('model_name', 'unknown') for log in logs if 'model_name' in log)
    print("🤖 Model Usage:"        for model, count in model_counts.most_common():
        print(f"  {model}: {count} requests")
    print()

    # Cost analysis
    total_cost = sum(log.get('cost', 0) for log in logs)
    total_tokens = sum(log.get('tokens_used', 0) for log in logs)

    print("💰 Cost Analysis:"        print(f"  Total Cost: ${total_cost:.4f}")
    print(f"  Total Tokens: {total_tokens}")
    print(".4f"        print()

    # Response time analysis
    response_times = [log.get('response_time', 0) for log in logs if log.get('response_time')]
    if response_times:
        avg_response_time = sum(response_times) / len(response_times)
        max_response_time = max(response_times)
        min_response_time = min(response_times)

        print("⏱️  Response Time Analysis:"            print(".2f"            print(".2f"            print(".2f"
def filter_logs_by_date(logs_data, days=7):
    """Filter logs by date range"""
    if not logs_data or not logs_data.get('logs'):
        return logs_data

    cutoff_date = datetime.now() - timedelta(days=days)
    filtered_logs = []

    for log in logs_data['logs']:
        log_timestamp = datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00'))
        if log_timestamp >= cutoff_date:
            filtered_logs.append(log)

    return {
        **logs_data,
        'logs': filtered_logs,
        'count': len(filtered_logs)
    }

def filter_logs_by_status(logs_data, status_codes):
    """Filter logs by HTTP status codes"""
    if not logs_data or not logs_data.get('logs'):
        return logs_data

    filtered_logs = [
        log for log in logs_data['logs']
        if log.get('status_code') in status_codes
    ]

    return {
        **logs_data,
        'logs': filtered_logs,
        'count': len(filtered_logs)
    }

# Example usage
logs_data = get_swarm_logs()
if logs_data:
    # Analyze all logs
    analyze_logs(logs_data)

    # Filter for last 7 days
    recent_logs = filter_logs_by_date(logs_data, days=7)
    print(f"\n📅 Recent Logs (7 days): {recent_logs['count']} requests")

    # Filter for errors
    error_logs = filter_logs_by_status(logs_data, [400, 401, 403, 404, 429, 500, 502, 503])
    print(f"❌ Error Logs: {error_logs['count']} requests")

Log Export and Backup

  • Python
  • JavaScript
import csv
import json
from datetime import datetime

def export_logs_to_csv(logs_data, filename=None):
    """Export logs to CSV format"""
    if not logs_data or not logs_data.get('logs'):
        print("No logs to export")
        return

    if not filename:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"swarm_logs_{timestamp}.csv"

    logs = logs_data['logs']

    # Define CSV columns
    fieldnames = [
        'timestamp', 'endpoint', 'method', 'status_code',
        'response_time', 'tokens_used', 'cost', 'agent_name', 'model_name'
    ]

    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        for log in logs:
            # Flatten nested data if needed
            row = {
                'timestamp': log.get('timestamp', ''),
                'endpoint': log.get('endpoint', ''),
                'method': log.get('method', ''),
                'status_code': log.get('status_code', ''),
                'response_time': log.get('response_time', ''),
                'tokens_used': log.get('tokens_used', ''),
                'cost': log.get('cost', ''),
                'agent_name': log.get('agent_name', ''),
                'model_name': log.get('model_name', '')
            }
            writer.writerow(row)

    print(f"✅ Logs exported to {filename}")
    return filename

def export_logs_to_json(logs_data, filename=None):
    """Export logs to JSON format"""
    if not logs_data:
        print("No logs to export")
        return

    if not filename:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"swarm_logs_{timestamp}.json"

    with open(filename, 'w', encoding='utf-8') as jsonfile:
        json.dump(logs_data, jsonfile, indent=2, ensure_ascii=False)

    print(f"✅ Logs exported to {filename}")
    return filename

def create_log_backup(logs_data, compress=True):
    """Create a compressed backup of logs"""
    import gzip

    if not logs_data:
        return

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"swarm_logs_backup_{timestamp}.json"

    if compress:
        filename += '.gz'
        with gzip.open(filename, 'wt', encoding='utf-8') as f:
            json.dump(logs_data, f, indent=2, ensure_ascii=False)
    else:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(logs_data, f, indent=2, ensure_ascii=False)

    print(f"✅ Backup created: {filename}")
    return filename

# Example usage
logs_data = get_swarm_logs()
if logs_data:
    # Export to different formats
    export_logs_to_csv(logs_data)
    export_logs_to_json(logs_data)
    create_log_backup(logs_data, compress=True)

Log Monitoring Dashboard

  • Python
  • JavaScript
import time
from datetime import datetime, timedelta

class LogMonitor:
    def __init__(self, check_interval=300):  # 5 minutes default
        self.check_interval = check_interval
        self.last_log_count = 0
        self.error_counts = {}
        self.cost_accumulator = 0

    def monitor_logs(self):
        """Monitor logs continuously"""
        print("🚀 Starting log monitoring... (Press Ctrl+C to stop)")

        try:
            while True:
                logs_data = get_swarm_logs()

                if logs_data:
                    self.analyze_recent_activity(logs_data)
                    self.check_for_anomalies(logs_data)

                time.sleep(self.check_interval)

        except KeyboardInterrupt:
            print("\n⏹️  Monitoring stopped")
            self.generate_monitoring_report()

    def analyze_recent_activity(self, logs_data):
        """Analyze recent API activity"""
        if not logs_data.get('logs'):
            return

        current_count = logs_data['count']

        if self.last_log_count > 0:
            new_logs = current_count - self.last_log_count
            if new_logs > 0:
                print(f"📈 {new_logs} new requests in the last {self.check_interval}s")

        self.last_log_count = current_count

        # Analyze recent logs (last hour)
        recent_logs = self.get_recent_logs(logs_data, hours=1)

        if recent_logs:
            # Error analysis
            errors = [log for log in recent_logs if log.get('status_code', 200) >= 400]
            if errors:
                print(f"⚠️  {len(errors)} errors in the last hour")

            # Cost analysis
            recent_cost = sum(log.get('cost', 0) for log in recent_logs)
            self.cost_accumulator += recent_cost

            print(".4f"                print(".4f"
    def check_for_anomalies(self, logs_data):
        """Check for unusual patterns or anomalies"""
        if not logs_data.get('logs'):
            return

        recent_logs = self.get_recent_logs(logs_data, hours=1)

        # Check for high error rates
        if recent_logs:
            error_rate = len([log for log in recent_logs if log.get('status_code', 200) >= 400]) / len(recent_logs)
            if error_rate > 0.1:  # More than 10% errors
                print(".1%"                # Check for unusual response times
            response_times = [log.get('response_time', 0) for log in recent_logs if log.get('response_time')]
            if response_times:
                avg_response_time = sum(response_times) / len(response_times)
                if avg_response_time > 10:  # More than 10 seconds average
                    print(".2f"
    def get_recent_logs(self, logs_data, hours=1):
        """Get logs from the last N hours"""
        if not logs_data.get('logs'):
            return []

        cutoff_time = datetime.now() - timedelta(hours=hours)
        recent_logs = []

        for log in logs_data['logs']:
            log_time = datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00'))
            if log_time >= cutoff_time:
                recent_logs.append(log)

        return recent_logs

    def generate_monitoring_report(self):
        """Generate a final monitoring report"""
        print("\n📊 Monitoring Report"            print("=" * 50)
        print(".4f"            print(f"Monitoring Duration: {self.check_interval}s intervals")

        if self.error_counts:
            print("\nError Summary:"                for error_code, count in self.error_counts.items():
                print(f"  HTTP {error_code}: {count} occurrences")

# Usage
monitor = LogMonitor(check_interval=300)  # Check every 5 minutes
monitor.monitor_logs()

Privacy and Security

Data Protection

  • IP Address Filtering: All client IP addresses are automatically filtered from logs
  • PII Protection: Personal identifiable information is not logged
  • Secure Storage: Logs are stored securely with encryption at rest
  • Access Control: Only accessible via your API key

Compliance

  • GDPR Compliant: Adheres to data protection regulations
  • Audit Trail: Maintains complete audit trail of API usage
  • Data Retention: Logs retained for compliance and debugging purposes
  • Access Logging: All log access is itself logged for security

Best Practices

Log Management

  1. Regular Monitoring: Check logs regularly for unusual patterns
  2. Error Analysis: Investigate error spikes promptly
  3. Cost Tracking: Monitor API costs and optimize usage
  4. Performance Analysis: Track response times and identify bottlenecks
  5. Security Monitoring: Watch for unauthorized access attempts

Data Analysis

  1. Trend Analysis: Identify usage patterns and growth trends
  2. Error Pattern Recognition: Detect recurring issues
  3. Cost Optimization: Find opportunities to reduce API costs
  4. Performance Optimization: Identify slow endpoints and optimize
  5. Capacity Planning: Plan for future usage growth

Automation

  1. Alert Setup: Set up alerts for high error rates or costs
  2. Automated Reports: Generate regular usage reports
  3. Anomaly Detection: Automatically detect unusual patterns
  4. Cost Controls: Implement automatic cost limiting
  5. Performance Monitoring: Continuous performance tracking

Troubleshooting

Common Issues

Empty Logs Response
# Check if API key is correct
curl -I "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/logs" \
  -H "x-api-key: your-api-key"
Missing Log Entries
  • Logs are filtered for privacy (no IP addresses)
  • Some requests may not be logged due to high volume
  • Check your API key permissions
Performance Issues
# Analyze response time patterns
logs_data = get_swarm_logs()
if logs_data:
    response_times = [log['response_time'] for log in logs_data['logs'] if 'response_time' in log]
    avg_time = sum(response_times) / len(response_times)
    print(f"Average response time: {avg_time:.2f}s")
Cost Analysis
# Calculate cost per endpoint
from collections import defaultdict

endpoint_costs = defaultdict(float)
for log in logs_data['logs']:
    endpoint_costs[log['endpoint']] += log.get('cost', 0)

for endpoint, cost in sorted(endpoint_costs.items(), key=lambda x: x[1], reverse=True):
    print(f"{endpoint}: ${cost:.4f}")
I