source venv/bin/activate
pip install langgraph python-dotenv openai

LangGraph Demo

1.0 State

State就是AI流转的全局变量

class InputState(TypedDict):
question: str
llm_answer: Optional[str] # None or str

class OutputState(TypedDict):
answer: str

class OverallState(InputState, OutputState):
pass

2.0 Node

def llm_node(state: InputeState):
msg = [
("system", readMd("system.md")),
("human", state["question"])
]

llm = ChatOpenAI(model="gpt-4o")

resp = llm.invoke(msg)
return {"answer": resp.content}

3.0 Graph Compile

builder = StateGraph(OverallState, input=InputState, output=OutputState)

builder.add_node("llm", llm_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)

graph = builder.compile()

Draw Graph

display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

Messages History

class State(TypedDict):
msgs: Annotated[list, operator.add]

MessageGraph

使用Reducer追加消息,但是可以对已有消息做更新、合并、删除操作(Context Engine)

class MessageGraph(StateGraph):
def __init__(self) -> None:
super().__init__(Annotated[list[AnyMessage], add_message])
builder = MessageGraph()
# ...
graph = builder.compile()

msgs2 = [HumanMessage(content="xxx", id=msg1.id)]
# ID相同,覆盖消息
add_messages(msgs1, msgs2)

Structured Output

class UserInfo(BaseModel):
name: str = Field(description="The name of the user")
# ...

# Runnable对象
structured_llm = llm.with_structured_output(UserInfo)

# UserInfo对象
resp = structured_llm.invoke(msg)

消息队列3大目标

异步

在生产者-消费者速度不匹配的情况下,使用异步可以减少等待,提高效率。

解耦

多个生产者可以通过消息队列管道集合成1条链路;也可以将1个生产者的消息负载均衡给多个消费者(只发送1条消息给MQ,MQ广播多份)。例如,增加了一个数据分析业务,这时候不需要修改业务代码,只需要配置MQ发送相应消息到大数据系统Server即可。
同时,生产者只需要关心将消息发送给MQ,无需关心后续处理(消费者挂了怎么办);MQ会负责和消费者通信。

削峰(生产者-消费者速度不同步)

由于队列本身是一条管道,拥有一定容量,因此可以削峰填谷,解决一些瞬时高并发流量。

消息队列的关键问题

C 系统一致性

A系统通过MQ将消息发送给B、C完成后续业务,B成功而C失败,这时如何保证一致性?

A 系统可用性

MQ宕机,依赖MQ管道的服务就不可用。MQ应该有高可用性和稳定性,不应该成为系统薄弱环节。
因此需要MQ集群,这时候又需要新的中间层NameSrv来管理维护MQ集群。

系统复杂度

  • 如何保证消费不丢失?
  • 如何避免重复消费?
  • 如何保证消息顺序?

幂等性

多次消费结果相当于只消费一次。

可以用业务id作为消息key,对key校验有没有消费过。
如果重复消费,确保多次消费和1次消费的结果相同。

  • 发送消息重复:发送后,网络断开,没收到ACK,导致重复发送
  • 消费消息重复:Consumer收到消息并处理完成,但是由于网络问题,Consumer应答没有发送到Broker;Broker遵从至少消费一次原则,重新发送。
  • Rebalance消息重复:Consumer Group的Consumer数量发生变化,触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

Message Queue产品

产品 优势 劣势 场景
Kafaka 吞吐量大、性能高、集群高可用 丢数据、功能单一 MapReduce大数据采集、日志分析
RabbitMQ 消息可靠、功能全面 erlang语言不容易定制,吞吐量较低 小规模服务调用
Pulsar Bookeeper,消息可靠性高 使用较少、生态有差距 大规模服务调用
RocketMQ 高吞吐、高性能、高可用。Java语言容易定制。 Java服务加载慢 功能全面,尤其适合金融、电商、互联网场景

消息队列工作方式

