代码拉取完成,页面将自动刷新
from fastapi import WebSocket,WebSocketDisconnect
import random
import os
import websockets.exceptions
import asyncio
class Manager(object):
def __init__(self,file_dict):
self.active_websocket = []
self.file_dict = file_dict
def search_file(self,file_name):
for pj_name,path in self.file_dict.items():
if file_name in os.listdir(path):
return os.path.join(path,file_name)
return None
async def connect(self,websocket:WebSocket):
await websocket.accept()
self.active_websocket.append(websocket)
async def disconnect(self,websocket:WebSocket):
self.active_websocket.remove(websocket)
async def send_personal_message(self,content:str,websocket:WebSocket):
await websocket.send_text(content)
async def broadcast(self,content:str):
for websocket in self.active_websocket:
await websocket.send_text(content)
async def read_suff_n_lines(self,f,n=5,file_size=None,size=1024):
multy = 1
pos = file_size - size*multy
while True:
f.seek(pos,0)
lines = f.readlines()
if len(lines)<5:
multy += 1
pos = file_size - size*multy
else:
break
return [line.strip() for line in lines][-5:]
async def ws_handler(self,websocket:WebSocket,file_name):
file_path = self.search_file(file_name)
print("file-path:",file_path)
if not file_path:
raise Exception("文件不存在")
f = open(file_path,encoding="utf-8")
file_size = f.seek(0, 2)
suff_lines = await self.read_suff_n_lines(f,5,file_size)
f.seek(0, 2)
await self.connect(websocket)
try:
for content in suff_lines:
await self.send_personal_message(content, websocket)
while True:
line = f.readline().strip()
if not line:
await asyncio.sleep(0.1)
content = line
await self.send_personal_message(content, websocket)
await asyncio.sleep(0.1)
except (WebSocketDisconnect,websockets.exceptions.ConnectionClosedError,websockets.exceptions.ConnectionClosedOK) as WebDis :
print("webDis",str(WebDis))
await self.disconnect(websocket)
finally:
f.close()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。