LangGraph

source venv/bin/activate
pip install langgraph python-dotenv openai

LangGraph Demo

1.0 State

State就是AI流转的全局变量

class InputState(TypedDict):
question: str
llm_answer: Optional[str] # None or str

class OutputState(TypedDict):
answer: str

class OverallState(InputState, OutputState):
pass

2.0 Node

def llm_node(state: InputeState):
msg = [
("system", readMd("system.md")),
("human", state["question"])
]

llm = ChatOpenAI(model="gpt-4o")

resp = llm.invoke(msg)
return {"answer": resp.content}

3.0 Graph Compile

builder = StateGraph(OverallState, input=InputState, output=OutputState)

builder.add_node("llm", llm_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)

graph = builder.compile()

Draw Graph

display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

Messages History

class State(TypedDict):
msgs: Annotated[list, operator.add]

MessageGraph

使用Reducer追加消息,但是可以对已有消息做更新、合并、删除操作(Context Engine)

class MessageGraph(StateGraph):
def __init__(self) -> None:
super().__init__(Annotated[list[AnyMessage], add_message])
builder = MessageGraph()
# ...
graph = builder.compile()

msgs2 = [HumanMessage(content="xxx", id=msg1.id)]
# ID相同,覆盖消息
add_messages(msgs1, msgs2)

Structured Output

class UserInfo(BaseModel):
name: str = Field(description="The name of the user")
# ...

# Runnable对象
structured_llm = llm.with_structured_output(UserInfo)

# UserInfo对象
resp = structured_llm.invoke(msg)

Tool Calling Agent

# Tool注解会拿到函数名、函数入参与函数注释
@tool
def your_tool(args):
"""Description"""

tools = [your_tool]
tool_node = ToolNode(tools)

ReAct Agent

# 默认使用Agent State
# 注意,这是一个已经compile的图
graph = create_react_agent(llm, tools=tools)

ReAct Graph

ReAct = 有条件调用 + 调用必返回

workflow = StateGraph(State)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_model)

workflow.add_edge(START, "agent")

workflow.add_conditional_edges(
"agent",
should_continue,
["tools", END],
)

# 双向连接
workflow.add_edge("tools", "agent")

app = workflow.compile()

should_continue

def should_continue(state: State):
messages = state["message"]
last_mesaage = message[-1]
if not last_message.tool_calls:
return END
else:
return "tools"

call_model

async def call_model(state: State, config: RunnableConfig):
msgs = state["messages"]
resp = await model.invoke(msgs, config)
return {"messages": resp}

Stream Output

def print_stream(stream):
for sub_stream in stream:
print(sub_stream)