一路狂飙

Agent Loop 系统详解

Agent Loop 系统详解

什么是 Agent Loop?

想象一下,你有一个超级智能的助手,它能:

  • 听懂你说的话
  • 使用各种工具(比如搜索网页、读写文件、执行命令)
  • 记住你们之前的对话
  • 给你智能的回答

Agent Loop 就是这个"超级助手"的大脑! 它是一个不断循环运行的程序,负责处理用户发送的每一条消息。


整体流程图

┌─────────────────────────────────────────────────────────────┐
│                    Agent Loop 主循环                          │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  等待消息到来    │
                    └────────┬────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │  收到一条消息    │
                    └────────┬────────┘
                             │
                             ▼
        ┌────────────────────────────────────┐
        │        处理这条消息(核心逻辑)        │
        └────────────────────────────────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │  发送回复给用户  │
                    └────────┬────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │  回到开始,继续等│
                    └─────────────────┘

代码结构拆解

1. 类的定义和初始化

class AgentLoop:
    """
    Agent Loop 是核心处理引擎。
    它的工作是:
    1. 从消息总线接收消息
    2. 构建上下文(历史记录、记忆、技能)
    3. 调用 LLM(大语言模型)
    4. 执行工具调用
    5. 发送回复
    """

这是什么?

  • 这是一个 Python 类,就像一个"模板"或"蓝图"
  • class 关键字定义了一个新的数据类型
  • AgentLoop 是这个类的名字

为什么要用类?

  • 类可以把相关的数据和功能打包在一起
  • 就像一个"工具箱",里面有很多工具,还有使用这些工具的说明书

2. 初始化函数 __init__

def __init__(
    self,
    bus: MessageBus,                    # 消息总线(用来收发消息)
    provider: LLMProvider,              # LLM 提供商(比如 OpenAI、Claude)
    workspace: Path,                    # 工作目录(文件操作的地方)
    model: str | None = None,           # 使用的模型名称
    max_iterations: int = 20,           # 最多循环多少次
    brave_api_key: str | None = None,   # 搜索引擎的 API 密钥
    exec_config: "ExecToolConfig | None" = None,  # 执行命令的配置
    cron_service: "CronService | None" = None,     # 定时任务服务
    restrict_to_workspace: bool = False,          # 是否限制在工作目录内
    session_manager: SessionManager | None = None, # 会话管理器
):

这是什么?

  • __init__ 是一个特殊的函数,叫做"构造函数"
  • 当你创建一个 AgentLoop 对象时,这个函数会自动运行
  • 它的作用是"初始化"这个对象,给它准备好所有需要的东西

参数解释:

  • self: 代表这个对象自己,所有函数都要有这个参数
  • bus: 消息总线,就像快递公司,负责收发消息
  • provider: LLM 提供商,就像你的"大脑"(ChatGPT、Claude 等)
  • workspace: 工作目录,就像你的"桌子",所有文件操作都在这里进行
  • max_iterations: 最多循环 20 次,防止死循环

初始化过程:

# 保存传入的参数
self.bus = bus
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
self.max_iterations = max_iterations

# 创建各种辅助对象
self.context = ContextBuilder(workspace)      # 上下文构建器
self.sessions = session_manager or SessionManager(workspace)  # 会话管理器
self.tools = ToolRegistry()                  # 工具注册表

# 创建子代理管理器(用于后台任务)
self.subagents = SubagentManager(
    provider=provider,
    workspace=workspace,
    bus=bus,
    model=self.model,
    brave_api_key=brave_api_key,
    exec_config=self.exec_config,
    restrict_to_workspace=restrict_to_workspace,
)

# 标记为未运行
self._running = False

# 注册默认工具
self._register_default_tools()

这是什么意思?

  • self.xxx = xxx 把传入的参数保存到对象内部
  • ContextBuilder: 负责构建发送给 AI 的完整提示
  • SessionManager: 负责管理对话历史
  • ToolRegistry: 负责管理所有可用的工具
  • SubagentManager: 负责管理后台任务

