Source code for reminix.adapters.openai.client

"""
OpenAI SDK Client Adapter

Wraps an OpenAI AsyncOpenAI client for use with the Reminix runtime.

Compatibility:
    openai >= 2.0.0
"""

from typing import Any, AsyncIterator

from ..protocols import OpenAIClientProtocol
from ...runtime import Agent, Context


[docs] def from_openai( client: OpenAIClientProtocol, *, name: str, model: str = "gpt-4o", system: str | None = None, metadata: dict[str, Any] | None = None, ) -> Agent: """ Create a Reminix Agent from an OpenAI AsyncOpenAI client. Args: client: An OpenAI AsyncOpenAI client instance name: Name for the Reminix agent model: The model to use (default: "gpt-4o") system: Optional system prompt metadata: Optional metadata for the agent Returns: A Reminix Agent that wraps the OpenAI client Example:: from openai import AsyncOpenAI from reminix.adapters.openai import from_openai from reminix.runtime import serve client = AsyncOpenAI() agent = from_openai(client, name="gpt4-agent", model="gpt-4o") serve(agent) """ agent = Agent( name=name, metadata={ "framework": "openai", "adapter": "client", "model": model, **(metadata or {}), }, ) def _build_messages( messages: list[dict[str, Any]], sys: str | None = None ) -> list[dict[str, Any]]: """Build messages list with optional system prompt.""" result: list[dict[str, Any]] = [] if sys: result.append({"role": "system", "content": sys}) result.extend(messages) return result @agent.invoke # type: ignore[arg-type] async def handle_invoke(input_data: dict[str, Any], ctx: Context) -> dict[str, Any]: prompt = input_data.get("prompt") or input_data.get("input") or str(input_data) messages = _build_messages([{"role": "user", "content": prompt}], system) response = await client.chat.completions.create( model=model, messages=messages, stream=False, ) return {"output": response.choices[0].message.content} # type: ignore[union-attr] @agent.invoke_stream # type: ignore[arg-type] async def handle_invoke_stream( input_data: dict[str, Any], ctx: Context ) -> AsyncIterator[dict[str, str]]: prompt = input_data.get("prompt") or input_data.get("input") or str(input_data) messages = _build_messages([{"role": "user", "content": prompt}], system) stream = await client.chat.completions.create( model=model, messages=messages, stream=True, ) async for chunk in stream: # type: ignore[union-attr] if chunk.choices and chunk.choices[0].delta.content: yield {"chunk": chunk.choices[0].delta.content} @agent.chat # type: ignore[arg-type] async def handle_chat(messages: list[dict[str, Any]], ctx: Context) -> dict[str, Any]: chat_messages = _build_messages( [{"role": m["role"], "content": m["content"]} for m in messages], system, ) response = await client.chat.completions.create( model=model, messages=chat_messages, stream=False, ) return { "message": { "role": "assistant", "content": response.choices[0].message.content, # type: ignore[union-attr] } } @agent.chat_stream # type: ignore[arg-type] async def handle_chat_stream( messages: list[dict[str, Any]], ctx: Context ) -> AsyncIterator[dict[str, str]]: chat_messages = _build_messages( [{"role": m["role"], "content": m["content"]} for m in messages], system, ) stream = await client.chat.completions.create( model=model, messages=chat_messages, stream=True, ) async for chunk in stream: # type: ignore[union-attr] if chunk.choices and chunk.choices[0].delta.content: yield {"chunk": chunk.choices[0].delta.content} return agent