RocketMQ和Kafka都使用Topic,每个Topic的内容会分发到多个管道(Partition或MessageQueue)。而Kafka在Topic过多的情况下,吞吐量会严重下降;RocketMQ解决了这个问题。

RocketMQ集群

在RocketMQ集群中,多台NameSrv是平等的,而Broker会组成多个主-从结构。
Slave只负责备份,只有Master(brokerId=0)才会发送消息。
然而主从结构的Slave,由于brokerId不为0,不会自动切换为Master,需要人工介入。

Dledger高可用集群

Dleger是一种Raft算法,实现了Leader选举。
Dledger会从Followers中自动选举Leader,从而保证高可用。

三种发送方式

单向发送

Producer只发送消息、不处理ACK;MQ也不发送ACK。消息可靠性没有保障。

// 返回值为null,不处理ACK。
public void sendOneWay(Message msg) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneWay(msg);
}

同步发送

Producer等待MQ ACK,才继续操作。同步发送可能会发生阻塞。

public SendResult sendResult(
Collection<Message> msgs) throws ...Exception {
return this.defaultMQProducerImpl.send(batch(msgs));
}

异步发送

Producer不等待MQ ACK(异步ACK,也能保证不丢失消息),直接发送消息。
但是异步发送也有代价,我们不能发送完立刻producer.shutdown(),而需要设置一段延迟,使producer能够捕捉Exception并重发消息。

// send方法本身没有返回值,不会阻塞;但是能够处理Exception
public void send(Message msg,
SendCallBack sendCallBack) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
try {
if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
sendByAccumulator(msg, null, sendCallBack);
} else {
sendDirect(msg, null, sendCallBack);
}
} catch (Throwable e) {
sendCallBack.onException(e);
}
}

producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
...
}

@Override
public void onException(Throwable e) {
...
}
});

两种消费方式

Consumer拉取

Consumer维护一个轮询拉取,Broker收到拉取请求后发送消息。

Broker推送

一般只用推模式,因为Consumer需要轮询(即使Broker不一定有消息),会消耗部分资源。

消息类型

顺序消息

局部有序,实际上是序号相同的消息发送到同一个队列管道,然后消费者从一个管道中拿消息,从而保证有序性。

广播消息

正常情况下,多个Consumer是负载均衡模式,一条消息只会发到其中一个Consumer消费;而在广播模式下,所有的Consumer都会收到消息。
在代码层面,正常情况下服务端统一维护消费者位点;而在广播模式下客户端本地.rocket_offsets维护消费者位点

消息重试

顺序消息

顺序消息要拿到ACK才会发送下一条消息,否则会重发消息

无序消息

为了保障无需消息的消费,MQ设置了一个消息重试间隔时间。如果没有回复,间隔10s-30s-1m-2m…来重发消息,最多重试16次(默认)。
如果达到重试上限还未消费,该消息称为死信消息。死信消息会进入死信队列

死信队列

死信队列不归属于Topic、Consumer,而是归属于Group Id。
死信队列的消息不会被再次重复消费,有效期为3天,过期删除。
可以手工在监控平台里处理死信,获取messageId后自己处理。

重复消费

网络闪断(成功执行,MQ没收到ACK)、生产者宕机(成功发送到MQ,生产者没收到ACK)会引发重复消费。

什么是消息事务

消息事务基于消息队列的两阶段提交,将本地事务和发放消息放在了一个分布式事务里。保证原子性。
用法:将一个分布式事务拆分成一个消息事务(A系统本地操作+发消息)+ B系统本地操作。
B系统操作由消息驱动。只要消息事务成功,那么A操作一定成功;这时B系统收到消息执行本地操作,如果本地操作失败,消息会重新投放,直到B操作成功。

上面的方法满足BASE:B基本A可用;S软状态;E最终一致性
BASE是对于CAP中的AP系统的拓展。牺牲强一致性来保证Available和Performance。
满足BASE的事务称为“柔性事务”

什么是Exactly Once

At Least Once