3. 注册默认工具

def _register_default_tools(self) -> None:
    """注册默认的工具集合。"""
    # 文件工具(如果配置了限制,只能在工作目录内操作)
    allowed_dir = self.workspace if self.restrict_to_workspace else None
    self.tools.register(ReadFileTool(allowed_dir=allowed_dir))      # 读文件
    self.tools.register(WriteFileTool(allowed_dir=allowed_dir))     # 写文件
    self.tools.register(EditFileTool(allowed_dir=allowed_dir))      # 编辑文件
    self.tools.register(ListDirTool(allowed_dir=allowed_dir))       # 列出目录

    # Shell 工具(执行命令)
    self.tools.register(ExecTool(
        working_dir=str(self.workspace),
        timeout=self.exec_config.timeout,
        restrict_to_workspace=self.restrict_to_workspace,
    ))

    # Web 工具
    self.tools.register(WebSearchTool(api_key=self.brave_api_key))  # 网页搜索
    self.tools.register(WebFetchTool())                            # 获取网页内容

    # 消息工具(发送消息)
    message_tool = MessageTool(send_callback=self.bus.publish_outbound)
    self.tools.register(message_tool)

    # 生成工具(用于子代理)
    spawn_tool = SpawnTool(manager=self.subagents)
    self.tools.register(spawn_tool)

    # 定时任务工具
    if self.cron_service:
        self.tools.register(CronTool(self.cron_service))

这是什么?

  • 这个函数注册了所有默认可用的工具
  • 每个工具都是一个独立的"功能模块"
  • AI 可以调用这些工具来完成各种任务

工具有哪些?

工具名称 功能 例子
ReadFileTool 读取文件 “帮我看看这个文件的内容”
WriteFileTool 写入文件 “把这段话保存到文件里”
EditFileTool 编辑文件 “把文件里的某个词替换掉”
ListDirTool 列出目录 “看看这个文件夹里有什么”
ExecTool 执行命令 “运行这个 Python 脚本”
WebSearchTool 网页搜索 “帮我搜索最新的 AI 新闻”
WebFetchTool 获取网页 “获取这个网页的内容”
MessageTool 发送消息 “给这个群发个通知”
SpawnTool 创建子任务 “后台运行这个任务”
CronTool 定时任务 “每天早上 9 点提醒我”

4. 主循环函数 run

async def run(self) -> None:
    """运行 agent 循环,从总线处理消息。"""
    self._running = True
    logger.info("Agent loop started")

    while self._running:  # 只要 _running 是 True,就一直循环
        try:
            # 等待下一条消息(最多等 1 秒)
            msg = await asyncio.wait_for(
                self.bus.consume_inbound(),
                timeout=1.0
            )

            # 处理消息
            try:
                response = await self._process_message(msg)
                if response:
                    await self.bus.publish_outbound(response)
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                # 发送错误响应
                await self.bus.publish_outbound(OutboundMessage(
                    channel=msg.channel,
                    chat_id=msg.chat_id,
                    content=f"Sorry, I encountered an error: {str(e)}"
                ))
        except asyncio.TimeoutError:
            continue  # 超时了,继续等待

这是什么?

  • 这是 Agent Loop 的"主循环"
  • 它会一直运行,直到程序停止

详细解释:

while self._running:  # 无限循环,就像一个"永不停止"的循环
  • while 是一个循环关键字
  • 只要 self._runningTrue,就会一直执行循环体
  • 这就像一个"永不疲倦"的工人,一直在等待工作
msg = await asyncio.wait_for(
    self.bus.consume_inbound(),
    timeout=1.0
)
  • await: 等待某个操作完成(异步编程)
  • asyncio.wait_for: 最多等 1 秒
  • self.bus.consume_inbound(): 从消息总线获取一条消息
  • 如果 1 秒内没有消息,会抛出 TimeoutError 异常
