What This Example Shows
- The
BatchedGridWorkflow endpoint, which runs every agent against every task in parallel
- A practical financial-analysis use case: two ETF analysts (risk + quant) across two sector queries (energy + semis)
- How to parse the per-task, per-agent output grid
BatchedGridWorkflow is a fan-out × fan-out primitive. If you give it N agents and M tasks, you get N × M independent agent runs in parallel — perfect for “analyze each of these tickers using each of these specialists” workloads.
Step 1: Setup
import asyncio
import os
from typing import Any, Dict
import httpx
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
ENDPOINT = f"{BASE_URL}/v1/batched-grid-workflow/completions"
HEADERS = {"x-api-key": API_KEY, "Content-Type": "application/json"}
Step 2: Build the Grid
Two specialists, two tasks → four parallel agent runs.
def build_payload() -> Dict[str, Any]:
return {
"name": "ETF Analysis Grid",
"description": "Risk and quant analysis across energy and semiconductor ETFs.",
"max_loops": 1,
"agent_completions": [
{
"agent_name": "Risk Analyst",
"description": "Risk assessment and portfolio risk metrics.",
"system_prompt": (
"You are a risk analyst specializing in ETF analysis. "
"Evaluate ETFs based on volatility, downside risk, correlation, "
"concentration risk, and risk-adjusted returns. Report Sharpe "
"ratio, maximum drawdown, beta, and Value at Risk (VaR)."
),
"model_name": "gpt-4.1",
"max_loops": 1,
"temperature": 0.3,
},
{
"agent_name": "Quantitative Analyst",
"description": "Quantitative metrics and performance analysis.",
"system_prompt": (
"You are a quantitative analyst specializing in ETF analysis. "
"Evaluate ETFs on performance, expense ratios, tracking error, "
"liquidity, and holdings composition. Provide returns, Sharpe "
"ratio, information ratio, and factor exposures."
),
"model_name": "gpt-4.1",
"max_loops": 1,
"temperature": 0.3,
},
],
"tasks": [
(
"Analyze the top energy ETFs including XLE, VDE, and IYE. Provide "
"detailed risk and performance metrics, holdings analysis, and "
"investment considerations."
),
(
"Analyze the top semiconductor ETFs including SMH, SOXX, and XSD. "
"Provide detailed risk and performance metrics, holdings analysis, "
"and investment considerations."
),
],
}
Step 3: Call the Endpoint
The /v1/batched-grid-workflow/completions endpoint is async-friendly. Use httpx.AsyncClient to avoid blocking your event loop.
async def run_grid() -> Dict[str, Any]:
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(ENDPOINT, headers=HEADERS, json=build_payload())
response.raise_for_status()
return response.json()
Step 4: Parse the Output
The response carries one entry per task; each entry is a dict keyed by agent name.
def show(response_data: Dict[str, Any]) -> None:
for task_idx, task_outputs in enumerate(response_data.get("outputs", []), start=1):
print("=" * 60)
print(f"Task {task_idx}")
print("=" * 60)
if isinstance(task_outputs, dict):
for agent_name, agent_response in task_outputs.items():
print(f"\n--- {agent_name} ---")
print(str(agent_response)[:400] + "...")
if __name__ == "__main__":
response = asyncio.run(run_grid())
show(response)
Output shape: outputs[task_index][agent_name] = response. If you flatten this into a 2-D grid (rows = tasks, columns = agents), every cell is an independent analyst opinion you can compare side-by-side.
When To Use Batched Grid
| If you need… | Use |
|---|
| Each agent to see the previous agent’s output | SequentialWorkflow |
| All agents to react to the same task in parallel | ConcurrentWorkflow |
| Same agents applied to many independent tasks | BatchedGridWorkflow |
| A director to synthesize specialist opinions | HierarchicalSwarm |
Batched Grid is the right choice when each task is independent — no shared state, no inter-agent communication, just maximum parallelism.
BatchedGridWorkflow is a premium endpoint. See the pricing page for cost details.