百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

LlamaIndex的QueryPipeline支持Agent应用开发(附开发示例)

zzlvtu 2024-09-04 23:01 3 浏览

在前面文章《应用编排的未来是Pipeline,LlamaIndex开发预览版推出Query Pipeline,提升应用开发的灵活性》里,我们提到了llamaindex出了一个新的实验feature,支持通过声明式的方式定义QueryPipeline从而形成个性化的应用流程,并且给出了对于RAG类应用的实现案例。

那么,Agent应用可以通过QueryPipeline的方式实现吗?如果可以,它将能统一整个应用的构建模式,对于开发者来讲,会带来一致的开发体验。这个答案是肯定的,最近官方放出了一个通过QueryPipeline构建ReAct Agent示例。实际上,实现思路很直接,就是定义了AgentWorker(类似于langchain的AgentExecutor的实现,支持ReAct模式的循环)在其上实现了一些Agent特有的组件,进而基于此实现了AgentPipeline,但langchain现在已经发现了这样设计在生产条件下的限制,推出了langgraph。

延伸阅读:LangChain 0.1.0版本正式发布,One More Thing将成了Agent落地生产的福音

本文以官方提供的“Text2SQL的ReAct Agent”为例来了解基本的开发过程。

1.导入数据,构建待查询的数据库。

!curl "https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip" -O ./chinook.zip
!unzip ./chinook.zip


 % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  298k  100  298k    0     0  2327k      0 --:--:-- --:--:-- --:--:-- 2387k
curl: (6) Could not resolve host: .
Archive:  ./chinook.zip
  inflating: chinook.db    
 
from llama_index import SQLDatabase
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
    select,
    column,
)


engine = create_engine("sqlite:///chinook.db")
sql_database = SQLDatabase(engine)

2.安装可观测性工具,官方推荐使用Arize Phoenix。

# setup Arize Phoenix for logging/observability
import phoenix as px
import llama_index


px.launch_app()
llama_index.set_global_handler("arize_phoenix")


 To view the Phoenix app in your browser, visit http://127.0.0.1:6006/
 To view the Phoenix app in a notebook, run `px.active_session().view()`
 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix

3.构建Text2SQL工具。

from llama_index.query_engine import NLSQLTableQueryEngine
from llama_index.tools.query_engine import QueryEngineTool


sql_query_engine = NLSQLTableQueryEngine(
    sql_database=sql_database,
    tables=["albums", "tracks", "artists"],
    verbose=True,
)
sql_tool = QueryEngineTool.from_defaults(
    query_engine=sql_query_engine,
    name="sql_tool",
    description=(
        "Useful for translating a natural language query into a SQL query"
    ),
)

4.构建ReAct Agent Pipeline,这是整个构建过程的关键。整个执行过程分为四步:

a)获取Agent输入

b)利用ReAct Prompt 调用 LLM 生成下一个操作/工具(或返回响应)。

c)如果LLM选择了工具/操作,则调用Tool Pipeline来执行工具 并收集工具执行的结果。

d)直到Pipeline最终生成返回结果。

在整个过程在QueryPipelineAgentWorker中执行(循环能力,未来可能会开放自定义,如LangGraph)。

   #QueryPipelineAgentWorker的部分实现
   def _get_task_step_response(
        self, agent_response: AGENT_CHAT_RESPONSE_TYPE, step: TaskStep, is_done: bool
    ) -> TaskStepOutput:
        """Get task step response."""
        if is_done:
            new_steps = []
        else:
            new_steps = [
                step.get_next_step(
                    step_id=str(uuid.uuid4()),
                    # NOTE: input is unused
                    input=None,
                )
            ]


        return TaskStepOutput(
            output=agent_response,
            task_step=step,
            is_last=is_done,
            next_steps=new_steps,
        )