response = await self._process_message(msg)
  • 调用 _process_message 函数处理消息
  • 这个函数会返回 AI 的回复
if response:
    await self.bus.publish_outbound(response)
  • 如果有回复,就发送到消息总线
  • 消息总线会把回复发送给用户
except asyncio.TimeoutError:
    continue
  • 如果超时了(1 秒内没有消息)
  • 就继续下一次循环(继续等待)

为什么要用异步?

  • 异步可以让程序在等待的时候去做其他事情
  • 比如等待网络请求时,可以处理其他消息
  • 这样可以提高效率,不用一直傻等

5. 停止函数 stop

def stop(self) -> None:
    """停止 agent 循环。"""
    self._running = False
    logger.info("Agent loop stopping")

这是什么?

  • 这是一个简单的函数,用来停止主循环
  • self._running 设置为 False
  • 主循环的 while self._running: 就会退出

6. 处理消息函数 _process_message(核心!)

这是整个 Agent Loop 最重要的函数!让我们详细拆解:

async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """
    处理单条入站消息。

    参数:
        msg: 要处理的入站消息。

    返回:
        响应消息,如果不需要响应则返回 None。
    """

6.1 处理系统消息

# 处理系统消息(子代理通知)
# chat_id 包含原始的 "channel:chat_id" 以便路由回去
if msg.channel == "system":
    return await self._process_system_message(msg)

这是什么?

  • 如果消息来自 “system” 通道
  • 说明这是后台任务的完成通知
  • 调用 _process_system_message 单独处理

6.2 记录日志

preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}")

这是什么?

  • 打印一条日志,显示正在处理的消息
  • 如果消息太长(超过 80 字符),就只显示前 80 字符
  • 方便调试和监控

6.3 获取或创建会话

# 获取或创建会话
session = self.sessions.get_or_create(msg.session_key)

这是什么?

  • msg.session_key: 会话键,格式是 “channel:chat_id”
  • 比如 “telegram:123456789”
  • get_or_create: 如果会话存在就获取,不存在就创建新的

会话是什么?

  • 会话保存了对话历史
  • 这样 AI 能记住你们之前说了什么
  • 每个用户都有独立的会话

6.4 更新工具上下文

# 更新工具上下文
message_tool = self.tools.get("message")
if isinstance(message_tool, MessageTool):
    message_tool.set_context(msg.channel, msg.chat_id)

spawn_tool = self.tools.get("spawn")
if isinstance(spawn_tool, SpawnTool):
    spawn_tool.set_context(msg.channel, msg.chat_id)

cron_tool = self.tools.get("cron")
if isinstance(cron_tool, CronTool):
    cron_tool.set_context(msg.channel, msg.chat_id)

这是什么?

  • 告诉某些工具当前的消息来源
  • 这样工具就知道该往哪里发送消息或创建任务

为什么需要上下文?

  • 比如 message_tool 需要知道往哪个 Telegram 群发送消息
  • spawn_tool 需要知道任务完成后通知哪里

6.5 构建消息列表

# 构建初始消息(使用 get_history 获取 LLM 格式的消息)
messages = self.context.build_messages(
    history=session.get_history(),           # 对话历史
    current_message=msg.content,             # 当前消息
    media=msg.media if msg.media else None,  # 媒体文件(图片、视频等)
    channel=msg.channel,                     # 通道
    chat_id=msg.chat_id,                     # 聊天 ID
)

这是什么?

  • 构建发送给 LLM 的完整消息列表
  • 包括:系统提示、历史对话、当前消息

消息列表是什么样子的?

messages = [
    {
        "role": "system",
        "content": "你是一个 AI 助手,可以使用以下工具..."
    },
    {
        "role": "user",
        "content": "你好"
    },
    {
        "role": "assistant",
        "content": "你好!有什么我可以帮你的吗?"
    },
    {
        "role": "user",
        "content": "帮我搜索最新的 AI 新闻"
    }
]

