Create a Python file
run_teams.py
Copy
Ask AI
import asyncio
from agno.client import AgentOSClient
from agno.run.team import RunCompletedEvent, RunContentEvent
async def run_team_non_streaming():
"""Execute a non-streaming team run."""
print("=" * 60)
print("Non-Streaming Team Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available teams
config = await client.aget_config()
if not config.teams:
print("No teams available")
return
team_id = config.teams[0].id
print(f"Running team: {team_id}")
# Execute the team
result = await client.run_team(
team_id=team_id,
message="What is the capital of France and what is 15 * 7?",
)
print(f"\nRun ID: {result.run_id}")
print(f"Content: {result.content}")
async def run_team_streaming():
"""Execute a streaming team run."""
print("\n" + "=" * 60)
print("Streaming Team Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available teams
config = await client.aget_config()
if not config.teams:
print("No teams available")
return
team_id = config.teams[0].id
print(f"Streaming from team: {team_id}")
print("\nResponse: ", end="", flush=True)
async for event in client.run_team_stream(
team_id=team_id,
message="Tell me about Python programming in 2 sentences.",
):
if isinstance(event, RunContentEvent):
print(event.content, end="", flush=True)
elif isinstance(event, RunCompletedEvent):
pass
print("\n")
async def main():
await run_team_non_streaming()
await run_team_streaming()
if __name__ == "__main__":
asyncio.run(main())
Start an AgentOS Server
Make sure you have an AgentOS server running with teams configured. See Creating Your First OS for setup instructions.