下面是一些构建Agent会用到的专门组件:

  • AgentInputComponent 允许将Agent输入(任务、状态字典)转换为一组查询管道输入。
  • AgentFnComponent:一个通用处理器,允许您获取当前任务、状态以及任何任意输入,并返回输出。在这个例子中,定义了一个函数组件来格式化 ReAct 提示。当然,也可以在任何地方放置它。
  • CustomAgentComponent:类似于 AgentFnComponent,可以实现 _run_component 来定义自己的逻辑,并访问任务和状态。它比 AgentFnComponent 更复杂,但更灵活(例如,可以定义初始化变量,并且回调位于基类中)。请注意,传递给 AgentFnComponent 和 AgentInputComponent 的任何函数都必须包含任务和状态作为输入变量,因为这些是Ageent传递的输入。

另外,AgentQueryPipeline的输出必须是 Tuple[AgentChatResponse, bool]。

1)定义AgentInputComponent。在每个Agent步骤的开始调用。除了传递输入之外,还执行初始化/状态修改。

from llama_index.agent.react.types import (
    ActionReasoningStep,
    ObservationReasoningStep,
    ResponseReasoningStep,
)
from llama_index.agent import Task, AgentChatResponse
from llama_index.query_pipeline import (
    AgentInputComponent,
    AgentFnComponent,
    CustomAgentComponent,
    ToolRunnerComponent,
    QueryComponent,
)
from llama_index.llms import MessageRole
from typing import Dict, Any, Optional, Tuple, List, cast




## Agent Input Component
## This is the component that produces agent inputs to the rest of the components
## Can also put initialization logic here.
def agent_input_fn(task: Task, state: Dict[str, Any]) -> Dict[str, Any]:
    """Agent input function.


    Returns:
        A Dictionary of output keys and values. If you are specifying
        src_key when defining links between this component and other
        components, make sure the src_key matches the specified output_key.


    """
    # initialize current_reasoning
    if "current_reasoning" not in state:
        state["current_reasoning"] = []
    reasoning_step = ObservationReasoningStep(observation=task.input)
    state["current_reasoning"].append(reasoning_step)
    return {"input": task.input}




agent_input_component = AgentInputComponent(fn=agent_input_fn)

2)定义Agent Prompt。定义可生成 ReAct 提示的Agent组件,并在 LLM 生成的输出后,将其解析到结构化对象中。

from llama_index.agent.react.formatter import ReActChatFormatter
from llama_index.query_pipeline import InputComponent, Link
from llama_index.llms import ChatMessage
from llama_index.tools import BaseTool




## define prompt function
def react_prompt_fn(
    task: Task, state: Dict[str, Any], input: str, tools: List[BaseTool]
) -> List[ChatMessage]:
    # Add input to reasoning
    chat_formatter = ReActChatFormatter()
    return chat_formatter.format(
        tools,
        chat_history=task.memory.get() + state["memory"].get_all(),
        current_reasoning=state["current_reasoning"],
    )




react_prompt_component = AgentFnComponent(
    fn=react_prompt_fn, partial_dict={"tools": [sql_tool]}
)

3)定义Agent Output Parser 及 Tool Pipeline。Agent Output Parser整个处理过程简单归纳为两种情况。即:

  • 如果LLM给出了最终答案,那么只需要处理输出即可。
  • 如果LLM给出了工具操作,需要使用指定参数执行指定的工具,然后处理输出。

工具调用可以利用ToolRunnerComponent 模块来完成。它接收一个工具列表,并且可以使用指定的工具名称(每个工具都有一个名称)和工具操作来“执行”。它实际上是CustomAgentComponent的特定实现。同时也实现了 sub_query_components 来将更高级别的回调管理器传递给工具运行器子模块。

from typing import Set, Optional
from llama_index.agent.react.output_parser import ReActOutputParser