6.6 Agent 循环(最重要的部分!)

# Agent 循环
iteration = 0
final_content = None

while iteration < self.max_iterations:  # 最多循环 20 次
    iteration += 1

    # 调用 LLM
    response = await self.provider.chat(
        messages=messages,                # 对话历史
        tools=self.tools.get_definitions(),  # 可用工具定义
        model=self.model
    )

    # 处理工具调用
    if response.has_tool_calls:
        # 添加包含工具调用的助手消息
        tool_call_dicts = [
            {
                "id": tc.id,
                "type": "function",
                "function": {
                    "name": tc.name,
                    "arguments": json.dumps(tc.arguments)  # 必须是 JSON 字符串
                }
            }
            for tc in response.tool_calls
        ]
        messages = self.context.add_assistant_message(
            messages, response.content, tool_call_dicts,
            reasoning_content=response.reasoning_content,
        )

        # 执行工具
        for tool_call in response.tool_calls:
            args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
            logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
            result = await self.tools.execute(tool_call.name, tool_call.arguments)
            messages = self.context.add_tool_result(
                messages, tool_call.id, tool_call.name, result
            )
    else:
        # 没有工具调用,处理完成
        final_content = response.content
        break

这是什么?

  • 这是 Agent Loop 的核心循环
  • 它会不断调用 LLM,直到 LLM 不再需要调用工具

详细解释:

第一步:调用 LLM

response = await self.provider.chat(
    messages=messages,                # 对话历史
    tools=self.tools.get_definitions(),  # 可用工具定义
    model=self.model
)
  • 发送消息给 LLM(比如 ChatGPT)
  • 告诉 LLM 有哪些工具可用
  • LLM 会决定:
    1. 只是回答问题
    2. 或者调用某个工具

第二步:检查是否有工具调用

if response.has_tool_calls:
  • 如果 LLM 想调用工具
  • has_tool_calls 就是 True

第三步:添加助手消息

tool_call_dicts = [
    {
        "id": tc.id,
        "type": "function",
        "function": {
            "name": tc.name,
            "arguments": json.dumps(tc.arguments)
        }
    }
    for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
    messages, response.content, tool_call_dicts,
    reasoning_content=response.reasoning_content,
)
  • 把 LLM 的响应(包括工具调用)添加到消息列表
  • 这样 LLM 就能记得自己调用了什么工具

第四步:执行工具

for tool_call in response.tool_calls:
    args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
    logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
    result = await self.tools.execute(tool_call.name, tool_call.arguments)
    messages = self.context.add_tool_result(
        messages, tool_call.id, tool_call.name, result
    )
  • 遍历所有工具调用
  • 执行每个工具
  • 把工具的执行结果添加到消息列表

举个例子:

假设用户问:“帮我搜索最新的 AI 新闻”

  1. LLM 决定调用 web_search 工具
  2. 执行 web_search 工具,搜索得到结果
  3. 把搜索结果添加到消息列表
  4. 再次调用 LLM,让它根据搜索结果生成回答
  5. LLM 不再调用工具,直接返回答案

第五步:没有工具调用,完成

else:
    # 没有工具调用,处理完成
    final_content = response.content
    break
  • 如果 LLM 不再调用工具
  • 就保存最终答案
  • 退出循环

为什么要循环?

  • 因为 LLM 可能需要调用多个工具
  • 或者需要根据工具的结果再次调用其他工具
  • 比如:先搜索 → 再读取某个文件 → 再执行某个命令

6.7 处理无结果的情况

if final_content is None:
    final_content = "I've completed processing but have no response to give."

这是什么?

  • 如果循环结束但没有得到答案
  • 就返回一个默认消息

6.8 记录响应日志

preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}")

