# port_scanner **Repository Path**: sheng_2004/port_scanner ## Basic Information - **Project Name**: port_scanner - **Description**: 计算机网络实验 B3 简易的端口扫描器 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-05-25 - **Last Updated**: 2024-06-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: Python ## README # **简易端口扫描器** *网络编程是通过使用[套接字](http://baike.baidu.com/view/538713.htm)来达到[进程间通信](http://baike.baidu.com/view/1492468.htm)目的的编程,Socket编程是网络编程的主流工具,Socket API是实现进程间通信的一种编程设施,也是一种为进程间提供底层抽象的机制,提供了访问下层通信协议的大量系统调用和相应的数据结构。本实验利用Socket API编写网络通信程序。* ## **端口扫描器的实现** ### **PortScanner** 笔者将端口扫描器封装在port_scanner包post_scanner 模块的 PortScanner类中。在PortScanner类中,有如下函数 ```python __init__,check_ip,scan_ports,get_result ``` #### init ```python def __init__(self): # 正则表达式匹配IPv4地址 self.ipv4_regex = re.compile( r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$') # 正则表达式匹配IPv6地址 self.ipv6_regex = re.compile( r'(([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4})|(([0-9a-fA-F]{1,4}:){6}:[0-9a-fA-F]{1,4})|(([0-9a-fA-F]{1,4}:){5}(:[0-9a-fA-F]{1,4}){1,2})|(([0-9a-fA-F]{1,4}:){4}(:[0-9a-fA-F]{1,4}){1,3})|(([0-9a-fA-F]{1,4}:){3}(:[0-9a-fA-F]{1,4}){1,4})|(([0-9a-fA-F]{1,4}:){2}(:[0-9a-fA-F]{1,4}){1,5})|([0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6}))|(:((:[0-9a-fA-F]{1,4}){1,7}))') self.result_set = set() ``` 此方法为PortScanner类的初始化方法,定义了三个实例变量: 1. self.ipv4_regex:正则表达式对象,用于匹配IPv4地址。IPv4地址由四个0到255的数字组成,每两个数字之间用.分隔。 2. self.ipv6_regex:正则表达式对象,用于匹配IPv6地址。IPv6地址由八组四位十六进制数(每位可以是0-9或a-f)组成,每两组之间用:分隔。每组数可以省略前导零,且整个地址中可以有一处连续的零用::代替。 3. self.result_set:集合,用于存储扫描到的开放端口。 #### check_ip ```python def check_ip(self, ip): print(ip) if self.ipv4_regex.match(ip): print("IPv4") return "IPv4" elif self.ipv6_regex.match(ip): print("IPv6") return "IPv6" else: print("Invalid IP") return "Invalid IP" ``` 接收一个参数ip,并检查这个ip是否是有效的IPv4或IPv6地址。 #### scan_ports ```python # 参数: ip: 要扫描的IP地址 # start: 要扫描的端口范围的开始 # end: 要扫描的端口范围的结束 # protocol: 要使用的协议 # result_queue: 用于传递结果的队列 # counter: 用于计数已经扫描的端口数 进度条 def scan_ports(self, ip, start, end, protocol, counter): # 清空结果集 self.result_set.clear() ip_case = 0 match self.check_ip(ip): case "IPv4": ip_case = 1 case "IPv6": ip_case = 2 case "Invalid IP": raise ValueError("Invalid IP") threads = [] # 保存所有扫描线程的引用 print(f"扫描 {ip} 的 {protocol.upper()} 端口 {start} 到 {end} ...") for port in range(start, end + 1): t = threading.Thread(target=scan_port, args=(ip, ip_case, port, protocol, self.result_set, counter)) t.start() threads.append(t) # 等待所有扫描线程完成 for thread in threads: thread.join() ``` 1. 扫描指定IP地址的一系列端口。以下是每行代码的详细解释: 2. self.result_set.clear():清空结果集,为新的扫描准备。 3. ip_case = 0:定义一个变量ip_case,用于标记IP地址的类型(IPv4或IPv6)。 4. match self.check_ip(ip)::使用check_ip方法检查输入的IP地址的类型,并根据结果设置ip_case的值。如果IP地址是IPv4,ip_case被设置为1;如果是IPv6,ip_case被设置为2;如果IP地址无效,将抛出一个ValueError异常。 5. threads = []:定义一个列表,用于存储所有的扫描线程。 6. for port in range(start, end + 1)::对于指定范围内的每个端口,创建一个新的线程来进行扫描。线程的目标函数是scan_port,参数是IP地址、IP地址类型、端口号、协议类型、结果集和计数器。 7. t.start():启动线程。 8. threads.append(t):将线程添加到线程列表中。 9. for thread in threads::等待所有的扫描线程完成。这是通过调用每个线程的join方法实现的。 #### get_result 得到扫描的结果 ### **静态函数 scan_port** ```python def scan_port(ip, ip_case, port, protocol, result_set, counter): sock = None match ip_case: case 1: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM) case 2: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM) sock.settimeout(10) try: is_open = sock.connect_ex((ip, port)) == 0 lock.acquire() counter[0] += 1 lock.release() if is_open: result_set.add(port) except Exception as e: raise ValueError(f"错误扫描 {protocol.upper()} port {port}: {str(e)}\n") finally: sock.close() ``` 1. 函数接收六个参数:ip(要扫描的IP地址),ip_case(IP地址类型,1表示IPv4,2表示IPv6),port(要扫描的端口号),protocol(要使用的协议,TCP或UDP),result_set(用于存储开放端口的集合),counter(用于计数已经扫描的端口数的列表)。 2. 根据ip_case的值,创建一个新的套接字sock。如果ip_case为1,创建一个IPv4套接字;如果ip_case为2,创建一个IPv6套接字。套接字的类型(流套接字或数据报套接字)取决于protocol的值。 3. 设置套接字的超时时间为10秒。这意味着如果尝试连接到指定的IP地址和端口超过10秒还没有响应,那么connect_ex方法将返回一个错误。 4. 尝试连接到指定的IP地址和端口。如果连接成功,connect_ex方法将返回0,否则返回一个错误码。 5. 使用一个锁来保护计数器counter,防止在多线程环境中出现竞态条件。每当尝试连接一个新的端口时,计数器的值就会增加1。 6. 如果端口是开放的(即is_open为True),那么将端口号添加到结果集result_set中。 7. 如果在尝试连接端口的过程中发生任何异常,那么将抛出一个ValueError异常,并附带一条错误消息。 8. 最后,无论是否成功连接到端口或是否发生异常,都会关闭套接字sock。通过finally块实现。 ## **图形化界面的实现** 使用python的懒人库——streamlit,不再赘述。