"""
Reminix Server
FastAPI-based HTTP server for serving Reminix agents.
"""
from typing import Any
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
import reminix
from .agent import Agent
from .streaming import stream_to_sse
from .types import DiscoverResponse, HealthResponse
def create_app(agents: list[Agent]) -> FastAPI:
"""
Create a FastAPI application for serving agents.
Args:
agents: List of agents to serve.
Returns:
Configured FastAPI application.
"""
app = FastAPI(
title="Reminix Agent Server",
description="Serving Reminix AI agents",
)
# Create agent lookup by name
agent_map: dict[str, Agent] = {agent.name: agent for agent in agents}
@app.get("/health")
async def health() -> HealthResponse:
"""Health check endpoint."""
return HealthResponse(
status="healthy",
agents=list(agent_map.keys()),
)
@app.get("/_discover")
async def discover() -> DiscoverResponse:
"""Discovery endpoint for Reminix platform."""
return DiscoverResponse(
runtime={
"version": reminix.__version__,
"language": "python",
"framework": "fastapi",
},
agents=[agent.to_info() for agent in agents],
)
@app.get("/agent/{name}/health")
async def agent_health(name: str) -> dict[str, Any]:
"""Per-agent health check."""
if name not in agent_map:
raise HTTPException(status_code=404, detail=f"Agent '{name}' not found")
agent = agent_map[name]
return {
"name": agent.name,
"invoke": agent.has_invoke or agent.has_invoke_stream,
"chat": agent.has_chat or agent.has_chat_stream,
}
@app.post("/agent/{name}/invoke")
async def invoke_agent(name: str, request: Request) -> Any:
"""Invoke an agent with input data."""
if name not in agent_map:
raise HTTPException(status_code=404, detail=f"Agent '{name}' not found")
agent = agent_map[name]
# Parse request body
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
input_data = body.get("input")
stream = body.get("stream", False)
context = body.get("context")
if input_data is None or not isinstance(input_data, dict):
raise HTTPException(status_code=400, detail="'input' must be an object")
# Check handler availability before calling
if stream:
if not agent.has_invoke_stream:
raise HTTPException(
status_code=501,
detail=f"Agent '{name}' does not implement invoke_stream",
)
else:
if not agent.has_invoke:
raise HTTPException(
status_code=501,
detail=f"Agent '{name}' does not implement invoke",
)
try:
if stream:
return StreamingResponse(
stream_to_sse(await agent.handle_invoke_stream(input_data, context)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
else:
return JSONResponse(content=await agent.handle_invoke(input_data, context))
except NotImplementedError as e:
raise HTTPException(status_code=501, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=502, detail=f"Agent error: {e}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
@app.post("/agent/{name}/chat")
async def chat_agent(name: str, request: Request) -> Any:
"""Chat with an agent."""
if name not in agent_map:
raise HTTPException(status_code=404, detail=f"Agent '{name}' not found")
agent = agent_map[name]
# Parse request body
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
messages = body.get("messages")
stream = body.get("stream", False)
context = body.get("context")
if messages is None or not isinstance(messages, list):
raise HTTPException(status_code=400, detail="'messages' must be an array")
if len(messages) == 0:
raise HTTPException(status_code=400, detail="'messages' must not be empty")
# Validate each message has required fields
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
raise HTTPException(
status_code=400, detail=f"Message at index {i} must be an object"
)
if "role" not in msg:
raise HTTPException(status_code=400, detail=f"Message at index {i} missing 'role'")
if msg["role"] not in ("system", "user", "assistant", "tool"):
raise HTTPException(
status_code=400,
detail=f"Message at index {i} has invalid role: {msg['role']}",
)
# Check handler availability before calling
if stream:
if not agent.has_chat_stream:
raise HTTPException(
status_code=501,
detail=f"Agent '{name}' does not implement chat_stream",
)
else:
if not agent.has_chat:
raise HTTPException(
status_code=501,
detail=f"Agent '{name}' does not implement chat",
)
try:
if stream:
return StreamingResponse(
stream_to_sse(await agent.handle_chat_stream(messages, context)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
else:
return JSONResponse(content=await agent.handle_chat(messages, context))
except NotImplementedError as e:
raise HTTPException(status_code=501, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=502, detail=f"Agent error: {e}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
return app
[docs]
def serve(
agents: Agent | list[Agent],
*,
host: str = "0.0.0.0",
port: int = 8080,
) -> None:
"""
Start the HTTP server to serve agents.
Args:
agents: Single agent or list of agents to serve.
host: Host to bind to (default: "0.0.0.0").
port: Port to listen on (default: 8080).
Example::
from reminix.runtime import Agent, serve
agent = Agent(name="my-agent")
@agent.invoke
async def handle_invoke(input: dict):
return {"output": "Hello!"}
serve([agent], port=8080)
"""
import uvicorn
# Normalize to list
if isinstance(agents, Agent):
agents = [agents]
if not agents:
raise ValueError("At least one agent must be provided")
# Validate all agents have unique names
names = [a.name for a in agents]
if len(names) != len(set(names)):
raise ValueError("All agents must have unique names")
# Create and run the app
app = create_app(agents)
print(f"Starting Reminix agent server on http://{host}:{port}")
print(f"Serving agents: {', '.join(names)}")
print(f"Health check: http://{host}:{port}/health")
uvicorn.run(app, host=host, port=port)