任务 ID: task-t038p7-3c0199  |  文件: task.md  |  最后修改: 2026-03-01 21:26:52

Task task-t038p7-3c0199 — 智能代理池 + 重叠路由重构(T038-P7)

文件路径

原始需求

两件事合并执行:
1. 智能代理池:所有 channel 的 HTTP 请求支持代理池自动 fallback + 学习最优代理
2. 重叠路由重构:重叠源(GitHub/YouTube/Bilibili/Reddit)搜索走 SearXNG,Channel 专注 read

设计方案(必读)/root/.openclaw/workspace/tasks/T038-P7-proxy-pool-design.md


执行链

coder → reviewer → 爱衣质检


Ai.Dev 💻(session key: agent:coder:main

执行前必读
1. /root/.openclaw/workspace/tasks/T038-P7-proxy-pool-design.md — 完整设计方案
2. /srv/projects/agent-reach/ai_search/channels/base.py — 当前基类
3. /srv/projects/agent-reach/ai_search/core.py — 当前路由逻辑
4. /srv/projects/agent-reach/ai_search/config.py — 当前配置


Part A:智能代理池(6 步)

A1:新建 ai_search/proxy_pool.py(~200 行)

实现 SmartProxyPool 类,核心功能:

@dataclass
class ProxyStat:
    success: int = 0
    fail: int = 0
    total_latency_ms: float = 0.0
    last_success_ts: float = 0.0
    last_fail_ts: float = 0.0

class SmartProxyPool:
    STATS_FILE = Path.home() / ".ai-search" / "proxy-stats.json"
    DIRECT = "__direct__"

    def __init__(self, proxies: List[str]): ...
    def get_ranked(self, domain: str) -> List[str]: ...
    def record(self, proxy: str, domain: str, success: bool, latency_ms: float = 0): ...
    def summary(self) -> Dict: ...
    def reset_domain(self, domain: str): ...
    def _load(self) -> None: ...
    def _save(self) -> None: ...  # 防抖:30秒内最多写一次

# 单例
_pool: Optional[SmartProxyPool] = None

def init_pool(proxies: List[str]) -> SmartProxyPool: ...
def get_pool() -> Optional[SmartProxyPool]: ...

评分公式get_ranked 内部):

score = success_rate × speed_factor × recency_bonus
# success_rate: success / attempts(未测试 = 0.5)
# speed_factor: min(1000 / avg_latency_ms, 2.0)(未成功 = 0.1)
# recency_bonus: 1.2 if 30分钟内成功, 0.8 if 30分钟内失败, 1.0 otherwise
# 完全未测试: exploration_score = 0.4

统计持久化到 ~/.ai-search/proxy-stats.json_save() 实现防抖(距上次写入 < 30s 则跳过,除非是 reset_domain)。

A2:修改 ai_search/channels/base.py — 添加 fetch 方法

Channel 基类添加:

class Channel:
    PROXY_MODE = "fallback"  # "fallback" | "always" | "never"

    async def fetch(self, url: str, method: str = "GET",
                    headers: dict = None, timeout: float = 10,
                    **kwargs) -> httpx.Response:
        """统一 HTTP 请求,自动使用智能代理池。"""
        from ai_search.proxy_pool import get_pool
        pool = get_pool()
        domain = urlparse(url).netloc

        if self.PROXY_MODE == "never" or pool is None:
            async with httpx.AsyncClient(timeout=timeout, headers=headers) as c:
                return await c.request(method, url, **kwargs)

        ranked = pool.get_ranked(domain)
        if self.PROXY_MODE == "always":
            ranked = [p for p in ranked if p != pool.DIRECT]

        last_error = None
        for proxy in ranked[:4]:  # 最多试 4 个
            proxy_url = None if proxy == pool.DIRECT else proxy
            t0 = time.time()
            try:
                async with httpx.AsyncClient(
                    proxy=proxy_url, timeout=timeout, headers=headers
                ) as c:
                    resp = await c.request(method, url, **kwargs)
                latency = (time.time() - t0) * 1000
                if resp.status_code in (403, 429, 503):
                    pool.record(proxy, domain, False, latency)
                    last_error = f"HTTP {resp.status_code}"
                    continue
                pool.record(proxy, domain, True, latency)
                return resp
            except (httpx.TimeoutException, httpx.ProxyError,
                    httpx.ConnectError, OSError) as e:
                latency = (time.time() - t0) * 1000
                pool.record(proxy, domain, False, latency)
                last_error = str(e)
                continue

        raise ConnectionError(f"All proxies failed for {domain}: {last_error}")

