"""
Reminix Agent
The Agent class for building AI agents that conform to the Reminix API contract.
"""
import asyncio
import inspect
from collections.abc import AsyncGenerator, Callable
from typing import Any, cast
from .types import (
AgentInfo,
ChatMessage,
ChatResponse,
Context,
InvokeResponse,
StreamChunk,
)
[docs]
class Agent:
"""
An AI agent that can handle invoke and chat requests.
Use separate handlers for non-streaming and streaming responses:
- ``@agent.invoke`` / ``@agent.chat`` for non-streaming
- ``@agent.invoke_stream`` / ``@agent.chat_stream`` for streaming
Example::
agent = Agent(name="my-agent")
@agent.invoke
async def handle_invoke(input: dict, ctx: Context):
return {"output": f"Processed: {input}"}
@agent.invoke_stream
async def handle_invoke_stream(input: dict, ctx: Context):
for word in ["Hello", " ", "World"]:
yield {"chunk": word}
@agent.chat
async def handle_chat(messages: list, ctx: Context):
return {"message": {"role": "assistant", "content": "Hello!"}}
@agent.chat_stream
async def handle_chat_stream(messages: list, ctx: Context):
for word in ["Hello", "!", " How can I help?"]:
yield {"chunk": word}
"""
[docs]
def __init__(
self,
name: str,
*,
metadata: dict[str, Any] | None = None,
) -> None:
"""
Initialize an agent with a name and optional metadata.
Args:
name: Unique name for the agent (used in URL routing).
Must be URL-safe (alphanumeric, hyphens, underscores).
metadata: Optional user-defined metadata (e.g., framework, model, description).
"""
if not name or not isinstance(name, str):
raise ValueError("Agent name must be a non-empty string")
# Validate name is URL-safe
import re
if not re.match(r"^[a-zA-Z0-9_-]+$", name):
raise ValueError(
"Agent name must be URL-safe (alphanumeric, hyphens, underscores only)"
)
self.name = name
self._metadata = metadata or {}
self._invoke_handler: Callable[..., Any] | None = None
self._invoke_stream_handler: Callable[..., Any] | None = None
self._chat_handler: Callable[..., Any] | None = None
self._chat_stream_handler: Callable[..., Any] | None = None
@property
def metadata(self) -> dict[str, Any]:
"""Get the agent's metadata."""
return self._metadata
[docs]
def to_info(self) -> AgentInfo:
"""
Return agent information for discovery.
Returns:
AgentInfo dict with name, capabilities, and metadata.
"""
return {
"name": self.name,
"invoke": self.has_invoke or self.has_invoke_stream,
"chat": self.has_chat or self.has_chat_stream,
"metadata": self._metadata,
}
[docs]
def invoke(self, handler: Callable[[dict[str, Any]], Any]) -> Callable[[dict[str, Any]], Any]:
"""
Decorator to register a non-streaming invoke handler.
The handler receives the input dict and should return {"output": ...}.
Example::
@agent.invoke
async def handle_invoke(input: dict):
return {"output": f"Result: {input['query']}"}
"""
self._invoke_handler = handler
return handler
[docs]
def invoke_stream(
self, handler: Callable[[dict[str, Any]], Any]
) -> Callable[[dict[str, Any]], Any]:
"""
Decorator to register a streaming invoke handler.
The handler receives the input dict and should yield {"chunk": "..."}.
Example::
@agent.invoke_stream
async def handle_invoke_stream(input: dict):
for word in ["Hello", " ", "World"]:
yield {"chunk": word}
"""
self._invoke_stream_handler = handler
return handler
[docs]
def chat(
self, handler: Callable[[list[ChatMessage]], Any]
) -> Callable[[list[ChatMessage]], Any]:
"""
Decorator to register a non-streaming chat handler.
The handler receives the messages list and should return
{"message": {"role": "assistant", "content": "..."}}.
Example::
@agent.chat
async def handle_chat(messages: list):
return {
"message": {
"role": "assistant",
"content": "Hello! How can I help?"
}
}
"""
self._chat_handler = handler
return handler
[docs]
def chat_stream(
self, handler: Callable[[list[ChatMessage]], Any]
) -> Callable[[list[ChatMessage]], Any]:
"""
Decorator to register a streaming chat handler.
The handler receives the messages list and should yield {"chunk": "..."}.
Example::
@agent.chat_stream
async def handle_chat_stream(messages: list):
for word in ["Hello", "!", " How can I help?"]:
yield {"chunk": word}
"""
self._chat_stream_handler = handler
return handler
@property
def has_invoke(self) -> bool:
"""Check if non-streaming invoke handler is registered."""
return self._invoke_handler is not None
@property
def has_invoke_stream(self) -> bool:
"""Check if streaming invoke handler is registered."""
return self._invoke_stream_handler is not None
@property
def has_chat(self) -> bool:
"""Check if non-streaming chat handler is registered."""
return self._chat_handler is not None
@property
def has_chat_stream(self) -> bool:
"""Check if streaming chat handler is registered."""
return self._chat_stream_handler is not None
def _handler_accepts_context(self, handler: Callable[..., Any]) -> bool:
"""Check if a handler function accepts a context parameter (2+ params)."""
try:
sig = inspect.signature(handler)
return len(sig.parameters) >= 2
except (ValueError, TypeError):
return False
[docs]
async def handle_invoke(
self,
input_data: dict[str, Any],
ctx: Context | None = None,
) -> InvokeResponse:
"""
Execute the non-streaming invoke handler.
Args:
input_data: The input data from the request.
ctx: Optional request context (passed if handler accepts it).
Returns:
An InvokeResponse dict with output.
Raises:
NotImplementedError: If no invoke handler is registered.
"""
if self._invoke_handler is None:
raise NotImplementedError(f"Agent '{self.name}' does not implement invoke")
# Pass context if handler accepts it
if self._handler_accepts_context(self._invoke_handler):
result = self._invoke_handler(input_data, ctx or {})
else:
result = self._invoke_handler(input_data)
# Handle async functions
if asyncio.iscoroutine(result):
result = await result
# Validate response
if not isinstance(result, dict) or "output" not in result:
raise ValueError(f"Invoke handler must return {{'output': ...}}, got: {type(result)}")
return cast(InvokeResponse, result)
[docs]
async def handle_invoke_stream(
self,
input_data: dict[str, Any],
ctx: Context | None = None,
) -> AsyncGenerator[StreamChunk, None]:
"""
Execute the streaming invoke handler.
Args:
input_data: The input data from the request.
ctx: Optional request context (passed if handler accepts it).
Returns:
An async generator yielding StreamChunks.
Raises:
NotImplementedError: If no streaming invoke handler is registered.
"""
if self._invoke_stream_handler is None:
raise NotImplementedError(f"Agent '{self.name}' does not implement invoke_stream")
# Pass context if handler accepts it
if self._handler_accepts_context(self._invoke_stream_handler):
result = self._invoke_stream_handler(input_data, ctx or {})
else:
result = self._invoke_stream_handler(input_data)
# Handle async functions that return generators
if asyncio.iscoroutine(result):
result = await result
if not inspect.isasyncgen(result):
raise ValueError("invoke_stream handler must be an async generator (use yield)")
return result
[docs]
async def handle_chat(
self,
messages: list[ChatMessage],
ctx: Context | None = None,
) -> ChatResponse:
"""
Execute the non-streaming chat handler.
Args:
messages: The chat messages from the request.
ctx: Optional request context (passed if handler accepts it).
Returns:
A ChatResponse dict with message.
Raises:
NotImplementedError: If no chat handler is registered.
"""
if self._chat_handler is None:
raise NotImplementedError(f"Agent '{self.name}' does not implement chat")
# Pass context if handler accepts it
if self._handler_accepts_context(self._chat_handler):
result = self._chat_handler(messages, ctx or {})
else:
result = self._chat_handler(messages)
# Handle async functions
if asyncio.iscoroutine(result):
result = await result
# Validate response
if not isinstance(result, dict) or "message" not in result:
raise ValueError(f"Chat handler must return {{'message': ...}}, got: {type(result)}")
message = result["message"]
if not isinstance(message, dict) or message.get("role") != "assistant":
raise ValueError(
"Chat handler must return {'message': {'role': 'assistant', 'content': ...}}"
)
return cast(ChatResponse, result)
[docs]
async def handle_chat_stream(
self,
messages: list[ChatMessage],
ctx: Context | None = None,
) -> AsyncGenerator[StreamChunk, None]:
"""
Execute the streaming chat handler.
Args:
messages: The chat messages from the request.
ctx: Optional request context (passed if handler accepts it).
Returns:
An async generator yielding StreamChunks.
Raises:
NotImplementedError: If no streaming chat handler is registered.
"""
if self._chat_stream_handler is None:
raise NotImplementedError(f"Agent '{self.name}' does not implement chat_stream")
# Pass context if handler accepts it
if self._handler_accepts_context(self._chat_stream_handler):
result = self._chat_stream_handler(messages, ctx or {})
else:
result = self._chat_stream_handler(messages)
# Handle async functions that return generators
if asyncio.iscoroutine(result):
result = await result
if not inspect.isasyncgen(result):
raise ValueError("chat_stream handler must be an async generator (use yield)")
return result
def __repr__(self) -> str:
caps = []
if self.has_invoke:
caps.append("invoke")
if self.has_invoke_stream:
caps.append("invoke_stream")
if self.has_chat:
caps.append("chat")
if self.has_chat_stream:
caps.append("chat_stream")
return f"Agent(name={self.name!r}, handlers=[{', '.join(caps)}])"