diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..7f9dca5197f583249dccb142263e77b2abbf4589
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+dump/
\ No newline at end of file
diff --git a/src/mcp_crawl/Crawldso.py b/src/mcp_crawl/Crawldso.py
new file mode 100644
index 0000000000000000000000000000000000000000..bbc10e29b83c3d2a77cb0a24a776ec94a53f91fc
--- /dev/null
+++ b/src/mcp_crawl/Crawldso.py
@@ -0,0 +1,548 @@
+import re
+import requests
+import subprocess
+from bs4 import BeautifulSoup
+import json
+import os
+import csv
+from typing import Dict, List, Optional
+import time
+import pandas as pd
+from urllib.parse import urljoin, urlparse
+from dotenv import load_dotenv
+
+class McpSoFetcher:
+ """
+ 从 https://mcp.so/servers 提取 MCP 服务器名称及跳转链接
+ """
+
+ def __init__(self, timeout: int = 10, delay: float = 2.0, github_token: str = None):
+ """
+ 初始化
+
+ Args:
+ timeout: 请求超时时间(默认: 10秒)
+ delay: 页面请求间隔(默认: 2秒,避免过快访问被限制)
+ github_token: GitHub 访问令牌(可选,用于私有仓库访问)
+ """
+ self.base_url = "https://mcp.so/servers"
+ self.github_base_url = "https://api.github.com"
+ self.timeout = timeout
+ self.delay = delay
+ self.headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
+ 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
+ 'Referer': 'https://mcp.so/',
+ 'DNT': '1',
+ 'Connection': 'keep-alive',
+ 'Upgrade-Insecure-Requests': '1'
+ }
+ self.github_token = github_token
+
+ def _get_page_soup(self, url: str) -> Optional[BeautifulSoup]:
+ """
+ 内部方法:获取页面HTML并解析为BeautifulSoup对象
+
+ Args:
+ url: 要请求的页面URL
+
+ Returns:
+ 解析后的BeautifulSoup对象,失败返回None
+ """
+ try:
+ response = requests.get(
+ url,
+ headers=self.headers,
+ timeout=self.timeout,
+ allow_redirects=True
+ )
+ response.raise_for_status()
+ # 解析HTML
+ soup = BeautifulSoup(response.text, 'html.parser')
+ return soup
+ except requests.exceptions.RequestException as e:
+ print(f"请求页面失败 {url}: {str(e)}")
+ return None
+
+ def extract_meta_description(self, html_content: str) -> Optional[str]:
+ """
+ 提取 meta description 内容(支持多种格式)
+
+ Args:
+ html_content: HTML源代码字符串
+
+ Returns:
+ description内容,如果未找到返回None
+ """
+ # name在前,content在后
+ pattern1 = r']*name=["\']description["\'][^>]*content=["\']([^"\']+)["\'][^>]*>'
+ match = re.search(pattern3, html_content, re.IGNORECASE | re.DOTALL)
+ if match:
+ return match.group(1)
+
+ # content在前的宽松匹配
+ pattern4 = r']*content=["\']([^"\']+)["\'][^>]*name=["\']description["\'][^>]*>'
+ match = re.search(pattern4, html_content, re.IGNORECASE | re.DOTALL)
+ if match:
+ return match.group(1)
+
+ return None
+
+
+ def extract_categories(self, html_content: str) -> List[str]:
+ """
+ 提取所有 category 链接
+
+ Args:
+ html_content: HTML源代码字符串
+
+ Returns:
+ category列表(已去重)
+ """
+ pattern = r'href=["\']\/category\/([^"\'\/]+)["\']'
+ matches = re.findall(pattern, html_content, re.IGNORECASE)
+
+ categories = []
+ seen = set()
+ for cat in matches:
+ if cat not in seen:
+ categories.append(cat)
+ seen.add(cat)
+
+ return categories
+
+ def _extract_page_servers(self, soup: BeautifulSoup) -> List[Dict[str, str]]:
+ """
+ 内部方法:从单页HTML中提取服务器信息
+
+ Args:
+ soup: 解析后的BeautifulSoup对象
+
+ Returns:
+ 服务器信息列表,每个元素包含name和url
+ """
+ servers = []
+ server_cards = soup.select('a[href*="/server/"]')
+
+ for card in server_cards:
+ name_elem = card.find('h3', class_='font-semibold')
+ if not name_elem:
+ continue
+
+ server_name = name_elem.get_text(strip=True)
+ relative_url = card.get('href', '').strip()
+ if not relative_url:
+ continue
+
+ absolute_url = urljoin(self.base_url, relative_url)
+ print(f"提取到服务器名称:{server_name}")
+ soup = self._get_page_soup(absolute_url)
+ if not soup:
+ print("未提取到服务器页面")
+ continue
+ git_url = self._extract_github_url(soup, absolute_url)
+ print(f"提取到github网址:{git_url}")
+ if not git_url:
+ print("未提取到github网址")
+ continue
+
+ #提取mcp type name
+ mcp_type = self.extract_categories(str(soup))
+
+ #提取description
+ mcp_desc = self.extract_meta_description(str(soup))
+
+ repo_info = fetcher.get_source_from_server(git_url)
+ if not repo_info:
+ print(f"未获取到仓库repo信息:{git_url}")
+ continue
+
+ if not any(s['url'] == absolute_url for s in servers) and git_url:
+ servers.append({
+ 'name': server_name,
+ 'url': absolute_url,
+ 'github_url': git_url,
+ 'language': repo_info.get('github_repo',{}).get('language',""),
+ "type": mcp_type,
+ "description": mcp_desc,
+ "star": repo_info.get('github_repo',{}).get('stargazers_count',-1),
+ })
+
+ return servers
+
+ def _get_total_pages(self, soup: BeautifulSoup) -> int:
+ """
+ 内部方法:从首页获取总页数
+
+ Args:
+ soup: 首页解析后的BeautifulSoup对象
+
+ Returns:
+ 总页数,失败返回1(默认只爬取第一页)
+ """
+ try:
+ # 找到分页控件中的最后一页按钮
+ pagination = soup.select('nav[aria-label="pagination"] ul li')
+ if not pagination:
+ return 1
+
+ # 提取最后一个页码(排除"Next"按钮)
+ page_numbers = []
+ for li in pagination:
+ a_tag = li.find('a')
+ if a_tag and a_tag.get_text(strip=True).isdigit():
+ page_numbers.append(int(a_tag.get_text(strip=True)))
+
+ return max(page_numbers) if page_numbers else 1
+ except Exception as e:
+ print(f"获取总页数失败: {str(e)}")
+ return 1
+
+ def _extract_github_url(self,soup, detail_url):
+ try:
+ github_link = soup.find(
+ 'a',
+ href=lambda h: h and
+ 'github.com' in h and # 确保是 GitHub 链接
+ '/issues' not in h # 排除 issues 链接
+ )
+ if github_link:
+ github_url = github_link.get('href', '').strip()
+ return urljoin(detail_url, github_url) if github_url else ""
+ return ""
+ except Exception as e:
+ print(f"提取GitHub链接失败 {detail_url}: {str(e)}")
+ return ""
+
+ def fetch_all_servers(self, current_page: Optional[int]=1, csv_path: str = None) -> List[Dict[str, str]]:
+ """
+ 爬取所有页面的MCP服务器信息
+
+ Args:
+ max_pages: 最大爬取页数(可选,默认爬取所有页面)
+
+ Returns:
+ 所有服务器信息列表
+ """
+ all_servers = []
+
+ # 先获取首页,获取总页数
+ print(f"正在爬取第 {current_page} 页...")
+ first_page_soup = self._get_page_soup(self.base_url)
+ if not first_page_soup:
+ print("首页爬取失败,终止任务")
+ return all_servers
+
+ page_url = f"{self.base_url}?page={current_page}"
+ print(f"\n正在爬取第 {current_page} 页...")
+
+ page_soup = self._get_page_soup(page_url)
+ if not page_soup:
+ print(f"第 {current_page} 页爬取失败,跳过")
+ return ""
+
+ page_servers = self._extract_page_servers(page_soup)
+
+ all_servers.extend(page_servers)
+
+ print(f"第 {current_page} 页提取到 {len(page_servers)} 个服务器,累计 {len(all_servers)} 个")
+
+ print(f"\n爬取完成!共获取到 {len(all_servers)} 个唯一的MCP服务器")
+ return all_servers
+
+ def load_progress(self, progress_path: str) -> Dict:
+ """加载爬取进度(断点续爬核心)"""
+ if os.path.exists(progress_path):
+ with open(progress_path, 'r', encoding='utf-8') as f:
+ return json.load(f)
+ return {'current_page': 0, 'total_servers': 0}
+
+ def save_progress(self, progress_path: str, current_page: int, total_servers: int) -> bool:
+ """保存爬取进度"""
+ try:
+ with open(progress_path, 'w', encoding='utf-8') as f:
+ json.dump({
+ 'current_page': current_page,
+ 'total_servers': total_servers
+ }, f, ensure_ascii=False, indent=2)
+ return True
+ except Exception as e:
+ print(f"保存进度失败:{e}")
+ return False
+
+ def export_to_csv(self, servers: List[Dict[str, str]], csv_path: str) -> bool:
+ """
+ 导出结果到CSV文件(可选格式)
+
+ Args:
+ servers: 服务器信息列表
+ csv_path: CSV保存路径(默认: ./mcp_servers.csv)
+
+ Returns:
+ 导出成功返回True,失败返回False
+ """
+ try:
+ import csv
+
+ save_dir = os.path.dirname(csv_path)
+ if save_dir and not os.path.exists(save_dir):
+ os.makedirs(save_dir)
+
+ with open(csv_path, 'a', encoding='utf-8', newline='') as f:
+ writer = csv.DictWriter(f, fieldnames=['name', 'url','github_url','language','type','description','star'])
+ writer.writeheader()
+ writer.writerows(servers)
+
+ print(f"CSV文件已保存到:{os.path.abspath(csv_path)}")
+ return True
+ except Exception as e:
+ print(f"导出CSV失败: {str(e)}")
+ return False
+
+ def load_existing_servers(self, csv_path: str) -> List[Dict]:
+ """加载已爬取的服务器数据(去重核心)"""
+ existing = []
+ if os.path.exists(csv_path):
+ with open(csv_path, 'r', encoding='utf-8') as f:
+ reader = csv.DictReader(f)
+ existing = [row for row in reader]
+ return existing
+
+ def parse_github_url(self, url: str) -> tuple[str, str]:
+ """
+ 解析 GitHub 仓库 URL,提取 owner 和 repo 名称
+ Args:
+ url: GitHub 仓库 URL
+
+ Returns:
+ (owner, repo) 元组
+ """
+ parts = url.split('/')
+ if len(parts) <2:
+ print(f"Invalid qualified_name format")
+ return None
+ return parts[-2], parts[-1]
+
+ def get_github_repo_info(self, owner: str, repo: str) -> Dict:
+ """
+ 获取 GitHub 仓库信息
+
+ Args:
+ owner: 仓库所有者
+ repo: 仓库名称
+
+ Returns:
+ 仓库信息字典
+ """
+ url = f"{self.github_base_url}/repos/{owner}/{repo}"
+ headers = {'Accept': 'application/vnd.github.v3+json'}
+
+ if self.github_token:
+ headers['Authorization'] = f'token {self.github_token}'
+ try:
+ response = requests.get(url, headers=headers)
+ response.raise_for_status()
+ except:
+ return None
+ return response.json()
+
+ def get_repo_contents(self, owner: str, repo: str, path: str = "") -> List[Dict]:
+ """
+ 获取 GitHub 仓库的文件列表
+
+ Args:
+ owner: 仓库所有者
+ repo: 仓库名称
+ path: 路径(默认为根目录)
+
+ Returns:
+ 文件/目录列表
+ """
+ url = f"{self.github_base_url}/repos/{owner}/{repo}/contents/{path}"
+ headers = {'Accept': 'application/vnd.github.v3+json'}
+
+ if self.github_token:
+ headers['Authorization'] = f'token {self.github_token}'
+
+ response = requests.get(url, headers=headers)
+ response.raise_for_status()
+ return response.json()
+
+ def github_web_to_api(self, github_url: str) -> str:
+ """
+ 将 GitHub 仓库网页地址转换为对应的 API 地址
+
+ Args:
+ web_url: GitHub 仓库网页地址(如 https://github.com/owner/repo)
+
+ Returns:
+ 对应的 API 地址(如 https://api.github.com/repos/owner/repo),转换失败返回 None
+ """
+ # 解析 URL,获取路径部分
+ clean_url = re.sub(r"/tree/.*|/blob/.*|/commit/.*", "", github_url)
+
+ # 步骤2:提取用户名和仓库名
+ match = re.match(r"https://github\.com/([^/]+)/([^/]+)", clean_url)
+ if not match:
+ # import ipdb; ipdb.set_trace()
+ raise ValueError(f"无效的GitHub URL:{github_url}")
+ owner,repo = match.groups()
+ return owner,repo
+
+ def get_source_from_server(self, url: str) -> Dict:
+ """
+ 完整流程:从 server 名称获取并下载源代码
+
+ Args:
+ url: server所在github仓库地址
+
+ Returns:
+ 包含 server 信息和下载状态的字典
+ """
+ return_res = self.github_web_to_api(url)
+ if not return_res:
+ print("未获取到确切的github仓库信息")
+ return None
+
+ owner, repo = return_res
+ repo_info = self.get_github_repo_info(owner, repo)
+
+ if not repo_info:
+ print("未获取到仓库repo信息")
+ return None
+
+ result = {
+ 'owner': owner,
+ 'repo': repo,
+ 'github_repo': repo_info,
+ 'clone_url': repo_info.get('clone_url',""),
+ 'html_url': repo_info.get('html_url',"")
+ }
+ return result
+
+ def clone_repo(self, clone_url: str, local_path: str) -> bool:
+ """
+ 使用 git clone 下载仓库
+
+ Args:
+ clone_url: 仓库克隆地址(如 https://github.com/owner/repo.git)
+ local_path: 本地保存路径
+
+ Returns:
+ 克隆成功返回 True,失败返回 False
+ """
+ os.makedirs(os.path.dirname(local_path) or '.', exist_ok=True)
+ if os.path.exists(local_path):
+ print(f"仓库已存在:{local_path},跳过克隆")
+ return True
+
+ try:
+ if self.github_token and "github.com" in clone_url:
+ clone_url = clone_url.replace("https://", f"https://{self.github_token}@")
+
+ result = subprocess.run(
+ ["git", "clone", "--progress","--depth","1",clone_url, local_path],
+ check=True,
+ capture_output=True,
+ text=True
+ )
+ print(f"克隆成功:{clone_url} -> {local_path}")
+ return True
+ except subprocess.CalledProcessError as e:
+ print(f"克隆失败:{e.stderr}")
+ return False
+
+ def deduplicate_servers(self, new_servers: List[Dict], existing_servers: List[Dict]) -> List[Dict]:
+ """基于url去重,只保留新数据"""
+ existing_urls = {s['url'] for s in existing_servers}
+ return [s for s in new_servers if s['url'] not in existing_urls]
+
+if __name__ == "__main__":
+ load_dotenv()
+ GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
+ fetcher = McpSoFetcher(
+ timeout=15,
+ delay=2.5,
+ github_token = GITHUB_TOKEN
+ )
+ save_path = f"./crawl_mcps/mcp_servers1.csv"
+ progress_path = f"./crawl_mcps/progress.json"
+
+ progress = fetcher.load_progress(progress_path)
+ cnt = progress['current_page']
+ total_servers = progress['total_servers']
+ existing_servers = fetcher.load_existing_servers(save_path)
+ res = existing_servers
+ if not os.path.exists(save_path):
+ try:
+ while cnt <= 276:
+ print(f"=== 开始爬取 https://mcp.so/servers第{cnt}页===")
+ all_mcp_servers = fetcher.fetch_all_servers(current_page=cnt, csv_path=save_path)
+
+ if all_mcp_servers:
+ new_servers = fetcher.deduplicate_servers(all_mcp_servers, existing_servers)
+ if new_servers:
+ res.extend(new_servers)
+ total_servers += len(new_servers)
+ is_append = os.path.exists(save_path)
+ fetcher.export_to_csv(new_servers, csv_path=save_path)
+ fetcher.save_progress(progress_path, cnt + 1, total_servers)
+ existing_servers = res
+ else:
+ print(f"第{cnt}页无新数据,跳过")
+ else:
+ print(f"第{cnt}页爬取失败,跳过")
+
+ print("\n=== 本页完成 ===")
+ cnt += 1
+
+ except Exception as e:
+ print(f"爬取中断:{e}")
+ fetcher.save_progress(progress_path, cnt, total_servers)
+ fetcher.export_to_csv(res, csv_path=save_path)
+
+ print(f"\n=== 全部完成!总计爬取 {total_servers} 条数据 ===")
+
+ # 筛选developer_tools
+ server_csv = pd.read_csv(save_path)
+ server_csv = server_csv.fillna({"type": "", "language": "", "star": 0})
+ server_csv1 = server_csv[(server_csv["type"]=="['developer-tools']") & (server_csv["language"]=="Python")]
+ server_csv1 = server_csv1.sort_values(
+ by="star",
+ ascending=False,
+ na_position="last"
+ ).reset_index(drop=True)
+ for github_url in server_csv1['github_url']:
+ if pd.isna(github_url) or github_url.strip() == "":
+ print("跳过空的 GitHub URL")
+ continue
+ try:
+ owner, repo = fetcher.github_web_to_api(github_url)
+ clone_url = f"https://github.com/{owner}/{repo}.git"
+ save_dir = "~/repos/"
+
+ import os
+ os.makedirs(save_dir, exist_ok=True)
+ folder_path = os.path.join(save_dir, owner)
+ os.makedirs(folder_path, exist_ok=True)
+
+ fetcher.clone_repo(clone_url, os.path.join(folder_path, repo))
+ print(f"克隆成功:{owner}/{repo}")
+ except Exception as e:
+ print(f"处理 URL {github_url} 失败:{str(e)}")
+ continue
+
+
\ No newline at end of file
diff --git a/src/reporter/Reporter.py b/src/reporter/Reporter.py
index 7613b33d77b5977092f6f18f0ec752754496968e..558f71136b21767f33ecfe70574dca8e215ba598 100644
--- a/src/reporter/Reporter.py
+++ b/src/reporter/Reporter.py
@@ -323,4 +323,7 @@ class Reporter:
f.write(f" 总用例数: {tool_stats['total_cases']}\n")
f.write(f" 工具验证通过率: {tool_stats['tool_validation_pass_rate']:.2f}%\n")
f.write(f" 评估验证通过率: {tool_stats['eval_validation_pass_rate']:.2f}%\n")
- print(f"报告已成功写入文件: {logpath}")
\ No newline at end of file
+ print(f"报告已成功写入文件: {logpath}")
+
+
+
\ No newline at end of file
diff --git a/src/test_generator/TestGenerator.py b/src/test_generator/TestGenerator.py
index 366785be52690187444c4b7a2bb670f5741d0049..a05534af9bd76f209950b33adb70b03e83d6a1cb 100644
--- a/src/test_generator/TestGenerator.py
+++ b/src/test_generator/TestGenerator.py
@@ -17,7 +17,7 @@ class TestGenerator:
Generator for test cases using Large Language Model
"""
- def __init__(self, api_key: str = None, config_path: str = None):
+ def __init__(self, api_key: str = None, config_path: str = None, log_name: str = None):
"""
Create a new test generator
@@ -28,10 +28,11 @@ class TestGenerator:
self.config = Config_class.load_config(config_path)
self.llm = LLMClient(api_key)
self.readsc = ReadSourceCode(config_path)
+ self.log_name = log_name
async def run(self):
# load config
- servers = [MCPClient(name, srv_config) for name, srv_config in self.config["mcpServers"].items()]
+ servers = [MCPClient(name, srv_config, env_script="",use_docker=True) for name, srv_config in self.config["mcpServers"].items()]
tests_per_tool = self.config.get("numTestsPerTool",2)
for server in servers:
@@ -274,15 +275,22 @@ class TestGenerator:
current_timestamp = datetime.datetime.utcnow().isoformat()
safe_timestamp = current_timestamp.replace(":", "-").replace(".", "-")
- folerpath = os.path.join(".logs",f'{server_name}_{safe_timestamp}')
- if not os.path.exists(folerpath):
- os.mkdir(folerpath)
+ folderpath = os.path.join(self.log_name,f'{server_name}_{safe_timestamp}')
+ if not os.path.exists(folderpath):
+ os.mkdir(folderpath)
filename = f"testcases.json"
- filepath = os.path.join(folerpath,filename)
+ filepath = os.path.join(folderpath,filename)
with open(filepath, 'w', encoding='utf-8') as file:
json.dump(testcases, file, ensure_ascii=False, indent=4)
print(f"{server_name} test cases are successfully saved into {filepath}")
+
+ txt_filename = "config_path.txt"
+ txt_filepath = os.path.join(folderpath, txt_filename)
+ with open(txt_filepath, 'w', encoding='utf-8') as txt_file:
+ txt_file.write(f"配置文件路径:{self.config_path}\n")
+ txt_file.write(f"记录生成时间(UTC):{current_timestamp}\n")
+ txt_file.write(f"对应服务名:{server_name}\n")
return True
except IOError as e:
diff --git a/src/utils/read_source_code.py b/src/utils/read_source_code.py
index 1be6775bd09eb4f34dde8202d7c8a085d57f1b14..30b8e3afa9ea11221466c983426bec6bb8eb3dd1 100644
--- a/src/utils/read_source_code.py
+++ b/src/utils/read_source_code.py
@@ -75,24 +75,31 @@ class ReadSourceCode:
tree = ast.parse(source_code)
for node in ast.walk(tree):
- if isinstance(node, ast.FunctionDef):
+ if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for decorator in node.decorator_list:
# 处理两种装饰器形式:@mcp.tool() 或 @mcp.tool(name="xxx")
is_mcp_tool = False
if isinstance(decorator, ast.Call):
if (isinstance(decorator.func, ast.Attribute)
+ and isinstance(decorator.func.value, ast.Name)
and decorator.func.value.id == "mcp"
- and decorator.func.attr == "tool"):
+ and decorator.func.attr == "tool"): # 只匹配 tool 装饰器
is_mcp_tool = True
elif isinstance(decorator, ast.Attribute):
- if decorator.value.id == "mcp" and decorator.attr == "tool":
+ if (isinstance(decorator.value, ast.Name)
+ and decorator.value.id == "mcp"
+ and decorator.func.attr == "tool"):
is_mcp_tool = True
-
+
if is_mcp_tool:
- # 提取函数完整代码(包括装饰器、文档字符串和函数体)
+ # 提取函数代码(兼容异步函数)
function_code = ast.get_source_segment(source_code, node)
- if function_code:
- tool_functions[node.name] = function_code.strip()
+ if not function_code:
+ # 备用方案:按行号截取
+ lines = source_code.splitlines()
+ start_line = node.lineno - 1
+ end_line = node.end_lineno - 1 if hasattr(node, 'end_lineno') else start_line
+ function_code = '\n'.join(lines[start_line:end_line+1]).strip()
+ tool_functions[node.name] = function_code.strip()
break
-
return tool_functions