首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

【Dify(v1.2) 核心源码深入解析】Agent 模块

  • 25-04-17 01:43
  • 2913
  • 8561
juejin.cn

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》

本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展

引言

Dify 是一个强大的 AI 应用开发平台,它提供了丰富的功能来帮助开发者构建智能代理(Agent)。在 Dify 中,Agent 模块是核心组件之一,它负责处理用户请求、调用工具、生成响应以及管理整个交互流程。本文将深入解析 Dify 的 Agent 模块,从架构设计到代码实现,帮助你全面理解这个模块的工作原理。

1. Agent 模块概述

Agent 模块是 Dify 中负责处理用户请求的核心组件。它的主要功能包括:

  • 接收用户输入并解析请求。
  • 根据请求内容调用合适的工具或模型。
  • 生成响应并返回给用户。
  • 管理整个交互流程,包括工具调用、消息处理和状态管理。

1.1 核心功能

功能模块描述
策略(Strategy)定义 Agent 的行为逻辑,如 Chain-of-Thought 和 Function-Calling。
工具(Tools)提供各种功能,如数据检索、文件处理等。
消息处理处理用户输入和生成响应。
状态管理管理 Agent 的状态和上下文。

1.2 设计目标

  • 灵活性:支持多种策略和工具,适应不同的应用场景。
  • 可扩展性:方便开发者扩展新的功能和工具。
  • 高效性:优化性能,确保快速响应用户请求。

2. Agent 模块架构

Agent 模块的架构设计如下图所示:

Chain-of-Thought
Function-Calling
用户请求
Agent 策略
选择策略
CoT 处理器
FC 处理器
调用工具
生成响应
返回用户

Agent 模块主要由以下几个部分组成:

  1. 策略(Strategy):定义 Agent 的行为逻辑。
  2. 工具(Tools):提供各种功能。
  3. 消息处理(Message Processing):处理用户输入和生成响应。
  4. 状态管理(State Management):管理 Agent 的状态和上下文。

3. 策略(Strategy)

策略是 Agent 模块的核心,它决定了 Agent 如何处理用户请求。Dify 提供了两种主要的策略:

  • Chain-of-Thought (CoT):通过逐步推理解决问题。
  • Function-Calling (FC):直接调用工具或函数解决问题。

3.1 Chain-of-Thought (CoT)

CoT 策略通过逐步推理解决问题,它的工作流程如下:

用户Agent工具模型loop[逐步推理]发送请求生成初步思考调用工具返回结果更新思考返回最终响应用户Agent工具模型

3.1.1 关键代码解析

python
代码解读
复制代码
class CotAgentRunner(BaseAgentRunner, ABC): def run(self, message: Message, query: str, inputs: Mapping[str, str]) -> Generator: """ 运行 CoT 策略 """ # 初始化 Agent 状态 self._init_react_state(query) # 循环处理用户请求 while function_call_state and iteration_step <= max_iteration_steps: # 组织提示消息 prompt_messages = self._organize_prompt_messages() # 调用模型生成响应 chunks = model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=app_generate_entity.model_conf.parameters, tools=[], stop=app_generate_entity.model_conf.stop, stream=True, user=self.user_id, callbacks=[], ) # 处理模型输出 for chunk in chunks: # 解析模型输出 scratchpad = AgentScratchpadUnit( agent_response="", thought="", action_str="", observation="", action=None, ) # 检查是否需要调用工具 if scratchpad.action and not scratchpad.is_final(): # 调用工具 tool_invoke_response, tool_invoke_meta = self._handle_invoke_action( action=scratchpad.action, tool_instances=tool_instances, message_file_ids=message_file_ids, trace_manager=trace_manager, ) # 更新 Agent 状态 self.save_agent_thought( agent_thought=agent_thought, tool_name=scratchpad.action.action_name, tool_input={scratchpad.action.action_name: scratchpad.action.action_input}, thought=scratchpad.thought or "", observation={scratchpad.action.action_name: tool_invoke_response}, tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()}, answer=scratchpad.agent_response, messages_ids=message_file_ids, llm_usage=usage_dict["usage"], )

3.1.2 CoT 的优势

优势描述
逐步推理通过多步思考解决问题,适合复杂任务。
灵活性高可以根据需要调整思考步骤和工具调用。
透明性好每一步思考和工具调用都有记录,便于调试和优化。

3.2 Function-Calling (FC)

FC 策略通过直接调用工具或函数解决问题,它的工作流程如下:

用户Agent工具模型发送请求生成工具调用调用工具返回结果返回响应用户Agent工具模型

3.2.1 关键代码解析