注意:不要改 base.py 的现有方法签名,只新增。如果 base.py 里有 ReadResult/SearchResult 等类,保持不变。

A3:改造各 Channel — 设置 PROXY_MODE + 使用 self.fetch()

设置 PROXY_MODE(在各 channel 类定义头部):

Channel PROXY_MODE
searxng never
exa_search never
xiaohongshu never
bosszhipin never
reddit always
bilibili always
web fallback
github fallback
youtube fallback
twitter fallback
instagram fallback
linkedin fallback
rss fallback

替换 HTTP 调用

将各 channel 中的 httpx.get() / httpx.AsyncClient() / requests.get() 替换为 self.fetch()

具体步骤:
1. 先 grep -rn "httpx\.\|requests\." ai_search/channels/ 找到所有 HTTP 调用
2. 逐个替换为 await self.fetch(url, headers=..., timeout=...)
3. 对于 reddit.py 和 bilibili.py:删除原有的 config.get("reddit_proxy") / config.get("bilibili_proxy") 逻辑,全部由 self.fetch() 的代理池处理
4. 确保不破坏非 HTTP 的功能(如 subprocess 调用 gh/bird/yt-dlp

⚠️ 重要:有些 channel 用 subprocess 调 CLI 工具(gh, bird, yt-dlp, mcporter),这些不走 self.fetch(),保持原样。只替换 Python HTTP 库调用。

A4:修改 config.py — 添加代理池配置

# Config 类中添加
PROXY_POOL_DEFAULT = [
    "socks5h://127.0.0.1:50002",
    "socks5h://127.0.0.1:50004",
    "socks5h://127.0.0.1:50005",
    "socks5h://127.0.0.1:50006",
    "socks5h://127.0.0.1:50007",
    "socks5h://127.0.0.1:50008",
    "socks5h://127.0.0.1:50009",
    "socks5h://127.0.0.1:50010",
    "socks5h://127.0.0.1:50013",
]

def get_proxy_pool(self) -> List[str]:
    return self.get("proxy_pool", self.PROXY_POOL_DEFAULT)

A5:代理池初始化 — 在 core.pyAISearch.__init__

class AISearch:
    def __init__(self):
        self.config = Config()
        # 初始化代理池
        from ai_search.proxy_pool import init_pool
        proxies = self.config.get_proxy_pool()
        if proxies:
            init_pool(proxies)

A6:CLI 新增 proxy-statusproxy-reset

cli.py 添加两个命令:

# proxy-status:展示每个域名的最优代理和统计
async def _cmd_proxy_status(args):
    s = AISearch()
    from ai_search.proxy_pool import get_pool
    pool = get_pool()
    if not pool:
        print("代理池未配置")
        return
    summary = pool.summary()
    if not summary:
        print("暂无统计数据,使用 ai-search 后会自动学习")
        return
    print(f"{'域名':<30} {'最优代理':<25} {'成功率':>6} {'延迟':>8}")
    print("-" * 72)
    for domain, info in sorted(summary.items()):
        print(f"{domain:<30} {info['best_proxy']:<25} {info['success_rate']:>6} {info['avg_latency']:>8}")

# proxy-reset:重置学习数据
async def _cmd_proxy_reset(args):
    s = AISearch()
    from ai_search.proxy_pool import get_pool
    pool = get_pool()
    if args.domain:
        pool.reset_domain(args.domain)
        print(f"已重置 {args.domain} 的代理统计")
    else:
        pool.stats.clear()
        pool._save()
        print("已重置全部代理统计")

A7:pyproject.toml 添加 socksio 依赖

dependencies = [
    ...
    "socksio>=1.0.0",
]

Part B:重叠路由重构(2 步)

B1:修改 core.py 搜索路由

将 4 个重叠源的搜索方法改为走 SearXNG,Channel search 作为 fallback:

async def search_github(self, query, language=None, limit=5):
    """GitHub 搜索 — 优先 SearXNG,fallback Channel"""
    try:
        results = await self.find(query, engines="github", limit=limit)
        if results:
            return results
    except Exception:
        pass
    # Fallback: 原 channel 搜索
    ch = get_channel("github")
    return await ch.search(query, config=self.config, limit=limit)

async def search_youtube(self, query, limit=5):
    """YouTube 搜索 — 优先 SearXNG"""
    try:
        results = await self.find(query, engines="youtube", limit=limit)
        if results:
            return results
    except Exception:
        pass
    ch = get_channel("youtube")
    return await ch.search(query, config=self.config, limit=limit)

async def search_bilibili(self, query, limit=5):
    """Bilibili 搜索 — 优先 SearXNG"""
    try:
        results = await self.find(query, engines="bilibili", limit=limit)
        if results:
            return results
    except Exception:
        pass
    ch = get_channel("bilibili")
    return await ch.search(query, config=self.config, limit=limit)

async def search_reddit(self, query, subreddit=None, limit=10):
    """Reddit 搜索 — SearXNG google + site:reddit.com"""
    site_query = f"{query} site:reddit.com"
    if subreddit:
        site_query = f"{query} site:reddit.com/r/{subreddit}"
    try:
        results = await self.find(site_query, engines="google", limit=limit)
        if results:
            return results
    except Exception:
        pass
    # Fallback 到 exa(原逻辑)
    ch = get_channel("exa_search")
    if ch:
        return await ch.search(query, config=self.config, limit=limit)
    return []

B2:验证重叠路由

# GitHub — 应走 SearXNG
python3 -c "
import asyncio
from ai_search.core import AISearch
s = AISearch()
r = asyncio.run(s.search_github('pytorch'))
print(f'search_github: {len(r)} results')
if r: print(f'  {r[0].get(\"title\",\"\")[:50]}')
"

# Reddit — 应走 SearXNG google + site:reddit.com
python3 -c "
import asyncio
from ai_search.core import AISearch
s = AISearch()
r = asyncio.run(s.search_reddit('python'))
print(f'search_reddit: {len(r)} results')
if r: print(f'  {r[0].get(\"title\",\"\")[:50]}')
"

最终验证

cd /srv/projects/agent-reach
pip install -e . --break-system-packages 2>&1 | tail -3

# 1. 代理池基础
ai-search version
ai-search proxy-status  # 应显示"暂无统计"

# 2. Reddit read(应走代理池 always)
ai-search read "https://www.reddit.com/r/Python/top/.json?limit=2" 2>&1 | head -10

# 3. Bilibili read(应走代理池 always)
ai-search read "https://www.bilibili.com/video/BV1GJ411x7h7" 2>&1 | head -5

# 4. 代理池学习验证
ai-search proxy-status  # 应显示 reddit.com 和 bilibili.com 的统计

# 5. 重叠路由
ai-search search-github "machine learning" -n 3
ai-search search-reddit "python" -n 3

# 6. 原有功能回归
ai-search web "test" -n 3
ai-search find "diabetes" --engine clinicaltrials -n 3
ai-search doctor

# 7. 跑已有测试
pytest tests/test_searxng.py -v --tb=short 2>&1 | tail -10

开始时:

/root/.openclaw/workspace/scripts/log-to-channel.sh coder receive "T038-P7 代理池+路由重构" task-t038p7-3c0199

完成后:
1. session.md 记录全部修改和验证
2. bash /root/.openclaw/workspace/scripts/log-to-channel.sh coder handoff "T038-P7 代理池+路由重构" reviewer task-t038p7-3c0199
3. sessions_send 通知 reviewer(agent:reviewer:maintimeoutSeconds=0


Ai.Rev 📋(session key: agent:reviewer:main

审查要点

  1. proxy_pool.py 代码质量
  2. 评分公式是否合理
  3. 持久化防抖是否实现
  4. 线程安全(单例)

  5. base.py fetch 方法

  6. 错误处理是否完整(超时、代理错误、HTTP 状态码)
  7. 最多 4 次尝试
  8. PROXY_MODE 三种模式正确

  9. 各 Channel PROXY_MODE 正确性
    bash grep -rn "PROXY_MODE" /srv/projects/agent-reach/ai_search/channels/

  10. 重叠路由验证
    bash # search_github 应走 SearXNG ai-search search-github "pytorch" -n 3 # search_reddit 应走 google + site:reddit.com ai-search search-reddit "python" -n 3

  11. 代理池功能验证
    bash # Reddit read 应通过代理成功 ai-search read "https://www.reddit.com/r/Python/top/.json?limit=2" # 查看学习结果 ai-search proxy-status

  12. 回归测试
    bash pytest tests/test_searxng.py -v --tb=short

  13. 无残留旧代理逻辑
    bash grep -rn "reddit_proxy\|bilibili_proxy" /srv/projects/agent-reach/ai_search/ # 应该为空(旧的单一代理逻辑已删)

开始时/完成后:标准 log-to-channel + sessions_send 流程。


⚠️ 爱衣质检