/v1/swarm/logs endpoint provides detailed information about your API usage history, including request timestamps, status codes, and execution details.
Logs are filtered to exclude any entries containing client IP addresses for privacy protection. Access is limited to logs associated with your API key.
Quick Start
- Python
- JavaScript
- cURL
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
}
def get_swarm_logs():
"""Get all API request logs"""
response = requests.get(
f"{BASE_URL}/v1/swarm/logs",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code} - {response.text}")
return None
# Get logs
logs_data = get_swarm_logs()
if logs_data:
print("✅ Logs retrieved successfully!")
print(f"Total logs: {logs_data.get('count', 0)}")
print(json.dumps(logs_data, indent=2))
const API_KEY = process.env.SWARMS_API_KEY;
const BASE_URL = "https://api.swarms.world";
const headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
};
async function getSwarmLogs() {
try {
const response = await fetch(`${BASE_URL}/v1/swarm/logs`, {
method: 'GET',
headers: headers
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
console.log("✅ Logs retrieved successfully!");
console.log(`Total logs: ${data.count || 0}`);
console.log(JSON.stringify(data, null, 2));
return data;
} catch (error) {
console.error('Error:', error);
return null;
}
}
// Get logs
getSwarmLogs();
# Get API request logs
curl -X GET "https://api.swarms.world/v1/swarm/logs" \
-H "x-api-key: your-api-key" \
-H "Content-Type": "application/json"
# Example response:
# {
# "status": "success",
# "count": 150,
# "logs": [
# {
# "timestamp": "2024-01-01T10:30:00Z",
# "endpoint": "/v1/agent/completions",
# "method": "POST",
# "status_code": 200,
# "response_time": 2.5,
# "tokens_used": 150,
# "cost": 0.003
# }
# ],
# "timestamp": "2024-01-01T12:00:00Z"
# }
Understanding Log Response
The logs endpoint returns structured information about your API usage:{
"status": "success",
"count": 150,
"logs": [
{
"timestamp": "2024-01-01T10:30:00Z",
"endpoint": "/v1/agent/completions",
"method": "POST",
"status_code": 200,
"response_time": 2.5,
"tokens_used": 150,
"cost": 0.003,
"agent_name": "Research Assistant",
"model_name": "gpt-4o-mini"
}
],
"timestamp": "2024-01-01T12:00:00Z"
}
Log Analysis and Filtering
- Python
- JavaScript
from datetime import datetime, timedelta
from collections import defaultdict, Counter
def analyze_logs(logs_data):
"""Analyze API usage logs"""
if not logs_data or not logs_data.get('logs'):
print("No logs available for analysis")
return
logs = logs_data['logs']
# Basic statistics
total_requests = len(logs)
successful_requests = len([log for log in logs if log.get('status_code') == 200])
failed_requests = total_requests - successful_requests
print("📊 API Usage Analysis" print("=" * 50)
print(f"Total Requests: {total_requests}")
print(f"Successful: {successful_requests} ({successful_requests/total_requests*100:.1f}%)")
print(f"Failed: {failed_requests} ({failed_requests/total_requests*100:.1f}%)")
print()
# Endpoint usage
endpoint_counts = Counter(log.get('endpoint', 'unknown') for log in logs)
print("🔗 Endpoint Usage:" for endpoint, count in endpoint_counts.most_common():
print(f" {endpoint}: {count} requests")
print()
# Model usage
model_counts = Counter(log.get('model_name', 'unknown') for log in logs if 'model_name' in log)
print("🤖 Model Usage:" for model, count in model_counts.most_common():
print(f" {model}: {count} requests")
print()
# Cost analysis
total_cost = sum(log.get('cost', 0) for log in logs)
total_tokens = sum(log.get('tokens_used', 0) for log in logs)
print("💰 Cost Analysis:" print(f" Total Cost: ${total_cost:.4f}")
print(f" Total Tokens: {total_tokens}")
print(".4f" print()
# Response time analysis
response_times = [log.get('response_time', 0) for log in logs if log.get('response_time')]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
print("⏱️ Response Time Analysis:" print(".2f" print(".2f" print(".2f"
def filter_logs_by_date(logs_data, days=7):
"""Filter logs by date range"""
if not logs_data or not logs_data.get('logs'):
return logs_data
cutoff_date = datetime.now() - timedelta(days=days)
filtered_logs = []
for log in logs_data['logs']:
log_timestamp = datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00'))
if log_timestamp >= cutoff_date:
filtered_logs.append(log)
return {
**logs_data,
'logs': filtered_logs,
'count': len(filtered_logs)
}
def filter_logs_by_status(logs_data, status_codes):
"""Filter logs by HTTP status codes"""
if not logs_data or not logs_data.get('logs'):
return logs_data
filtered_logs = [
log for log in logs_data['logs']
if log.get('status_code') in status_codes
]
return {
**logs_data,
'logs': filtered_logs,
'count': len(filtered_logs)
}
# Example usage
logs_data = get_swarm_logs()
if logs_data:
# Analyze all logs
analyze_logs(logs_data)
# Filter for last 7 days
recent_logs = filter_logs_by_date(logs_data, days=7)
print(f"\n📅 Recent Logs (7 days): {recent_logs['count']} requests")
# Filter for errors
error_logs = filter_logs_by_status(logs_data, [400, 401, 403, 404, 429, 500, 502, 503])
print(f"❌ Error Logs: {error_logs['count']} requests")
function analyzeLogs(logsData) {
if (!logsData || !logsData.logs) {
console.log("No logs available for analysis");
return;
}
const logs = logsData.logs;
// Basic statistics
const totalRequests = logs.length;
const successfulRequests = logs.filter(log => log.status_code === 200).length;
const failedRequests = totalRequests - successfulRequests;
console.log("📊 API Usage Analysis");
console.log("=".repeat(50));
console.log(`Total Requests: ${totalRequests}`);
console.log(`Successful: ${successfulRequests} (${(successfulRequests/totalRequests*100).toFixed(1)}%)`);
console.log(`Failed: ${failedRequests} (${(failedRequests/totalRequests*100).toFixed(1)}%)`);
console.log();
// Endpoint usage
const endpointCounts = {};
logs.forEach(log => {
const endpoint = log.endpoint || 'unknown';
endpointCounts[endpoint] = (endpointCounts[endpoint] || 0) + 1;
});
console.log("🔗 Endpoint Usage:");
Object.entries(endpointCounts)
.sort(([,a], [,b]) => b - a)
.forEach(([endpoint, count]) => {
console.log(` ${endpoint}: ${count} requests`);
});
console.log();
// Cost analysis
const totalCost = logs.reduce((sum, log) => sum + (log.cost || 0), 0);
const totalTokens = logs.reduce((sum, log) => sum + (log.tokens_used || 0), 0);
console.log("💰 Cost Analysis:");
console.log(` Total Cost: $${totalCost.toFixed(4)}`);
console.log(` Total Tokens: ${totalTokens}`);
console.log(` Avg Cost per Request: $${(totalCost/totalRequests).toFixed(6)}`);
console.log();
}
function filterLogsByDate(logsData, days = 7) {
if (!logsData || !logsData.logs) return logsData;
const cutoffDate = new Date(Date.now() - (days * 24 * 60 * 60 * 1000));
const filteredLogs = logsData.logs.filter(log => {
const logTimestamp = new Date(log.timestamp);
return logTimestamp >= cutoffDate;
});
return {
...logsData,
logs: filteredLogs,
count: filteredLogs.length
};
}
function filterLogsByStatus(logsData, statusCodes) {
if (!logsData || !logsData.logs) return logsData;
const filteredLogs = logsData.logs.filter(log =>
statusCodes.includes(log.status_code)
);
return {
...logsData,
logs: filteredLogs,
count: filteredLogs.length
};
}
// Example usage
getSwarmLogs().then(logsData => {
if (logsData) {
// Analyze all logs
analyzeLogs(logsData);
// Filter for last 7 days
const recentLogs = filterLogsByDate(logsData, 7);
console.log(`\n📅 Recent Logs (7 days): ${recentLogs.count} requests`);
// Filter for errors
const errorLogs = filterLogsByStatus(logsData, [400, 401, 403, 404, 429, 500, 502, 503]);
console.log(`❌ Error Logs: ${errorLogs.count} requests`);
}
});
Log Export and Backup
- Python
- JavaScript
import csv
import json
from datetime import datetime
def export_logs_to_csv(logs_data, filename=None):
"""Export logs to CSV format"""
if not logs_data or not logs_data.get('logs'):
print("No logs to export")
return
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"swarm_logs_{timestamp}.csv"
logs = logs_data['logs']
# Define CSV columns
fieldnames = [
'timestamp', 'endpoint', 'method', 'status_code',
'response_time', 'tokens_used', 'cost', 'agent_name', 'model_name'
]
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for log in logs:
# Flatten nested data if needed
row = {
'timestamp': log.get('timestamp', ''),
'endpoint': log.get('endpoint', ''),
'method': log.get('method', ''),
'status_code': log.get('status_code', ''),
'response_time': log.get('response_time', ''),
'tokens_used': log.get('tokens_used', ''),
'cost': log.get('cost', ''),
'agent_name': log.get('agent_name', ''),
'model_name': log.get('model_name', '')
}
writer.writerow(row)
print(f"✅ Logs exported to {filename}")
return filename
def export_logs_to_json(logs_data, filename=None):
"""Export logs to JSON format"""
if not logs_data:
print("No logs to export")
return
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"swarm_logs_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(logs_data, jsonfile, indent=2, ensure_ascii=False)
print(f"✅ Logs exported to {filename}")
return filename
def create_log_backup(logs_data, compress=True):
"""Create a compressed backup of logs"""
import gzip
if not logs_data:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"swarm_logs_backup_{timestamp}.json"
if compress:
filename += '.gz'
with gzip.open(filename, 'wt', encoding='utf-8') as f:
json.dump(logs_data, f, indent=2, ensure_ascii=False)
else:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(logs_data, f, indent=2, ensure_ascii=False)
print(f"✅ Backup created: {filename}")
return filename
# Example usage
logs_data = get_swarm_logs()
if logs_data:
# Export to different formats
export_logs_to_csv(logs_data)
export_logs_to_json(logs_data)
create_log_backup(logs_data, compress=True)
function exportLogsToCSV(logsData, filename = null) {
if (!logsData || !logsData.logs) {
console.log("No logs to export");
return;
}
if (!filename) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-').slice(0, -5);
filename = `swarm_logs_${timestamp}.csv`;
}
const logs = logsData.logs;
const headers = ['timestamp', 'endpoint', 'method', 'status_code', 'response_time', 'tokens_used', 'cost', 'agent_name', 'model_name'];
let csvContent = headers.join(',') + '\n';
logs.forEach(log => {
const row = [
log.timestamp || '',
log.endpoint || '',
log.method || '',
log.status_code || '',
log.response_time || '',
log.tokens_used || '',
log.cost || '',
log.agent_name || '',
log.model_name || ''
];
csvContent += row.map(field => `"${field}"`).join(',') + '\n';
});
// Download CSV (browser environment)
const blob = new Blob([csvContent], { type: 'text/csv;charset=utf-8;' });
const link = document.createElement('a');
link.href = URL.createObjectURL(blob);
link.download = filename;
link.click();
console.log(`✅ Logs exported to ${filename}`);
return filename;
}
function exportLogsToJSON(logsData, filename = null) {
if (!logsData) {
console.log("No logs to export");
return;
}
if (!filename) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-').slice(0, -5);
filename = `swarm_logs_${timestamp}.json`;
}
const jsonContent = JSON.stringify(logsData, null, 2);
// Download JSON (browser environment)
const blob = new Blob([jsonContent], { type: 'application/json;charset=utf-8;' });
const link = document.createElement('a');
href = URL.createObjectURL(blob);
link.download = filename;
link.click();
console.log(`✅ Logs exported to ${filename}`);
return filename;
}
// Example usage
getSwarmLogs().then(logsData => {
if (logsData) {
exportLogsToCSV(logsData);
exportLogsToJSON(logsData);
}
});
Log Monitoring Dashboard
- Python
- JavaScript
import time
from datetime import datetime, timedelta
class LogMonitor:
def __init__(self, check_interval=300): # 5 minutes default
self.check_interval = check_interval
self.last_log_count = 0
self.error_counts = {}
self.cost_accumulator = 0
def monitor_logs(self):
"""Monitor logs continuously"""
print("🚀 Starting log monitoring... (Press Ctrl+C to stop)")
try:
while True:
logs_data = get_swarm_logs()
if logs_data:
self.analyze_recent_activity(logs_data)
self.check_for_anomalies(logs_data)
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("\n⏹️ Monitoring stopped")
self.generate_monitoring_report()
def analyze_recent_activity(self, logs_data):
"""Analyze recent API activity"""
if not logs_data.get('logs'):
return
current_count = logs_data['count']
if self.last_log_count > 0:
new_logs = current_count - self.last_log_count
if new_logs > 0:
print(f"📈 {new_logs} new requests in the last {self.check_interval}s")
self.last_log_count = current_count
# Analyze recent logs (last hour)
recent_logs = self.get_recent_logs(logs_data, hours=1)
if recent_logs:
# Error analysis
errors = [log for log in recent_logs if log.get('status_code', 200) >= 400]
if errors:
print(f"⚠️ {len(errors)} errors in the last hour")
# Cost analysis
recent_cost = sum(log.get('cost', 0) for log in recent_logs)
self.cost_accumulator += recent_cost
print(".4f" print(".4f"
def check_for_anomalies(self, logs_data):
"""Check for unusual patterns or anomalies"""
if not logs_data.get('logs'):
return
recent_logs = self.get_recent_logs(logs_data, hours=1)
# Check for high error rates
if recent_logs:
error_rate = len([log for log in recent_logs if log.get('status_code', 200) >= 400]) / len(recent_logs)
if error_rate > 0.1: # More than 10% errors
print(".1%" # Check for unusual response times
response_times = [log.get('response_time', 0) for log in recent_logs if log.get('response_time')]
if response_times:
avg_response_time = sum(response_times) / len(response_times)
if avg_response_time > 10: # More than 10 seconds average
print(".2f"
def get_recent_logs(self, logs_data, hours=1):
"""Get logs from the last N hours"""
if not logs_data.get('logs'):
return []
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_logs = []
for log in logs_data['logs']:
log_time = datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00'))
if log_time >= cutoff_time:
recent_logs.append(log)
return recent_logs
def generate_monitoring_report(self):
"""Generate a final monitoring report"""
print("\n📊 Monitoring Report" print("=" * 50)
print(".4f" print(f"Monitoring Duration: {self.check_interval}s intervals")
if self.error_counts:
print("\nError Summary:" for error_code, count in self.error_counts.items():
print(f" HTTP {error_code}: {count} occurrences")
# Usage
monitor = LogMonitor(check_interval=300) # Check every 5 minutes
monitor.monitor_logs()
class LogMonitor {
constructor(checkInterval = 300000) { // 5 minutes default
this.checkInterval = checkInterval;
this.lastLogCount = 0;
this.errorCounts = {};
this.costAccumulator = 0;
this.isMonitoring = false;
}
async startMonitoring() {
console.log("🚀 Starting log monitoring... (Call stopMonitoring() to stop)");
this.isMonitoring = true;
while (this.isMonitoring) {
try {
const logsData = await getSwarmLogs();
if (logsData) {
await this.analyzeRecentActivity(logsData);
this.checkForAnomalies(logsData);
}
} catch (error) {
console.error("Monitoring error:", error);
}
await new Promise(resolve => setTimeout(resolve, this.checkInterval));
}
}
stopMonitoring() {
this.isMonitoring = false;
console.log("⏹️ Monitoring stopped");
this.generateMonitoringReport();
}
async analyzeRecentActivity(logsData) {
if (!logsData.logs) return;
const currentCount = logsData.count;
if (this.lastLogCount > 0) {
const newLogs = currentCount - this.lastLogCount;
if (newLogs > 0) {
console.log(`📈 ${newLogs} new requests in the last ${this.checkInterval/1000}s`);
}
}
this.lastLogCount = currentCount;
// Analyze recent logs (last hour)
const recentLogs = this.getRecentLogs(logsData, 1);
if (recentLogs.length > 0) {
// Cost analysis
const recentCost = recentLogs.reduce((sum, log) => sum + (log.cost || 0), 0);
this.costAccumulator += recentCost;
console.log(`💰 Recent Cost (1h): $${recentCost.toFixed(4)}`);
console.log(`💰 Total Accumulated Cost: $${this.costAccumulator.toFixed(4)}`);
}
}
checkForAnomalies(logsData) {
if (!logsData.logs) return;
const recentLogs = this.getRecentLogs(logsData, 1);
if (recentLogs.length === 0) return;
// Check for high error rates
const errors = recentLogs.filter(log => (log.status_code || 200) >= 400);
const errorRate = errors.length / recentLogs.length;
if (errorRate > 0.1) { // More than 10% errors
console.log(`⚠️ High error rate: ${(errorRate * 100).toFixed(1)}%`);
}
// Track error types
errors.forEach(log => {
const code = log.status_code;
this.errorCounts[code] = (this.errorCounts[code] || 0) + 1;
});
}
getRecentLogs(logsData, hours = 1) {
if (!logsData.logs) return [];
const cutoffTime = new Date(Date.now() - (hours * 60 * 60 * 1000));
return logsData.logs.filter(log => {
const logTime = new Date(log.timestamp);
return logTime >= cutoffTime;
});
}
generateMonitoringReport() {
console.log("\n📊 Monitoring Report");
console.log("=".repeat(50));
console.log(`Total Accumulated Cost: $${this.costAccumulator.toFixed(4)}`);
if (Object.keys(this.errorCounts).length > 0) {
console.log("\nError Summary:");
Object.entries(this.errorCounts).forEach(([code, count]) => {
console.log(` HTTP ${code}: ${count} occurrences`);
});
}
}
}
// Usage
const monitor = new LogMonitor(300000); // Check every 5 minutes
monitor.startMonitoring();
// Stop after 30 minutes
setTimeout(() => {
monitor.stopMonitoring();
}, 30 * 60 * 1000);
Privacy and Security
Data Protection
- IP Address Filtering: All client IP addresses are automatically filtered from logs
- PII Protection: Personal identifiable information is not logged
- Secure Storage: Logs are stored securely with encryption at rest
- Access Control: Only accessible via your API key
Compliance
- GDPR Compliant: Adheres to data protection regulations
- Audit Trail: Maintains complete audit trail of API usage
- Data Retention: Logs retained for compliance and debugging purposes
- Access Logging: All log access is itself logged for security
Best Practices
Log Management
- Regular Monitoring: Check logs regularly for unusual patterns
- Error Analysis: Investigate error spikes promptly
- Cost Tracking: Monitor API costs and optimize usage
- Performance Analysis: Track response times and identify bottlenecks
- Security Monitoring: Watch for unauthorized access attempts
Data Analysis
- Trend Analysis: Identify usage patterns and growth trends
- Error Pattern Recognition: Detect recurring issues
- Cost Optimization: Find opportunities to reduce API costs
- Performance Optimization: Identify slow endpoints and optimize
- Capacity Planning: Plan for future usage growth
Automation
- Alert Setup: Set up alerts for high error rates or costs
- Automated Reports: Generate regular usage reports
- Anomaly Detection: Automatically detect unusual patterns
- Cost Controls: Implement automatic cost limiting
- Performance Monitoring: Continuous performance tracking
Troubleshooting
Common Issues
Empty Logs Response# Check if API key is correct
curl -I "https://api.swarms.world/v1/swarm/logs" \
-H "x-api-key: your-api-key"
- Logs are filtered for privacy (no IP addresses)
- Some requests may not be logged due to high volume
- Check your API key permissions
# Analyze response time patterns
logs_data = get_swarm_logs()
if logs_data:
response_times = [log['response_time'] for log in logs_data['logs'] if 'response_time' in log]
avg_time = sum(response_times) / len(response_times)
print(f"Average response time: {avg_time:.2f}s")
# Calculate cost per endpoint
from collections import defaultdict
endpoint_costs = defaultdict(float)
for log in logs_data['logs']:
endpoint_costs[log['endpoint']] += log.get('cost', 0)
for endpoint, cost in sorted(endpoint_costs.items(), key=lambda x: x[1], reverse=True):
print(f"{endpoint}: ${cost:.4f}")