Agent Loop 系统详解
什么是 Agent Loop?
想象一下,你有一个超级智能的助手,它能:
- 听懂你说的话
- 使用各种工具(比如搜索网页、读写文件、执行命令)
- 记住你们之前的对话
- 给你智能的回答
Agent Loop 就是这个"超级助手"的大脑! 它是一个不断循环运行的程序,负责处理用户发送的每一条消息。
整体流程图
┌─────────────────────────────────────────────────────────────┐
│ Agent Loop 主循环 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ 等待消息到来 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 收到一条消息 │
└────────┬────────┘
│
▼
┌────────────────────────────────────┐
│ 处理这条消息(核心逻辑) │
└────────────────────────────────────┘
│
▼
┌─────────────────┐
│ 发送回复给用户 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 回到开始,继续等│
└─────────────────┘
代码结构拆解
1. 类的定义和初始化
class AgentLoop:
"""
Agent Loop 是核心处理引擎。
它的工作是:
1. 从消息总线接收消息
2. 构建上下文(历史记录、记忆、技能)
3. 调用 LLM(大语言模型)
4. 执行工具调用
5. 发送回复
"""
这是什么?
- 这是一个 Python 类,就像一个"模板"或"蓝图"
class关键字定义了一个新的数据类型AgentLoop是这个类的名字
为什么要用类?
- 类可以把相关的数据和功能打包在一起
- 就像一个"工具箱",里面有很多工具,还有使用这些工具的说明书
2. 初始化函数 __init__
def __init__(
self,
bus: MessageBus, # 消息总线(用来收发消息)
provider: LLMProvider, # LLM 提供商(比如 OpenAI、Claude)
workspace: Path, # 工作目录(文件操作的地方)
model: str | None = None, # 使用的模型名称
max_iterations: int = 20, # 最多循环多少次
brave_api_key: str | None = None, # 搜索引擎的 API 密钥
exec_config: "ExecToolConfig | None" = None, # 执行命令的配置
cron_service: "CronService | None" = None, # 定时任务服务
restrict_to_workspace: bool = False, # 是否限制在工作目录内
session_manager: SessionManager | None = None, # 会话管理器
):
这是什么?
__init__是一个特殊的函数,叫做"构造函数"- 当你创建一个
AgentLoop对象时,这个函数会自动运行 - 它的作用是"初始化"这个对象,给它准备好所有需要的东西
参数解释:
self: 代表这个对象自己,所有函数都要有这个参数bus: 消息总线,就像快递公司,负责收发消息provider: LLM 提供商,就像你的"大脑"(ChatGPT、Claude 等)workspace: 工作目录,就像你的"桌子",所有文件操作都在这里进行max_iterations: 最多循环 20 次,防止死循环
初始化过程:
# 保存传入的参数
self.bus = bus
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
self.max_iterations = max_iterations
# 创建各种辅助对象
self.context = ContextBuilder(workspace) # 上下文构建器
self.sessions = session_manager or SessionManager(workspace) # 会话管理器
self.tools = ToolRegistry() # 工具注册表
# 创建子代理管理器(用于后台任务)
self.subagents = SubagentManager(
provider=provider,
workspace=workspace,
bus=bus,
model=self.model,
brave_api_key=brave_api_key,
exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace,
)
# 标记为未运行
self._running = False
# 注册默认工具
self._register_default_tools()
这是什么意思?
self.xxx = xxx把传入的参数保存到对象内部ContextBuilder: 负责构建发送给 AI 的完整提示SessionManager: 负责管理对话历史ToolRegistry: 负责管理所有可用的工具SubagentManager: 负责管理后台任务
3. 注册默认工具
def _register_default_tools(self) -> None:
"""注册默认的工具集合。"""
# 文件工具(如果配置了限制,只能在工作目录内操作)
allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(ReadFileTool(allowed_dir=allowed_dir)) # 读文件
self.tools.register(WriteFileTool(allowed_dir=allowed_dir)) # 写文件
self.tools.register(EditFileTool(allowed_dir=allowed_dir)) # 编辑文件
self.tools.register(ListDirTool(allowed_dir=allowed_dir)) # 列出目录
# Shell 工具(执行命令)
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
))
# Web 工具
self.tools.register(WebSearchTool(api_key=self.brave_api_key)) # 网页搜索
self.tools.register(WebFetchTool()) # 获取网页内容
# 消息工具(发送消息)
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
self.tools.register(message_tool)
# 生成工具(用于子代理)
spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool)
# 定时任务工具
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
这是什么?
- 这个函数注册了所有默认可用的工具
- 每个工具都是一个独立的"功能模块"
- AI 可以调用这些工具来完成各种任务
工具有哪些?
| 工具名称 | 功能 | 例子 |
|---|---|---|
ReadFileTool |
读取文件 | “帮我看看这个文件的内容” |
WriteFileTool |
写入文件 | “把这段话保存到文件里” |
EditFileTool |
编辑文件 | “把文件里的某个词替换掉” |
ListDirTool |
列出目录 | “看看这个文件夹里有什么” |
ExecTool |
执行命令 | “运行这个 Python 脚本” |
WebSearchTool |
网页搜索 | “帮我搜索最新的 AI 新闻” |
WebFetchTool |
获取网页 | “获取这个网页的内容” |
MessageTool |
发送消息 | “给这个群发个通知” |
SpawnTool |
创建子任务 | “后台运行这个任务” |
CronTool |
定时任务 | “每天早上 9 点提醒我” |
4. 主循环函数 run
async def run(self) -> None:
"""运行 agent 循环,从总线处理消息。"""
self._running = True
logger.info("Agent loop started")
while self._running: # 只要 _running 是 True,就一直循环
try:
# 等待下一条消息(最多等 1 秒)
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
# 处理消息
try:
response = await self._process_message(msg)
if response:
await self.bus.publish_outbound(response)
except Exception as e:
logger.error(f"Error processing message: {e}")
# 发送错误响应
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}"
))
except asyncio.TimeoutError:
continue # 超时了,继续等待
这是什么?
- 这是 Agent Loop 的"主循环"
- 它会一直运行,直到程序停止
详细解释:
while self._running: # 无限循环,就像一个"永不停止"的循环
while是一个循环关键字- 只要
self._running是True,就会一直执行循环体 - 这就像一个"永不疲倦"的工人,一直在等待工作
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
await: 等待某个操作完成(异步编程)asyncio.wait_for: 最多等 1 秒self.bus.consume_inbound(): 从消息总线获取一条消息- 如果 1 秒内没有消息,会抛出
TimeoutError异常
response = await self._process_message(msg)
- 调用
_process_message函数处理消息 - 这个函数会返回 AI 的回复
if response:
await self.bus.publish_outbound(response)
- 如果有回复,就发送到消息总线
- 消息总线会把回复发送给用户
except asyncio.TimeoutError:
continue
- 如果超时了(1 秒内没有消息)
- 就继续下一次循环(继续等待)
为什么要用异步?
- 异步可以让程序在等待的时候去做其他事情
- 比如等待网络请求时,可以处理其他消息
- 这样可以提高效率,不用一直傻等
5. 停止函数 stop
def stop(self) -> None:
"""停止 agent 循环。"""
self._running = False
logger.info("Agent loop stopping")
这是什么?
- 这是一个简单的函数,用来停止主循环
- 把
self._running设置为False - 主循环的
while self._running:就会退出
6. 处理消息函数 _process_message(核心!)
这是整个 Agent Loop 最重要的函数!让我们详细拆解:
async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
处理单条入站消息。
参数:
msg: 要处理的入站消息。
返回:
响应消息,如果不需要响应则返回 None。
"""
6.1 处理系统消息
# 处理系统消息(子代理通知)
# chat_id 包含原始的 "channel:chat_id" 以便路由回去
if msg.channel == "system":
return await self._process_system_message(msg)
这是什么?
- 如果消息来自 “system” 通道
- 说明这是后台任务的完成通知
- 调用
_process_system_message单独处理
6.2 记录日志
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}")
这是什么?
- 打印一条日志,显示正在处理的消息
- 如果消息太长(超过 80 字符),就只显示前 80 字符
- 方便调试和监控
6.3 获取或创建会话
# 获取或创建会话
session = self.sessions.get_or_create(msg.session_key)
这是什么?
msg.session_key: 会话键,格式是 “channel:chat_id”- 比如 “telegram:123456789”
get_or_create: 如果会话存在就获取,不存在就创建新的
会话是什么?
- 会话保存了对话历史
- 这样 AI 能记住你们之前说了什么
- 每个用户都有独立的会话
6.4 更新工具上下文
# 更新工具上下文
message_tool = self.tools.get("message")
if isinstance(message_tool, MessageTool):
message_tool.set_context(msg.channel, msg.chat_id)
spawn_tool = self.tools.get("spawn")
if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(msg.channel, msg.chat_id)
cron_tool = self.tools.get("cron")
if isinstance(cron_tool, CronTool):
cron_tool.set_context(msg.channel, msg.chat_id)
这是什么?
- 告诉某些工具当前的消息来源
- 这样工具就知道该往哪里发送消息或创建任务
为什么需要上下文?
- 比如
message_tool需要知道往哪个 Telegram 群发送消息 spawn_tool需要知道任务完成后通知哪里
6.5 构建消息列表
# 构建初始消息(使用 get_history 获取 LLM 格式的消息)
messages = self.context.build_messages(
history=session.get_history(), # 对话历史
current_message=msg.content, # 当前消息
media=msg.media if msg.media else None, # 媒体文件(图片、视频等)
channel=msg.channel, # 通道
chat_id=msg.chat_id, # 聊天 ID
)
这是什么?
- 构建发送给 LLM 的完整消息列表
- 包括:系统提示、历史对话、当前消息
消息列表是什么样子的?
messages = [
{
"role": "system",
"content": "你是一个 AI 助手,可以使用以下工具..."
},
{
"role": "user",
"content": "你好"
},
{
"role": "assistant",
"content": "你好!有什么我可以帮你的吗?"
},
{
"role": "user",
"content": "帮我搜索最新的 AI 新闻"
}
]
6.6 Agent 循环(最重要的部分!)
# Agent 循环
iteration = 0
final_content = None
while iteration < self.max_iterations: # 最多循环 20 次
iteration += 1
# 调用 LLM
response = await self.provider.chat(
messages=messages, # 对话历史
tools=self.tools.get_definitions(), # 可用工具定义
model=self.model
)
# 处理工具调用
if response.has_tool_calls:
# 添加包含工具调用的助手消息
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments) # 必须是 JSON 字符串
}
}
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
)
# 执行工具
for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
else:
# 没有工具调用,处理完成
final_content = response.content
break
这是什么?
- 这是 Agent Loop 的核心循环
- 它会不断调用 LLM,直到 LLM 不再需要调用工具
详细解释:
第一步:调用 LLM
response = await self.provider.chat(
messages=messages, # 对话历史
tools=self.tools.get_definitions(), # 可用工具定义
model=self.model
)
- 发送消息给 LLM(比如 ChatGPT)
- 告诉 LLM 有哪些工具可用
- LLM 会决定:
- 只是回答问题
- 或者调用某个工具
第二步:检查是否有工具调用
if response.has_tool_calls:
- 如果 LLM 想调用工具
has_tool_calls就是True
第三步:添加助手消息
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments)
}
}
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
)
- 把 LLM 的响应(包括工具调用)添加到消息列表
- 这样 LLM 就能记得自己调用了什么工具
第四步:执行工具
for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
- 遍历所有工具调用
- 执行每个工具
- 把工具的执行结果添加到消息列表
举个例子:
假设用户问:“帮我搜索最新的 AI 新闻”
- LLM 决定调用
web_search工具 - 执行
web_search工具,搜索得到结果 - 把搜索结果添加到消息列表
- 再次调用 LLM,让它根据搜索结果生成回答
- LLM 不再调用工具,直接返回答案
第五步:没有工具调用,完成
else:
# 没有工具调用,处理完成
final_content = response.content
break
- 如果 LLM 不再调用工具
- 就保存最终答案
- 退出循环
为什么要循环?
- 因为 LLM 可能需要调用多个工具
- 或者需要根据工具的结果再次调用其他工具
- 比如:先搜索 → 再读取某个文件 → 再执行某个命令
6.7 处理无结果的情况
if final_content is None:
final_content = "I've completed processing but have no response to give."
这是什么?
- 如果循环结束但没有得到答案
- 就返回一个默认消息
6.8 记录响应日志
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}")
这是什么?
- 打印一条日志,显示 AI 的回复
- 方便调试和监控
6.9 保存会话
# 保存到会话
session.add_message("user", msg.content)
session.add_message("assistant", final_content)
self.sessions.save(session)
这是什么?
- 把用户的消息和 AI 的回复保存到会话
- 这样下次对话时能记住历史
6.10 返回响应
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content
)
这是什么?
- 创建一个出站消息
- 包含:通道、聊天 ID、回复内容
- 返回这个消息,主循环会把它发送给用户
7. 处理系统消息函数 _process_system_message
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
处理系统消息(比如子代理通知)。
chat_id 字段包含 "original_channel:original_chat_id" 以便
将响应路由回正确的目的地。
"""
这是什么?
- 处理来自后台任务的通知
- 比如子代理完成了某个任务,会发送系统消息通知主代理
流程与 _process_message 类似:
- 解析原始通道和聊天 ID
- 获取会话
- 更新工具上下文
- 构建消息列表
- Agent 循环
- 保存会话
- 返回响应
8. 直接处理消息函数 process_direct
async def process_direct(
self,
content: str,
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
) -> str:
"""
直接处理消息(用于 CLI 或 cron 使用)。
参数:
content: 消息内容。
session_key: 会话标识符。
channel: 来源通道(用于上下文)。
chat_id: 来源聊天 ID(用于上下文)。
返回:
Agent 的响应。
"""
msg = InboundMessage(
channel=channel,
sender_id="user",
chat_id=chat_id,
content=content
)
response = await self._process_message(msg)
return response.content if response else ""
这是什么?
- 这是一个便捷函数
- 用于命令行界面(CLI)或定时任务(cron)
- 直接传入消息内容,返回 AI 的回复
完整工作流程示例
让我们用一个具体的例子来说明整个流程:
场景:用户在 Telegram 上问"帮我搜索最新的 AI 新闻"
1. 用户在 Telegram 发送消息
↓
2. Telegram 通道接收消息,创建 InboundMessage
↓
3. 消息发布到消息总线
↓
4. Agent Loop 的主循环从队列获取消息
↓
5. 调用 _process_message 处理消息
↓
6. 获取或创建会话("telegram:123456789")
↓
7. 更新工具上下文
↓
8. 构建消息列表(包括历史对话)
↓
9. 进入 Agent 循环
↓
10. 第一次迭代:
- 调用 LLM
- LLM 决定调用 web_search 工具
- 执行 web_search 工具
- 得到搜索结果
↓
11. 第二次迭代:
- 再次调用 LLM(带上搜索结果)
- LLM 根据搜索结果生成答案
- LLM 不再调用工具
- 保存最终答案
↓
12. 保存会话(用户消息 + AI 回复)
↓
13. 返回 OutboundMessage
↓
14. 主循环发送响应到消息总线
↓
15. Telegram 通道接收并发送给用户
↓
16. 用户看到 AI 的回复
关键概念总结
1. 异步编程 (Async/Await)
什么是异步?
- 普通函数:执行时必须等待完成才能做其他事
- 异步函数:执行时可以"等待"某个操作,同时可以做其他事
举个例子:
# 同步(普通)
def sync_function():
result = network_request() # 必须等网络请求完成
return result
# 异步
async def async_function():
result = await network_request() # 等待网络请求,但可以处理其他任务
return result
为什么要用异步?
- 提高效率
- 可以同时处理多个消息
- 不会因为等待网络请求而阻塞
2. 消息队列
什么是消息队列?
- 一个先进先出(FIFO)的队列
- 生产者(通道)把消息放入队列
- 消费者(Agent Loop)从队列取出消息
为什么要用消息队列?
- 解耦:通道和 Agent Loop 不直接依赖
- 缓冲:消息可以暂时存储,不会丢失
- 并发:可以同时处理多个消息
3. 会话 (Session)
什么是会话?
- 保存对话历史的容器
- 每个用户有独立的会话
会话包含什么?
- 对话历史(用户消息 + AI 回复)
- 会话键(channel:chat_id)
- 元数据(创建时间、最后更新时间等)
为什么要用会话?
- 记住上下文
- AI 能理解"之前说了什么"
- 提供连续的对话体验
4. 工具 (Tool)
什么是工具?
- AI 可以调用的"功能模块"
- 每个工具都有特定的功能
工具的特点:
- 有名称和描述
- 有参数定义(JSON Schema)
- 有执行逻辑
为什么要用工具?
- 扩展 AI 的能力
- 让 AI 能做更多事情
- 比如搜索网页、读写文件、执行命令
5. Function Calling
什么是 Function Calling?
- LLM 的一种能力
- LLM 可以决定"调用哪个函数"和"用什么参数"
举个例子:
用户:帮我搜索最新的 AI 新闻
LLM:我需要调用 web_search 函数,参数是 {"query": "AI 新闻"}
系统:执行 web_search 函数,得到结果
LLM:根据搜索结果,生成答案
为什么要用 Function Calling?
- 让 AI 更智能
- AI 能主动使用工具
- 不需要人工判断何时使用工具
常见问题
Q1: 为什么需要循环 20 次?
A: 因为 AI 可能需要多次调用工具才能完成任务。比如:
- 搜索网页
- 读取搜索结果中的某个文件
- 根据文件内容执行某个命令
- 再次搜索补充信息
每次工具调用都需要一次循环。20 次是为了防止死循环。
Q2: 什么是 await?
A: await 是异步编程的关键字。它的意思是"等待这个操作完成,但期间可以做其他事情"。比如:
result = await network_request() # 等待网络请求,但可以处理其他消息
Q3: 为什么要有系统消息 (system channel)?
A: 系统消息用于后台任务的通知。比如:
- 子代理完成了某个任务
- 定时任务触发了
- 心跳检测
这些消息不需要用户触发,而是系统自动生成的。
Q4: 会话保存在哪里?
A: 会话保存在 ~/.nanobot/sessions/ 目录下,每个会话是一个 JSON 文件。比如:
~/.nanobot/sessions/telegram_123456789.json
Q5: 如何添加新的工具?
A: 需要做三件事:
- 创建一个新的工具类,继承
Tool基类 - 实现
name、description、parameters属性 - 实现
execute方法 - 在
_register_default_tools中注册
总结
Agent Loop 是 nanobot 的核心,它的工作流程可以概括为:
- 等待消息 - 从消息队列获取用户消息
- 获取会话 - 获取用户的对话历史
- 构建上下文 - 准备发送给 LLM 的完整消息
- 调用 LLM - 让 AI 思考如何回答
- 执行工具 - 如果 AI 需要调用工具,就执行工具
- 重复循环 - 直到 AI 不再需要调用工具
- 保存会话 - 把对话保存下来
- 返回响应 - 把答案发送给用户
这个设计使得 nanobot 能够:
- 处理复杂的任务(通过多次工具调用)
- 记住对话历史(通过会话管理)
- 扩展功能(通过工具系统)
- 高效运行(通过异步编程)
希望这个解释能帮助你理解 Agent Loop 的工作原理!
文档生成时间:2026-02-09