## Agent Output Component
## Process reasoning step/tool outputs, and return agent response
def finalize_fn(
    task: Task,
    state: Dict[str, Any],
    reasoning_step: Any,
    is_done: bool = False,
    tool_output: Optional[Any] = None,
) -> Tuple[AgentChatResponse, bool]:
    """Finalize function.


    Here we take the latest reasoning step, and a tool output (if provided),
    and return the agent output (and decide if agent is done).


    This function returns an `AgentChatResponse` and `is_done` tuple. and
    is the last component of the query pipeline. This is the expected
    return type for any query pipeline passed to `QueryPipelineAgentWorker`.


    """
    current_reasoning = state["current_reasoning"]
    current_reasoning.append(reasoning_step)
    # if tool_output is not None, add to current reasoning
    if tool_output is not None:
        observation_step = ObservationReasoningStep(
            observation=str(tool_output)
        )
        current_reasoning.append(observation_step)
    if isinstance(current_reasoning[-1], ResponseReasoningStep):
        response_step = cast(ResponseReasoningStep, current_reasoning[-1])
        response_str = response_step.response
    else:
        response_str = current_reasoning[-1].get_content()


    # if is_done, add to memory
    # NOTE: memory is a reserved keyword in `state`, but you can add your own too
    if is_done:
        state["memory"].put(
            ChatMessage(content=task.input, role=MessageRole.USER)
        )
        state["memory"].put(
            ChatMessage(content=response_str, role=MessageRole.ASSISTANT)
        )


    return AgentChatResponse(response=response_str), is_done




class OutputAgentComponent(CustomAgentComponent):
    """Output agent component."""


    tool_runner_component: ToolRunnerComponent
    output_parser: ReActOutputParser


    def __init__(self, tools, **kwargs):
        tool_runner_component = ToolRunnerComponent(tools)
        super().__init__(
            tool_runner_component=tool_runner_component,
            output_parser=ReActOutputParser(),
            **kwargs
        )


    def _run_component(self, **kwargs: Any) -> Any:
        """Run component."""
        chat_response = kwargs["chat_response"]
        task = kwargs["task"]
        state = kwargs["state"]
        reasoning_step = self.output_parser.parse(
            chat_response.message.content
        )
        if reasoning_step.is_done:
            return {
                "output": finalize_fn(
                    task, state, reasoning_step, is_done=True
                )
            }
        else:
            tool_output = self.tool_runner_component.run_component(
                tool_name=reasoning_step.action,
                tool_input=reasoning_step.action_input,
            )
            return {
                "output": finalize_fn(
                    task,
                    state,
                    reasoning_step,
                    is_done=False,
                    tool_output=tool_output,
                )
            }


    @property
    def _input_keys(self) -> Set[str]:
        return {"chat_response"}


    @property
    def _optional_input_keys(self) -> Set[str]:
        return {"is_done", "tool_output"}


    @property
    def _output_keys(self) -> Set[str]:
        return {"output"}


    @property
    def sub_query_components(self) -> List[QueryComponent]:
        return [self.tool_runner_component]




react_output_component = OutputAgentComponent([sql_tool])

4)构建Pipeline流程,形成agent_input -> react_prompt -> llm -> react_output这样执行流程,注:对于简单的顺序流程,在QueryPipeline中可以直接写为Chain的形式。

from llama_index.query_pipeline import QueryPipeline as QP
from llama_index.llms import OpenAI


qp = QP(
    modules={
        "agent_input": agent_input_component,
        "react_prompt": react_prompt_component,
        "llm": OpenAI(model="gpt-4-1106-preview"),
        "react_output": react_output_component,
    },
    verbose=True,
)
qp.add_chain(["agent_input", "react_prompt", "llm", "react_output"])

这里还可以使用pyvis来可视化Pipeline拓扑。

rom pyvis.network import Network


net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(qp.dag)
net.show("agent_dag.html")

5)装载pipeline。

from llama_index.agent import QueryPipelineAgentWorker, AgentRunner
from llama_index.callbacks import CallbackManager


agent_worker = QueryPipelineAgentWorker(qp)
agent = AgentRunner(agent_worker, callback_manager=CallbackManager([]))

6)运行Pipeline。

# start task
task = agent.create_task(
    "What are some tracks from the artist AC/DC? Limit it to 3"
)


step_output = agent.run_step(task.task_id)  #单步输出


step_output.is_last    #检查是否完成


response = agent.finalize_response(task.task_id)
print(str(response))


The top 3 tracks by AC/DC are "For Those About To Rock (We Salute You)", "Put The Finger On You", and "Let's Get It Up".

另外,官方还给了一个简化实现,无需选择工具,直接执行text2sql,并支持根据正确性多次重试生成的Agent Pipeline。

1.构造Pipeline。

from llama_index.llms import OpenAI


