/root/.openclaw/workspace/tasks/task-t038p7-3c0199/task.md/root/.openclaw/workspace/tasks/task-t038p7-3c0199/session.md两件事合并执行:
1. 智能代理池:所有 channel 的 HTTP 请求支持代理池自动 fallback + 学习最优代理
2. 重叠路由重构:重叠源(GitHub/YouTube/Bilibili/Reddit)搜索走 SearXNG,Channel 专注 read
设计方案(必读):/root/.openclaw/workspace/tasks/T038-P7-proxy-pool-design.md
coder → reviewer → 爱衣质检
agent:coder:main)执行前必读:
1. /root/.openclaw/workspace/tasks/T038-P7-proxy-pool-design.md — 完整设计方案
2. /srv/projects/agent-reach/ai_search/channels/base.py — 当前基类
3. /srv/projects/agent-reach/ai_search/core.py — 当前路由逻辑
4. /srv/projects/agent-reach/ai_search/config.py — 当前配置
ai_search/proxy_pool.py(~200 行)实现 SmartProxyPool 类,核心功能:
@dataclass
class ProxyStat:
success: int = 0
fail: int = 0
total_latency_ms: float = 0.0
last_success_ts: float = 0.0
last_fail_ts: float = 0.0
class SmartProxyPool:
STATS_FILE = Path.home() / ".ai-search" / "proxy-stats.json"
DIRECT = "__direct__"
def __init__(self, proxies: List[str]): ...
def get_ranked(self, domain: str) -> List[str]: ...
def record(self, proxy: str, domain: str, success: bool, latency_ms: float = 0): ...
def summary(self) -> Dict: ...
def reset_domain(self, domain: str): ...
def _load(self) -> None: ...
def _save(self) -> None: ... # 防抖:30秒内最多写一次
# 单例
_pool: Optional[SmartProxyPool] = None
def init_pool(proxies: List[str]) -> SmartProxyPool: ...
def get_pool() -> Optional[SmartProxyPool]: ...
评分公式(get_ranked 内部):
score = success_rate × speed_factor × recency_bonus
# success_rate: success / attempts(未测试 = 0.5)
# speed_factor: min(1000 / avg_latency_ms, 2.0)(未成功 = 0.1)
# recency_bonus: 1.2 if 30分钟内成功, 0.8 if 30分钟内失败, 1.0 otherwise
# 完全未测试: exploration_score = 0.4
统计持久化到 ~/.ai-search/proxy-stats.json,_save() 实现防抖(距上次写入 < 30s 则跳过,除非是 reset_domain)。
ai_search/channels/base.py — 添加 fetch 方法在 Channel 基类添加:
class Channel:
PROXY_MODE = "fallback" # "fallback" | "always" | "never"
async def fetch(self, url: str, method: str = "GET",
headers: dict = None, timeout: float = 10,
**kwargs) -> httpx.Response:
"""统一 HTTP 请求,自动使用智能代理池。"""
from ai_search.proxy_pool import get_pool
pool = get_pool()
domain = urlparse(url).netloc
if self.PROXY_MODE == "never" or pool is None:
async with httpx.AsyncClient(timeout=timeout, headers=headers) as c:
return await c.request(method, url, **kwargs)
ranked = pool.get_ranked(domain)
if self.PROXY_MODE == "always":
ranked = [p for p in ranked if p != pool.DIRECT]
last_error = None
for proxy in ranked[:4]: # 最多试 4 个
proxy_url = None if proxy == pool.DIRECT else proxy
t0 = time.time()
try:
async with httpx.AsyncClient(
proxy=proxy_url, timeout=timeout, headers=headers
) as c:
resp = await c.request(method, url, **kwargs)
latency = (time.time() - t0) * 1000
if resp.status_code in (403, 429, 503):
pool.record(proxy, domain, False, latency)
last_error = f"HTTP {resp.status_code}"
continue
pool.record(proxy, domain, True, latency)
return resp
except (httpx.TimeoutException, httpx.ProxyError,
httpx.ConnectError, OSError) as e:
latency = (time.time() - t0) * 1000
pool.record(proxy, domain, False, latency)
last_error = str(e)
continue
raise ConnectionError(f"All proxies failed for {domain}: {last_error}")
注意:不要改 base.py 的现有方法签名,只新增。如果 base.py 里有 ReadResult/SearchResult 等类,保持不变。
self.fetch()设置 PROXY_MODE(在各 channel 类定义头部):
| Channel | PROXY_MODE |
|---|---|
| searxng | never |
| exa_search | never |
| xiaohongshu | never |
| bosszhipin | never |
always |
|
| bilibili | always |
| web | fallback |
| github | fallback |
| youtube | fallback |
fallback |
|
fallback |
|
fallback |
|
| rss | fallback |
替换 HTTP 调用:
将各 channel 中的 httpx.get() / httpx.AsyncClient() / requests.get() 替换为 self.fetch()。
具体步骤:
1. 先 grep -rn "httpx\.\|requests\." ai_search/channels/ 找到所有 HTTP 调用
2. 逐个替换为 await self.fetch(url, headers=..., timeout=...)
3. 对于 reddit.py 和 bilibili.py:删除原有的 config.get("reddit_proxy") / config.get("bilibili_proxy") 逻辑,全部由 self.fetch() 的代理池处理
4. 确保不破坏非 HTTP 的功能(如 subprocess 调用 gh/bird/yt-dlp)
⚠️ 重要:有些 channel 用 subprocess 调 CLI 工具(gh, bird, yt-dlp, mcporter),这些不走 self.fetch(),保持原样。只替换 Python HTTP 库调用。
config.py — 添加代理池配置# Config 类中添加
PROXY_POOL_DEFAULT = [
"socks5h://127.0.0.1:50002",
"socks5h://127.0.0.1:50004",
"socks5h://127.0.0.1:50005",
"socks5h://127.0.0.1:50006",
"socks5h://127.0.0.1:50007",
"socks5h://127.0.0.1:50008",
"socks5h://127.0.0.1:50009",
"socks5h://127.0.0.1:50010",
"socks5h://127.0.0.1:50013",
]
def get_proxy_pool(self) -> List[str]:
return self.get("proxy_pool", self.PROXY_POOL_DEFAULT)
core.py 的 AISearch.__init__ 中class AISearch:
def __init__(self):
self.config = Config()
# 初始化代理池
from ai_search.proxy_pool import init_pool
proxies = self.config.get_proxy_pool()
if proxies:
init_pool(proxies)
proxy-status 和 proxy-reset在 cli.py 添加两个命令:
# proxy-status:展示每个域名的最优代理和统计
async def _cmd_proxy_status(args):
s = AISearch()
from ai_search.proxy_pool import get_pool
pool = get_pool()
if not pool:
print("代理池未配置")
return
summary = pool.summary()
if not summary:
print("暂无统计数据,使用 ai-search 后会自动学习")
return
print(f"{'域名':<30} {'最优代理':<25} {'成功率':>6} {'延迟':>8}")
print("-" * 72)
for domain, info in sorted(summary.items()):
print(f"{domain:<30} {info['best_proxy']:<25} {info['success_rate']:>6} {info['avg_latency']:>8}")
# proxy-reset:重置学习数据
async def _cmd_proxy_reset(args):
s = AISearch()
from ai_search.proxy_pool import get_pool
pool = get_pool()
if args.domain:
pool.reset_domain(args.domain)
print(f"已重置 {args.domain} 的代理统计")
else:
pool.stats.clear()
pool._save()
print("已重置全部代理统计")
dependencies = [
...
"socksio>=1.0.0",
]
core.py 搜索路由将 4 个重叠源的搜索方法改为走 SearXNG,Channel search 作为 fallback:
async def search_github(self, query, language=None, limit=5):
"""GitHub 搜索 — 优先 SearXNG,fallback Channel"""
try:
results = await self.find(query, engines="github", limit=limit)
if results:
return results
except Exception:
pass
# Fallback: 原 channel 搜索
ch = get_channel("github")
return await ch.search(query, config=self.config, limit=limit)
async def search_youtube(self, query, limit=5):
"""YouTube 搜索 — 优先 SearXNG"""
try:
results = await self.find(query, engines="youtube", limit=limit)
if results:
return results
except Exception:
pass
ch = get_channel("youtube")
return await ch.search(query, config=self.config, limit=limit)
async def search_bilibili(self, query, limit=5):
"""Bilibili 搜索 — 优先 SearXNG"""
try:
results = await self.find(query, engines="bilibili", limit=limit)
if results:
return results
except Exception:
pass
ch = get_channel("bilibili")
return await ch.search(query, config=self.config, limit=limit)
async def search_reddit(self, query, subreddit=None, limit=10):
"""Reddit 搜索 — SearXNG google + site:reddit.com"""
site_query = f"{query} site:reddit.com"
if subreddit:
site_query = f"{query} site:reddit.com/r/{subreddit}"
try:
results = await self.find(site_query, engines="google", limit=limit)
if results:
return results
except Exception:
pass
# Fallback 到 exa(原逻辑)
ch = get_channel("exa_search")
if ch:
return await ch.search(query, config=self.config, limit=limit)
return []
# GitHub — 应走 SearXNG
python3 -c "
import asyncio
from ai_search.core import AISearch
s = AISearch()
r = asyncio.run(s.search_github('pytorch'))
print(f'search_github: {len(r)} results')
if r: print(f' {r[0].get(\"title\",\"\")[:50]}')
"
# Reddit — 应走 SearXNG google + site:reddit.com
python3 -c "
import asyncio
from ai_search.core import AISearch
s = AISearch()
r = asyncio.run(s.search_reddit('python'))
print(f'search_reddit: {len(r)} results')
if r: print(f' {r[0].get(\"title\",\"\")[:50]}')
"
cd /srv/projects/agent-reach
pip install -e . --break-system-packages 2>&1 | tail -3
# 1. 代理池基础
ai-search version
ai-search proxy-status # 应显示"暂无统计"
# 2. Reddit read(应走代理池 always)
ai-search read "https://www.reddit.com/r/Python/top/.json?limit=2" 2>&1 | head -10
# 3. Bilibili read(应走代理池 always)
ai-search read "https://www.bilibili.com/video/BV1GJ411x7h7" 2>&1 | head -5
# 4. 代理池学习验证
ai-search proxy-status # 应显示 reddit.com 和 bilibili.com 的统计
# 5. 重叠路由
ai-search search-github "machine learning" -n 3
ai-search search-reddit "python" -n 3
# 6. 原有功能回归
ai-search web "test" -n 3
ai-search find "diabetes" --engine clinicaltrials -n 3
ai-search doctor
# 7. 跑已有测试
pytest tests/test_searxng.py -v --tb=short 2>&1 | tail -10
开始时:
/root/.openclaw/workspace/scripts/log-to-channel.sh coder receive "T038-P7 代理池+路由重构" task-t038p7-3c0199
完成后:
1. session.md 记录全部修改和验证
2. bash
/root/.openclaw/workspace/scripts/log-to-channel.sh coder handoff "T038-P7 代理池+路由重构" reviewer task-t038p7-3c0199
3. sessions_send 通知 reviewer(agent:reviewer:main,timeoutSeconds=0)
agent:reviewer:main)审查要点:
线程安全(单例)
base.py fetch 方法
PROXY_MODE 三种模式正确
各 Channel PROXY_MODE 正确性
bash
grep -rn "PROXY_MODE" /srv/projects/agent-reach/ai_search/channels/
重叠路由验证
bash
# search_github 应走 SearXNG
ai-search search-github "pytorch" -n 3
# search_reddit 应走 google + site:reddit.com
ai-search search-reddit "python" -n 3
代理池功能验证
bash
# Reddit read 应通过代理成功
ai-search read "https://www.reddit.com/r/Python/top/.json?limit=2"
# 查看学习结果
ai-search proxy-status
回归测试
bash
pytest tests/test_searxng.py -v --tb=short
无残留旧代理逻辑
bash
grep -rn "reddit_proxy\|bilibili_proxy" /srv/projects/agent-reach/ai_search/
# 应该为空(旧的单一代理逻辑已删)
开始时/完成后:标准 log-to-channel + sessions_send 流程。
ai-search proxy-status 有输出?