Don’t Build Multi-Agents

Cognition
构建长期运行的AI智能体系统,需要解决“可靠性”问题:

  1. 上下文丢失、过长
  2. 状态混乱
  3. 错误累积

例如,Multi-Agent思路需要构建规划Agent、解释Agent、执行Agent、SOP Agent。
然而,如果仅仅使用两个独立Agent,其生成结果会更加独立、隔绝,而不是相关联。
整体大于局部。局部的完整性不能保证整体的一致性。

Shared Data

Principle 1: Share context, and share full agent traces, not just individual messages.

我们希望通过共享上下文解决一致性问题,但是不行。
Agent1和Agent2都是基于自己对Shared Data的理解工作,而不知道对方在做什么。
因此我们需要共享Traces,让一个Agent(例如解释Agent)对另一个Agent(例如执行Agent)进行Revision校正。
可是,只有垂直矫正,水平的Agent(多个执行Agents)之间仍然不知道对方在做什么。

Actions mean desisons

Principle 2: Actions carry implicit decisions, and conflicting decisions carry bad results.

每个Agent的行为都必须基于同样的预期结果,而不能基于不清楚、有歧义的预期结果;否则整体很难保持风格统一。

Single-threaded Linear Agent

鉴于上面两条,作者选择使用单Agent线性解决问题。
然而,这样做容易产生context windows overflow上下文溢出(因为线性Agent其实就是不断附带上一次的上下文进行下一次chat)。
我们引入总结压缩LLM解决上下文问题。

Claude Code设计模式

Calude Code的智能体有两个特点:

  1. 主Agent与子Agent不会并行运行
  2. 子Agent只回答简单问题,而不会编写代码
    这样做有几个优点
  • 避免上下文冲突:子Agent不包括主Agent的上下文,只回答清晰、具体的问题。
  • 节省上下文:子Agent的操作也不保存在主Agent的上下文中。他们是解耦合的。

How we built our multi-agent research system

How we built our multi-agent research system
三种AI模式:

  1. Chat AI
  2. Single Agent
  3. Multiple Agents
    Multi-Agents的优势在于回答开放、不确定的问题。传统的单Agent不适合研究,而多Agent并行搜索,最终总结出来的信息压缩性更强。

The essence of search is compression.

Anthropic团队区分了两种模式:

  1. 垂直模式:容易并行处理的任务,Leader Agent与多个Sub Agent交互
  2. 水平模式:不容易并行的任务、需要上下文共享的任务、Agent依赖强的任务,如编程,Leader Agent一步一步执行Sub stage

Agent模式的token使用量是Chat模式的4倍;而Multi-Agent则是Chat模式的15倍。
Multi-Agent让token用量增加,因此更可能解决问题。同时也带来的高成本。

Prompt Engineering

  1. Think like your agents.
  2. Teach the orchestartor how to delegate.
    例如,子问题如何划分?怎么确定它就是任务的最小可执行单元?
    可以使用 明确预期结果-example输出格式-可用资源tools-任务边界不要做什么 这一套指令。
  3. Scale effort to query complexity.
    为prompt嵌入scaling rules,明确指出简单-中等-复杂任务分别分配多少subagents。这一条主要是做减法,对简单任务指定少agent,节省成本。
  4. Tool design and selection are critical.
    Tool Description要够好,否则Agent可能不会调用需要的MCP工具。
  5. Let agents improve themselves.
    使用tool-testing agent,让agent改进失败的prompt和流程、重写工具描述等。
  6. Start wide, then narrow down.
    这一条是因为Agent自己的搜索词写的比较AI,太长了,返回的结果很少。需要提示AI使用宽泛的提示词,然后再窄化范围精确搜索。
  7. Guide the thinking process.
    这一步是打印日志,让AI把思考过程打成标记、大纲、ToDoList,这样方便修改。
  8. Parallel tool calling transforms speed and performance.
    主Agent平行分派任务给子Agent;子Agent并行调用Tools。

Eval Agents

Multi-Agents的过程可能每一次都不同,因此不能使用传统的评估方法。

  1. 小样本评估。不要等到测试用例足够多才开始测试,边测试边修改效果更好。
  2. LLM评估。给出判断标准(事实/引用准确性、完整性、来源质量、多余/无效工具调用…),让LLM量化评估(0.0~1.0打分)
  3. 人工检查遗漏。如AI是不是只使用SEO靠前的,而不是权威的网站。

需要注意,Multi-Agents会产生涌现(Emergent Behaviors),对Leader Agent的改动会影响Sub Agent。