python
代码解读
复制代码
class FunctionCallAgentRunner(BaseAgentRunner): def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]: """ 运行 Function-Calling 策略 """ # 初始化工具和提示消息 tool_instances, prompt_messages_tools = self._init_prompt_tools() # 循环处理用户请求 while function_call_state and iteration_step <= max_iteration_steps: # 组织提示消息 prompt_messages = self._organize_prompt_messages() # 调用模型生成响应 chunks = model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=app_generate_entity.model_conf.parameters, tools=prompt_messages_tools, stop=app_generate_entity.model_conf.stop, stream=self.stream_tool_call, user=self.user_id, callbacks=[], ) # 处理模型输出 for chunk in chunks: # 检查是否需要调用工具 if self.check_tool_calls(chunk): # 提取工具调用 tool_calls = self.extract_tool_calls(chunk) # 调用工具 for tool_call_id, tool_call_name, tool_call_args in tool_calls: tool_instance = tool_instances.get(tool_call_name) if tool_instance: tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke( tool=tool_instance, tool_parameters=tool_call_args, user_id=self.user_id, tenant_id=self.tenant_id, message=self.message, invoke_from=self.application_generate_entity.invoke_from, agent_tool_callback=self.agent_callback, trace_manager=trace_manager, app_id=self.application_generate_entity.app_config.app_id, message_id=self.message.id, conversation_id=self.conversation.id, ) # 更新 Agent 状态 self.save_agent_thought( agent_thought=agent_thought, tool_name=tool_call_name, tool_input=tool_call_args, thought=response, observation=tool_invoke_response, tool_invoke_meta=tool_invoke_meta.to_dict(), answer=response, messages_ids=message_file_ids, llm_usage=current_llm_usage, )

3.2.2 FC 的优势

优势描述
高效性直接调用工具,减少中间步骤,适合简单任务。
响应速度快减少推理步骤,提高响应速度。
简单易用适合直接调用工具的场景,易于实现。

4. 工具(Tools)

工具是 Agent 模块的重要组成部分,它提供了各种功能来帮助 Agent 完成任务。Dify 提供了多种工具,包括数据检索、文件处理、模型调用等。

4.1 工具的分类

工具分类描述
数据检索工具用于从数据集中检索信息。
文件处理工具用于处理用户上传的文件。
模型调用工具用于调用其他 AI 模型。
自定义工具用户可以自定义工具来扩展功能。

4.2 工具的实现

工具的实现基于 Tool 类,它定义了工具的基本行为。以下是一个简单的工具实现示例:

python
代码解读
复制代码
class ExampleTool(Tool): def __init__(self, name: str, description: str): super().__init__(name, description) def run(self, parameters: dict) -> str: """ 运行工具 """ # 处理参数 input_text = parameters.get("input_text", "") # 执行工具逻辑 result = self.process_input(input_text) return result def process_input(self, input_text: str) -> str: """ 处理输入文本 """ # 示例逻辑:将输入文本转换为大写 return input_text.upper()

4.3 工具的调用

工具的调用流程如下:

Agent工具管理器工具请求工具初始化工具调用工具返回结果Agent工具管理器工具

5. 消息处理

消息处理是 Agent 模块的核心功能之一,它负责处理用户输入和生成响应。消息处理的主要步骤包括:

  • 解析用户输入:将用户输入转换为内部表示。
  • 生成提示消息:根据用户输入和上下文生成提示消息。
  • 调用模型:将提示消息传递给模型,生成响应。
  • 处理模型输出:解析模型输出,生成最终响应。

5.1 消息处理流程

用户输入
解析输入
生成提示消息
调用模型
处理输出
生成响应
返回用户

5.2 消息处理代码示例

python
代码解读
复制代码
class BaseAgentRunner(AppRunner): def _organize_prompt_messages(self) -> list[PromptMessage]: """ 组织提示消息 """ # 初始化系统消息 system_message = self._organize_system_prompt() # 组织用户查询 query_messages = self._organize_user_query(self._query, []) # 组织历史消息 historic_messages = self._organize_historic_prompt_messages([system_message, *query_messages]) # 组织当前 Agent 思考过程 assistant_messages = [] if self._agent_scratchpad: assistant_message = AssistantPromptMessage(content="") for unit in self._agent_scratchpad: if unit.is_final(): assistant_message.content += f"Final Answer: {unit.agent_response}" else: assistant_message.content += f"Thought: {unit.thought}\n\n" if unit.action_str: assistant_message.content += f"Action: {unit.action_str}\n\n" if unit.observation: assistant_message.content += f"Observation: {unit.observation}\n\n" assistant_messages = [assistant_message] # 合并所有消息 messages = [ system_message, *historic_messages, *query_messages, *assistant_messages, UserPromptMessage(content="continue"), ] return messages

6. 状态管理

状态管理是 Agent 模块的重要功能,它负责记录和管理 Agent 的状态和上下文。状态管理的主要任务包括:

  • 记录 Agent 思考过程:保存 Agent 的每一步思考和工具调用结果。
  • 管理上下文:维护用户会话的上下文信息。
  • 优化性能:通过状态管理减少重复计算和资源浪费。

6.1 状态管理流程

用户请求
记录初始状态
处理请求
更新状态
保存状态
返回响应

6.2 状态管理代码示例