Producer接受Broker ACK来确保信息成功写入Topic。
如果Producer接收ACK超时、或Broker出错时,会重复发送消息。

但是如果Broker已经写入Topic,但是没有来得及发送ACK或ACK超时,Producer重新发送的消息会第二次写入Topic,导致最终Consumer收到重复消息。

At Most Once

Producer接收ACK超时,或Broker出错时没有重复发消息,会导致消息丢失,没有写入Topic,也没有被Consumer消费。
有些时候我们为了避免重复消费,允许这种情况发生。

Exactly Once

Exactly Once是说,即使重复发送了消息,Consumer只消费一次。需要消息队列Serv、Producer、Consumer协同才能实现。

RocketMQ事务消息

  1. MQ开启一个事务Topic
  2. 事务中第一个执行的服务发送1.5条消息(0.5是因为,这条消息在事务提交前,对Consumer不可见)
  3. 1.5发送成功后,发送0.5消息的服务开始本地事务;并决定事务提交/回滚。
    RocketMQ保证最终一致性

如何做到写入消息但是对用户不可见呢?

0.5消息,备份原消息Topic和MQ,然后改变Topic为HALF_TOPIC,由于Consumer没有订阅这个Topic,所以无法消费。
然后RocketMQ开始定时任务,从HALF_TOPIC中拉取消息消费,并决定提交事务还是回滚。

Kafka幂等

Kafka不确定是否成功发送,就一直重试,Broker保证只消费一次。

幂等Producer

Kafka为了保证幂等性,引入ProducerID和SequenceNumber。
new_seq = old_seq+1: 正常消息; new_seq <= old_seq : 重复消息; new_seq > old_seq+1: 消息丢失;

跳表

ZSet的实现方式有跳表、压缩列表。

  • 压缩列表:比较方便地搜索头节点和尾节点。数量<128,所有元素长度<64B时使用。
  • 跳表:就是链表二分搜索的数据结构。多级链表,最高级链接的节点最稀疏。可以从高到低寻找,加快效率。

同样,跳表对范围查询支持较好,二分找到开头,然后遍历即可。

Redis为什么不用b+树?MySQL为什么不用跳表?

这个问题在于 Redis是直接操作内存的并不需要磁盘io而MySQL需要去读取io,所以mysql要使用b+树的方式减少磁盘io,B+树的原理是 叶子节点存储数据,非叶子节点存储索引,每次读取磁盘页时就会读取一整个节点,每个叶子节点还有指向前后节点的指针,为的是最大限度的降低磁盘的IO;因为数据在内存中读取耗费的时间是从磁盘的IO读取的百万分之一 而Redis是 内存中读取数据,不涉及IO,因此使用了跳表,跳表明显是更快更简单的方式。

单线程网络IO、KV读写

Redis的网络IO和KeyValue读写是由一个线程来完成的。
而Redis的持久化、异步删除、集群数据同步是额外的线程执行。

也由于Redis是单线程的,所以要特别小心耗时的操作,这些操作会阻塞后续指令。

简单来说就是处理事务一套、前台接待一套。不会因为前面办事导致人均等待时间太久。

Redis使用IO多路复用(epoll),将连接信息、事件放到队列中,使其能够处理并发的客户端连接。

socket: {
s0
s1
s2
s3
"..."
}

IO多路复用: {
s3 -> s2 -> s1 -> s0
}

事件处理器: {
连接处理器
命令请求处理器
命令回复处理器
}

socket -> IO多路复用 -> 文件事件分派器 -> 事件处理器

详解GET key

Redis相当于HashMap,也由于Hash是无序的,因此scan这样的流式查询,在查改场景中,可能会漏扫中途插入到前面下标的元素。

Redis持久化

RDB Snapshot

默认情况下,Redis将内存数据快照保存为dump.rdb,可以使用

save <time_duration> <row_insertion>

指示Redis多少秒内插入多少条数据后持久化到数据库
也可以直接用savebgsave命令写入数据库