这是什么?

  • 打印一条日志,显示 AI 的回复
  • 方便调试和监控

6.9 保存会话

# 保存到会话
session.add_message("user", msg.content)
session.add_message("assistant", final_content)
self.sessions.save(session)

这是什么?

  • 把用户的消息和 AI 的回复保存到会话
  • 这样下次对话时能记住历史

6.10 返回响应

return OutboundMessage(
    channel=msg.channel,
    chat_id=msg.chat_id,
    content=final_content
)

这是什么?

  • 创建一个出站消息
  • 包含:通道、聊天 ID、回复内容
  • 返回这个消息,主循环会把它发送给用户

7. 处理系统消息函数 _process_system_message

async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """
    处理系统消息(比如子代理通知)。

    chat_id 字段包含 "original_channel:original_chat_id" 以便
    将响应路由回正确的目的地。
    """

这是什么?

  • 处理来自后台任务的通知
  • 比如子代理完成了某个任务,会发送系统消息通知主代理

流程与 _process_message 类似:

  1. 解析原始通道和聊天 ID
  2. 获取会话
  3. 更新工具上下文
  4. 构建消息列表
  5. Agent 循环
  6. 保存会话
  7. 返回响应

8. 直接处理消息函数 process_direct

async def process_direct(
    self,
    content: str,
    session_key: str = "cli:direct",
    channel: str = "cli",
    chat_id: str = "direct",
) -> str:
    """
    直接处理消息(用于 CLI 或 cron 使用)。

    参数:
        content: 消息内容。
        session_key: 会话标识符。
        channel: 来源通道(用于上下文)。
        chat_id: 来源聊天 ID(用于上下文)。

    返回:
        Agent 的响应。
    """
    msg = InboundMessage(
        channel=channel,
        sender_id="user",
        chat_id=chat_id,
        content=content
    )

    response = await self._process_message(msg)
    return response.content if response else ""

这是什么?

  • 这是一个便捷函数
  • 用于命令行界面(CLI)或定时任务(cron)
  • 直接传入消息内容,返回 AI 的回复

完整工作流程示例

让我们用一个具体的例子来说明整个流程:

场景:用户在 Telegram 上问"帮我搜索最新的 AI 新闻"

1. 用户在 Telegram 发送消息
   ↓
2. Telegram 通道接收消息,创建 InboundMessage
   ↓
3. 消息发布到消息总线
   ↓
4. Agent Loop 的主循环从队列获取消息
   ↓
5. 调用 _process_message 处理消息
   ↓
6. 获取或创建会话("telegram:123456789")
   ↓
7. 更新工具上下文
   ↓
8. 构建消息列表(包括历史对话)
   ↓
9. 进入 Agent 循环
   ↓
10. 第一次迭代:
    - 调用 LLM
    - LLM 决定调用 web_search 工具
    - 执行 web_search 工具
    - 得到搜索结果
    ↓
11. 第二次迭代:
    - 再次调用 LLM(带上搜索结果)
    - LLM 根据搜索结果生成答案
    - LLM 不再调用工具
    - 保存最终答案
    ↓
12. 保存会话(用户消息 + AI 回复)
    ↓
13. 返回 OutboundMessage
    ↓
14. 主循环发送响应到消息总线
    ↓
15. Telegram 通道接收并发送给用户
    ↓
16. 用户看到 AI 的回复

关键概念总结

1. 异步编程 (Async/Await)

什么是异步?

  • 普通函数:执行时必须等待完成才能做其他事
  • 异步函数:执行时可以"等待"某个操作,同时可以做其他事

举个例子:

# 同步(普通)
def sync_function():
    result = network_request()  # 必须等网络请求完成
    return result

# 异步
async def async_function():
    result = await network_request()  # 等待网络请求,但可以处理其他任务
    return result

为什么要用异步?

  • 提高效率
  • 可以同时处理多个消息
  • 不会因为等待网络请求而阻塞

