Source code for reminix.adapters.llamaindex.chat_engine

"""
LlamaIndex ChatEngine Adapter

Wraps a LlamaIndex ChatEngine for use with the Reminix runtime.

Compatibility:
    llama-index-core >= 0.14.0
"""

from __future__ import annotations

from typing import Any

from ..protocols import LlamaIndexChatEngineProtocol
from reminix.runtime import Agent


[docs] def from_chat_engine( chat_engine: LlamaIndexChatEngineProtocol, *, name: str, metadata: dict[str, Any] | None = None, ) -> Agent: """ Create a Reminix Agent from a LlamaIndex ChatEngine. ChatEngine maintains conversation history and is ideal for conversational RAG applications. Args: chat_engine: A LlamaIndex ChatEngine instance. name: Name for the Reminix agent. metadata: Optional metadata for the agent. Returns: A Reminix Agent that wraps the ChatEngine. Example:: from llama_index.core import VectorStoreIndex, SimpleDirectoryReader from reminix.adapters.llamaindex import from_chat_engine from reminix.runtime import serve # Load and index documents documents = SimpleDirectoryReader("data").load_data() index = VectorStoreIndex.from_documents(documents) # Create ChatEngine and wrap it chat_engine = index.as_chat_engine(chat_mode="condense_question") agent = from_chat_engine(chat_engine, name="docs-chat") serve(agent) """ agent = Agent( name, metadata={ "framework": "llamaindex", "adapter": "chat-engine", **(metadata or {}), }, ) @agent.invoke # type: ignore[arg-type] async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming invoke via LlamaIndex ChatEngine.""" message = _extract_message(input_data) response = await chat_engine.achat(message) return {"output": str(response)} @agent.invoke_stream # type: ignore[arg-type] async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]): """Streaming invoke via LlamaIndex ChatEngine.""" message = _extract_message(input_data) response = await chat_engine.astream_chat(message) # type: ignore[attr-defined] async for token in response.async_response_gen(): yield {"chunk": token} @agent.chat # type: ignore[arg-type] async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming chat via LlamaIndex ChatEngine.""" # Reset chat history and replay messages for context chat_engine.reset() last_message = "" for msg in messages: if msg.get("role") == "user": last_message = msg.get("content", "") response = await chat_engine.achat(last_message) return {"message": {"role": "assistant", "content": str(response)}} @agent.chat_stream # type: ignore[arg-type] async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]): """Streaming chat via LlamaIndex ChatEngine.""" chat_engine.reset() last_message = "" for msg in messages: if msg.get("role") == "user": last_message = msg.get("content", "") response = await chat_engine.astream_chat(last_message) # type: ignore[attr-defined] async for token in response.async_response_gen(): yield {"chunk": token} return agent
def _extract_message(input_data: dict[str, Any]) -> str: """Extract message string from input data.""" if "message" in input_data: return str(input_data["message"]) if "input" in input_data: return str(input_data["input"]) if "query" in input_data: return str(input_data["query"]) if "prompt" in input_data: return str(input_data["prompt"]) return str(input_data)