bgsave 异步持久化

bgsave使用写时复制COW。bgsave从主线程fork出来,当主线程修改数据时,bgsave线程会将写入数据拷贝一份,然后写入rdb

Append-Only File

快照不能做到完全持久,假如服务宕机,可能会丢失几条写入。
这时候我们直接做个命令日志AOF,将执行的修改指令写入appendonly.aof

appendonly yes
appendfilename "appendonly.aof"

aof有三种模式appendfsync

  • always:立刻写入磁盘
  • everysec:每秒写一次
  • no:交给OS调度
    但是,由于aof是记录命令,需要执行时间,对于持久化大量数据比较耗时间。
    对于连续操作(如自增)aof会优化为1条命令,可以用bgrewriteaof命令手动重写
# 最小重构大小
auto-aof-rewrite-min-size 64mb
# 增长了100%,即128mb就重构
auto-aof-rewrite-percentage 100

Redis4 混合持久化

由于Redis重启时优先使用aof恢复数据,rdb利用率不高。因此出现了混合持久化

# 必须同时开启aof
aof-use-rdb-preamle yes
# 可以直接把快照关掉,因为混合持久化都写在aof里面

开启后,当aof重写时,会直接写入rdb,将rdb快照和aof增量存储在一起。
于是Redis重启可以先读rdb,再执行增量aof恢复数据,提高效率。

Redis主从

# redis-<your_port>.conf
pidfile /var/run/redis_<your_port>.pid
logfile "<your_port>.log"
# 数据存放目录
dir /usr/local/redis/data/<your_port>

### 主从复制
replicaof <main_redis_ip> <port>
# 从节点,只读
replica-read-only yes


### 启动
# 启动从节点
redis-server redis-<your_port>.conf
# 连接到从节点
redis-cli -p <minor_redis_port>

主从原理

master: {
rdb data
repl buffer
}
slave

slave -> master: 1. psync全量复制同步数据(通过socket长连接)
master.rdb data -> master.rdb data: 2.1 收到psync命令,执行bgsave生成最新rdb快照
master.repl buffer -> master.repl buffer: 2.2 主节点将增量写语句更新到buffer
master.rdb data -> slave: 3. 发送rdb数据
slave -> slave: 4. 清空旧数据,加载主节点rdb
master.repl buffer -> slave: 5. 发送缓冲区写命令
slave -> slave: 6. 执行主节点buffer写命令
master -> slave: 7. 主节点通过socket长连接,持续发送写命令给从节点,保持数据一致

断点续传

master: {
repl backlog buffer
}
slave

slave -> master: 1. 连接断开
master.repl backlog buffer -> master.repl backlog buffer: 2. 主节点增量写命令写入buffer
slave -> master: 3. 恢复socket长连接
slave -> master: 4. psync(offset)带偏移量
master -> slave: 5. 若offset在buffer中,断点以后的数据发送给从节点;否则,全量发送
master -> slave: 6. 持续发送buffer写命令,保持数据一致

如果存在很多从节点,那么主节点传输压力会比较大。可以采用树型架构,让从节点再给它的子节点传输数据。

哨兵高可用

sentinel_cluster: {
sentinel1 <-> sentinel2 <-> sentinel3 <-> sentinel1
}

client -> master <-> sentinel_cluster
master -> slave1
master -> slave2
client -> sentinel_cluster
sentinel_cluster <-> slave1
sentinel_cluster <-> slave2

哨兵会动态监听redis主节点,如果主节点挂了,哨兵会选择一个新redis示例作为主节点(通知给client端)

开启哨兵

# sentinel.conf

port 26379
pidfile <your_file>
logfile <your_file>
dir "<your_dir>"

# quorm是指多少个sentinel同时认为主节点挂了,才让master失效,一般设置为一半以上
sentinel monitor mymaster <redis_ip> <redis_port> <quorm>

启动哨兵./redis-sentinel sentinel.conf

Redis Cluster