2. 消息队列

什么是消息队列?

  • 一个先进先出(FIFO)的队列
  • 生产者(通道)把消息放入队列
  • 消费者(Agent Loop)从队列取出消息

为什么要用消息队列?

  • 解耦:通道和 Agent Loop 不直接依赖
  • 缓冲:消息可以暂时存储,不会丢失
  • 并发:可以同时处理多个消息

3. 会话 (Session)

什么是会话?

  • 保存对话历史的容器
  • 每个用户有独立的会话

会话包含什么?

  • 对话历史(用户消息 + AI 回复)
  • 会话键(channel:chat_id)
  • 元数据(创建时间、最后更新时间等)

为什么要用会话?

  • 记住上下文
  • AI 能理解"之前说了什么"
  • 提供连续的对话体验

4. 工具 (Tool)

什么是工具?

  • AI 可以调用的"功能模块"
  • 每个工具都有特定的功能

工具的特点:

  • 有名称和描述
  • 有参数定义(JSON Schema)
  • 有执行逻辑

为什么要用工具?

  • 扩展 AI 的能力
  • 让 AI 能做更多事情
  • 比如搜索网页、读写文件、执行命令

5. Function Calling

什么是 Function Calling?

  • LLM 的一种能力
  • LLM 可以决定"调用哪个函数"和"用什么参数"

举个例子:

用户:帮我搜索最新的 AI 新闻
LLM:我需要调用 web_search 函数,参数是 {"query": "AI 新闻"}
系统:执行 web_search 函数,得到结果
LLM:根据搜索结果,生成答案

为什么要用 Function Calling?

  • 让 AI 更智能
  • AI 能主动使用工具
  • 不需要人工判断何时使用工具

常见问题

Q1: 为什么需要循环 20 次?

A: 因为 AI 可能需要多次调用工具才能完成任务。比如:

  1. 搜索网页
  2. 读取搜索结果中的某个文件
  3. 根据文件内容执行某个命令
  4. 再次搜索补充信息

每次工具调用都需要一次循环。20 次是为了防止死循环。

Q2: 什么是 await

A: await 是异步编程的关键字。它的意思是"等待这个操作完成,但期间可以做其他事情"。比如:

result = await network_request()  # 等待网络请求,但可以处理其他消息

Q3: 为什么要有系统消息 (system channel)?

A: 系统消息用于后台任务的通知。比如:

  • 子代理完成了某个任务
  • 定时任务触发了
  • 心跳检测

这些消息不需要用户触发,而是系统自动生成的。

Q4: 会话保存在哪里?

A: 会话保存在 ~/.nanobot/sessions/ 目录下,每个会话是一个 JSON 文件。比如:

~/.nanobot/sessions/telegram_123456789.json

Q5: 如何添加新的工具?

A: 需要做三件事:

  1. 创建一个新的工具类,继承 Tool 基类
  2. 实现 namedescriptionparameters 属性
  3. 实现 execute 方法
  4. _register_default_tools 中注册

总结

Agent Loop 是 nanobot 的核心,它的工作流程可以概括为:

  1. 等待消息 - 从消息队列获取用户消息
  2. 获取会话 - 获取用户的对话历史
  3. 构建上下文 - 准备发送给 LLM 的完整消息
  4. 调用 LLM - 让 AI 思考如何回答
  5. 执行工具 - 如果 AI 需要调用工具,就执行工具
  6. 重复循环 - 直到 AI 不再需要调用工具
  7. 保存会话 - 把对话保存下来
  8. 返回响应 - 把答案发送给用户

这个设计使得 nanobot 能够:

  • 处理复杂的任务(通过多次工具调用)
  • 记住对话历史(通过会话管理)
  • 扩展功能(通过工具系统)
  • 高效运行(通过异步编程)

希望这个解释能帮助你理解 Agent Loop 的工作原理!


文档生成时间:2026-02-09