Multi-Agent框架最好考虑下面几个方面:

  1. 工作分工(规划、解释、执行、自愈、总结)
  2. 问题如何分割成子问题(确定可执行标准)
  3. 效率(时间预期、工具调用次数限制、Scaling rules)

Production challenges

  1. Agent有状态,重构Agent影响很大,最好加上自愈Agent、错误处理系统。
    此外还可以加上check point,一步一步来,失败了从这一步开始重新生成;而不是丢失上下文从头开始。
  2. Agent的错误是“复利”的,前面错了会导致最后错得离谱。

Debugging

监控Agent的决策模式和交互结构,做到生产级追踪,更能系统性诊断和解决问题。

Deploy

使用彩虹部署。旧会话分配到旧机器上,逐渐分配流量到新机器上,渐进替代,减少prompt改动的影响。

Sync and Block

Leader Agent并行地分配任务给Sub tasks,但是实际上是以最后一个执行完的Sub Agent为准进行信息交互。这会造成等待与阻塞。但是如果Sub Agent分别处理每一个Sub Agent,又会出现上下文不共享的问题,局部扰乱整体。

Remote Procedure Call

本地函数放到服务器运行,会出现若干问题:

  1. 我怎么知道是哪个函数?Call Id
    本地函数调用,可以直接用指针找到函数;但是远程过程调用不行。
    因此我们需要分别在Client和Server维护一个“函数 <-> Call Id”的映射来确定所调用的函数。
  2. Client如何将参数传送到Server?序列化与反序列化
    本地函数调用,参数会压入栈;然而在远程过程调用中,Client与Server是不同的进程、处理器、操作系统、大小端,而且链表、对象这样的数据内存不分配在一处,加上网络传输必须要有容错机制,不能通过内存传递参数。
    因此我们需要使用网络传输,Client要将参数转换为字节流,传输到Server后,再反序列化还原为参数。
    这里还会涉及到数据格式的问题,JSON(性能不高)、XML、Protobuf、Thrift都是数据格式。
  3. 不使用内存,如何传输?网络传输
    网络传输层需要将Call Id与字节流传输给Server,因此RPC基于传输层TCP协议,gRPC基于HTTP2协议(同样基于TCP)。

早期的RPC不使用HTTP,是因为当时HTTP不能建立长连接,并且HTTP头部过长且不能压缩。HTTP2解决了上述问题。

一个HTTP请求

http://localhost:8080/add?a=1&b=2
"Content-Type": "application/json"

这个请求指定了方法add、协议http、数据格式JSON

阅读全文 »

数据分页优化

select * from your_table where type = ? limit start, end;

limit的分页方式是查出select的所有数据,然后舍弃start之前的数据。因此对于大数据量,性能很低。

优化方案

偏移ID

-- 深分页慢sql,51sec
select * from emp where ename='svZLER' limit 1000000, 10;

-- 使用id回表优化查询,44sec
select * from emp where id in (select id from emp where ename='eMxdWz') limit 1000000, 10;

-- 子查询使用二级索引深分页,然后回表,37sec
select * from emp inner join (select id from emp where ename='eMxdWz' limit 1000000, 10) b using(id) ;
-- b using(id) 相当于 on b.id = emp.id

分段查询

消息队列3大目标

异步

在生产者-消费者速度不匹配的情况下,使用异步可以减少等待,提高效率。

解耦

多个生产者可以通过消息队列管道集合成1条链路;也可以将1个生产者的消息负载均衡给多个消费者(只发送1条消息给MQ,MQ广播多份)。例如,增加了一个数据分析业务,这时候不需要修改业务代码,只需要配置MQ发送相应消息到大数据系统Server即可。
同时,生产者只需要关心将消息发送给MQ,无需关心后续处理(消费者挂了怎么办);MQ会负责和消费者通信。

削峰(生产者-消费者速度不同步)

由于队列本身是一条管道,拥有一定容量,因此可以削峰填谷,解决一些瞬时高并发流量。

消息队列的关键问题

C 系统一致性

A系统通过MQ将消息发送给B、C完成后续业务,B成功而C失败,这时如何保证一致性?

A 系统可用性

MQ宕机,依赖MQ管道的服务就不可用。MQ应该有高可用性和稳定性,不应该成为系统薄弱环节。
因此需要MQ集群,这时候又需要新的中间层NameSrv来管理维护MQ集群。

