From 65d986cec38df5906e21c7002bc9359e9461f7c8 Mon Sep 17 00:00:00 2001 From: lnx Date: Wed, 2 Jul 2025 16:10:06 +0800 Subject: [PATCH] add tshark_mcp tool server --- servers/tshark_mcp/mcp_config.json | 20 ++++ servers/tshark_mcp/src/server.py | 147 +++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 servers/tshark_mcp/mcp_config.json create mode 100644 servers/tshark_mcp/src/server.py diff --git a/servers/tshark_mcp/mcp_config.json b/servers/tshark_mcp/mcp_config.json new file mode 100644 index 0000000..30284e9 --- /dev/null +++ b/servers/tshark_mcp/mcp_config.json @@ -0,0 +1,20 @@ +{ + "mcpServers": { + "tshark_mcp": { + "command": "python3", + "args": [ + "/opt/mcp-servers/servers/tshark_mcp/src/server.py" + ], + "alwaysAllow": [], + "disabled": false, + "timeout": 3600 + } + }, + "server_name": "tshark-mcp", + "log_level": "info", + "port": 8080, + "allowed_origins": [ + "http://localhost:3000", + "http://127.0.0.1:3000" + ] +} \ No newline at end of file diff --git a/servers/tshark_mcp/src/server.py b/servers/tshark_mcp/src/server.py new file mode 100644 index 0000000..2e411e6 --- /dev/null +++ b/servers/tshark_mcp/src/server.py @@ -0,0 +1,147 @@ +import subprocess +import json +import logging +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("TShark网络协议分析服务") + +@mcp.tool() +def tshark_list_interfaces() -> dict: + """列出可用网络接口""" + try: + result = subprocess.check_output(['tshark', '-D'], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "interfaces": result.splitlines()} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_capture(interface: str, timeout: int = 10) -> dict: + """从指定接口捕获流量""" + try: + result = subprocess.run(['tshark', '-i', interface], + capture_output=True, + text=True, + timeout=timeout) + if result.returncode == 0: + return {"status": "success", "output": result.stdout} + else: + return {"error": result.stderr} + except subprocess.TimeoutExpired: + return {"error": f"Capture timed out after {timeout} seconds"} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_write_file(filename: str) -> dict: + """将捕获数据写入文件""" + try: + subprocess.check_output(['tshark', '-w', filename], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "filename": filename} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_read_file(filename: str) -> dict: + """读取捕获文件""" + try: + result = subprocess.check_output(['tshark', '-r', filename], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_display_filter(filter_expr: str) -> dict: + """应用显示过滤器""" + try: + result = subprocess.check_output(['tshark', '-Y', filter_expr], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_field_extraction(field: str) -> dict: + """提取特定字段""" + try: + result = subprocess.check_output(['tshark', '-T', 'fields', '-e', field], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_packet_count(count: int, timeout: int = 30) -> dict: + """捕获指定数量的数据包""" + try: + result = subprocess.run(['tshark', '-c', str(count)], + capture_output=True, + text=True, + timeout=timeout) + if result.returncode == 0: + return {"status": "success", "output": result.stdout} + else: + return {"error": result.stderr} + except subprocess.TimeoutExpired: + return {"error": f"Packet capture timed out after {timeout} seconds"} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_statistics(stat_type: str) -> dict: + """生成统计信息""" + try: + result = subprocess.check_output(['tshark', '-z', stat_type], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_capture_filter(filter_expr: str) -> dict: + """应用捕获过滤器""" + try: + result = subprocess.check_output(['tshark', '-f', filter_expr], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def tshark_http_tree() -> dict: + """生成HTTP请求树""" + try: + result = subprocess.check_output(['tshark', '-q', '-z', 'http,tree'], + text=True, + stderr=subprocess.STDOUT) + return {"status": "success", "output": result} + except subprocess.CalledProcessError as e: + return {"error": str(e)} + except Exception as e: + return {"error": str(e)} + +if __name__ == "__main__": + mcp.run() \ No newline at end of file -- Gitee