当哨兵集群选举新节点的时候,服务会宕机几秒钟。因此我们需要Cluster

client1 -> RedisCluster
client2 -> RedisCluster
RedisCluster: Hash slot: CRC16(key) % 16384
RedisCluster -> Redis集群
Redis集群: {
master1 -> slave1-1
master1 -> slave1-2

master2 -> slave2-1
master2 -> slave2-2

master3 -> slave3-1
master3 -> slave3-2
}

在Cluster中,每个master数据是不重叠的,数据会被分片储存。通过Hash算法来决定存储数据到哪一个master节点。
使用Cluster,可以避免Redis服务完全宕机。
2的幂次取模小技巧:

Xmod2n=X & (2n1)X \mod 2^n = X \text{ \& } (2^n - 1)

Redis集群搭建

redis-cluster/
|-- 8000
| `-- redis.conf
|-- 8010
`-- 8020
  1. Redis配置
# ...其他配置

daemonize yes
port 8000
dir /path/to/redis-cluster/8000/
# 启用集群
cluster-enabled yes
cluster-config-file nodes-8000.conf
cluster-node-timeout 5000
# 密码
requirepass <your_password>
masterauth <your_auth_password>
  1. 启动所有master和slave节点
redis-server /path/to/redis-cluster/80*/redis.conf
ps aux | grep redis
  1. 开启集群
# replicas表示节点的副本,配置为1,则1主1从
redis-cli -a <your_auth_password> --cluster create --cluster-replicas 1 \
localhost:8000 localhost:8001 localhost:8002 ...

注意,第二次启动集群后,就不需要这一步了。节点会自动读取nodes-8000.conf文件,恢复上次集群状态。

  1. 进入redis节点验证配置
cluster info
cluster nodes

Redission原理

Thread1: {
Redission
}
Thread2: {
Redission
}

Thread1.Redission -> Try Lock
Try Lock -> 守护线程: 加锁成功
守护线程 -> Redis(Master): lock,每隔10s检查线程是否仍持有锁。如果持有,则延长锁失效时间

Thread2.Redission -> Try Lock
Try Lock -> Thread2.Redission: 加锁失败,使用while自旋尝试加锁

Redission利用了Redis Lua脚本保证原子操作。

Don’t Build Multi-Agents

Cognition
构建长期运行的AI智能体系统,需要解决“可靠性”问题:

  1. 上下文丢失、过长
  2. 状态混乱
  3. 错误累积

例如,Multi-Agent思路需要构建规划Agent、解释Agent、执行Agent、SOP Agent。
然而,如果仅仅使用两个独立Agent,其生成结果会更加独立、隔绝,而不是相关联。
整体大于局部。局部的完整性不能保证整体的一致性。

Shared Data

Principle 1: Share context, and share full agent traces, not just individual messages.

我们希望通过共享上下文解决一致性问题,但是不行。
Agent1和Agent2都是基于自己对Shared Data的理解工作,而不知道对方在做什么。
因此我们需要共享Traces,让一个Agent(例如解释Agent)对另一个Agent(例如执行Agent)进行Revision校正。
可是,只有垂直矫正,水平的Agent(多个执行Agents)之间仍然不知道对方在做什么。

Actions mean desisons

Principle 2: Actions carry implicit decisions, and conflicting decisions carry bad results.

每个Agent的行为都必须基于同样的预期结果,而不能基于不清楚、有歧义的预期结果;否则整体很难保持风格统一。

Single-threaded Linear Agent

鉴于上面两条,作者选择使用单Agent线性解决问题。
然而,这样做容易产生context windows overflow上下文溢出(因为线性Agent其实就是不断附带上一次的上下文进行下一次chat)。
我们引入总结压缩LLM解决上下文问题。

Claude Code设计模式

Calude Code的智能体有两个特点:

  1. 主Agent与子Agent不会并行运行
  2. 子Agent只回答简单问题,而不会编写代码
    这样做有几个优点
  • 避免上下文冲突:子Agent不包括主Agent的上下文,只回答清晰、具体的问题。
  • 节省上下文:子Agent的操作也不保存在主Agent的上下文中。他们是解耦合的。

