Source code for reminix.adapters.langchain.agent_executor

"""
LangChain AgentExecutor Adapter

Wraps a LangChain AgentExecutor for use with the Reminix runtime.
"""

from __future__ import annotations

from typing import Any

from reminix.runtime import Agent


[docs] def from_agent_executor( executor: Any, *, name: str, metadata: dict[str, Any] | None = None, ) -> Agent: """ Create a Reminix Agent from a LangChain AgentExecutor. Args: executor: A LangChain AgentExecutor instance. name: Name for the Reminix agent. metadata: Optional metadata for the agent. Returns: A Reminix Agent that wraps the LangChain executor. Example:: from langchain.agents import AgentExecutor, create_react_agent from langchain_openai import ChatOpenAI from reminix.adapters.langchain import from_agent_executor from reminix.runtime import serve # Create LangChain agent llm = ChatOpenAI(model="gpt-4o") agent = create_react_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools) # Wrap and serve reminix_agent = from_agent_executor(executor, name="react-agent") serve(reminix_agent) """ agent = Agent( name, metadata={ "framework": "langchain", "adapter": "agent_executor", **(metadata or {}), }, ) @agent.invoke # type: ignore[arg-type] async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming invoke via LangChain AgentExecutor.""" result = await executor.ainvoke(input_data) # AgentExecutor returns {"output": "..."} by default if isinstance(result, dict) and "output" in result: return {"output": result["output"]} # Handle other response shapes return {"output": str(result)} @agent.invoke_stream # type: ignore[arg-type] async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]): """Streaming invoke via LangChain AgentExecutor.""" async for event in executor.astream_events(input_data, version="v2"): if event["event"] == "on_chat_model_stream": chunk = event["data"].get("chunk") if chunk and hasattr(chunk, "content") and chunk.content: yield {"chunk": chunk.content} @agent.chat # type: ignore[arg-type] async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming chat via LangChain AgentExecutor.""" # Convert Reminix messages to LangChain format lc_messages = _convert_messages(messages) result = await executor.ainvoke({"messages": lc_messages}) # Extract output if isinstance(result, dict) and "output" in result: output = result["output"] else: output = str(result) return {"message": {"role": "assistant", "content": output}} @agent.chat_stream # type: ignore[arg-type] async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]): """Streaming chat via LangChain AgentExecutor.""" lc_messages = _convert_messages(messages) async for event in executor.astream_events({"messages": lc_messages}, version="v2"): if event["event"] == "on_chat_model_stream": chunk = event["data"].get("chunk") if chunk and hasattr(chunk, "content") and chunk.content: yield {"chunk": chunk.content} return agent
def _convert_messages(messages: list[dict[str, Any]]) -> list[Any]: """Convert Reminix messages to LangChain message format.""" # Import here to avoid requiring langchain as a dependency try: from langchain_core.messages import ( # type: ignore[import-not-found] AIMessage, HumanMessage, SystemMessage, ) except ImportError as e: raise ImportError( "langchain-core is required for the LangChain adapter. " "Install it with: pip install langchain-core" ) from e lc_messages: list[Any] = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": lc_messages.append(SystemMessage(content=content)) elif role == "user": lc_messages.append(HumanMessage(content=content)) elif role == "assistant": lc_messages.append(AIMessage(content=content)) # Skip tool messages for now return lc_messages