# huiyi1 **Repository Path**: xfkcode/huiyi1 ## Basic Information - **Project Name**: huiyi1 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-05-22 - **Last Updated**: 2025-05-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 重点代码题 ```python ###########################################################5.4.3异步IO方式 ##服务器server.py import socket import select s = socket.socket() s.bind(('127.0.0.1',8000)) #服务器:http://127.0.0.1:8000 s.listen(5) r_list = [s,] num = 0 while True: rl, wl, error = select.select(r_list,[],[],10) #r_list赋值给rl num+=1 print('counts is %s'%num) #输出入num的值 print("rl=%s"%(rl)) #输出rl的值 ##客户端client.py import socket flag = 1 s = socket.socket() s.connect(('127.0.0.1',8000)) while flag: input_msg = input('input>>>') if input_msg == '0': break s.sendall(input_msg.encode()) msg = s.recv(1024) print(msg.decode()) s.close() ###########################################################5.5.1单线程服务器 import socket HOST='127.0.0.1' PORT=9999 sockaddr=(HOST,PORT) sk=socket.socket() sk.bind(sockaddr) sk.listen(5) while True: conn,address=sk.accept() while True: try: ret_bytes=conn.recv(1024) except Exception as ex: print("已从",address,"断开") break else: conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8')) print(ret_bytes) sk.close() ##编写一个访问服务器的客户端 import socket HOST='127.0.0.1' PORT=9999 sockaddr=(HOST,PORT) ct=socket.socket() ct.connect(sockaddr) while True: inp=input("请输入要发送的内容: ") ct.sendall(bytes(inp,encoding='utf-8')) ret_bytes=ct.recv(1024) print(str(ret_bytes,encoding='utf-8')) ct.close() ##5.5.2多线程与多进程 ##服务器示例代码 import socket import threading import time def tcplink(sock, addr): print('Accept new connection from %s:%s...' % addr) sock.send(b'Welcome!') while True: data = sock.recv(1024) time.sleep(1) if not data or data.decode('utf-8') == 'exit': break sock.send(('Hello, %s!' % data.decode('utf-8')).encode('utf-8')) sock.close() print('Connection from %s:%s closed.' % addr) #建立一个socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #监听9999端口 s.bind(('127.0.0.1', 9999)) #紧接着,调用listen()方法开始监听端口,传入的参数指定等待连接的最大数量 s.listen(5) print('Waiting for connection...') while True: # 接受一个新连接: 返回的sock用来通信,addr是客户机的地址 sock, addr = s.accept() # 创建新线程来处理TCP连接: t = threading.Thread(target=tcplink, args=(sock, addr)) t.start() ##客户端示例代码 import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立连接: s.connect(('127.0.0.1', 9999)) # 接收欢迎消息: print(s.recv(1024).decode('utf-8')) for data in [b'Michael', b'Tracy', b'Sarah']: # 发送数据: s.send(data) print(s.recv(1024).decode('utf-8')) s.send(b'exit') s.close() ``` ``` ## Pymysql ```python import pymysql db = pymysql.connect(host='localhost', port=3306, user='root', passwd='', db='test', charset='utf8') cursor = db.cursor() try: # cursor.execute('select * from users') 查看 # for row in cursor: # print(row) sql = "update employeetest set age = age +2 where ='%c'"%('M') # 改 cursor.execute(sql) # res = cursor.fetchall() 查询多条数据 单条用fetchone() db.commit() # cursor.execute("drop table if exists [表名]) except: db.rollback() cursor.close() db.close() ``` ## FTP ```python from ftplib import FTP ftp = FTP() timeout = 30 port = 21 ftp.connect(host='#', port=port, timeout=timeout) ftp.login('ftpuser', '123456') print() ftp.getwelcome() ftp.cwd('./test') list = ftp.nlst() for i in list: print(i) path = 'd:/FTP上传/test' + i f = open(path, 'wb') filename = 'RETR' + i # 保存 ftp.retrbinary(filename, f.write) # 删除 ftp.delete(i) # 上传 ftp.storbinary('STOR ' + filename, open(path, 'rb')) ftp.quit() ``` ## Telnet ```python import telnetlib Host = '192.168.0.159' username = 'admin' password = '' finish = ':~$ ' tn = telnetlib.Telnet(Host) tn.read_until(b"login: ") tn.write(username + '\n') tn.read_until(finish) tn.write(password) tn.write('ls\n') tn.read_until(finish) tn.close() ``` ## Pop3 ```python import poplib pop = poplib.POP3('pop.mozilla.com', 25) pop.getwelcome() pop.user("admin@pop3.com") pop.pass_("admin") ``` ## Smtp ```python import smtplib s = smtplib.SMTP('smtp.gmail.com', 587) s.login("admin@pop3.com", "admin") s.noop() ``` ## Server(服务器端) ```python import socket import threading # 创建一个服务器 socket 对象 a = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # 绑定服务器地址和端口 a.bind(('10.15.50.56', 5000)) # 设置服务器最大连接数 a.listen(5) # 用于存放客户端信息的字典,键为 fileno,值为客户端名字 mydict = dict() # 用于存放客户端 socket 对象的列表 mylist = [] # 将聊天消息发送给其他客户端 def chatMsgToOthers(no, chatMsg): for c in mylist: if c.fileno() != no: print(chatMsg) c.send(chatMsg.encode()) # 子线程处理函数,用于与客户端通信 def subThreadProgress(conn, no): # 接收客户端发送的名字信息 name = conn.recv(1024).decode() mydict[conn.fileno()] = name # 将客户端信息添加到字典中 mylist.append(conn) # 将客户端 socket 对象添加到列表中 print(no, '是', name) while True: # 接收客户端发送的消息 recvmsg = conn.recv(1024).decode() if recvmsg: # 将客户端发送的消息格式化并发送给其他客户端 chatMsgToOthers(no, mydict[no] + ':' + recvmsg) # 服务器主循环 while True: # 接受客户端连接请求 conn, address = a.accept() print('有一个新连接加入:', address) buf = conn.recv(1024).decode() if buf == '1': # 发送连接成功消息给客户端 conn.send('连接成功'.encode()) print('有新连接加入') # 创建子线程处理与客户端的通信 myThread = threading.Thread(target=subThreadProgress, args=(conn, conn.fileno())) myThread.setDaemon(True) # 将子线程设置为守护线程 myThread.start() # 启动子线程 ``` ## Client(客户端) ```python import socket import threading # 创建一个客户端 socket 对象 b = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # 连接到服务器的 IP 地址和端口号 b.connect(('10.15.50.56', 5000)) # 发送一个指示连接请求的消息 b.send(b'1') # 接收并打印从服务器返回的消息 print(b.recv(1024).decode()) # 输入并发送客户端的名字 name = input('请输入你的名字') b.send(name.encode()) # 定义发送消息的函数 def sendprogress(): while True: # 输入要发送的消息并发送给服务器 msg = input('me:') b.send(msg.encode()) # 定义接收消息的函数 def recvprogress(): while True: # 接收从服务器发送的消息 revmsg = b.recv(1024) # 如果接收到消息,则打印消息内容 if revmsg: print(revmsg.decode()) # 创建发送消息的线程 sendThread = threading.Thread(target=sendprogress) # 创建接收消息的线程 recvThread = threading.Thread(target=recvprogress) # 将发送和接收消息的线程放入一个列表 threads = [sendThread, recvThread] # 启动所有线程 for t in threads: t.setDaemon(True) # 将线程设置为守护线程,以便程序退出时自动关闭 t.start() # 等待所有线程结束 t.join() ``` ## doctest ==注意缩进== > ceshi.py ```python def lx(a, b): """ >>> lx(3,4) 12 """ return a * b if __name__ == '__main__': import doctest doctest.testmod(verbose=True) ``` > ceshi.txt ``` # 创建同名的txt文件 >>> from ceshi import lx >>> lx(3,4) 12 ``` 执行 python -m doctest -v ceshi.txt 命令 完 ## unittest ==注意==断言,unittest类 记忆 ``` import unittest class Count: def __init__(self, a, b): self.a = a self.b = b def add(self): return self.a + self.b class TestCount(unittest.TestCase): def setUp(self): print("开始") def test_add(self): z = Count(2, 3) self.assertEqual(z.add(), 5) def tearDown(self): print("结束") if __name__ == '__main__': s = unittest.TestSuite() s.addTest(TestCount("test_add")) runner = unittest.TextTestRunner() runner.run(s) ``` ## 已更http请求 ```python #########################################################5.1.2 cookies操作 ##1.从网页获取cookies import http.cookiejar import urllib.request cookie = http.cookiejar.LWPCookieJar() #创建CookieJar对象 #使用HTTPCookieProcessor创建cookie处理器 handler = urllib.request.HTTPCookieProcessor(cookie) openner = urllib.request.build_opener(handler) #创建openner对象 #向http://www.baidu.com发送请求, request = urllib.request.Request('http://www.baidu.com') response = openner.open(request) #openner对象使用open方法打开请求 for item in cookie: print(item.name+'='+item.value) ##2.将cookie以文本格式保存,然后读取 #cookie保存为文本格式 import urllib.request import http.cookiejar filename = 'cookies.txt' cookie = http.cookiejar.LWPCookieJar(filename) handler = urllib.request.HTTPCookieProcessor() openner = urllib.request.build_opener(handler) response = openner.open('http://www.baidu.com') cookie.save(ignore_discard=True,ignore_expires=True) #读取cookie文件 cookie = http.cookiejar.LWPCookieJar() cookie.load('cookies.txt',ignore_expires=True,ignore_discard=True) handler = urllib.request.HTTPCookieProcessor(cookie) openner = urllib.request.build_opener(handler) response = openner.open('http://www.baidu.com') print(response.read().decode('utf-8')) ###########################################################5.1.3 主机与编码 ##1.字符串的编码和解码 text = '天气真好!' bytetext = text.encode() #对text进行编码 print(bytetext) #打印编码后的值 print(bytetext.decode()) #打印解码后的值 ##对encode()和decode()函数可以将字符串按照指定编码方式进行编码,默认为Unicode编码方式 u = '字符串' #显示指定unicode类型对象u str = u.encode('gb2312') #以gb2312编码对unicode对象进行编码 str1 = u.encode('gbk') #以gbk编码对unicode对象进行编码 str2 = u.encode('utf-8') #以utf-8编码对unicode对象进行编码 u1 = str.decode('gb2312') #以gb2312编码对字符串str进行解码,以获取unicode u2 = str.decode('utf-8') #以utf-8的编码对str进行解码,无法还原原来的unicode类型 ##2.文件的编码和解码 f = open('test.txt','r') s = f.read() #读取文件内容,如果是不识别的encoding格式将读取失败 str = u.encode('utf-8') #以文件保存格式对内容进行解码,获得utf-8编码的字符串str u = s.decode('gb2312') #转换为unicode字符串 str1 = u.encode('gbk') #转换为gbk编码的字符串str1 str1 = u.encode('utf-16') #转换为utf-16编码的字符串str1 ##python给用户提供了一个包codecs进行文件的读取,这个包中的open()函数可以指定编码的类型 import codecs f = codecs.open('text.text','r+',encoding='utf-8')#文件编码使用utf-8,如果不是讲报错 content = f.read() f.write('你想要写入的信息') f.close() ##3.struct模块 import struct print(struct.pack('>i',4253)) #”i”表示4个字节存储数据。”>”表示高字节在前(“<”表示低自己在前) print(struct.unpack('>i',b'\x00\x00\x10\x9d')) ###########################################################5.2.2 BaseHTTPRequestHandler搭建服务器 from http.server import HTTPServer, BaseHTTPRequestHandler import json data = {'testweb': 'hello world!'} #网页界面显示信息 host = ('localhost', 8080) #设置服务器地址及端口号 class Resquest(BaseHTTPRequestHandler): #建立基类Resquest def do_GET(self): self.send_response(200) #发送响应200 self.send_header('Content-type', 'application/json') #发送头文件 self.end_headers() self.wfile.write(json.dumps(data).encode()) if __name__ == '__main__': server = HTTPServer(host, Resquest) print("Starting server, listen at: %s:%s" % host) server.serve_forever() ###########################################################5.3.1 get请求 ##1.使用http.client的get请求 import http.client #导入http.client con = http.client.HTTPConnection('www.baidu.com') #与百度建立连接 con.request("GET", "/index.html",'',{}) #使用get方法 resu = con.getresponse() print("resu.status=",resu.status) #打印读取到的数据 print (resu.read()) ##2.urllib的get请求 from urllib import request req = request.Request('http://www.baidu.com/') req.add_header('User-Agent','') with request.urlopen(req) as f: print('Status:', f.status, f.reason) for k, v in f.getheaders(): print('%s: %s' % (k, v)) ###########################################################5.3.2 post请求 import urllib.request import urllib.parse url = 'http://210.34.4.28/opac/search_adv_result.php' headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/55.0.2883.103 Safari/537.36', 'Connection':'keep-alive'} values = {'q0':'python网络编程', 'sType0':'any'} #检索关键字Python网络编程 data = urllib.parse.urlencode(values) #编码工作,字典转换为字符串的格式 full_url = url + '?' + data #用字符串想加的方式得到新的url request = urllib.request.Request(url=full_url, headers=headers) response = urllib.request.urlopen(request).read() html = response.decode('utf-8') print(html) ###########################################################5.3.3 requests模块 ##1.get请求 import requests r = requests.get("http://xxxxx?name=****") print(r.text) ##2.post请求 import requests postdata = { 'name':'****' } r = requests.post("http://xxxxx?name=****",data=postdata) print(r.text) ###########################################################5.4.1 使用Fork方式 import os fpid = os.fork() if (fpid==0): print("this is child fpid = {}".format(fpid)) else: print("this is parent fpid = {}".format(fpid)) ###########################################################5.4.2使用multiprocessing模块 # -*- coding: UTF-8 -*- import multiprocessing import os def run_proc(name): print('Child process {0} {1} Running '.format(name, os.getpid())) if __name__ == '__main__': print('Parent process {0} is Running'.format(os.getpid())) for i in range(5): #循环5次 #target是执行函数, args是函数参数 p = multiprocessing.Process(target=run_proc, args=(str(i),)) print('process start') p.start() p.join() print('Process close') ``` ## 数据库补充 DAO ODBC 方法 <不像重点考查> ```python ##########################################################通过import导入win32com.client #实例化数据库引擎 import win32com.client engine = win32com.client.Dispatch("DAO.DBEngine.35") #实例化数据库对象,建立对数据库的连接 db = engine.OpenDatabase(r"c:/temp/mydb.mdb") #在数据库中已经有一个表叫做 'customers',打开这个表,对其中数据进行处理 rs = db.OpenRecordset("customers") #采用DAO的execute方法 db.Execute("delete * from customers where balancetype = 'overdue' and name = 'bill'") ##########################################################通过import导入pypyodbc import pypyodbc #建立数据库连接 cnxn = pyodbc.connect('DRIVER={ODBC  Driver };SERVER=127.0.0.1;DATABASE=customers;UID=sa;PWD=123') cursor =cnxn.cursor() ##########################################################3.1.3 对具体的数据库使用特定的Python模块 #cx_Oracle模块连接代码: conn = cx_Oracle.connect('username/password@TNSname') #PyMySQLdb模块的连接代码,下面两种方式都可以: conn = MySQLdb.connect('host','username','password','database') conn = MySQLdb.connect(host="host",user="username",passwd="password",db="database") ``` ## 大概率不考 ###########################################################5.4.4使用asyncore和asyncio模块 ```python ## 1.asyncore模块 import asyncore class HTTPClient(asyncore.dispatcher): def __init__(self, host, path): asyncore.dispatcher.__init__(self) self.create_socket() self.connect( (host, 80) ) self.buffer = bytes('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (path, host), 'ascii') def handle_connect(self): pass def handle_close(self): self.close() def handle_read(self): print(self.recv(8192)) def writable(self): return (len(self.buffer) > 0) def handle_write(self): sent = self.send(self.buffer) self.buffer = self.buffer[sent:] client = HTTPClient('www.python.org', '/') asyncore.loop() ## 2.asyncio模块 import asyncio #把一个generator标记为coroutine类型,把这个coroutine放到EventLoop中执行 @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) #yield from语法可以方便地调用另一个generator(connect) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() #多个coroutine封装成一组Task然后并发执行 tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ###########################################################5.4.5WSGI协议 from wsgiref.util import setup_testing_defaults from wsgiref.simple_server import make_server #environ:一个包含所有HTTP请求信息的dict对象; #start_response:一个发送HTTP响应的函数 def simple_app(environ, start_response): setup_testing_defaults(environ) status = '200 OK' headers = [('Content-type', 'text/plain; charset=utf-8')] #发送了HTTP响应的status和Header start_response(status, headers) ret = [("%s: %s\n" % (key, value)).encode("utf-8") for key, value in environ.items()] # 返回值ret将作为HTTP响应的Body发送给浏览器 return ret with make_server('', 8000, simple_app) as httpd: print("Serving on port 8000...") httpd.serve_forever() ##server.py代码,实现WSGI服务器的功能 # server.py # 从导入wsgiref模块 from wsgiref.simple_server import make_server # 导入hello.py文件中的application函数: from hello import application # 创建一个服务器,IP地址为空,端口是8000,处理函数是application: httpd = make_server('', 8000, application) print('Serving HTTP on port 8000...') # 开始监听HTTP请求: httpd.serve_forever() 相同的文件夹下面编写hello.py代码 #environ:一个包含所有HTTP请求信息的dict对象; #start_response:一个发送HTTP响应的函数 def application(environ, start_response): start_response('200 OK', [('Content-Type', 'text/html')]) #发送了HTTP响应的status和Header return [b'

Hello, world!

'] # 返回值将作为HTTP响应的Body发送给浏览器 ```