import os
import requests
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
def create_quant_agent_with_mcp():
"""Create a quantitative agent that integrates with MCP server"""
payload = {
"agent_config": {
"agent_name": "MCP-Enabled Quantitative Analyst",
"description": "Quantitative analyst with MCP server integration for real-time data access",
"system_prompt": (
"You are a Quantitative Data Analyst with direct access to MCP servers. "
"Use the MCP tools to fetch real-time financial data, perform statistical analysis, "
"and provide actionable insights. Always verify data quality and include source attribution."
),
"model_name": "gpt-4o",
"role": "quantitative_analyst",
"max_loops": 3,
"max_tokens": 16384,
"temperature": 0.3,
"mcp_url": "https://your-mcp-server.com/financial-data",
"streaming_on": False
},
"task": (
"Connect to the MCP server and fetch the latest market data for AAPL, TSLA, and MSFT. "
"Calculate volatility, Sharpe ratio, and correlation coefficients. "
"Provide portfolio optimization recommendations based on the analysis."
)
}
return payload
def run_mcp_agent():
"""Execute the MCP-enabled quantitative agent"""
payload = create_quant_agent_with_mcp()
try:
response = requests.post(
f"{BASE_URL}/v1/agent/completions",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
if __name__ == "__main__":
result = run_mcp_agent()
if result:
print("MCP Agent executed successfully!")
print(f"Job ID: {result.get('job_id', 'N/A')}")
else:
print("Failed to execute MCP agent")