How we built our multi-agent research system

How we built our multi-agent research system
三种AI模式:

  1. Chat AI
  2. Single Agent
  3. Multiple Agents
    Multi-Agents的优势在于回答开放、不确定的问题。传统的单Agent不适合研究,而多Agent并行搜索,最终总结出来的信息压缩性更强。

The essence of search is compression.

Anthropic团队区分了两种模式:

  1. 垂直模式:容易并行处理的任务,Leader Agent与多个Sub Agent交互
  2. 水平模式:不容易并行的任务、需要上下文共享的任务、Agent依赖强的任务,如编程,Leader Agent一步一步执行Sub stage

Agent模式的token使用量是Chat模式的4倍;而Multi-Agent则是Chat模式的15倍。
Multi-Agent让token用量增加,因此更可能解决问题。同时也带来的高成本。

Prompt Engineering

  1. Think like your agents.
  2. Teach the orchestartor how to delegate.
    例如,子问题如何划分?怎么确定它就是任务的最小可执行单元?
    可以使用 明确预期结果-example输出格式-可用资源tools-任务边界不要做什么 这一套指令。
  3. Scale effort to query complexity.
    为prompt嵌入scaling rules,明确指出简单-中等-复杂任务分别分配多少subagents。这一条主要是做减法,对简单任务指定少agent,节省成本。
  4. Tool design and selection are critical.
    Tool Description要够好,否则Agent可能不会调用需要的MCP工具。
  5. Let agents improve themselves.
    使用tool-testing agent,让agent改进失败的prompt和流程、重写工具描述等。
  6. Start wide, then narrow down.
    这一条是因为Agent自己的搜索词写的比较AI,太长了,返回的结果很少。需要提示AI使用宽泛的提示词,然后再窄化范围精确搜索。
  7. Guide the thinking process.
    这一步是打印日志,让AI把思考过程打成标记、大纲、ToDoList,这样方便修改。
  8. Parallel tool calling transforms speed and performance.
    主Agent平行分派任务给子Agent;子Agent并行调用Tools。

Eval Agents

Multi-Agents的过程可能每一次都不同,因此不能使用传统的评估方法。

  1. 小样本评估。不要等到测试用例足够多才开始测试,边测试边修改效果更好。
  2. LLM评估。给出判断标准(事实/引用准确性、完整性、来源质量、多余/无效工具调用…),让LLM量化评估(0.0~1.0打分)
  3. 人工检查遗漏。如AI是不是只使用SEO靠前的,而不是权威的网站。

需要注意,Multi-Agents会产生涌现(Emergent Behaviors),对Leader Agent的改动会影响Sub Agent。

Multi-Agent框架最好考虑下面几个方面:

  1. 工作分工(规划、解释、执行、自愈、总结)
  2. 问题如何分割成子问题(确定可执行标准)
  3. 效率(时间预期、工具调用次数限制、Scaling rules)

Production challenges

  1. Agent有状态,重构Agent影响很大,最好加上自愈Agent、错误处理系统。
    此外还可以加上check point,一步一步来,失败了从这一步开始重新生成;而不是丢失上下文从头开始。
  2. Agent的错误是“复利”的,前面错了会导致最后错得离谱。

Debugging

监控Agent的决策模式和交互结构,做到生产级追踪,更能系统性诊断和解决问题。

Deploy

使用彩虹部署。旧会话分配到旧机器上,逐渐分配流量到新机器上,渐进替代,减少prompt改动的影响。

Sync and Block

Leader Agent并行地分配任务给Sub tasks,但是实际上是以最后一个执行完的Sub Agent为准进行信息交互。这会造成等待与阻塞。但是如果Sub Agent分别处理每一个Sub Agent,又会出现上下文不共享的问题,局部扰乱整体。

Remote Procedure Call

