Source code for reminix.adapters.langchain.agent_executor
"""
LangChain AgentExecutor Adapter
Wraps a LangChain AgentExecutor for use with the Reminix runtime.
"""
from __future__ import annotations
from typing import Any
from reminix.runtime import Agent
[docs]
def from_agent_executor(
executor: Any,
*,
name: str,
metadata: dict[str, Any] | None = None,
) -> Agent:
"""
Create a Reminix Agent from a LangChain AgentExecutor.
Args:
executor: A LangChain AgentExecutor instance.
name: Name for the Reminix agent.
metadata: Optional metadata for the agent.
Returns:
A Reminix Agent that wraps the LangChain executor.
Example::
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from reminix.adapters.langchain import from_agent_executor
from reminix.runtime import serve
# Create LangChain agent
llm = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
# Wrap and serve
reminix_agent = from_agent_executor(executor, name="react-agent")
serve(reminix_agent)
"""
agent = Agent(
name,
metadata={
"framework": "langchain",
"adapter": "agent_executor",
**(metadata or {}),
},
)
@agent.invoke # type: ignore[arg-type]
async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming invoke via LangChain AgentExecutor."""
result = await executor.ainvoke(input_data)
# AgentExecutor returns {"output": "..."} by default
if isinstance(result, dict) and "output" in result:
return {"output": result["output"]}
# Handle other response shapes
return {"output": str(result)}
@agent.invoke_stream # type: ignore[arg-type]
async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]):
"""Streaming invoke via LangChain AgentExecutor."""
async for event in executor.astream_events(input_data, version="v2"):
if event["event"] == "on_chat_model_stream":
chunk = event["data"].get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
yield {"chunk": chunk.content}
@agent.chat # type: ignore[arg-type]
async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming chat via LangChain AgentExecutor."""
# Convert Reminix messages to LangChain format
lc_messages = _convert_messages(messages)
result = await executor.ainvoke({"messages": lc_messages})
# Extract output
if isinstance(result, dict) and "output" in result:
output = result["output"]
else:
output = str(result)
return {"message": {"role": "assistant", "content": output}}
@agent.chat_stream # type: ignore[arg-type]
async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]):
"""Streaming chat via LangChain AgentExecutor."""
lc_messages = _convert_messages(messages)
async for event in executor.astream_events({"messages": lc_messages}, version="v2"):
if event["event"] == "on_chat_model_stream":
chunk = event["data"].get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
yield {"chunk": chunk.content}
return agent
def _convert_messages(messages: list[dict[str, Any]]) -> list[Any]:
"""Convert Reminix messages to LangChain message format."""
# Import here to avoid requiring langchain as a dependency
try:
from langchain_core.messages import ( # type: ignore[import-not-found]
AIMessage,
HumanMessage,
SystemMessage,
)
except ImportError as e:
raise ImportError(
"langchain-core is required for the LangChain adapter. "
"Install it with: pip install langchain-core"
) from e
lc_messages: list[Any] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
lc_messages.append(SystemMessage(content=content))
elif role == "user":
lc_messages.append(HumanMessage(content=content))
elif role == "assistant":
lc_messages.append(AIMessage(content=content))
# Skip tool messages for now
return lc_messages