# llm = OpenAI(model="gpt-3.5-turbo")
llm = OpenAI(model="gpt-4-1106-preview")
from llama_index.agent import Task, AgentChatResponse
from typing import Dict, Any
from llama_index.query_pipeline import AgentInputComponent, AgentFnComponent




def agent_input_fn(task: Task, state: Dict[str, Any]) -> Dict:
    """Agent input function."""
    # initialize current_reasoning
    if "convo_history" not in state:
        state["convo_history"] = []
        state["count"] = 0
    state["convo_history"].append(f"User: {task.input}")
    convo_history_str = "\n".join(state["convo_history"]) or "None"
    return {"input": task.input, "convo_history": convo_history_str}




agent_input_component = AgentInputComponent(fn=agent_input_fn)
from llama_index.prompts import PromptTemplate


retry_prompt_str = """\
You are trying to generate a proper natural language query given a user input.


This query will then be interpreted by a downstream text-to-SQL agent which
will convert the query to a SQL statement. If the agent triggers an error,
then that will be reflected in the current conversation history (see below).


If the conversation history is None, use the user input. If its not None,
generate a new SQL query that avoids the problems of the previous SQL query.


Input: {input}
Convo history (failed attempts): 
{convo_history}


New input: """
retry_prompt = PromptTemplate(retry_prompt_str)
from llama_index.response import Response
from typing import Tuple


validate_prompt_str = """\
Given the user query, validate whether the inferred SQL query and response from executing the query is correct and answers the query.


Answer with YES or NO.


Query: {input}
Inferred SQL query: {sql_query}
SQL Response: {sql_response}


Result: """
validate_prompt = PromptTemplate(validate_prompt_str)


MAX_ITER = 3




def agent_output_fn(
    task: Task, state: Dict[str, Any], output: Response
) -> Tuple[AgentChatResponse, bool]:
    """Agent output component."""
    print(f"> Inferred SQL Query: {output.metadata['sql_query']}")
    print(f"> SQL Response: {str(output)}")
    state["convo_history"].append(
        f"Assistant (inferred SQL query): {output.metadata['sql_query']}"
    )
    state["convo_history"].append(f"Assistant (response): {str(output)}")


    # run a mini chain to get response
    validate_prompt_partial = validate_prompt.as_query_component(
        partial={
            "sql_query": output.metadata["sql_query"],
            "sql_response": str(output),
        }
    )
    qp = QP(chain=[validate_prompt_partial, llm])
    validate_output = qp.run(input=task.input)


    state["count"] += 1
    is_done = False
    if state["count"] >= MAX_ITER:
        is_done = True
    if "YES" in validate_output.message.content:
        is_done = True


    return AgentChatResponse(response=str(output)), is_done




agent_output_component = AgentFnComponent(fn=agent_output_fn)
from llama_index.query_pipeline import (
    QueryPipeline as QP,
    Link,
    InputComponent,
)


qp = QP(
    modules={
        "input": agent_input_component,
        "retry_prompt": retry_prompt,
        "llm": llm,
        "sql_query_engine": sql_query_engine,
        "output_component": agent_output_component,
    },
    verbose=True,
)
qp.add_link("input", "retry_prompt", src_key="input", dest_key="input")
qp.add_link(
    "input", "retry_prompt", src_key="convo_history", dest_key="convo_history"
)
qp.add_chain(["retry_prompt", "llm", "sql_query_engine", "output_component"])

2.装载执行:

from llama_index.agent import QueryPipelineAgentWorker, AgentRunner
from llama_index.callbacks import CallbackManager


agent_worker = QueryPipelineAgentWorker(qp)
agent = AgentRunner(agent_worker, callback_manager=CallbackManager([]))
response = agent.chat(
    "How many albums did the artist who wrote 'Restless and Wild' release? (answer should be non-zero)?"
)
print(str(response))


小结

由于该特性仍处于研发预览状态,无官方详细的解读,单从案例看,当前对于自定义流程的Agent实现上尚存在一定的限制,这将是其改进的方向,进而实现复杂的Agent应用模式。另一方面,受限于AgentWorker的设计,Agent与RAG混排可能也是未来值得官方改进的方向。