python
代码解读
复制代码
class BaseAgentRunner(AppRunner): def save_agent_thought( self, agent_thought: MessageAgentThought, tool_name: str | None, tool_input: Union[str, dict, None], thought: str | None, observation: Union[str, dict, None], tool_invoke_meta: Union[str, dict, None], answer: str | None, messages_ids: list[str], llm_usage: LLMUsage | None = None, ): """ 保存 Agent 思考过程 """ # 更新 Agent 思考记录 updated_agent_thought = ( db.session.query(MessageAgentThought).filter(MessageAgentThought.id == agent_thought.id).first() ) if not updated_agent_thought: raise ValueError("agent thought not found") # 更新思考内容 if thought: updated_agent_thought.thought += thought # 更新工具信息 if tool_name: updated_agent_thought.tool = tool_name if tool_input: if isinstance(tool_input, dict): try: tool_input = json.dumps(tool_input, ensure_ascii=False) except Exception: tool_input = json.dumps(tool_input) updated_agent_thought.tool_input = tool_input # 更新观察结果 if observation: if isinstance(observation, dict): try: observation = json.dumps(observation, ensure_ascii=False) except Exception: observation = json.dumps(observation) updated_agent_thought.observation = observation # 更新回答内容 if answer: updated_agent_thought.answer = answer # 更新文件信息 if messages_ids is not None and len(messages_ids) > 0: updated_agent_thought.message_files = json.dumps(messages_ids) # 更新 LLM 使用情况 if llm_usage: updated_agent_thought.message_token = llm_usage.prompt_tokens updated_agent_thought.message_price_unit = llm_usage.prompt_price_unit updated_agent_thought.message_unit_price = llm_usage.prompt_unit_price updated_agent_thought.answer_token = llm_usage.completion_tokens updated_agent_thought.answer_price_unit = llm_usage.completion_price_unit updated_agent_thought.answer_unit_price = llm_usage.completion_unit_price updated_agent_thought.tokens = llm_usage.total_tokens updated_agent_thought.total_price = llm_usage.total_price # 提交更改 db.session.commit() db.session.close()

7. 实际应用示例

为了更好地理解 Agent 模块的工作原理,我们通过一个简单的例子来展示如何使用 Dify 的 Agent 模块。

7.1 示例场景

假设我们有一个问答应用,用户可以上传文件并提问。Agent 需要解析用户问题,调用文件处理工具提取信息,并生成回答。

7.2 示例代码

python
代码解读
复制代码
from core.agent.cot_agent_runner import CotAgentRunner from core.model_runtime.entities import Message, PromptMessage, UserPromptMessage, AssistantPromptMessage class ExampleAgentRunner(CotAgentRunner): def __init__(self, app_config, model_config, conversation, message): super().__init__( tenant_id="example_tenant", application_generate_entity=app_config, conversation=conversation, app_config=app_config, model_config=model_config, config=app_config.agent, queue_manager=None, message=message, user_id="example_user", model_instance=None, memory=None, prompt_messages=None, ) def run(self, query: str, inputs: dict): # 初始化 Agent 状态 self._init_react_state(query) # 组织提示消息 prompt_messages = self._organize_prompt_messages() # 调用模型生成响应 response = self.model_instance.invoke_llm( prompt_messages=prompt_messages, model_parameters=self.app_config.model_conf.parameters, tools=[], stop=self.app_config.model_conf.stop, stream=True, user=self.user_id, callbacks=[], ) # 处理模型输出 for chunk in response: # 解析模型输出 scratchpad = AgentScratchpadUnit( agent_response="", thought="", action_str="", observation="", action=None, ) # 检查是否需要调用工具 if scratchpad.action and not scratchpad.is_final(): # 调用工具 tool_invoke_response, tool_invoke_meta = self._handle_invoke_action( action=scratchpad.action, tool_instances=self._tool_instances, message_file_ids=[], trace_manager=None, ) # 更新 Agent 状态 self.save_agent_thought( agent_thought=self.create_agent_thought( message_id=self.message.id, message="", tool_name="", tool_input="", messages_ids=[], ), tool_name=scratchpad.action.action_name, tool_input={scratchpad.action.action_name: scratchpad.action.action_input}, thought=scratchpad.thought or "", observation={scratchpad.action.action_name: tool_invoke_response}, tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()}, answer=scratchpad.agent_response, messages_ids=[], llm_usage=None, ) return scratchpad.agent_response

8. 总结

通过本文的详细解析,我们深入了解了 Dify 的 Agent 模块的架构设计和实现细节。Agent 模块通过灵活的策略、丰富的工具、高效的消息处理和状态管理,为开发者提供了一个强大的 AI 应用开发平台。希望本文能帮助你更好地理解和使用 Dify 的 Agent 模块。

Dify 的 Agent 模块仍在不断发展,未来可能会引入以下新功能:

  • 更智能的策略:支持更多复杂的推理和决策逻辑。
  • 更丰富的工具:扩展更多实用工具,如自然语言处理和图像识别。
  • 更好的性能优化:进一步提升响应速度和资源利用率。
注:本文转载自juejin.cn的小爷毛毛_卓寿杰的文章"https://juejin.cn/post/7493037770954473510"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

109
人工智能
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top