Source code for reminix.adapters.llamaindex.query_engine

"""
LlamaIndex QueryEngine Adapter

Wraps a LlamaIndex QueryEngine for use with the Reminix runtime.

Compatibility:
    llama-index-core >= 0.14.0
"""

from __future__ import annotations

from typing import Any

from ..protocols import LlamaIndexQueryEngineProtocol
from reminix.runtime import Agent


[docs] def from_query_engine( query_engine: LlamaIndexQueryEngineProtocol, *, name: str, metadata: dict[str, Any] | None = None, ) -> Agent: """ Create a Reminix Agent from a LlamaIndex QueryEngine. QueryEngine is best for simple question-answering over documents. For conversational use cases, consider using from_chat_engine instead. Args: query_engine: A LlamaIndex QueryEngine instance. name: Name for the Reminix agent. metadata: Optional metadata for the agent. Returns: A Reminix Agent that wraps the QueryEngine. Example:: from llama_index.core import VectorStoreIndex, SimpleDirectoryReader from reminix.adapters.llamaindex import from_query_engine from reminix.runtime import serve # Load and index documents documents = SimpleDirectoryReader("data").load_data() index = VectorStoreIndex.from_documents(documents) # Create QueryEngine and wrap it query_engine = index.as_query_engine() agent = from_query_engine(query_engine, name="docs-qa") serve(agent) """ agent = Agent( name, metadata={ "framework": "llamaindex", "adapter": "query-engine", **(metadata or {}), }, ) @agent.invoke # type: ignore[arg-type] async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming invoke via LlamaIndex QueryEngine.""" query = _extract_query(input_data) response = await query_engine.aquery(query) return {"output": str(response)} @agent.invoke_stream # type: ignore[arg-type] async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]): """Streaming invoke via LlamaIndex QueryEngine.""" # QueryEngine doesn't have native streaming, so we return the full response query = _extract_query(input_data) response = await query_engine.aquery(query) yield {"chunk": str(response)} @agent.chat # type: ignore[arg-type] async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]: """Non-streaming chat via LlamaIndex QueryEngine.""" # QueryEngine doesn't maintain conversation history # Use the last user message as the query query = messages[-1]["content"] if messages else "" response = await query_engine.aquery(query) return {"message": {"role": "assistant", "content": str(response)}} @agent.chat_stream # type: ignore[arg-type] async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]): """Streaming chat via LlamaIndex QueryEngine.""" query = messages[-1]["content"] if messages else "" response = await query_engine.aquery(query) yield {"chunk": str(response)} return agent
def _extract_query(input_data: dict[str, Any]) -> str: """Extract query string from input data.""" if "query" in input_data: return str(input_data["query"]) if "input" in input_data: return str(input_data["input"]) if "prompt" in input_data: return str(input_data["prompt"]) if "question" in input_data: return str(input_data["question"]) return str(input_data)