本地函数放到服务器运行,会出现若干问题:

  1. 我怎么知道是哪个函数?Call Id
    本地函数调用,可以直接用指针找到函数;但是远程过程调用不行。
    因此我们需要分别在Client和Server维护一个“函数 <-> Call Id”的映射来确定所调用的函数。
  2. Client如何将参数传送到Server?序列化与反序列化
    本地函数调用,参数会压入栈;然而在远程过程调用中,Client与Server是不同的进程、处理器、操作系统、大小端,而且链表、对象这样的数据内存不分配在一处,加上网络传输必须要有容错机制,不能通过内存传递参数。
    因此我们需要使用网络传输,Client要将参数转换为字节流,传输到Server后,再反序列化还原为参数。
    这里还会涉及到数据格式的问题,JSON(性能不高)、XML、Protobuf、Thrift都是数据格式。
  3. 不使用内存,如何传输?网络传输
    网络传输层需要将Call Id与字节流传输给Server,因此RPC基于传输层TCP协议,gRPC基于HTTP2协议(同样基于TCP)。

早期的RPC不使用HTTP,是因为当时HTTP不能建立长连接,并且HTTP头部过长且不能压缩。HTTP2解决了上述问题。

一个HTTP请求

http://localhost:8080/add?a=1&b=2
"Content-Type": "application/json"

这个请求指定了方法add、协议http、数据格式JSON

阅读全文 »

数据分页优化

select * from your_table where type = ? limit start, end;

limit的分页方式是查出select的所有数据,然后舍弃start之前的数据。因此对于大数据量,性能很低。

优化方案

偏移ID

-- 深分页慢sql,51sec
select * from emp where ename='svZLER' limit 1000000, 10;

-- 使用id回表优化查询,44sec
select * from emp where id in (select id from emp where ename='eMxdWz') limit 1000000, 10;

-- 子查询使用二级索引深分页,然后回表,37sec
select * from emp inner join (select id from emp where ename='eMxdWz' limit 1000000, 10) b using(id) ;
-- b using(id) 相当于 on b.id = emp.id

分段查询

Hash

字母异位词

排序每一个单词,就知道是不是异位词。

两数之和

从数组中,找到nums[i] + nums[j] == target,并返回{ i, j }
思路是双重循环,遍历每一个元素,求和是否为target。
然而,双重循环需要O(N2)O(N^2)的复杂度。因此,可以使用一张表,使用containsKey方法识别是否存在当前i的target - nums[i],即可减少一重循环。

关键思想

用Map高效率查找,减少一重循环。

最长连续序列

从乱序数组中,找到最长连续(数组中不一定连续)的序列。要求O(N)O(N)
首先用数组的值存入哈希表,然后遍历数组,判断map.constains(curNum++)
然而,即使这样还是效率不够高。

优化

  1. 中间值不进入循环,序列开始值才进入,使用!contains(curNum - 1)判断是否为序列开始值
  2. 去重,不要哈希表,不需要键值对,使用哈希Set,只存储值。

关键思想

去重;不处理中间值

阅读全文 »

NJU gdb六步走

  1. 启动gdb,加载可执行文件
  2. 设置断点 break main 入口处设置断点
  3. 启动程序 run (参数)
  4. 查看程序当然状态
    • info register (EIP): 显示所有寄存器(或只有EIP寄存器)的内容
    • 栈:保存过程执行时的数据信息
  5. 继续下一条指令
    • stepsi(机器指令)
  6. 退出 quit

1.0 Cprograming

原网址: https://www.cprogramming.com/gdb.html

gcc main.c -g -Wall -Werror -o main    启动编译

gdb main 开始debug
list 列出代码
break 行 设置断点
info break 断点信息
run 运行程序
next或step 进行单步编译(next跳过函数)
print <value> 打印变量的值
continue 跳到下一个断点位置
quit 退出
阅读全文 »

GMP

协程

协程是用户态的概念。多个协程实际上映射为1个线程。

