Source code for reminix.adapters.llamaindex.query_engine
"""
LlamaIndex QueryEngine Adapter
Wraps a LlamaIndex QueryEngine for use with the Reminix runtime.
Compatibility:
llama-index-core >= 0.14.0
"""
from __future__ import annotations
from typing import Any
from ..protocols import LlamaIndexQueryEngineProtocol
from reminix.runtime import Agent
[docs]
def from_query_engine(
query_engine: LlamaIndexQueryEngineProtocol,
*,
name: str,
metadata: dict[str, Any] | None = None,
) -> Agent:
"""
Create a Reminix Agent from a LlamaIndex QueryEngine.
QueryEngine is best for simple question-answering over documents.
For conversational use cases, consider using from_chat_engine instead.
Args:
query_engine: A LlamaIndex QueryEngine instance.
name: Name for the Reminix agent.
metadata: Optional metadata for the agent.
Returns:
A Reminix Agent that wraps the QueryEngine.
Example::
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from reminix.adapters.llamaindex import from_query_engine
from reminix.runtime import serve
# Load and index documents
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
# Create QueryEngine and wrap it
query_engine = index.as_query_engine()
agent = from_query_engine(query_engine, name="docs-qa")
serve(agent)
"""
agent = Agent(
name,
metadata={
"framework": "llamaindex",
"adapter": "query-engine",
**(metadata or {}),
},
)
@agent.invoke # type: ignore[arg-type]
async def handle_invoke(input_data: dict[str, Any], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming invoke via LlamaIndex QueryEngine."""
query = _extract_query(input_data)
response = await query_engine.aquery(query)
return {"output": str(response)}
@agent.invoke_stream # type: ignore[arg-type]
async def handle_invoke_stream(input_data: dict[str, Any], ctx: dict[str, Any]):
"""Streaming invoke via LlamaIndex QueryEngine."""
# QueryEngine doesn't have native streaming, so we return the full response
query = _extract_query(input_data)
response = await query_engine.aquery(query)
yield {"chunk": str(response)}
@agent.chat # type: ignore[arg-type]
async def handle_chat(messages: list[dict[str, Any]], ctx: dict[str, Any]) -> dict[str, Any]:
"""Non-streaming chat via LlamaIndex QueryEngine."""
# QueryEngine doesn't maintain conversation history
# Use the last user message as the query
query = messages[-1]["content"] if messages else ""
response = await query_engine.aquery(query)
return {"message": {"role": "assistant", "content": str(response)}}
@agent.chat_stream # type: ignore[arg-type]
async def handle_chat_stream(messages: list[dict[str, Any]], ctx: dict[str, Any]):
"""Streaming chat via LlamaIndex QueryEngine."""
query = messages[-1]["content"] if messages else ""
response = await query_engine.aquery(query)
yield {"chunk": str(response)}
return agent
def _extract_query(input_data: dict[str, Any]) -> str:
"""Extract query string from input data."""
if "query" in input_data:
return str(input_data["query"])
if "input" in input_data:
return str(input_data["input"])
if "prompt" in input_data:
return str(input_data["prompt"])
if "question" in input_data:
return str(input_data["question"])
return str(input_data)