Source code for reminix.runtime.agent

"""
Reminix Agent

The Agent class for building AI agents that conform to the Reminix API contract.
"""

import asyncio
import inspect
from collections.abc import AsyncGenerator, Callable
from typing import Any, cast

from .types import (
    AgentInfo,
    ChatMessage,
    ChatResponse,
    Context,
    InvokeResponse,
    StreamChunk,
)


[docs] class Agent: """ An AI agent that can handle invoke and chat requests. Use separate handlers for non-streaming and streaming responses: - ``@agent.invoke`` / ``@agent.chat`` for non-streaming - ``@agent.invoke_stream`` / ``@agent.chat_stream`` for streaming Example:: agent = Agent(name="my-agent") @agent.invoke async def handle_invoke(input: dict, ctx: Context): return {"output": f"Processed: {input}"} @agent.invoke_stream async def handle_invoke_stream(input: dict, ctx: Context): for word in ["Hello", " ", "World"]: yield {"chunk": word} @agent.chat async def handle_chat(messages: list, ctx: Context): return {"message": {"role": "assistant", "content": "Hello!"}} @agent.chat_stream async def handle_chat_stream(messages: list, ctx: Context): for word in ["Hello", "!", " How can I help?"]: yield {"chunk": word} """
[docs] def __init__( self, name: str, *, metadata: dict[str, Any] | None = None, ) -> None: """ Initialize an agent with a name and optional metadata. Args: name: Unique name for the agent (used in URL routing). Must be URL-safe (alphanumeric, hyphens, underscores). metadata: Optional user-defined metadata (e.g., framework, model, description). """ if not name or not isinstance(name, str): raise ValueError("Agent name must be a non-empty string") # Validate name is URL-safe import re if not re.match(r"^[a-zA-Z0-9_-]+$", name): raise ValueError( "Agent name must be URL-safe (alphanumeric, hyphens, underscores only)" ) self.name = name self._metadata = metadata or {} self._invoke_handler: Callable[..., Any] | None = None self._invoke_stream_handler: Callable[..., Any] | None = None self._chat_handler: Callable[..., Any] | None = None self._chat_stream_handler: Callable[..., Any] | None = None
@property def metadata(self) -> dict[str, Any]: """Get the agent's metadata.""" return self._metadata
[docs] def to_info(self) -> AgentInfo: """ Return agent information for discovery. Returns: AgentInfo dict with name, capabilities, and metadata. """ return { "name": self.name, "invoke": self.has_invoke or self.has_invoke_stream, "chat": self.has_chat or self.has_chat_stream, "metadata": self._metadata, }
[docs] def invoke(self, handler: Callable[[dict[str, Any]], Any]) -> Callable[[dict[str, Any]], Any]: """ Decorator to register a non-streaming invoke handler. The handler receives the input dict and should return {"output": ...}. Example:: @agent.invoke async def handle_invoke(input: dict): return {"output": f"Result: {input['query']}"} """ self._invoke_handler = handler return handler
[docs] def invoke_stream( self, handler: Callable[[dict[str, Any]], Any] ) -> Callable[[dict[str, Any]], Any]: """ Decorator to register a streaming invoke handler. The handler receives the input dict and should yield {"chunk": "..."}. Example:: @agent.invoke_stream async def handle_invoke_stream(input: dict): for word in ["Hello", " ", "World"]: yield {"chunk": word} """ self._invoke_stream_handler = handler return handler
[docs] def chat( self, handler: Callable[[list[ChatMessage]], Any] ) -> Callable[[list[ChatMessage]], Any]: """ Decorator to register a non-streaming chat handler. The handler receives the messages list and should return {"message": {"role": "assistant", "content": "..."}}. Example:: @agent.chat async def handle_chat(messages: list): return { "message": { "role": "assistant", "content": "Hello! How can I help?" } } """ self._chat_handler = handler return handler
[docs] def chat_stream( self, handler: Callable[[list[ChatMessage]], Any] ) -> Callable[[list[ChatMessage]], Any]: """ Decorator to register a streaming chat handler. The handler receives the messages list and should yield {"chunk": "..."}. Example:: @agent.chat_stream async def handle_chat_stream(messages: list): for word in ["Hello", "!", " How can I help?"]: yield {"chunk": word} """ self._chat_stream_handler = handler return handler
@property def has_invoke(self) -> bool: """Check if non-streaming invoke handler is registered.""" return self._invoke_handler is not None @property def has_invoke_stream(self) -> bool: """Check if streaming invoke handler is registered.""" return self._invoke_stream_handler is not None @property def has_chat(self) -> bool: """Check if non-streaming chat handler is registered.""" return self._chat_handler is not None @property def has_chat_stream(self) -> bool: """Check if streaming chat handler is registered.""" return self._chat_stream_handler is not None def _handler_accepts_context(self, handler: Callable[..., Any]) -> bool: """Check if a handler function accepts a context parameter (2+ params).""" try: sig = inspect.signature(handler) return len(sig.parameters) >= 2 except (ValueError, TypeError): return False
[docs] async def handle_invoke( self, input_data: dict[str, Any], ctx: Context | None = None, ) -> InvokeResponse: """ Execute the non-streaming invoke handler. Args: input_data: The input data from the request. ctx: Optional request context (passed if handler accepts it). Returns: An InvokeResponse dict with output. Raises: NotImplementedError: If no invoke handler is registered. """ if self._invoke_handler is None: raise NotImplementedError(f"Agent '{self.name}' does not implement invoke") # Pass context if handler accepts it if self._handler_accepts_context(self._invoke_handler): result = self._invoke_handler(input_data, ctx or {}) else: result = self._invoke_handler(input_data) # Handle async functions if asyncio.iscoroutine(result): result = await result # Validate response if not isinstance(result, dict) or "output" not in result: raise ValueError(f"Invoke handler must return {{'output': ...}}, got: {type(result)}") return cast(InvokeResponse, result)
[docs] async def handle_invoke_stream( self, input_data: dict[str, Any], ctx: Context | None = None, ) -> AsyncGenerator[StreamChunk, None]: """ Execute the streaming invoke handler. Args: input_data: The input data from the request. ctx: Optional request context (passed if handler accepts it). Returns: An async generator yielding StreamChunks. Raises: NotImplementedError: If no streaming invoke handler is registered. """ if self._invoke_stream_handler is None: raise NotImplementedError(f"Agent '{self.name}' does not implement invoke_stream") # Pass context if handler accepts it if self._handler_accepts_context(self._invoke_stream_handler): result = self._invoke_stream_handler(input_data, ctx or {}) else: result = self._invoke_stream_handler(input_data) # Handle async functions that return generators if asyncio.iscoroutine(result): result = await result if not inspect.isasyncgen(result): raise ValueError("invoke_stream handler must be an async generator (use yield)") return result
[docs] async def handle_chat( self, messages: list[ChatMessage], ctx: Context | None = None, ) -> ChatResponse: """ Execute the non-streaming chat handler. Args: messages: The chat messages from the request. ctx: Optional request context (passed if handler accepts it). Returns: A ChatResponse dict with message. Raises: NotImplementedError: If no chat handler is registered. """ if self._chat_handler is None: raise NotImplementedError(f"Agent '{self.name}' does not implement chat") # Pass context if handler accepts it if self._handler_accepts_context(self._chat_handler): result = self._chat_handler(messages, ctx or {}) else: result = self._chat_handler(messages) # Handle async functions if asyncio.iscoroutine(result): result = await result # Validate response if not isinstance(result, dict) or "message" not in result: raise ValueError(f"Chat handler must return {{'message': ...}}, got: {type(result)}") message = result["message"] if not isinstance(message, dict) or message.get("role") != "assistant": raise ValueError( "Chat handler must return {'message': {'role': 'assistant', 'content': ...}}" ) return cast(ChatResponse, result)
[docs] async def handle_chat_stream( self, messages: list[ChatMessage], ctx: Context | None = None, ) -> AsyncGenerator[StreamChunk, None]: """ Execute the streaming chat handler. Args: messages: The chat messages from the request. ctx: Optional request context (passed if handler accepts it). Returns: An async generator yielding StreamChunks. Raises: NotImplementedError: If no streaming chat handler is registered. """ if self._chat_stream_handler is None: raise NotImplementedError(f"Agent '{self.name}' does not implement chat_stream") # Pass context if handler accepts it if self._handler_accepts_context(self._chat_stream_handler): result = self._chat_stream_handler(messages, ctx or {}) else: result = self._chat_stream_handler(messages) # Handle async functions that return generators if asyncio.iscoroutine(result): result = await result if not inspect.isasyncgen(result): raise ValueError("chat_stream handler must be an async generator (use yield)") return result
def __repr__(self) -> str: caps = [] if self.has_invoke: caps.append("invoke") if self.has_invoke_stream: caps.append("invoke_stream") if self.has_chat: caps.append("chat") if self.has_chat_stream: caps.append("chat_stream") return f"Agent(name={self.name!r}, handlers=[{', '.join(caps)}])"