diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7f9dca5197f583249dccb142263e77b2abbf4589 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +dump/ \ No newline at end of file diff --git a/src/mcp_crawl/Crawldso.py b/src/mcp_crawl/Crawldso.py new file mode 100644 index 0000000000000000000000000000000000000000..bbc10e29b83c3d2a77cb0a24a776ec94a53f91fc --- /dev/null +++ b/src/mcp_crawl/Crawldso.py @@ -0,0 +1,548 @@ +import re +import requests +import subprocess +from bs4 import BeautifulSoup +import json +import os +import csv +from typing import Dict, List, Optional +import time +import pandas as pd +from urllib.parse import urljoin, urlparse +from dotenv import load_dotenv + +class McpSoFetcher: + """ + 从 https://mcp.so/servers 提取 MCP 服务器名称及跳转链接 + """ + + def __init__(self, timeout: int = 10, delay: float = 2.0, github_token: str = None): + """ + 初始化 + + Args: + timeout: 请求超时时间(默认: 10秒) + delay: 页面请求间隔(默认: 2秒,避免过快访问被限制) + github_token: GitHub 访问令牌(可选,用于私有仓库访问) + """ + self.base_url = "https://mcp.so/servers" + self.github_base_url = "https://api.github.com" + self.timeout = timeout + self.delay = delay + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Referer': 'https://mcp.so/', + 'DNT': '1', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' + } + self.github_token = github_token + + def _get_page_soup(self, url: str) -> Optional[BeautifulSoup]: + """ + 内部方法:获取页面HTML并解析为BeautifulSoup对象 + + Args: + url: 要请求的页面URL + + Returns: + 解析后的BeautifulSoup对象,失败返回None + """ + try: + response = requests.get( + url, + headers=self.headers, + timeout=self.timeout, + allow_redirects=True + ) + response.raise_for_status() + # 解析HTML + soup = BeautifulSoup(response.text, 'html.parser') + return soup + except requests.exceptions.RequestException as e: + print(f"请求页面失败 {url}: {str(e)}") + return None + + def extract_meta_description(self, html_content: str) -> Optional[str]: + """ + 提取 meta description 内容(支持多种格式) + + Args: + html_content: HTML源代码字符串 + + Returns: + description内容,如果未找到返回None + """ + # name在前,content在后 + pattern1 = r']*name=["\']description["\'][^>]*content=["\']([^"\']+)["\'][^>]*>' + match = re.search(pattern3, html_content, re.IGNORECASE | re.DOTALL) + if match: + return match.group(1) + + # content在前的宽松匹配 + pattern4 = r']*content=["\']([^"\']+)["\'][^>]*name=["\']description["\'][^>]*>' + match = re.search(pattern4, html_content, re.IGNORECASE | re.DOTALL) + if match: + return match.group(1) + + return None + + + def extract_categories(self, html_content: str) -> List[str]: + """ + 提取所有 category 链接 + + Args: + html_content: HTML源代码字符串 + + Returns: + category列表(已去重) + """ + pattern = r'href=["\']\/category\/([^"\'\/]+)["\']' + matches = re.findall(pattern, html_content, re.IGNORECASE) + + categories = [] + seen = set() + for cat in matches: + if cat not in seen: + categories.append(cat) + seen.add(cat) + + return categories + + def _extract_page_servers(self, soup: BeautifulSoup) -> List[Dict[str, str]]: + """ + 内部方法:从单页HTML中提取服务器信息 + + Args: + soup: 解析后的BeautifulSoup对象 + + Returns: + 服务器信息列表,每个元素包含name和url + """ + servers = [] + server_cards = soup.select('a[href*="/server/"]') + + for card in server_cards: + name_elem = card.find('h3', class_='font-semibold') + if not name_elem: + continue + + server_name = name_elem.get_text(strip=True) + relative_url = card.get('href', '').strip() + if not relative_url: + continue + + absolute_url = urljoin(self.base_url, relative_url) + print(f"提取到服务器名称:{server_name}") + soup = self._get_page_soup(absolute_url) + if not soup: + print("未提取到服务器页面") + continue + git_url = self._extract_github_url(soup, absolute_url) + print(f"提取到github网址:{git_url}") + if not git_url: + print("未提取到github网址") + continue + + #提取mcp type name + mcp_type = self.extract_categories(str(soup)) + + #提取description + mcp_desc = self.extract_meta_description(str(soup)) + + repo_info = fetcher.get_source_from_server(git_url) + if not repo_info: + print(f"未获取到仓库repo信息:{git_url}") + continue + + if not any(s['url'] == absolute_url for s in servers) and git_url: + servers.append({ + 'name': server_name, + 'url': absolute_url, + 'github_url': git_url, + 'language': repo_info.get('github_repo',{}).get('language',""), + "type": mcp_type, + "description": mcp_desc, + "star": repo_info.get('github_repo',{}).get('stargazers_count',-1), + }) + + return servers + + def _get_total_pages(self, soup: BeautifulSoup) -> int: + """ + 内部方法:从首页获取总页数 + + Args: + soup: 首页解析后的BeautifulSoup对象 + + Returns: + 总页数,失败返回1(默认只爬取第一页) + """ + try: + # 找到分页控件中的最后一页按钮 + pagination = soup.select('nav[aria-label="pagination"] ul li') + if not pagination: + return 1 + + # 提取最后一个页码(排除"Next"按钮) + page_numbers = [] + for li in pagination: + a_tag = li.find('a') + if a_tag and a_tag.get_text(strip=True).isdigit(): + page_numbers.append(int(a_tag.get_text(strip=True))) + + return max(page_numbers) if page_numbers else 1 + except Exception as e: + print(f"获取总页数失败: {str(e)}") + return 1 + + def _extract_github_url(self,soup, detail_url): + try: + github_link = soup.find( + 'a', + href=lambda h: h and + 'github.com' in h and # 确保是 GitHub 链接 + '/issues' not in h # 排除 issues 链接 + ) + if github_link: + github_url = github_link.get('href', '').strip() + return urljoin(detail_url, github_url) if github_url else "" + return "" + except Exception as e: + print(f"提取GitHub链接失败 {detail_url}: {str(e)}") + return "" + + def fetch_all_servers(self, current_page: Optional[int]=1, csv_path: str = None) -> List[Dict[str, str]]: + """ + 爬取所有页面的MCP服务器信息 + + Args: + max_pages: 最大爬取页数(可选,默认爬取所有页面) + + Returns: + 所有服务器信息列表 + """ + all_servers = [] + + # 先获取首页,获取总页数 + print(f"正在爬取第 {current_page} 页...") + first_page_soup = self._get_page_soup(self.base_url) + if not first_page_soup: + print("首页爬取失败,终止任务") + return all_servers + + page_url = f"{self.base_url}?page={current_page}" + print(f"\n正在爬取第 {current_page} 页...") + + page_soup = self._get_page_soup(page_url) + if not page_soup: + print(f"第 {current_page} 页爬取失败,跳过") + return "" + + page_servers = self._extract_page_servers(page_soup) + + all_servers.extend(page_servers) + + print(f"第 {current_page} 页提取到 {len(page_servers)} 个服务器,累计 {len(all_servers)} 个") + + print(f"\n爬取完成!共获取到 {len(all_servers)} 个唯一的MCP服务器") + return all_servers + + def load_progress(self, progress_path: str) -> Dict: + """加载爬取进度(断点续爬核心)""" + if os.path.exists(progress_path): + with open(progress_path, 'r', encoding='utf-8') as f: + return json.load(f) + return {'current_page': 0, 'total_servers': 0} + + def save_progress(self, progress_path: str, current_page: int, total_servers: int) -> bool: + """保存爬取进度""" + try: + with open(progress_path, 'w', encoding='utf-8') as f: + json.dump({ + 'current_page': current_page, + 'total_servers': total_servers + }, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + print(f"保存进度失败:{e}") + return False + + def export_to_csv(self, servers: List[Dict[str, str]], csv_path: str) -> bool: + """ + 导出结果到CSV文件(可选格式) + + Args: + servers: 服务器信息列表 + csv_path: CSV保存路径(默认: ./mcp_servers.csv) + + Returns: + 导出成功返回True,失败返回False + """ + try: + import csv + + save_dir = os.path.dirname(csv_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir) + + with open(csv_path, 'a', encoding='utf-8', newline='') as f: + writer = csv.DictWriter(f, fieldnames=['name', 'url','github_url','language','type','description','star']) + writer.writeheader() + writer.writerows(servers) + + print(f"CSV文件已保存到:{os.path.abspath(csv_path)}") + return True + except Exception as e: + print(f"导出CSV失败: {str(e)}") + return False + + def load_existing_servers(self, csv_path: str) -> List[Dict]: + """加载已爬取的服务器数据(去重核心)""" + existing = [] + if os.path.exists(csv_path): + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + existing = [row for row in reader] + return existing + + def parse_github_url(self, url: str) -> tuple[str, str]: + """ + 解析 GitHub 仓库 URL,提取 owner 和 repo 名称 + Args: + url: GitHub 仓库 URL + + Returns: + (owner, repo) 元组 + """ + parts = url.split('/') + if len(parts) <2: + print(f"Invalid qualified_name format") + return None + return parts[-2], parts[-1] + + def get_github_repo_info(self, owner: str, repo: str) -> Dict: + """ + 获取 GitHub 仓库信息 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + + Returns: + 仓库信息字典 + """ + url = f"{self.github_base_url}/repos/{owner}/{repo}" + headers = {'Accept': 'application/vnd.github.v3+json'} + + if self.github_token: + headers['Authorization'] = f'token {self.github_token}' + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + except: + return None + return response.json() + + def get_repo_contents(self, owner: str, repo: str, path: str = "") -> List[Dict]: + """ + 获取 GitHub 仓库的文件列表 + + Args: + owner: 仓库所有者 + repo: 仓库名称 + path: 路径(默认为根目录) + + Returns: + 文件/目录列表 + """ + url = f"{self.github_base_url}/repos/{owner}/{repo}/contents/{path}" + headers = {'Accept': 'application/vnd.github.v3+json'} + + if self.github_token: + headers['Authorization'] = f'token {self.github_token}' + + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() + + def github_web_to_api(self, github_url: str) -> str: + """ + 将 GitHub 仓库网页地址转换为对应的 API 地址 + + Args: + web_url: GitHub 仓库网页地址(如 https://github.com/owner/repo) + + Returns: + 对应的 API 地址(如 https://api.github.com/repos/owner/repo),转换失败返回 None + """ + # 解析 URL,获取路径部分 + clean_url = re.sub(r"/tree/.*|/blob/.*|/commit/.*", "", github_url) + + # 步骤2:提取用户名和仓库名 + match = re.match(r"https://github\.com/([^/]+)/([^/]+)", clean_url) + if not match: + # import ipdb; ipdb.set_trace() + raise ValueError(f"无效的GitHub URL:{github_url}") + owner,repo = match.groups() + return owner,repo + + def get_source_from_server(self, url: str) -> Dict: + """ + 完整流程:从 server 名称获取并下载源代码 + + Args: + url: server所在github仓库地址 + + Returns: + 包含 server 信息和下载状态的字典 + """ + return_res = self.github_web_to_api(url) + if not return_res: + print("未获取到确切的github仓库信息") + return None + + owner, repo = return_res + repo_info = self.get_github_repo_info(owner, repo) + + if not repo_info: + print("未获取到仓库repo信息") + return None + + result = { + 'owner': owner, + 'repo': repo, + 'github_repo': repo_info, + 'clone_url': repo_info.get('clone_url',""), + 'html_url': repo_info.get('html_url',"") + } + return result + + def clone_repo(self, clone_url: str, local_path: str) -> bool: + """ + 使用 git clone 下载仓库 + + Args: + clone_url: 仓库克隆地址(如 https://github.com/owner/repo.git) + local_path: 本地保存路径 + + Returns: + 克隆成功返回 True,失败返回 False + """ + os.makedirs(os.path.dirname(local_path) or '.', exist_ok=True) + if os.path.exists(local_path): + print(f"仓库已存在:{local_path},跳过克隆") + return True + + try: + if self.github_token and "github.com" in clone_url: + clone_url = clone_url.replace("https://", f"https://{self.github_token}@") + + result = subprocess.run( + ["git", "clone", "--progress","--depth","1",clone_url, local_path], + check=True, + capture_output=True, + text=True + ) + print(f"克隆成功:{clone_url} -> {local_path}") + return True + except subprocess.CalledProcessError as e: + print(f"克隆失败:{e.stderr}") + return False + + def deduplicate_servers(self, new_servers: List[Dict], existing_servers: List[Dict]) -> List[Dict]: + """基于url去重,只保留新数据""" + existing_urls = {s['url'] for s in existing_servers} + return [s for s in new_servers if s['url'] not in existing_urls] + +if __name__ == "__main__": + load_dotenv() + GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") + fetcher = McpSoFetcher( + timeout=15, + delay=2.5, + github_token = GITHUB_TOKEN + ) + save_path = f"./crawl_mcps/mcp_servers1.csv" + progress_path = f"./crawl_mcps/progress.json" + + progress = fetcher.load_progress(progress_path) + cnt = progress['current_page'] + total_servers = progress['total_servers'] + existing_servers = fetcher.load_existing_servers(save_path) + res = existing_servers + if not os.path.exists(save_path): + try: + while cnt <= 276: + print(f"=== 开始爬取 https://mcp.so/servers第{cnt}页===") + all_mcp_servers = fetcher.fetch_all_servers(current_page=cnt, csv_path=save_path) + + if all_mcp_servers: + new_servers = fetcher.deduplicate_servers(all_mcp_servers, existing_servers) + if new_servers: + res.extend(new_servers) + total_servers += len(new_servers) + is_append = os.path.exists(save_path) + fetcher.export_to_csv(new_servers, csv_path=save_path) + fetcher.save_progress(progress_path, cnt + 1, total_servers) + existing_servers = res + else: + print(f"第{cnt}页无新数据,跳过") + else: + print(f"第{cnt}页爬取失败,跳过") + + print("\n=== 本页完成 ===") + cnt += 1 + + except Exception as e: + print(f"爬取中断:{e}") + fetcher.save_progress(progress_path, cnt, total_servers) + fetcher.export_to_csv(res, csv_path=save_path) + + print(f"\n=== 全部完成!总计爬取 {total_servers} 条数据 ===") + + # 筛选developer_tools + server_csv = pd.read_csv(save_path) + server_csv = server_csv.fillna({"type": "", "language": "", "star": 0}) + server_csv1 = server_csv[(server_csv["type"]=="['developer-tools']") & (server_csv["language"]=="Python")] + server_csv1 = server_csv1.sort_values( + by="star", + ascending=False, + na_position="last" + ).reset_index(drop=True) + for github_url in server_csv1['github_url']: + if pd.isna(github_url) or github_url.strip() == "": + print("跳过空的 GitHub URL") + continue + try: + owner, repo = fetcher.github_web_to_api(github_url) + clone_url = f"https://github.com/{owner}/{repo}.git" + save_dir = "~/repos/" + + import os + os.makedirs(save_dir, exist_ok=True) + folder_path = os.path.join(save_dir, owner) + os.makedirs(folder_path, exist_ok=True) + + fetcher.clone_repo(clone_url, os.path.join(folder_path, repo)) + print(f"克隆成功:{owner}/{repo}") + except Exception as e: + print(f"处理 URL {github_url} 失败:{str(e)}") + continue + + \ No newline at end of file diff --git a/src/reporter/Reporter.py b/src/reporter/Reporter.py index 7613b33d77b5977092f6f18f0ec752754496968e..558f71136b21767f33ecfe70574dca8e215ba598 100644 --- a/src/reporter/Reporter.py +++ b/src/reporter/Reporter.py @@ -323,4 +323,7 @@ class Reporter: f.write(f" 总用例数: {tool_stats['total_cases']}\n") f.write(f" 工具验证通过率: {tool_stats['tool_validation_pass_rate']:.2f}%\n") f.write(f" 评估验证通过率: {tool_stats['eval_validation_pass_rate']:.2f}%\n") - print(f"报告已成功写入文件: {logpath}") \ No newline at end of file + print(f"报告已成功写入文件: {logpath}") + + + \ No newline at end of file diff --git a/src/test_generator/TestGenerator.py b/src/test_generator/TestGenerator.py index 366785be52690187444c4b7a2bb670f5741d0049..a05534af9bd76f209950b33adb70b03e83d6a1cb 100644 --- a/src/test_generator/TestGenerator.py +++ b/src/test_generator/TestGenerator.py @@ -17,7 +17,7 @@ class TestGenerator: Generator for test cases using Large Language Model """ - def __init__(self, api_key: str = None, config_path: str = None): + def __init__(self, api_key: str = None, config_path: str = None, log_name: str = None): """ Create a new test generator @@ -28,10 +28,11 @@ class TestGenerator: self.config = Config_class.load_config(config_path) self.llm = LLMClient(api_key) self.readsc = ReadSourceCode(config_path) + self.log_name = log_name async def run(self): # load config - servers = [MCPClient(name, srv_config) for name, srv_config in self.config["mcpServers"].items()] + servers = [MCPClient(name, srv_config, env_script="",use_docker=True) for name, srv_config in self.config["mcpServers"].items()] tests_per_tool = self.config.get("numTestsPerTool",2) for server in servers: @@ -274,15 +275,22 @@ class TestGenerator: current_timestamp = datetime.datetime.utcnow().isoformat() safe_timestamp = current_timestamp.replace(":", "-").replace(".", "-") - folerpath = os.path.join(".logs",f'{server_name}_{safe_timestamp}') - if not os.path.exists(folerpath): - os.mkdir(folerpath) + folderpath = os.path.join(self.log_name,f'{server_name}_{safe_timestamp}') + if not os.path.exists(folderpath): + os.mkdir(folderpath) filename = f"testcases.json" - filepath = os.path.join(folerpath,filename) + filepath = os.path.join(folderpath,filename) with open(filepath, 'w', encoding='utf-8') as file: json.dump(testcases, file, ensure_ascii=False, indent=4) print(f"{server_name} test cases are successfully saved into {filepath}") + + txt_filename = "config_path.txt" + txt_filepath = os.path.join(folderpath, txt_filename) + with open(txt_filepath, 'w', encoding='utf-8') as txt_file: + txt_file.write(f"配置文件路径:{self.config_path}\n") + txt_file.write(f"记录生成时间(UTC):{current_timestamp}\n") + txt_file.write(f"对应服务名:{server_name}\n") return True except IOError as e: diff --git a/src/utils/read_source_code.py b/src/utils/read_source_code.py index 1be6775bd09eb4f34dde8202d7c8a085d57f1b14..30b8e3afa9ea11221466c983426bec6bb8eb3dd1 100644 --- a/src/utils/read_source_code.py +++ b/src/utils/read_source_code.py @@ -75,24 +75,31 @@ class ReadSourceCode: tree = ast.parse(source_code) for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): for decorator in node.decorator_list: # 处理两种装饰器形式:@mcp.tool() 或 @mcp.tool(name="xxx") is_mcp_tool = False if isinstance(decorator, ast.Call): if (isinstance(decorator.func, ast.Attribute) + and isinstance(decorator.func.value, ast.Name) and decorator.func.value.id == "mcp" - and decorator.func.attr == "tool"): + and decorator.func.attr == "tool"): # 只匹配 tool 装饰器 is_mcp_tool = True elif isinstance(decorator, ast.Attribute): - if decorator.value.id == "mcp" and decorator.attr == "tool": + if (isinstance(decorator.value, ast.Name) + and decorator.value.id == "mcp" + and decorator.func.attr == "tool"): is_mcp_tool = True - + if is_mcp_tool: - # 提取函数完整代码(包括装饰器、文档字符串和函数体) + # 提取函数代码(兼容异步函数) function_code = ast.get_source_segment(source_code, node) - if function_code: - tool_functions[node.name] = function_code.strip() + if not function_code: + # 备用方案:按行号截取 + lines = source_code.splitlines() + start_line = node.lineno - 1 + end_line = node.end_lineno - 1 if hasattr(node, 'end_lineno') else start_line + function_code = '\n'.join(lines[start_line:end_line+1]).strip() + tool_functions[node.name] = function_code.strip() break - return tool_functions