系统复杂度

  • 如何保证消费不丢失?
  • 如何避免重复消费?
  • 如何保证消息顺序?

幂等性

多次消费结果相当于只消费一次。

可以用业务id作为消息key,对key校验有没有消费过。
如果重复消费,确保多次消费和1次消费的结果相同。

  • 发送消息重复:发送后,网络断开,没收到ACK,导致重复发送
  • 消费消息重复:Consumer收到消息并处理完成,但是由于网络问题,Consumer应答没有发送到Broker;Broker遵从至少消费一次原则,重新发送。
  • Rebalance消息重复:Consumer Group的Consumer数量发生变化,触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

Message Queue产品

产品 优势 劣势 场景
Kafaka 吞吐量大、性能高、集群高可用 丢数据、功能单一 MapReduce大数据采集、日志分析
RabbitMQ 消息可靠、功能全面 erlang语言不容易定制,吞吐量较低 小规模服务调用
Pulsar Bookeeper,消息可靠性高 使用较少、生态有差距 大规模服务调用
RocketMQ 高吞吐、高性能、高可用。Java语言容易定制。 Java服务加载慢 功能全面,尤其适合金融、电商、互联网场景

消息队列工作方式

RocketMQ和Kafka都使用Topic,每个Topic的内容会分发到多个管道(Partition或MessageQueue)。而Kafka在Topic过多的情况下,吞吐量会严重下降;RocketMQ解决了这个问题。

RocketMQ集群

在RocketMQ集群中,多台NameSrv是平等的,而Broker会组成多个主-从结构。
Slave只负责备份,只有Master(brokerId=0)才会发送消息。
然而主从结构的Slave,由于brokerId不为0,不会自动切换为Master,需要人工介入。

Dledger高可用集群

Dleger是一种Raft算法,实现了Leader选举。
Dledger会从Followers中自动选举Leader,从而保证高可用。

三种发送方式

单向发送

Producer只发送消息、不处理ACK;MQ也不发送ACK。消息可靠性没有保障。

// 返回值为null,不处理ACK。
public void sendOneWay(Message msg) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneWay(msg);
}

同步发送

Producer等待MQ ACK,才继续操作。同步发送可能会发生阻塞。

public SendResult sendResult(
Collection<Message> msgs) throws ...Exception {
return this.defaultMQProducerImpl.send(batch(msgs));
}

异步发送

Producer不等待MQ ACK(异步ACK,也能保证不丢失消息),直接发送消息。
但是异步发送也有代价,我们不能发送完立刻producer.shutdown(),而需要设置一段延迟,使producer能够捕捉Exception并重发消息。

// send方法本身没有返回值,不会阻塞;但是能够处理Exception
public void send(Message msg,
SendCallBack sendCallBack) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
try {
if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
sendByAccumulator(msg, null, sendCallBack);
} else {
sendDirect(msg, null, sendCallBack);
}
} catch (Throwable e) {
sendCallBack.onException(e);
}
}

producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
...
}

@Override
public void onException(Throwable e) {
...
}
});

两种消费方式

Consumer拉取

Consumer维护一个轮询拉取,Broker收到拉取请求后发送消息。

Broker推送

一般只用推模式,因为Consumer需要轮询(即使Broker不一定有消息),会消耗部分资源。

消息类型

顺序消息

局部有序,实际上是序号相同的消息发送到同一个队列管道,然后消费者从一个管道中拿消息,从而保证有序性。

广播消息

正常情况下,多个Consumer是负载均衡模式,一条消息只会发到其中一个Consumer消费;而在广播模式下,所有的Consumer都会收到消息。
在代码层面,正常情况下服务端统一维护消费者位点;而在广播模式下客户端本地.rocket_offsets维护消费者位点

消息重试

顺序消息

顺序消息要拿到ACK才会发送下一条消息,否则会重发消息

无序消息

为了保障无需消息的消费,MQ设置了一个消息重试间隔时间。如果没有回复,间隔10s-30s-1m-2m…来重发消息,最多重试16次(默认)。
如果达到重试上限还未消费,该消息称为死信消息。死信消息会进入死信队列

死信队列

死信队列不归属于Topic、Consumer,而是归属于Group Id。
死信队列的消息不会被再次重复消费,有效期为3天,过期删除。
可以手工在监控平台里处理死信,获取messageId后自己处理。

重复消费

网络闪断(成功执行,MQ没收到ACK)、生产者宕机(成功发送到MQ,生产者没收到ACK)会引发重复消费。

Hash

字母异位词

排序每一个单词,就知道是不是异位词。

