Source code for reminix.adapters.llamaindex.chat_engine
"""
LlamaIndex ChatEngine Adapter
Wraps a LlamaIndex ChatEngine for use with the Reminix runtime.
Compatibility:
llama-index-core >= 0.14.0
"""
from __future__ import annotations
from typing import Any
from ..protocols import LlamaIndexChatEngineProtocol
from reminix.runtime import Agent
[docs]
def from_chat_engine(
chat_engine: LlamaIndexChatEngineProtocol,
*,
name: str,
metadata: dict[str, Any] | None = None,
) -> Agent:
"""
Create a Reminix Agent from a LlamaIndex ChatEngine.
ChatEngine maintains conversation history and is ideal for
conversational RAG applications.
Args:
chat_engine: A LlamaIndex ChatEngine instance.
name: Name for the Reminix agent.
metadata: Optional metadata for the agent.
Returns:
A Reminix Agent that wraps the ChatEngine.
Example::
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from reminix.adapters.llamaindex import from_chat_engine
from reminix.runtime import serve
# Load and index documents
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
# Create ChatEngine and wrap it
chat_engine = index.as_chat_engine(chat_mode="condense_question")
agent = from_chat_engine(chat_engine, name="docs-chat")
serve(agent)
"""
agent = Agent(
name,
metadata={
"framework": "llamaindex",
"adapter": "chat-engine",
**(metadata or {}),
},
)
@agent.invoke # type: ignore[arg-type]
async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming invoke via LlamaIndex ChatEngine."""
message = _extract_message(input_data)
response = await chat_engine.achat(message)
return {"output": str(response)}
@agent.invoke_stream # type: ignore[arg-type]
async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]):
"""Streaming invoke via LlamaIndex ChatEngine."""
message = _extract_message(input_data)
response = await chat_engine.astream_chat(message) # type: ignore[attr-defined]
async for token in response.async_response_gen():
yield {"chunk": token}
@agent.chat # type: ignore[arg-type]
async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming chat via LlamaIndex ChatEngine."""
# Reset chat history and replay messages for context
chat_engine.reset()
last_message = ""
for msg in messages:
if msg.get("role") == "user":
last_message = msg.get("content", "")
response = await chat_engine.achat(last_message)
return {"message": {"role": "assistant", "content": str(response)}}
@agent.chat_stream # type: ignore[arg-type]
async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]):
"""Streaming chat via LlamaIndex ChatEngine."""
chat_engine.reset()
last_message = ""
for msg in messages:
if msg.get("role") == "user":
last_message = msg.get("content", "")
response = await chat_engine.astream_chat(last_message) # type: ignore[attr-defined]
async for token in response.async_response_gen():
yield {"chunk": token}
return agent
def _extract_message(input_data: dict[str, Any]) -> str:
"""Extract message string from input data."""
if "message" in input_data:
return str(input_data["message"])
if "input" in input_data:
return str(input_data["input"])
if "query" in input_data:
return str(input_data["query"])
if "prompt" in input_data:
return str(input_data["prompt"])
return str(input_data)