1 Star 0 Fork 0

yuxinvalo/drone_magnestic

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
udpserverframe.py 4.07 KB
一键复制 编辑 原始数据 按行查看 历史
Tearsyu 提交于 2021-06-08 10:03 . add explaination and rm unused code
import socket
from PyQt5 import QtGui
from PyQt5.QtCore import pyqtSignal, QThread
from PyQt5.QtWidgets import QTextBrowser, QVBoxLayout, QDialog, QPushButton
from server.udpserver import UDPServer
from tools import tools
from tools.droneparser import SINGLE_PACK_SIZE, DroneOutputDataParser
class Ui_UDPServer_Console(QDialog):
def __init__(self):
super(Ui_UDPServer_Console, self).__init__()
tools.basic_log_config()
self.browser = QTextBrowser()
self.startServerBtn = QPushButton()
self.startServerBtn.setText("START")
self.stopServerBtn = QPushButton()
self.stopServerBtn.setText("STOP")
self.stopServerBtn.setEnabled(False)
self.startServerBtn.clicked.connect(self.start_server)
self.stopServerBtn.clicked.connect(self.stop_server)
layout = QVBoxLayout() # 垂直盒式布局
layout.addWidget(self.browser)
layout.addWidget(self.startServerBtn)
layout.addWidget(self.stopServerBtn)
self.setLayout(layout)
self.browser.append("================================================")
self.setWindowTitle("UDP SERVER CONSOLE")
self.index = 0
self.is_server_runnable = False
self.gen_server_thread()
def gen_server_thread(self):
self.udp_thread = UDPServerThread()
res = self.udp_thread.init()
self.udp_thread.trigger.connect(self.show_data)
if res != 0:
self.browser.append("Server initialized failed!")
self.browser.append(str(res))
else:
self.is_server_runnable = True
def start_server(self):
if self.is_server_runnable:
self.udp_thread.udp_server.is_running = True
self.udp_thread.start()
self.startServerBtn.setEnabled(False)
self.stopServerBtn.setEnabled(True)
self.browser.append("Start Server!")
else:
self.browser.append("You can not start the server!")
def stop_server(self):
self.udp_thread.stop()
self.startServerBtn.setEnabled(True)
self.stopServerBtn.setEnabled(False)
self.browser.append("Stop Server!")
def show_data(self, parseStr):
self.browser.append("INDEX===>" + str(self.index))
self.browser.append(parseStr[0])
self.index = self.index + 1
if parseStr[0:2] == -1:
self.startServerBtn.setEnabled(True)
self.stopServerBtn.setEnabled(False)
def closeEvent(self, a0: QtGui.QCloseEvent):
if self.udp_thread:
self.udp_thread.udp_server.is_running = False
self.udp_thread.udp_server.server_socket.close()
class UDPServerThread(QThread):
trigger = pyqtSignal(list)
def __int__(self):
super(UDPServerThread, self).__init__()
def init(self):
self.udp_server = UDPServer()
self.parser = DroneOutputDataParser()
res = self.udp_server.generate_server()
if res != 0:
return res
else:
self.udp_server.is_running = True
return 0
def run(self):
# self.udp_server = UDPServer()
# self.parser = DroneOutputDataParser()
res_str = ""
while self.udp_server.is_running:
try:
receive_data, client = self.udp_server.server_socket.recvfrom(SINGLE_PACK_SIZE)
self.parser.parse_single_package(receive_data)
res_str = res_str + self.parser.single_string
self.trigger.emit([res_str])
res_str = ""
except socket.timeout:
# self.trigger.emit("-1, Timeout exception, the server will be stoped!")
# self.udp_server.is_running = False
continue
except Exception as e:
self.trigger.emit([res_str, e])
self.stop()
def stop(self):
self.udp_server.is_running = False
# self.udp_server.server_socket.close()
# if __name__ == '__main__':
# app = QApplication(sys.argv)
# form = Ui_UDPServer_Console()
# form.show()
# app.exec_()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/lainyu/drone_magnestic.git
git@gitee.com:lainyu/drone_magnestic.git
lainyu
drone_magnestic
drone_magnestic
main

搜索帮助