两数之和

从数组中,找到nums[i] + nums[j] == target,并返回{ i, j }
思路是双重循环,遍历每一个元素,求和是否为target。
然而,双重循环需要O(N2)O(N^2)的复杂度。因此,可以使用一张表,使用containsKey方法识别是否存在当前i的target - nums[i],即可减少一重循环。

关键思想

用Map高效率查找,减少一重循环。

最长连续序列

从乱序数组中,找到最长连续(数组中不一定连续)的序列。要求O(N)O(N)
首先用数组的值存入哈希表,然后遍历数组,判断map.constains(curNum++)
然而,即使这样还是效率不够高。

优化

  1. 中间值不进入循环,序列开始值才进入,使用!contains(curNum - 1)判断是否为序列开始值
  2. 去重,不要哈希表,不需要键值对,使用哈希Set,只存储值。

关键思想

去重;不处理中间值

阅读全文 »

NJU gdb六步走

  1. 启动gdb,加载可执行文件
  2. 设置断点 break main 入口处设置断点
  3. 启动程序 run (参数)
  4. 查看程序当然状态
    • info register (EIP): 显示所有寄存器(或只有EIP寄存器)的内容
    • 栈:保存过程执行时的数据信息
  5. 继续下一条指令
    • stepsi(机器指令)
  6. 退出 quit

1.0 Cprograming

原网址: https://www.cprogramming.com/gdb.html

gcc main.c -g -Wall -Werror -o main    启动编译

gdb main 开始debug
list 列出代码
break 行 设置断点
info break 断点信息
run 运行程序
next或step 进行单步编译(next跳过函数)
print <value> 打印变量的值
continue 跳到下一个断点位置
quit 退出
阅读全文 »

GMP

协程

协程是用户态的概念。多个协程实际上映射为1个线程。

协程是用户态概念,因此创建、销毁、调度都在用户态完成,不需要切换内核态。
由于协程从属于同一个内核级线程,因此实际上无法并行;而一个协程的阻塞最终也会导致整个线程下的所有协程阻塞。

Goroutine

Go解耦了协程和线程的绑定关系,从而使线程变为一个中间层,协程可以灵活地映射到不同的线程上,相当于“虚拟线程”。

好处如下:

  • 可以利用多个线程,实现并行
  • 通过调度器,实现灵活的映射
  • 栈空间动态扩展(线程大小固定,会产生内存浪费)

GMP

Goroutine Machine Processor
GMP就是协程调度器。
GMP有一个全局队列存储Goroutine;不过实际上Processor都会优先在自己的本地队列调度Goroutine(没有则向全局队列获取),并映射Goroutine到Machine上执行。
如果全局队列没有Goroutine,那么会尝试获取就绪态(正在IO)的协程。
如果仍然失败,那么会从其他Processor中窃取一半的Goroutine,实现负载均衡。

全局队列是互斥的,获取Goroutine要防止获取多次。

type schedt struct {
...
lock mutex
runq gQueue
runqsize int32
}
阅读全文 »

Hello World

func main() {
h := server.Default()

h.GET("/hello", func(c context.Context, ctx *app.RequestContext) {
ctx.Data(consts.StatusOK, consts.MIMETextPlain, []byte("Hello World!"))
})

h.Spin()
}
$ go run main.go

IDL

Thrift

# echo.thrift
namespace go api

struct Request {
1: string message
}

struct Response {
1: string message
}

service Echo {
Response echo(1: Request req)
}

CloudweGo代码生成

go install github.com/cloudwego/thriftgo@latest

mkdir -p demo/demo_thrift
cd demo/demo_thrift
cwgo server --type RPC \
--module demo/demo_thrift \
--service demo_thrift \
--idl ../../echo.thrift

Protobuf

syntax = "proto3"

package pbapi;

option go_package = "/pbapi";

message Request {
string msg = 1;
}

message Response {
string msg = 1;
}

service EchoService {
rpc Echo (Request) returns (Response) {}
}

CloudweGo代码生成

mkdir -p demo/demo_proto
cd demo/demo_proto

cwgo server -I ../../idl
--type RPC \
--module demo/demo_proto \
--service demo_proto \
--idl ../../echo.thrift

MakeFile自动cwgo代码生成

.PHONY: gen-demo-proto
gen-demo-proto:
@cd demo/demo_proto && cwgo server -I ../../idl --type RPC --module demo/demo_proto --service demo_proto --idl ../../echo.thrift

Consul服务注册、发现

