Skip to main content

Creating an A2A Agent

This guide will walk you through creating your own Agent-to-Agent (A2A) compatible agent for AutonomousSphere.

What is an A2A Agent?

An A2A agent is a service that can receive messages, process them, and respond in a standardized way. Agents can be integrated into the AutonomousSphere platform to provide various capabilities.

Prerequisites

  • Python 3.8+
  • FastAPI
  • Basic understanding of RESTful APIs

Step 1: Set Up Your Project

Create a new directory for your agent:
mkdir my-agent
cd my-agent
Create a virtual environment and install dependencies:
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install fastapi uvicorn requests pydantic

Step 2: Create the Agent Code

Create a main.py file with the following code:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional
import os
import sys
import logging

# Add parent directory to path to import base_agent
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from base_agent.base_agent import BaseA2AAgent, AgentCapabilities

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create FastAPI app
app = FastAPI(title="My A2A Agent", description="My custom A2A-compatible agent")

class MessageRequest(BaseModel):
    message: str
    channel_id: Optional[int] = None
    user_id: Optional[int] = None

class MessageResponse(BaseModel):
    response: str
    agent: str
    metadata: Dict[str, Any] = {}

# Get environment variables
AGENT_NAME = os.getenv("AGENT_NAME", "MyAgent")
AGENT_URL = os.getenv("AGENT_URL", "http://my-agent:8080")
REGISTRY_URL = os.getenv("REGISTRY_URL", "http://registry:8081")
CHAT_SERVICE_URL = os.getenv("CHAT_SERVICE_URL", "http://api:8000")

# Create agent instance
agent = BaseA2AAgent(
    name=AGENT_NAME,
    description="My custom A2A agent",
    url=AGENT_URL,
    registry_url=REGISTRY_URL,
    chat_service_url=CHAT_SERVICE_URL,
    capabilities=AgentCapabilities(
        text=True,
        streaming=False,
        functions=["my_function"]
    )
)

# Implement text message handler
def handle_text(message: Dict[str, Any]) -> Dict[str, Any]:
    """Handle a text message"""
    text = message.get("message", "")
    
    # Your custom logic here
    response = f"You said: {text}"
    
    return {
        "response": response,
        "agent": AGENT_NAME,
        "metadata": {
            "processed_at": "2023-01-01T00:00:00Z"
        }
    }

# Register the handler
agent.register_message_handler("text", handle_text)

# Override the abstract method
agent.handle_text_message = handle_text

@app.get("/")
def read_root():
    return {"status": "ok", "agent": AGENT_NAME}

@app.get("/agent-card")
def get_agent_card():
    """Return the agent card for A2A discovery"""
    return {
        "name": AGENT_NAME,
        "url": AGENT_URL,
        "description": "My custom A2A agent",
        "capabilities": {
            "text": True,
            "streaming": False,
            "functions": ["my_function"]
        }
    }

@app.post("/message", response_model=MessageResponse)
async def process_message(request: MessageRequest):
    """Process a message sent to the agent"""
    try:
        logger.info(f"Received message: {request.message}")
        
        # Convert to dict for the handler
        message_dict = {
            "type": "text",
            "message": request.message,
            "channel_id": request.channel_id,
            "user_id": request.user_id
        }
        
        # Process with the agent
        result = agent.handle_message(message_dict)
        
        return MessageResponse(
            response=result["response"],
            agent=result["agent"],
            metadata=result["metadata"]
        )
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
def health_check():
    """Health check endpoint"""
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True)

Step 3: Run Your Agent

Start your agent:
uvicorn main:app --host 0.0.0.0 --port 8080 --reload

Step 4: Register with AutonomousSphere

Your agent will automatically register with the AutonomousSphere registry service if it’s running. You can also manually trigger discovery in the AutonomousSphere API.

Step 5: Test Your Agent

Send a test message to your agent:
curl -X POST "http://localhost:8080/message" \
     -H "Content-Type: application/json" \
     -d '{"message": "Hello, agent!"}'

Next Steps

  • Add more sophisticated processing logic
  • Implement streaming responses
  • Add custom functions