一个探索LLMs Realtime Serarch写的chatAgents

Yuan.Sn

前端时间,有一个很火的 舆情分析Agent 。闲来无事,研究了 信息检索的源码 发现有个叫 Tavil 的 framework很有意思,萌生了给之前半吊子 chatBot 接入 Realtime Search的念头。

何为Tavily?

Tavil 是一个 ReadyToAgent 的 Serch API, 更够提供 Real-time 的 web access。当然,说成废话就是LLMs的搜索爬虫。

如何Tavily?

Tavily 有3个重要的工具 SearchExtractCrawl

  • Search 能够根据 Prompt 检索出的相关网页
  • Extract 能够检索并清洗具体的一个URL页面
  • Crawl 把 site 内的所有相关链接也同时检索并处理

这几个工具通过组合可以形成一个完整的 RAG Pipeline

app-blue-lg.4226db43
app-blue-lg.4226db43

Let's Build Agents!

万事具备, 那就把半吊子 chatBot 接入Internet吧!回顾一下chatBot,是用 Stremlit 构建前端,通过前端直接 格式调用 LLM 进行的chat。这样的模式耦合度太高了,因此为了接入Tavily,需要优化一下项目架构。

ChatAgent ‘s 架构

为了增加对多种 LLMs调用支持 及其 上下文/对话的持久化,引入了 LangGraph(https://www.langchain.com/langgraph ) 框架。同时,为了解耦前后端 添加了 FastApi进行通讯。

Untitled-2025-12-21-00381
Untitled-2025-12-21-00381

webAgent (agent.py) — ReAct 循环构建

整个 ChatAgent 调用的核心实现 就是这个 webAgent 类, 构建出了一个优雅的 ReAct (Reasoning and Acting) 循环

当获取到 Promp 后,调用这个 ReAct 循环,build_graph 可用让LLM根据 Promp 的类型 精准地进行 实时搜索 (Reasoning)。这个搜索的行动(Acting), 就用到了 Tavily的三个工具 Search、Extract 、Crawl。与此同时,还构建了一个 Summarizing 工具,在返回结果给 LLM 之前,通过 summary_llm 对网页内容进行清洗和总结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import logging
from typing import Callable, Any
from langchain_core.language_models import BaseChatModel
from langchain_tavily import TavilyCrawl, TavilyExtract, TavilySearch
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
import json
import ast

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_output_summarizer(summary_llm: BaseChatModel) -> Callable[[str, str], dict]:
"""
创建输出摘要器

Args:
summary_llm: 用于生成摘要的语言模型

Returns:
摘要函数
"""
def summarize_output(tool_output: str, user_message: str = "") -> dict:
"""
对工具输出进行摘要处理

Args:
tool_output: 工具原始输出
user_message: 用户消息(用于上下文)

Returns:
包含摘要、URL 和 favicon 的字典
"""
if not tool_output or tool_output.strip() == "":
return {"summary": tool_output, "urls": []}

# 尝试解析 JSON 格式的工具输出
try:
parsed_output = json.loads(tool_output)
except (json.JSONDecodeError, TypeError):
try:
parsed_output = ast.literal_eval(tool_output)
except (ValueError, SyntaxError):
return {"summary": tool_output, "urls": []}

# 提取 URL、favicon 和内容
urls = []
favicons = []
content = ""

if isinstance(parsed_output, dict) and 'results' in parsed_output:
items = parsed_output['results']
elif isinstance(parsed_output, list):
items = parsed_output
else:
return {"summary": tool_output, "urls": [], "favicons": []}

# 从结果中提取信息
for item in items:
if isinstance(item, dict):
if 'url' in item:
urls.append(item['url'])
if 'favicon' in item:
favicons.append(item['favicon'])
if 'raw_content' in item:
content += item['raw_content'] + "\n\n"

# 生成摘要
if content:
summary_prompt = f"""请将以下内容总结为相关格式,以帮助回答用户的问题。
重点关注对回答以下问题最有用的关键信息:{user_message}
删除冗余信息并突出最重要的发现。

内容:
{content[:3000]} # 限制内容长度

请提供一个清晰、有组织的摘要,捕捉与用户问题相关的基本信息:
"""
try:
summary = summary_llm.invoke(summary_prompt).content
except Exception as e:
logger.error(f"摘要生成失败: {e}")
summary = content[:500] # 回退到截断内容
else:
summary = tool_output

return {"summary": summary, "urls": urls, "favicons": favicons}

return summarize_output


class WebAgent:
"""
Web智能体 类,集成 Tavily 搜索、提取和爬取功能
"""

def __init__(self, checkpointer: MemorySaver = None):
"""
初始化 Web 智能体

Args:
checkpointer: LangGraph 检查点存储器(用于对话记忆)
"""
self.checkpointer = checkpointer

def build_graph(
self,
api_key: str,
llm: BaseChatModel,
prompt: str,
summary_llm: BaseChatModel,
user_message: str = "",
mode: str = "fast",
topic: str = "general",
time_range: str = None
):
"""
构建并编译 LangGraph 工作流

Args:
api_key: Tavily API 密钥
llm: 主要语言模型(用于智能体推理)
prompt: 系统提示词
summary_llm: 用于摘要的语言模型
user_message: 用户原始消息(用于摘要上下文)
mode: 搜索模式,"fast"(快速模式)或 "deep"(深度思考模式),默认为 "fast"
topic: 搜索主题,"general"(通用)、"news"(新闻)或 "finance"(财经),默认为 "general"
time_range: 时间范围过滤,可选 "day"、"week"、"month"、"year",默认不限制

Returns:
编译后的 LangGraph 智能体
"""
if not api_key:
raise ValueError("错误:未提供 Tavily API 密钥")

# 根据模式设置搜索参数
# fast: 快速模式,使用 basic 深度,3 条结果,速度快、成本低
# deep: 深度思考模式,使用 advanced 深度,5 条结果,包含图片,结果更全面但成本更高
depth = "basic" if mode == "fast" else "advanced"
max_results = 3 if mode == "fast" else 5
include_images = False if mode == "fast" else True
crawl_limit = 5 if mode == "fast" else 15

# 构建 TavilySearch 参数
search_params = {
"max_results": max_results,
"tavily_api_key": api_key,
"include_favicon": True,
"search_depth": depth,
"include_answer": False,
"topic": topic,
"include_images": include_images,
}

# 添加时间范围参数(如果指定)
if time_range:
search_params["time_range"] = time_range

# 创建 Tavily 工具
search = TavilySearch(**search_params)

extract = TavilyExtract(
extract_depth=depth,
tavily_api_key=api_key,
include_favicon=True,
include_images=include_images,
)

crawl = TavilyCrawl(
tavily_api_key=api_key,
include_favicon=True,
limit=crawl_limit
)

# 创建输出摘要器
output_summarizer = create_output_summarizer(summary_llm)

# 为 Extract 工具添加摘要功能
class SummarizingTavilyExtract(TavilyExtract):
def _run(self, *args, **kwargs):
kwargs.pop('run_manager', None)
result = super()._run(*args, **kwargs)
return output_summarizer(str(result), user_message)

async def _arun(self, *args, **kwargs):
kwargs.pop('run_manager', None)
result = await super()._arun(*args, **kwargs)
return output_summarizer(str(result), user_message)

# 为 Crawl 工具添加摘要功能
class SummarizingTavilyCrawl(TavilyCrawl):
def _run(self, *args, **kwargs):
kwargs.pop('run_manager', None)
result = super()._run(*args, **kwargs)
return output_summarizer(str(result), user_message)

async def _arun(self, *args, **kwargs):
kwargs.pop('run_manager', None)
result = await super()._arun(*args, **kwargs)
return output_summarizer(str(result), user_message)

# 创建带摘要的工具实例
extract_with_summary = SummarizingTavilyExtract(
extract_depth=extract.extract_depth,
tavily_api_key=api_key,
include_favicon=extract.include_favicon,
description=extract.description
)

crawl_with_summary = SummarizingTavilyCrawl(
tavily_api_key=api_key,
include_favicon=crawl.include_favicon,
limit=crawl.limit,
description=crawl.description
)

# 创建 ReAct 智能体
return create_react_agent(
prompt=prompt,
model=llm,
tools=[search, extract_with_summary, crawl_with_summary],
checkpointer=self.checkpointer,
)

FastApi (app.py) — 前后端的连接器

这个其实没有什么好讲的,就是给前端提供了一些Stream接口定义。需要注意的是,传输给前端的是流式响应格式 如果需要nginx转发需要配置配置 Upgrade 协议头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
FastAPI 后端服务器
提供智能体聊天流式接口
"""

import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from contextlib import asynccontextmanager

import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain.schema import HumanMessage
from langgraph.checkpoint.memory import MemorySaver

# 添加项目路径
sys.path.append(str(Path(__file__).parent))

# 导入后端模块
from backend.agent import WebAgent
from backend.prompts import REASONING_PROMPT, SIMPLE_PROMPT
from backend.utils import check_api_key
from backend.llm_config import LLMConfig, LLMProvider
from backend.session_manager import get_session_manager

# 加载环境变量
load_dotenv()

# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
...


# 创建 FastAPI 应用
app = FastAPI(
title="智能聊天机器人 API",
description="基于 LangGraph 和 Tavily 的智能体聊天接口",
version="1.0.0",
lifespan=lifespan
)

# 配置 CORS
app.add_middleware()


class AgentRequest(BaseModel):
"""智能体请求模型"""
input: str
thread_id: str
agent_type: str
llm_provider: str = LLMProvider.CLAUDE
llm_model: str = "sonnet"


@app.get("/")
async def root():
"""健康检查接口"""
...


@app.get("/health")
async def health():
"""健康检查接口(前端使用)"""
...


@app.post("/stream_agent")
async def stream_agent(body: AgentRequest, request: Request):
"""
流式智能体接口

Args:
body: 请求体(包含用户输入、会话 ID、智能体类型等)
request: FastAPI 请求对象

Returns:
StreamingResponse: 流式响应
"""
...


# ==================== 会话管理 API ====================

@app.get("/api/sessions")
async def get_sessions():
"""
获取所有会话列表

Returns:
会话列表(仅元数据)
"""
...


@app.get("/api/sessions/{session_id}")
async def get_session(session_id: str):
"""
获取指定会话的详细信息

Args:
session_id: 会话ID

Returns:
会话详情(包含完整消息列表)
"""
...


class CreateSessionRequest(BaseModel):
"""创建会话请求模型"""
session_id: str
title: str = None


@app.post("/api/sessions")
async def create_session(body: CreateSessionRequest):
"""
创建新会话

Args:
body: 包含 session_id 和可选的 title

Returns:
新创建的会话数据
"""
...


class RenameSessionRequest(BaseModel):
"""重命名会话请求模型"""
title: str


@app.put("/api/sessions/{session_id}")
async def rename_session(session_id: str, body: RenameSessionRequest):
"""
重命名会话

Args:
session_id: 会话ID
body: 包含新标题

Returns:
操作结果
"""
...


@app.delete("/api/sessions/{session_id}")
async def delete_session(session_id: str):
"""
删除会话

Args:
session_id: 会话ID

Returns:
操作结果
"""
...


if __name__ == "__main__":
# 启动服务器
port = int(os.getenv("PORT", 8080))
uvicorn.run(
app=app,
host="0.0.0.0",
port=port,
log_level="info"
)

streamlit_app.py — 前端显示

前端主要实现了一些功能选项的切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Streamlit 智能聊天机器人前端
集成 Tavily Web Agent 功能,支持深度思考模式
"""

import streamlit as st
import requests
import json
import datetime
import time
import uuid
import os
from typing import Dict, List, Optional
from dotenv import load_dotenv
import re

# 加载环境变量
load_dotenv()

# ==================== 页面配置 ====================
st.set_page_config(
page_title="Yuan's Chat Agents",
page_icon="./favicon.ico",
layout="wide",
initial_sidebar_state="expanded"
)

# ==================== 配置常量 ====================
BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8080")
MIN_TIME_BETWEEN_REQUESTS = datetime.timedelta(seconds=1)
HISTORY_LENGTH = 10 # 保留最近10条消息用于上下文

# ==================== 样式配置 ====================
st.markdown("""
<style>
.stButton>button {
width: 100%;
}
.tool-card {
padding: 10px;
border-radius: 5px;
margin: 5px 0;
background-color: #f0f2f6;
}
.tool-search {
border-left: 3px solid #4CAF50;
}
.tool-extract {
border-left: 3px solid #2196F3;
}
.tool-crawl {
border-left: 3px solid #FF9800;
}
</style>
""", unsafe_allow_html=True)


# ==================== 工具函数 ====================

def detect_api_key_type(api_key: str) -> str:
"""
自动识别 API 密钥类型

Returns:
"claude" | "openai" | "unknown"
"""
...


def get_default_model(provider: str) -> str:
"""根据提供商获取默认模型"""
...


def initialize_session():
"""初始化会话状态"""
...


def format_time(timestamp: datetime.datetime) -> str:
"""格式化时间戳"""
...


def check_backend_health() -> bool:
"""检查后端服务健康状态"""
...


# ==================== 会话管理功能 ====================

def load_sessions_list():
"""从后端加载会话列表"""
...


def load_session(session_id: str):
"""加载指定会话的详细信息"""
...


def switch_session(session_id: str):
"""切换到指定会话"""
...


def create_new_session():
"""创建新会话"""
...


def delete_session_ui(session_id: str):
"""删除会话"""
...


def rename_session_ui(session_id: str, new_title: str):
"""重命名会话"""
...


def stream_agent_response(user_input: str, config: Dict) -> tuple:
"""
调用后端智能体并流式接收响应

Args:
user_input: 用户输入
config: 配置字典(API 密钥、智能体类型等)

Returns:
(完整响应文本, 工具调用列表)
"""
...


def render_tool_call(tool_event: Dict):
"""渲染工具调用卡片"""
...


# ==================== 侧边栏配置 ====================

def render_sidebar():
"""渲染侧边栏"""
...


# ==================== 主应用 ====================

def main():
"""主应用程序"""
...


if __name__ == "__main__":
main()

Let's CHAT!!!

项目的源码开源到了 Github

Comments