服务注册用于为服务集群提供统一接口,自动处理集群loadbalance和宕机

// r, err := consul.NewConsulRegister("localhost:8500")
r, err := consul.NewConsulRegister(conf.Getconf().Registry.RegistryAddress[0])
if err != nil {
log.Fatal(err)
}
opts = append(opts, server.WithRegistry(r))
version: '3'
services:
consul:
ports:
- 8500:8500

Gorm操作数据库

package model

import "gorm.io/gorm"

type User struct {
gorm.Model
Email string `gorm:"uniqueIndex;type:varchar(128) not null"`
Password string `gorm:"type:varchar(64) not null"`

}

新增页面

  • 路由
// main.go
func main() {
...
h.GET("/your-page", func(c context.Context, ctx *app.RequestContext) {
ctx.HTML(consts.StatusOK, "your-page.tmpl", utils.H("Title: Your Title"))
})
}
  • 模板
// your-page.tmpl
{{ define "your-page" }}
<div>
...
</div>
{{ end }}
  • Hertz生成IDL接口代码
syntax = "proto3"

package pbapi;

option go_package = "/pbapi";

message Request {
string msg = 1;
}

message Response {
string msg = 1;
}

service EchoService {
rpc Echo (Request) returns (Response) {}
}

MCP

RAG的局限性

对于AI来说,RAG仅仅是外部知识库,AI只起到一个总结效果。而总结的效果取决于向量相似度匹配,可能遗漏关键信息。

direction: right
结构化数据 -> 文本块
非结构化数据 -> 文本块

文本块 -> 向量数据库 -> 检索文本块 -> 生成最终响应
  • 生成内容不完整:RAG处理的是文本的切片,因此无法看到整篇文档信息。
  • RAG无法判断需要多少切片才能解决问题。
  • 多轮检索能力弱。

MCP基础

Function Calling

Coze的Agent就是基于Function Calling思路封装的。

不过Function Calling成本比较高,需要模型经过专门训练微调才能稳定支持。

这导致有些模型不支持某些插件的调用(例如Trae只有选择Sonnet、GPT等模型才可以处理图片)。

另外,Function Calling不是一项标准,许多模型的实现细节不一样。

Model Context Protocol

MCP是一项标准协议,简单来说就是通用的接口,使AI-外部工具/数据源交互标准化、可复用。

Claude Desktop、Cursor这样的工具在内部实现MCP Client,这个Client通过MCP协议与MCP Server(由服务提供公司自己开发,实现访问数据、浏览器、本地文件等功能,最终通过MCP返回标准格式)交互,最终在MCP Host上展示。

MCP 传输方式

STDIO,本地环境
SSE,并发量不高,单向通信
Streamable HTTP,高并发,需要维护长连接

指标 Function Calling Model Context Portocol
协议 私有协议 开放协议
场景 单次函数调用 多工具协同 + 数据交互
接入方式 函数直接接入 需要MCP Server + MCP Client
耦合度 工具与模型绑定 工具开发与Agent开发解耦
调用方式 API Stdio/SSE
阅读全文 »

文本格式转换

pandoc --from markdown --to docx source.md -o dest.docx
pandoc -f markdown source.md -t docx -o dest.docx
pandoc source.md -o dest.docx --ignore-args # 忽略参数

注意:为了最佳转换效果,markdown文件每行后都要空行

模板

pandoc --reference-doc template.docx source.md -o dest.docx

md2epub

# 首先把所有的md文件列出来
## 递归查找所有 .md 文件(排除 README.md 和 SUMMARY.md)
find . -name "*.md" ! -name "README.md" ! -name "SUMMARY.md" | sort > filelist.txt
## 然后编辑 `filelist.txt`,确保文件顺序正确(例如按 `SUMMARY.md` 的目录结构排序)。

pandoc --standalone --toc \
--metadata title="MIT6.824 分布式系统" \
--metadata author="Robert Morris" \
-o output.epub $(cat filelist.txt)

注意:对于gitbook,pandoc可能不能正确处理路径,推荐使用honkit。

honkit

// book.json
{
"title": "MIT6.824 分布式系统",
"author": "Robert Morris",
"plugins": ["hints"],
"pluginsConfig": {
"hints": {
"info": "fa fa-info-circle",
"warning": "fa fa-exclamation-triangle"
}
}
}
# 安装honkit
npm install honkit --save-dev
# 需要calibre转换
ebook-convert --version

npm init -y
npx honkit epub ./ ./mybook.epub
0%