参考:https://github.com/run-llama/llama_index/blob/main/docs/examples/agent/agent_runner/query_pipeline_agent.ipynb

相关推荐

什么是DPDK?DPDK的原理及学习学习路线总结

一、什么是DPDK  对于用户来说,它可能是一个性能出色的包数据处理加速软件库;对于开发者来说,它可能是一个实践包处理新想法的创新工场;对于性能调优者来说,它可能又是一个绝佳的成果分享平台。 ...

每天进步一点:两分钟解决kvm下windows虚拟机鼠标不跟随

跟随昨天文章做测试的朋友们应该和我一样遇到了vnc连接windows鼠标不跟随的问题,经过一番查找有两种解决办法:1.编辑配置文件命令virshedittest或者直接vi/etc/libvir...

PC虚拟化主流:KVM、XEN、OpenVZ详解

目前,PC的虚拟化逐渐成为互联网发展的大趋势,我们知道,KVM、XEN、OpenVZ是虚拟化的三种方式,今天我们就来探讨这三种虚拟化的优势和劣势。1、pc虚拟化——KVMKVM是完整的硬件虚拟化,可在...

Windows上使用QEMU创建aarch64(ARM64)虚拟机

前言随着国产化的推进,现在采用ARM、MIPS的机器越来越多,作为开发、运维人员要调测软件总不能每种架构的机器都去买一台吧?主要像博主这样的穷B,实在也是承受不起。。需要的工具...

高度致敬Windows!开源优麒麟20.04 LTS发布:支持5年

优麒麟团队宣布,优麒麟(UbuntuKylin)开源操作系统20.04LTS正式版已经发布,代号FocalFossa,全球同步发布的还有Ubuntu20.04、Lubuntu20.04、Xub...

极空间虚拟机上线了,一学就会!小白保姆级使用教程

友情提示本文涉及内容较多,篇幅在4500字左右,为了对小白用户更加友好,图片示例多达60张。整个文章部分为三个阶段,准备-初探-实战。其中实战部分包含Windows系统,ikuai软路由系统,iSto...

Windows Subsystem for Linux现以应用形式上架Microsoft Store

微软今天宣布WindowsSubsystemforLinux(WSL)作为一款应用上架Windows11端的MicrosoftStore。也就是说,现在WSL以应用的方式通过...

Windows Server 2019 Core 虚拟机系统镜像制作

WindowsServer2019Core简介WindowsServer2019是微软于2018年11月13日发布的新一代WindowsServer服务器操作系统,基于Win10180...

微软商店中的WSL预览版现已可用!Windows 11用户狂喜

...

在NAS上安装Win10,24小时待命的云电脑达成√

#头条创作挑战赛#引子...

免费开源虚拟机VirtualBox 7.0.12发布:修复TPM和黑屏问题

IT之家10月18日消息,甲骨文近日发布了VirtualBox7.0.12维护版本更新,重点修复此前版本中用户反馈和官方发现的BUG,改善了对LinuxKernel6.4/6.5...

KVM Cloud 虚拟机管理系统安装部署

KVMCloud介绍KVMCloud是一款基于KVM实现的适用于小微企业的虚拟机管理系统,支持如下功能:基于KVM的VM基础功能(创建、启动、停止、重装、webVNC等功能)使用NFS作为磁盘...

个人KVM 虚拟化学习笔记(kvm虚拟化管理平台)

一、KVM原理二、KVM基础功能2.1CPU2.2内存2.3存储2.4网络三、KVM高级功能...

kvm虚拟化之ESXi到KVM之v2v迁移(esxi虚拟机迁移到另一个esxi)

1.ESXi到KVM之v2v情况说明(1).配置任务列表:1)VMwareESXi虚拟平台下linux系统迁移到KVM虚拟平台。2)VMwareESXi虚拟平台下windows系统迁移到KVM虚拟平台...

unraid下虚拟机安装Windows(vmware安装unraid)

unraid下虚拟机安装Windows使用unraid也有一段时间了,主要是做数据备份,以及docker容器的安装测试,今天有空测试一下VMS虚拟机的使用,用在unraid上安装windows7操作系...