代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
import sys
import json
import re
import os
import time
import hashlib
import queue
from PyQt5.QtWidgets import QMainWindow , QApplication
from PyQt5.QtGui import QIntValidator
from PyQt5.QtCore import QTimer, QThread, pyqtSignal
from ptool_ui import Ui_Dialog
from ptool_com import ComSetSwitch
## log thread, receive msg and update to log
class LogThread(QThread):
logReady = pyqtSignal(str)
def __init__(self, msgq):
super(LogThread, self).__init__()
self.running = True
self.msgq = msgq
def __del__(self):
self.running = False
self.wait()
def run(self):
while self.running == True:
try:
msg = self.msgq.get(block=True, timeout=1)
except:
continue
self.logReady.emit(msg)
class PtoolMainWindow(QMainWindow, Ui_Dialog):
## config item names
CFG_COM_PORT = 'comPort'
CFG_PRODUCT_CODE = 'productCode'
CFG_SHOW_MODEL = 'showModel'
CFG_HARDWARE_VERVERSION = 'hardwareVersion'
CFG_POE_POWER = 'poePower'
CFG_CLOULD_URL = 'cloudUrl'
CFG_OWNER_ID = 'ownerId'
CFG_SERIAL_NO = 'serialNo'
CFG_MAC_ADDRESS = 'macAddress'
CFG_MAC_INCREASE = 'macIncrease'
CFG_SN_INCREASE = 'snIncrease'
CFG_AUTH_MODEL = 'cloudModel'
CFG_ADMIN = 'admin'
## items tuple
CFG_ITMES = (CFG_COM_PORT, CFG_PRODUCT_CODE, CFG_SHOW_MODEL, \
CFG_HARDWARE_VERVERSION, CFG_POE_POWER, CFG_CLOULD_URL, CFG_OWNER_ID, CFG_SERIAL_NO, CFG_MAC_ADDRESS)
## status
STATE_PORT_CLOSED = 0
STATE_WAITING_DEVICE = 1
STATE_DEVICE_CONNECTING = 2
STATE_DEVICE_PROCESSING = 3
STATE_OPERATION_FAILED = 4
STATE_OPERATION_SUCCESS = 5
STATE_NAMES = {STATE_PORT_CLOSED:"串口未打开",\
STATE_WAITING_DEVICE:"等待设备连接", \
STATE_DEVICE_CONNECTING:"检测到设备", \
STATE_DEVICE_PROCESSING:"处理中...", \
STATE_OPERATION_FAILED:"操作失败!!!", \
STATE_OPERATION_SUCCESS:"操作成功!!!"
}
STATE_STYLES = {STATE_PORT_CLOSED:"color: rgb(255, 0, 0);",\
STATE_WAITING_DEVICE:"color: rgb(255, 127, 39);", \
STATE_DEVICE_CONNECTING:"color: rgb(0, 170, 0);", \
STATE_DEVICE_PROCESSING:"color: rgb(0, 170, 0);", \
STATE_OPERATION_FAILED:"color: rgb(255, 0, 0);", \
STATE_OPERATION_SUCCESS:"color: rgb(0, 170, 0);"
}
STATE_STYLES2 = {STATE_PORT_CLOSED:"color: rgb(255, 0, 0);",\
STATE_WAITING_DEVICE:"color: rgb(255, 0, 0);", \
STATE_DEVICE_CONNECTING:"color: rgb(255, 127, 39);", \
STATE_DEVICE_PROCESSING:"color: rgb(255, 127, 39);", \
STATE_OPERATION_FAILED:"color: rgb(255, 0, 0);", \
STATE_OPERATION_SUCCESS:"color: rgb(0, 170, 0);"
}
# signal
runStatusSignal = pyqtSignal(int, str)
def __init__(self, parent=None):
super(PtoolMainWindow, self).__init__(parent)
self.setupUi(self)
self.setupUiCustom()
## init control variables
self.config = {}
self.portOpened = False
self.runState = self.STATE_PORT_CLOSED
self.comSet = None
self.admin = False
# connect status update event
self.runStatusSignal.connect(self.slotRunStatus)
# create log queue and thread for loging
self.logQueue = queue.Queue()
self.logThread = LogThread(self.logQueue)
self.logThread.logReady.connect(self.logAll)
# open a log file
logFile = "log\\log_%s.log" % time.strftime("%Y%m%d_%H%M%S", time.localtime(time.time()))
if not os.path.exists("log"):
os.mkdir("log")
self.logFp = open(logFile, "w")
# timer for flash
self.uiTimerFlag = 0
self.uiTimer = QTimer(self)
self.uiTimer.timeout.connect(self.uiTimerHandler)
self.logThread.start()
def setupUiCustom(self):
# validator
intValidator = QIntValidator(self)
intValidator.setRange(1, 1000)
self.winEditMacAdd.setValidator(intValidator)
self.winEditSnAdd.setValidator(intValidator)
# poe power, range 10W - 1000W
poePowerValidator = QIntValidator(self)
poePowerValidator.setRange(10000, 1000000)
self.winEditPoePower.setValidator(poePowerValidator)
self.winEditMacAddress.setInputMask("HH:HH:HH:HH:HH:HH")
## set com ports
for i in range(0, 32):
self.winComSelect.addItem("COM%d" % i)
self.winSaveConfig.setEnabled(True)
self.winSaveConfig.clicked.connect(lambda: self.saveConfig())
self.winOpenCom.clicked.connect(lambda: self.portSwitch())
## all checkboxs, connected to checkBoxApplyAll
self.winProductCode.clicked.connect(lambda:self.checkBoxApplyAll())
self.winShowModel.clicked.connect(lambda:self.checkBoxApplyAll())
self.winHardwareVersion.clicked.connect(lambda:self.checkBoxApplyAll())
self.winPoePower.clicked.connect(lambda:self.checkBoxApplyAll())
self.winCloudUrl.clicked.connect(lambda:self.checkBoxApplyAll())
self.winOwnerId.clicked.connect(lambda:self.checkBoxApplyAll())
self.winSerialNo.clicked.connect(lambda:self.checkBoxApplyAll())
self.winMacAddress.clicked.connect(lambda:self.checkBoxApplyAll())
self.winMacAdd.clicked.connect(lambda:self.checkBoxApplyAll())
self.winSnAdd.clicked.connect(lambda:self.checkBoxApplyAll())
def checkBoxApplyAll(self):
self.winEditProductCode.setEnabled(self.winProductCode.isChecked())
self.winEditShowModel.setEnabled(self.winShowModel.isChecked())
self.winEditHardwareVersion.setEnabled(self.winHardwareVersion.isChecked())
self.winEditPoePower.setEnabled(self.winPoePower.isChecked())
self.winEditCloudUrl.setEnabled(self.winCloudUrl.isChecked())
self.winEditOwnerId.setEnabled(self.winOwnerId.isChecked())
self.winEditSerialNo.setEnabled(self.winSerialNo.isChecked())
self.winEditMacAddress.setEnabled(self.winMacAddress.isChecked())
self.winEditMacAdd.setEnabled(self.winMacAdd.isChecked())
self.winEditSnAdd.setEnabled(self.winSnAdd.isChecked())
def logAll(self, text):
#cursor = self.winEditLog.textCursor()
#cursor.movePosition(QtGui.QTextCursor.End)
self.winEditLog.append(text)
#self.winEditLog.moveCursor(QTextCursor.End)
#bar = self.winEditLog.verticalScrollBar()
#if bar:
# bar.setSliderPosition(bar.maximum())
if self.logFp:
s = time.time()
ss = (s - int(s)) * 1000
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(s))
msg = "[%s.%03d] %s\n" % (now, ss, text)
self.logFp.write(msg)
def log(self, text):
self.logQueue.put(text)
def getAuthModel(self):
if self.CFG_AUTH_MODEL in self.config:
return self.config[self.CFG_AUTH_MODEL]
else:
return 'SW001'
def getAuthCode(self):
m = hashlib.md5()
hw = self.winEditMacAddress.text().split(":")
if len(hw) != 6:
return ""
mac = "{0}{1}{2}:{3}{4}{5}".format(hw[0], hw[1], hw[2], hw[3], hw[4], hw[5])
s = self.getAuthModel()
s += mac.upper()
s += self.winEditOwnerId.text()
s += "admin123"
m.update(s.encode('utf-8'))
return m.hexdigest()
def makeItem(self, enabled, text):
return {'enabled': enabled, 'text' : text}
def getConfig(self, item):
if item in self.config:
return self.config[item]
return self.makeItem(False, '')
def loadConfig(self, file):
self.config = {}
try:
with open(file, "r", encoding='utf-8') as fp:
try:
self.config = json.load(fp)
except:
self.log("***Json file(%s) parsed failed!" % file)
except:
self.log("***Config file not found!")
if len(self.config) == 0:
return False
# check if items exist
for i in self.CFG_ITMES:
if i not in self.config:
self.log("***Missing item'%s' in %s" % (i, file))
return False
return True
def applyConfig(self):
if self.CFG_COM_PORT in self.config:
self.winComSelect.setCurrentText(self.config[self.CFG_COM_PORT])
if self.CFG_ADMIN in self.config:
self.admin = self.config[self.CFG_ADMIN]
c = self.getConfig(self.CFG_PRODUCT_CODE)
self.winProductCode.setEnabled(self.admin)
self.winProductCode.setChecked(c['enabled'])
self.winEditProductCode.setText(c['text'])
c = self.getConfig(self.CFG_SHOW_MODEL)
self.winShowModel.setEnabled(self.admin)
self.winShowModel.setChecked(c['enabled'])
self.winEditShowModel.setText(c['text'])
c = self.getConfig(self.CFG_HARDWARE_VERVERSION)
self.winHardwareVersion.setEnabled(self.admin)
self.winHardwareVersion.setChecked(c['enabled'])
self.winEditHardwareVersion.setText(c['text'])
c = self.getConfig(self.CFG_POE_POWER)
self.winPoePower.setEnabled(self.admin)
self.winPoePower.setChecked(c['enabled'])
self.winEditPoePower.setText(c['text'])
c = self.getConfig(self.CFG_CLOULD_URL)
self.winCloudUrl.setEnabled(self.admin)
self.winCloudUrl.setChecked(c['enabled'])
self.winEditCloudUrl.setText(c['text'])
c = self.getConfig(self.CFG_OWNER_ID)
self.winOwnerId.setEnabled(self.admin)
self.winOwnerId.setChecked(c['enabled'])
self.winEditOwnerId.setText(c['text'])
c = self.getConfig(self.CFG_SERIAL_NO)
self.winSerialNo.setEnabled(self.admin)
self.winSerialNo.setChecked(c['enabled'])
self.winEditSerialNo.setText(c['text'])
c = self.getConfig(self.CFG_MAC_ADDRESS)
self.winMacAddress.setEnabled(self.admin)
self.winMacAddress.setChecked(c['enabled'])
self.winEditMacAddress.setText(c['text'])
c = self.getConfig(self.CFG_MAC_INCREASE)
self.winMacAdd.setEnabled(self.admin)
self.winMacAdd.setChecked(c['enabled'])
self.winEditMacAdd.setText(c['text'])
c = self.getConfig(self.CFG_SN_INCREASE)
self.winSnAdd.setEnabled(self.admin)
self.winSnAdd.setChecked(c['enabled'])
self.winEditSnAdd.setText(c['text'])
self.winSaveConfig.setEnabled(self.admin)
def saveConfig(self):
cfg = {}
cfg[self.CFG_ADMIN] = self.admin
cfg[self.CFG_COM_PORT] = self.winComSelect.currentText()
cfg[self.CFG_AUTH_MODEL] = self.getAuthModel()
cfg[self.CFG_PRODUCT_CODE] = self.makeItem(self.winProductCode.isChecked(), self.winEditProductCode.text())
cfg[self.CFG_SHOW_MODEL] = self.makeItem(self.winShowModel.isChecked(), self.winEditShowModel.text())
cfg[self.CFG_HARDWARE_VERVERSION] = self.makeItem(self.winHardwareVersion.isChecked(), self.winEditHardwareVersion.text())
cfg[self.CFG_POE_POWER] = self.makeItem(self.winPoePower.isChecked(), self.winEditPoePower.text())
cfg[self.CFG_CLOULD_URL] = self.makeItem(self.winCloudUrl.isChecked(), self.winEditCloudUrl.text())
cfg[self.CFG_OWNER_ID] = self.makeItem(self.winOwnerId.isChecked(), self.winEditOwnerId.text())
cfg[self.CFG_SERIAL_NO] = self.makeItem(self.winSerialNo.isChecked(), self.winEditSerialNo.text())
cfg[self.CFG_MAC_ADDRESS] = self.makeItem(self.winMacAddress.isChecked(), self.winEditMacAddress.text())
cfg[self.CFG_MAC_INCREASE] = self.makeItem(self.winMacAdd.isChecked(), self.winEditMacAdd.text())
cfg[self.CFG_SN_INCREASE] = self.makeItem(self.winSnAdd.isChecked(), self.winEditSnAdd.text())
with open("config.ini", "w", encoding='utf-8') as fp:
fp.write(json.dumps(cfg, indent=4))
self.log('配置文件保存成功')
# status update
# status is controlled by self.runStatus
def updateStatus(self, state, msg=None):
# update status
oldState = self.runState
self.runState = state
if oldState != self.runState:
self.log("运行状态改变: %s -> %s" %(self.STATE_NAMES[oldState], self.STATE_NAMES[self.runState]))
# set style and msg
style = self.STATE_STYLES[self.runState]
if msg == None:
msg = self.STATE_NAMES[self.runState]
self.winRunStatus.setText(msg)
self.winRunStatus.setStyleSheet(style)
self.uiTimerFlag = 0
def slotRunStatus(self, state, msg):
if len(msg) == 0:
msg = None
self.updateStatus(state, msg)
if state == self.STATE_OPERATION_SUCCESS:
self.onOperationCompleted(True)
elif state == self.STATE_OPERATION_FAILED:
self.onOperationCompleted(False)
def uiTimerHandler(self):
if self.uiTimerFlag == 0:
self.uiTimerFlag = 1
self.winRunStatus.setStyleSheet(self.STATE_STYLES[self.runState])
else :
self.uiTimerFlag = 0
self.winRunStatus.setStyleSheet(self.STATE_STYLES2[self.runState])
def onPortChanged(self, enabled):
if enabled :
self.winOpenCom.setText("关闭串口")
self.winComSelect.setEnabled(False)
self.winPortStatus.setStyleSheet("QProgressBar::chunk { background-color: rgb(0, 170, 0) }")
self.uiTimer.start(400)
self.updateStatus(self.STATE_WAITING_DEVICE)
else:
self.uiTimer.stop()
self.winOpenCom.setText("打开串口")
self.winComSelect.setEnabled(True)
self.winPortStatus.setStyleSheet("QProgressBar::chunk { background-color: rgb(255, 0, 0) }")
self.updateStatus(self.STATE_PORT_CLOSED)
def macAddressAdd(self, mac, num):
a = mac.split(":")
if len(a) == 6:
v3 = int(a[3], 16)
v4 = int(a[4], 16)
v5 = int(a[5], 16)
v = (v3 * 256 * 256) + (v4 * 256) + v5
v += num
v3 = (v >> 16) & 255
v4 = (v >> 8) & 255
v5 = v & 255
return "{0}:{1}:{2}:{3:02x}:{4:02x}:{5:02x}".format(a[0], a[1], a[2], v3, v4, v5)
return mac
def serialNoAdd(self, sn, num):
a = re.findall(r'\d+', sn)
if a:
intstr = a[-1]
intval = int(intstr)
intlen = len(intstr)
plen = len(sn) - len(intstr)
v = sn[0:plen]
intval += num
intstr = str(intval)
while len(intstr) < intlen:
intstr = '0' + intstr
return v + intstr
# by default, return sn
return sn
def onOperationCompleted(self, result):
if result:
# operation ok
self.log("设备操作成功, 请更换下一个设备")
cfgChanged = False
# add mac or sn if checked and increased needed
if self.winMacAddress.isChecked():
# if auto add enabled
if self.winMacAdd.isChecked():
n = int(self.winEditMacAdd.text())
mac = self.macAddressAdd(self.winEditMacAddress.text(), n)
self.winEditMacAddress.setText(mac)
cfgChanged = True
else:
# not enable auto add, clear MAC
self.winEditMacAddress.setText("")
if self.winSerialNo.isChecked():
# if auto add enabled
if self.winSnAdd.isChecked():
n = int(self.winEditSnAdd.text())
sn = self.serialNoAdd(self.winEditSerialNo.text(), n)
self.winEditSerialNo.setText(sn)
cfgChanged = True
else:
# not enable auto add, clear MAC
self.winEditSerialNo.setText("")
if cfgChanged:
self.saveConfig()
else:
# operation failed
self.log("设备操作失败!!!")
# port switch
def portSwitch(self):
if self.portOpened :
self.comSet.stop()
self.comSet = None
self.portOpened = False
else :
port = self.winComSelect.currentText()
rate = 115200
self.comSet = ComSetSwitch(self)
self.portOpened = self.comSet.start(port, rate)
if self.portOpened:
self.log("串口(%s,%d)打开成功" %(port, rate))
else :
self.log("串口(%s,%d)打开失败" %(port, rate))
# callback
self.onPortChanged(self.portOpened)
def setupAll(self):
if self.loadConfig('config.ini'):
self.log("配置文件加载加载成功")
self.applyConfig()
else:
self.log("配置文件加载失败!!!")
self.checkBoxApplyAll()
## update all status
self.onPortChanged(self.portOpened)
# on window close, if thread is running, close it
def closeEvent(self, event):
if self.comSet != None:
self.comSet.stop()
self.comSet = None
if self.logFp:
self.logFp.close()
self.logFp = None
if self.logThread != None:
self.logThread = None
if __name__=="__main__":
app = QApplication(sys.argv)
win = PtoolMainWindow()
win.setupAll()
win.show()
sys.exit(app.exec())
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。