协程是用户态概念,因此创建、销毁、调度都在用户态完成,不需要切换内核态。
由于协程从属于同一个内核级线程,因此实际上无法并行;而一个协程的阻塞最终也会导致整个线程下的所有协程阻塞。

Goroutine

Go解耦了协程和线程的绑定关系,从而使线程变为一个中间层,协程可以灵活地映射到不同的线程上,相当于“虚拟线程”。

好处如下:

  • 可以利用多个线程,实现并行
  • 通过调度器,实现灵活的映射
  • 栈空间动态扩展(线程大小固定,会产生内存浪费)

GMP

Goroutine Machine Processor
GMP就是协程调度器。
GMP有一个全局队列存储Goroutine;不过实际上Processor都会优先在自己的本地队列调度Goroutine(没有则向全局队列获取),并映射Goroutine到Machine上执行。
如果全局队列没有Goroutine,那么会尝试获取就绪态(正在IO)的协程。
如果仍然失败,那么会从其他Processor中窃取一半的Goroutine,实现负载均衡。

全局队列是互斥的,获取Goroutine要防止获取多次。

type schedt struct {
...
lock mutex
runq gQueue
runqsize int32
}
阅读全文 »

Hello World

func main() {
h := server.Default()

h.GET("/hello", func(c context.Context, ctx *app.RequestContext) {
ctx.Data(consts.StatusOK, consts.MIMETextPlain, []byte("Hello World!"))
})

h.Spin()
}
$ go run main.go

IDL

Thrift

# echo.thrift
namespace go api

struct Request {
1: string message
}

struct Response {
1: string message
}

service Echo {
Response echo(1: Request req)
}

CloudweGo代码生成

go install github.com/cloudwego/thriftgo@latest

mkdir -p demo/demo_thrift
cd demo/demo_thrift
cwgo server --type RPC \
--module demo/demo_thrift \
--service demo_thrift \
--idl ../../echo.thrift

Protobuf

syntax = "proto3"

package pbapi;

option go_package = "/pbapi";

message Request {
string msg = 1;
}

message Response {
string msg = 1;
}

service EchoService {
rpc Echo (Request) returns (Response) {}
}

CloudweGo代码生成

mkdir -p demo/demo_proto
cd demo/demo_proto

cwgo server -I ../../idl
--type RPC \
--module demo/demo_proto \
--service demo_proto \
--idl ../../echo.thrift

MakeFile自动cwgo代码生成

.PHONY: gen-demo-proto
gen-demo-proto:
@cd demo/demo_proto && cwgo server -I ../../idl --type RPC --module demo/demo_proto --service demo_proto --idl ../../echo.thrift

Consul服务注册、发现

服务注册用于为服务集群提供统一接口,自动处理集群loadbalance和宕机

// r, err := consul.NewConsulRegister("localhost:8500")
r, err := consul.NewConsulRegister(conf.Getconf().Registry.RegistryAddress[0])
if err != nil {
log.Fatal(err)
}
opts = append(opts, server.WithRegistry(r))
version: '3'
services:
consul:
ports:
- 8500:8500

Gorm操作数据库

package model

import "gorm.io/gorm"

type User struct {
gorm.Model
Email string `gorm:"uniqueIndex;type:varchar(128) not null"`
Password string `gorm:"type:varchar(64) not null"`

}

新增页面

  • 路由
// main.go
func main() {
...
h.GET("/your-page", func(c context.Context, ctx *app.RequestContext) {
ctx.HTML(consts.StatusOK, "your-page.tmpl", utils.H("Title: Your Title"))
})
}
  • 模板
// your-page.tmpl
{{ define "your-page" }}
<div>
...
</div>
{{ end }}
  • Hertz生成IDL接口代码
syntax = "proto3"

package pbapi;

option go_package = "/pbapi";

message Request {
string msg = 1;
}

message Response {
string msg = 1;
}

service EchoService {
rpc Echo (Request) returns (Response) {}
}
0%