1 Star 3 Fork 4

gcbb / gpycan

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
main_.py 37.24 KB
一键复制 编辑 原始数据 按行查看 历史
guochao 提交于 2022-11-16 17:39 . 添加环境初始化的配置文件

# import pkg_resources
# from pkg_resources import py2_warn
# from gPylib.xlsdbc import Xls_Dbc as Xls_Dbc
# import gPylib
# from gPylib.dbc import *
# from Dbc2MFile import *
# import operator
from __future__ import print_function
import logging
def SetLogging():
'''
设置LOGGER
'''
# 第一步,创建一个logger
logger = logging.getLogger()
# logger.setLevel(logging.DEBUG) # Log等级总开关
# 第三步,定义handler的输出格式
# formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
formatter = ("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
# logging.basicConfig(level=logging.DEBUG, format =formatter)
logging.basicConfig(level=logging.INFO, format =formatter)
# ch = logging.StreamHandler()
# ch.setFormatter(formatter)
# logger.addHandler(ch)
# logger.debug('Logger可以正常工作')
return logger
logger = SetLogging()
import can
# import gxnet
from ui import UI
from ui import wx
import time
import asyncio
from threading import Thread
import os
import json
from queue import Queue
# from can.interfaces.canalystii import CANalystIIBus
# from can.interfaces.vector import VectorBus
# bus = VectorBus(channel=0, bitrate=500000) # Link Layer (CAN protocol)
from can.interfaces.vector import VectorBus
from can.interfaces import virtual
# Refer to isotp documentation for full details about parameters
import sys
import copy
sys.path.append("D:/gcbb/python/gPylib")
# import gutil
from dbc import Py_Dbc
from EcuDiag import EcuDiag
class SimFromeFile:
"""
从文件中读取数据模拟ECU回告
加载ASC文件
"""
Ecu_rx_id = 0
Ecu_tx_id = 0
Line_index = 0
def __init__(self, sim_file_path):
logging.info("仿真测试文件"+str(sim_file_path))
"""
设置ECU 收发ID
设置仿真文件
"""
if not (sim_file_path):
sim_file_path = os.getcwd() + "/test/testlog.asc"
with open(sim_file_path) as ascfile:
print('Dbg:"main_.py" :185 ascfile=', ascfile)
self.Diag_log = ascfile.readlines()
pass
def SetEcuId(self, ecu_recive_id, ecu_send_id):
self.Ecu_rx_id = ecu_recive_id
self.Ecu_tx_id = ecu_send_id
def GetSimResponse(self, req_msg,repeat = False):
"""
依据查询的data 先set 然后删除00,0x55 再查找对应的数据
如果找到, 则紧跟着后面的对应的应答ID数据(多行)做数组返回
repeat:每次都从头重新查找
"""
# print('Dbg:"main_.py" :184 req_msg=',req_msg)
# 查询req_msg
# 查询对应的回告,回过去
req_id = self.Ecu_rx_id
rsp_id = self.Ecu_tx_id
find_ok = False
resp_array = []
line_id = 0
line_data = []
total = len(self.Diag_log)
if(repeat):#如果循环,则每次都从头搜索
self.Line_index = 0
last_line_index = self.Line_index
for tmp_index in range(self.Line_index,total):
# for line in self.Diag_log:
line = self.Diag_log[tmp_index]
tmp = line.split()
if len(tmp) < 5: # 不是有效的CAN数据
next
elif int(tmp[2], 16) not in [req_id, rsp_id]: # 不是诊断报文直接跳过
next
else:
# 需要的诊断数据
line_data = [int(_, 16) for _ in tmp[6:14]]
line_id = int(tmp[2], 16)
if find_ok:
if line_id == rsp_id:
print('Dbg:"main_.py" :202 data_bytes=', line_data)
msg = can.Message(
arbitration_id=rsp_id, is_extended_id=False, data=line_data
)
resp_array.append(msg)
print('Dbg:"main_.py" :2config_file20 msg=', msg)
elif line_id == req_id:
print('Dbg:"main_.py" :208 resp_array=', resp_array)
if line_data[0] == 0x30:
print('Dbg:"main_.py" :223 line_data=', line_data)
next
else:
return resp_array
else:
# 无关报文
pass
elif line_id == req_id:
tmp_search_data_end_pos = min(req_msg.data[0], 7) + 1
tmp_search_data = req_msg.data[0:tmp_search_data_end_pos]
# 先set 然后删除填充字
tmp1 = list(copy.copy(req_msg.data))
while tmp1[-1] in [0x0, 0x55]:
tmp1.pop()
tmp2 = list(copy.copy(line_data))
while tmp2[-1] in [0x0, 0x55]:
tmp2.pop()
if 0 in tmp1:
tmp1.remove(0x00)
if 0x55 in tmp1:
tmp1.remove(0x55)
if 0x0 in tmp2:
tmp2.remove(0x0)
if 0x55 in tmp2:
tmp2.remove(0x55)
# print('Dbg:"main_.py" :109 tmp1=',tmp1)
# print('Dbg:"main_.py" :112 tmp2=',tmp2)
# if ( # list(tmp_search_data) == line_data[0:tmp_search_data_end_pos] 0) or
if tmp1 == tmp2: # 删除后面的55 和00 再比较
find_ok = True
print(
'Dbg:"main_.py" :267 find_ok=',
find_ok,
"Data:",
tmp_search_data,
)
pass
# 搜索
self.Line_index = tmp_index
# if 25 < self.Line_index < 35:
# print('Dbg:"main_.py" :143 line_data=', [hex(_) for _ in line_data])
info = "没找到对应的回告数据"
print('Dbg:"main_.py" :140 info=', info, ":", req_msg,"起始行", last_line_index)
return range(8)
class App(UI):
bus = None
bus2 = None
can_name = "virtual"
can_thread_args = {"can_name": "virtual"}
Can_array = []
Ui_close = False
Ecu_select_path = ["", "", ""] # company,vehicle,ecu
Ecu_config = {}
Diag_log = [] # 只有再读文件测试时才用, ASC 文件的行信息
# 刷写数据
Flash_data = {
"boot_file": "",
}
Boot_data = []
App_data = []
Sim_file_class = SimFromeFile("test/Flash19.asc")
Sim_mode =False
Diag_can = can.interface.Bus( "Diag", bustype="virtual", receive_own_messages= False)#诊断虚拟CAN
Diag_can2 = can.interface.Bus( "Diag", bustype="virtual", receive_own_messages=False)#中转CAN
Ecu_diag = EcuDiag()
logging.info("启动")
Display_signal_name_list = []
Display_signal_class_list = []
Display_frame_queue = Queue(maxsize=1000)#显示队列
def InitUI(self, event):
# 加载 asc 测试文件
# with open("testlog.asc") as ascfile:
# print('Dbg:"main_.py" :185 ascfile=',ascfile)
# self.Diag_log = ascfile.readlines()
# 加载 asc 测试文件<<<
pass
Ecu_diag = self.Ecu_diag
self.Ecu_diag.SetCan(self.Diag_can)#诊断专用虚拟CAN
self.m_notebook1.SetSelection(0) # 选择CAN卡页
# 初始化显示列表
# 车型
company_name_list = Ecu_diag.GetCompany()
self.ChoiceInit(self.公司, company_name_list)
self.EcuInfoChanged(self.公司)
# self.InitDbcTreeCtrol()# 在用户选择dbcfile 时才执行
self.Display_signal_name_list = []#TODO 后面应当从ui运行配置文件读入
self.InitDispSignalTree()
self.Display_frame_queue = Queue(maxsize=1000)
# 启动显示线程
self.StartDisplayThread()
event.Skip()
def InitDispSignalTree(self):
'''
设置信号显示的list表头
'''
listctrl = self.m_dataViewListCtrl_signal_data
store = listctrl.GetStore()
# if(self.Display_signal_class_list) :
# if( listctrl.GetColumnCount() == 0):# 从没设置过
for title in ["信号名","信号解析值","信号原始值"]:
# store.AppendColumn(title)
listctrl.AppendTextColumn(title)
logging.debug(listctrl.GetColumnCount())
def StartDisplayThread(self):
display_thread = Thread(target=self.UpdateDbcSignalValueView,args=[])
display_thread.start()
def InitDbcTreeCtrol(self):
ecu_list = ["esc","eps"]
dbc_tree = self.m_treeCtrl_dbc
dbc_net = self.DbcClass.Get_DbcNet()
dbc_tree_root = dbc_tree.AddRoot(dbc_net["name"], image=-1, selImage=-1, data=None)
for ecu in dbc_net["ecu_list"]:
# tmp_ecu = dbc_tree.AppendItem(dbc_tree_root, ecu["name"], image=-1, selImage=-1, data="asd")
for frame in ecu["frame_list"]:
tmp_frame = dbc_tree.AppendItem(dbc_tree_root, frame["name"], image=-1, selImage=-1, data="frame")
for sig in frame["sig_list"]:
tmp_sig = dbc_tree.AppendItem(tmp_frame, sig["name"], image=-1, selImage=-1, data="signal")
pass
def OnDBC_signal_d_click(self,event):
obj = event.GetEventObject()
item = obj.GetFocusedItem()
# logging.info(obj.GetItemText(item))
# logging.info(obj.GetItemText(obj.GetItemParent(item)))
# logging.info(obj.GetItemData(item))
# logging.info(item)
data = obj.GetItemData(item)
if(data == "signal"):
sig_name = obj.GetItemText(obj.GetItemParent(item))+'.'+obj.GetItemText(item)
logging.debug('sig_name='+json.dumps(sig_name,indent = 4))
self.Display_signal_name_list.append(sig_name)
self.Display_signal_class_list = self.DbcClass.GetSigClassListBySigNameList(self.Display_signal_name_list)
#通过报文更新UI显示
logging.debug('Display_frame_queue='+str(self.Display_frame_queue))
test_msg = can.Message(arbitration_id=0x0, data=[0]*8)
self.Display_frame_queue.put(test_msg)# 发送一帧虚拟报文,激活刷新显示
# if(indent == 2):
# print
#获取对应的 信号名,添加到显示列表中
def SetDisplaySignameList(self):
'''
设定要解析的信号列表
'''
#TODO 从dbc 中按signal_name_list 取出数据来
self.DisplaySignalList = []
if(self.DisplaySignalList != signal_name_list):
#重新更新 需要解析的报文id列表
#TODO 获取对应的SignalClass
self.DisplayFrameIdList = []
self.Display_signal_class_list = []
self.DisplaySignalList = signal_name_list
# def UpdateDbcSignalValueViewUi(self)
async def UpdateDbcSignalValueViewOnMsg(self,msg):
view_id_array = [sig_class.ID for sig_class in self.Display_signal_class_list]
# logging.debug('view_id_array='+json.dumps(view_id_array,indent = 4))
if (msg.arbitration_id in view_id_array):
# frame = {
# "Timestamp":msg.timestamp,
# "ID":msg.arbitration_id,
# "Data":msg.data
# }
self.Display_frame_queue.put(msg)
def UpdateDbcSignalValueView(self,args=[]):#显示线程
'''
依据signal list 值 从DBC中取出 对应的信号
在列表中显示对应的信号值
# data = ["名字","解析值1","原始值"]
'''
#msg 'arbitration_id', 'bitrate_switch', 'channel', 'data', 'dlc', 'equals', 'error_state_indicator', 'id_type', 'is_error_frame', 'is_extended_id', 'is_fd', 'is_remote_frame', 'timestamp'
while(True):
msg=self.Display_frame_queue.get()
frame = {
"Timestamp":msg.timestamp,
"ID":msg.arbitration_id,
"Data":msg.data
}
listctrl = self.m_dataViewListCtrl_signal_data
store = listctrl.GetStore()
if(self.Display_signal_class_list) :
store.DeleteAllItems()
index = 0
for sig_class in self.Display_signal_class_list:
# data = ["名字","解析值1","原始值"]
val =sig_class.GetVal()
enum_str = sig_class.GetVaueTableStrByVal(val)
data = (sig_class.name, enum_str,str(val))
logging.debug('data='+json.dumps(store.GetColumnCount(),indent = 4))
logging.debug('data='+json.dumps(data,indent = 4))
store.AppendItem(data)
#在CPU的速度有余量的情况下 从DBC取出信号会更好
# 只更新对应列的数据
frame_array=[]
while(not self.Display_frame_queue.empty()):
frame_array.append(self.Display_frame_queue.get())
self.DbcClass.UpdateFrameData([frame])
row_index = 0
for sig_class in self.Display_signal_class_list:
# # data = ["名字","解析值1","原始值"]
val =sig_class.GetVal()
store.SetValueByRow(str(val),row=row_index,col=1)
enum_str = sig_class.GetVaueTableStrByVal(val)
store.SetValueByRow(enum_str,row=row_index,col=2)
row_index+=1
time.sleep(0.1)
pass
def ChoiceInit(self, obj, value_array):
# print('Dbg:"main_.py" :204 obj=',obj,'GetStringSelection=',obj.GetStringSelection(),'value_array = ',value_array)
# 设置choice 的选项, 并设置默认值
if len(value_array) > 0:
obj.SetItems(value_array)
obj.Selection = 0
def EcuInfoChanged(self, obj):
"""
非选择事件, 所有的控制选择信息都会在这里做最终处理
uds 的EcuConfig 信息在选择完ECU后更新
"""
choice_obj_array = [self.公司, self.车型, self.控制器]
index = 0
if obj == self.公司:
# 用户选择公司后,就要初始化车型和控制器
index = 0
_str = choice_obj_array[index].GetStringSelection()
self.Ecu_select_path[index] = _str
next_obj = choice_obj_array[index + 1]
this_select = self.公司.GetStringSelection()
tmp_array = self.Ecu_diag.GetVehicle(this_select)
# print('Dbg:"main_.py" :220 tmp_array=', tmp_array)
self.ChoiceInit(next_obj, tmp_array) # 更新次级内容事件
self.EcuInfoChanged(next_obj)
# 更新 车型列表并初始化
elif obj == self.车型:
# 更新 车型列表并初始化
index = 1
_str = choice_obj_array[index].GetStringSelection()
self.Ecu_select_path[index] = _str
next_obj = choice_obj_array[index + 1]
this_select = self.车型.GetStringSelection()
tmp_path = [self.公司.GetStringSelection(), self.车型.GetStringSelection()]
tmp_array = self.Ecu_diag.GetEcu(tmp_path)
self.ChoiceInit(next_obj, tmp_array) # 更新次级内容事件
self.EcuInfoChanged(next_obj)
pass
elif obj == self.控制器:
index = 2
_str = choice_obj_array[index].GetStringSelection()
self.Ecu_select_path[index] = _str
# 控制器选择完成后,设置对应的UDS诊断信息
# print('Dbg:"main_.py" :188 choice_obj_array=', choice_obj_array)
self.Ecu_diag.SetEcu(self.Ecu_select_path)
ecu_cfg = self.Ecu_diag.GetEcuConfig()
# print('Dbg:"main_.py" :263 ecu_cfg=',ecu_cfg)
self.Ecu_config = ecu_cfg
self.UiUpdateByEcuConfig(ecu_cfg)
# 测试类
self.Sim_file_class.SetEcuId(
self.Ecu_config["诊断ID"]["ECU收"], self.Ecu_config["诊断ID"]["ECU发"]
)
pass
else:
print("非预期的事件main:224", obj.name)
pass
def UiUpdateByEcuConfig(self, ecu_cfg):
# TODO 更新UI
self.ChoiceInit(self.session_choice, list(ecu_cfg["session"].keys()))
self.ChoiceInit(self.Routine_choice, list(ecu_cfg["routine"]["命令列表"].keys()))
self.ChoiceInit(self.Read_info_choice, list(ecu_cfg["数据DID"].keys()))
self.UiUpdateFlashConfig(self.Ecu_select_path)
# TODO 更新刷写的 boot 路径和App路径, 还有更新刷写的配置文件路径
pass
def UiUpdateFlashConfig(self, ecu_select_path):
"""通过共公司-车型-控制器更新刷写配置文件和APP boot
查询顺序: ecu文件夹往父级查询
"""
logger.info('ecu_select_path'+str( ecu_select_path))
# TODO 获取配置文件
pass
def OnSelectCompany(self, event):
# 选择对应的列表, 更新次级列表内容, 并发送次级列表选择事件
# 因为不会模拟控件事件消息, 所以在函数中递归依次调用
obj = event.GetEventObject()
self.EcuInfoChanged(obj)
event.Skip()
def DbcFileSelected(self,event):
'''
选择DBC文件, 重新更新DBC 类
'''
# dbc_file = self.event.geto
obj = event.GetEventObject()#DBC 文件选择器
dbc_file = obj.GetPath()
logging.info(dbc_file)
self.DbcClass = Py_Dbc(dbc_file)
self.InitDbcTreeCtrol()
def GetConfigFile(self,ecu_name):
"""
获取配置文件,
如果配置文件无效,则用配置文件中的路径
"""
flash_config_file = self.filePicker_EscConfig.GetPath()
print('Dbg:"main_.py" :307 flash_config_file=',flash_config_file)
if(os.path.exists(flash_config_file)):
return flash_config_file
else:
#TODO 使用默认文件
ecu_flash_config_file_name = (
"./EcuConfig/"
+self.Ecu_select_path[0]
+"/"+self.Ecu_select_path[1]
+"/"+ecu_name
+ "/"+ecu_name + "_falsh_config.json")
print('Dbg:"main_.py" :316 ecu_flash_config_file_name=',ecu_flash_config_file_name)
if(os.path.exists(ecu_flash_config_file_name)):
info="使用默认文件"
print('Dbg:"main_.py" :310 info=',info,ecu_flash_config_file_name)
return ecu_flash_config_file_name
else:
info="请选择有效的配置文件"
print('Dbg:"main_.py" :313 info=',info)
return None
def EcuChanged(self, event):
# TODO
# 查找ECU配置文件, 获取APP 和Boot
ecu_select_obj = self.控制器
obj = event.GetEventObject()
select_index = obj.GetSelection()
ecu_name = obj.GetPageText(select_index)
print('Dbg:"main_.py" :267 ecu_name=', ecu_name)
if ecu_name in ecu_select_obj.GetItems():
index = ecu_select_obj.GetItems().index(ecu_name)
ecu_select_obj.SetSelection(index)
self.EcuInfoChanged(self.控制器)
# flash_config_file = self.GetConfigFile(ecu_name)
# if not flash_config_file:
# return
# tmp_class = EcuFlashFile(flash_config_file)
else:
print("对应车型未包含", ecu_name, "配置文件")
event.Skip()
def UiClose(self, event):
self.Ui_close = True
# 可能有多个client 将来会是个ARRAY
self.Ecu_diag.CloseUdsClient()
print('Dbg:"main_.py" :183 Ui_close=', self.Ui_close)
event.Skip()
def OpenCan(self, event):
"""
打开CAN
设置UDS的CAN
"""
if self.Can_array:
info = "无需重新打开"
print('Dbg:"main_.py" :279 info=', info, self.Can_array)
return
can_name = self.can_name
args = {"can_name": can_name, "can": can}
self.can_thread_args = args
bus1 = None
bus2 = None
# channel = 1
bit_rate = 500000
self.Sim_mode = False
# self.can_name == "usb-can"#TODO 测试用
if can_name in ["virtual"]:
bus1 = can.interface.Bus( "SimCan", bustype="virtual", receive_own_messages=True)
SimEcuBus = can.interface.Bus( "SimCan", bustype="virtual", receive_own_messages=True)
self.Can_array.append(bus1)
self.Can_array.append(SimEcuBus)
self.Sim_mode = True
elif can_name == "nican":
from xnetcan import NicanBus
bus1 = NicanBus( channel="CAN1", can_filters=None, bitrate=bit_rate, log_errors=True) # tester pass
self.Can_array.append(bus1)
# bus2 = NicanBus( channel="CAN2", can_filters=None, bitrate=bit_rate, log_errors=True) # tester pass
elif self.can_name == "vector":
# bus = VectorBus(channel=0, bitrate=bit_rate) # Link Layer (CAN protocol)
bus1 = can.interface.Bus( bustype="vector", app_name="CANape", channel=1, bitrate=bit_rate) # ...
self.Can_array.append(bus1)
elif self.can_name == "Zlg-E":
from zcanbus import ZcanBus
bus1 = ZcanBus(device_type=21,device_index= 0,channel_index=0,bitrate=500000)
self.Can_array.append(bus1)
else:
pass
# print(self.Can_array)
# coroutine_canthred = self.CanThredHw(self.can_thread_args)
can_new_loop = asyncio.new_event_loop() # 在当前线程下创建时间循环,(未启用),在start_loop里面启动它
print('Dbg:"main_.py" :399 can_new_loop=',can_new_loop)
t = Thread( target=self.can_thread_loop, args=(can_new_loop,)) #主线程 通过当前线程开启新的线程去启动事件循环
t.start()
diag_can_new_loop = asyncio.new_event_loop() # 诊断线程
diag_thredt = Thread( target=self.DiagThredLoop, args=(diag_can_new_loop,)) # 通过当前线程开启新的线程去启动事件循环
diag_thredt.start()
print('Dbg:"main_.py" :401 can_new_loop=',can_new_loop)
# asyncio.run_coroutine_threadsafe( coroutine_canthred, can_new_loop) # 这几个是关键,代表在新线程中事件循环不断“游走”执行
print("Dbg 160:can_new_loop=", can_new_loop, "\n")
event.Skip()
def UnlockEcu(self, event):
"""
解锁ECU
"""
sec_choice_index = self.m_choice4.GetSelection()
sec_lv = int(self.m_choice4.Items[sec_choice_index])
event_list = [
{"命令": "进扩展模式", "模式": 3, "延时": 0.01},
{"命令": "进安全模式", "模式": sec_lv, "延时": 0}]
self.Ecu_diag.Diag_event_quee.put(event_list)
# self.Ecu_diag.DiagComplexEvent(event_list)
event.Skip()
def ReadDtc(self, event):
event_list = [{"命令": "读故障码", "参数": [9], "延时": 0}] # 参数为所有命令的 参数顺序堆叠
self.Ecu_diag.Diag_event_quee.put(event_list)
# self.Ecu_diag.DiagComplexEvent(event_list)
event.Skip()
async def SimEcu(self, msg):
# 通过CAN2 模拟ECU 接收到msg的行为
# 完成诊断的应答
# 搜索诊断文件, 提取应答数据,然后发送
if(self.Sim_mode):
sim_bus = self.Can_array[1]
if msg.arbitration_id in [self.Ecu_config["诊断ID"]["ECU收"]]:
# req_data = msg.data
if(self.Sim_mode):
print('Dbg:"main_.py" :453 msg=', msg)
tmp = self.Sim_file_class.GetSimResponse( msg,True)
print('Dbg:"main_.py" :455 tmp=',tmp)
for send_msg in tmp: # list(req_data[0:req_data[0]])):
if (type(send_msg) == can.Message):
print('Dbg:"main_.py" :453 send_msg=', send_msg)
sim_bus.send(send_msg)
else:
info = "没找到对应的数据"
print('Dbg:"main_.py" :456 info=',info,msg)
async def print_message(self, msg):
"""Regular callback function. Can also be a coroutine."""
if msg.arbitration_id in [0x735, 0x73D, 0x736, 0x73E]:
print('Dbg:"main_.py" :451 msg=', msg)
async def SendEcuDataToDiagCan(self, msg):
if msg.arbitration_id in [ 0x73D, 0x73E]:
# print("发送can 给诊断can")
self.Diag_can2.send(msg)
# logging.debug(self.Diag_can2)
logging.debug(msg)
# print('Dbg:"main_.py" :456 msg=',msg)
pass
async def SendDiagDataToHwCan(self,msg):
if(msg.arbitration_id in [0x736, 0x735]):
# print('Dbg:"main_.py" :461 diag_msg=',msg)
self.Can_array[0].send(msg)
# if(msg.arbitration_id in [0x73e]):
# logging.info("诊断CAN 接收到数据"+str(msg))
# print('Dbg:"main_.py 诊断CAN 接收到数据" :481 diag_msg=',msg)
async def SendDiagCanDataToEcuCanThred(self):
'''
诊断转发线程
'''
diag_can = self.Diag_can2
print('Dbg:"main_.py" :464 诊断CAN 发给CAN diag_can=',diag_can)
# diag_reader = can.AsyncBufferedReader()
diag_listeners = [
# diag_reader,
self.SendDiagDataToHwCan,
# self.Logger_asc
# self.print_message, # Callback function
]
loop = asyncio.get_running_loop()
notifier = can.Notifier(diag_can, diag_listeners, loop=loop)
self.Can_notifier = notifier
can_array = self.Can_array
while (len(can_array) > 0) and (not self.Ui_close):
await asyncio.sleep(0.1)
# diag_msg = await diag_reader.get_message()#获取诊断, 转发到can
pass
notifier.stop()
pass
async def DisplayMessageByFilter(self,msg):
'''
依据过滤ID显示数据
'''
print(msg)
# self.Logger_asc(msg)
def StartLog(self,event):
obj = event.GetEventObject()
if(obj.GetValue()):
self.Logger_asc = can.Logger("log.asc")#记录文件
# self.Can_notifier.add_listener(self.DisplayMessageByFilter)
self.Can_notifier.add_listener(self.Logger_asc)
print('Dbg:"main_.py" :504 obj=开始记录数据')
else:
# self.Can_notifier.remove_listener(self.DisplayMessageByFilter)
self.Can_notifier.remove_listener(self.Logger_asc)
self.Logger_asc.stop()
print('Dbg:"main_.py" :504 obj=停止记录数据')
pass
async def UpdateDbcParise(self,msg):
'''
将msg 为dbc 的frame 数据格式, 然后更新DBC的信号值
'''
frame = {
"Timestamp":msg.Timestamp,
"ID":msg.arbitration_id,
"Data":msg.payload
}
self.DbcClass.UpdateFrameData(frame)# 更新接收道德数据
async def CanThredHw(self,can):
"""
真实CAN线/协程
传入CAN mode
can2 用于监听测试
"""
notifier_array = []
can_array = self.Can_array
logging.debug(can_array)
# msg1 = can.Message(arbitration_id=0x0, data=[1, 2, 3])
# reader = can.AsyncBufferedReader()
# logger = can.Logger("logfile.asc")
listeners = [
# msg_show #显示所有的CAN消息
# reader, # AsyncBufferedReader() listener
self.SendEcuDataToDiagCan,
#解析CAN信息再列表显示
# logger, # Regular Listener object
# self.UpdateDbcParise
self.UpdateDbcSignalValueViewOnMsg
]
loop = asyncio.get_running_loop()
if len(can_array) > 0:
# TODO 如何重新设置 listeners, 以支持UI进行数据记录选择
notifier = can.Notifier(can_array[0], listeners, loop=loop)
if(self.Sim_mode):#仿真测试用的
notifier.add_listener(self.SimEcu)
notifier.add_listener(self.print_message)
notifier_array.append(notifier)
while (len(can_array) > 0) and (not self.Ui_close):
await asyncio.sleep(0.1)
# msg = await reader.get_message()#读取CAN消息
pass
else:
msg = "没有打开的CAN"
print('Dbg:"main_.py" :563 msg=',msg)
for notifier in notifier_array:
print('Dbg:"main_.py" :548 notifier close=', notifier)
notifier.stop()
for can in can_array:
print('Dbg:"main_.py" :551 can close=', can)
can.shutdown()
print("Done!")
def DiagThredLoop(self,loop):
'''
诊断处理线程
'''
info = "启动诊断线程"
print('Dbg:"main_.py" :551 info=',info)
coroutine_canthred = self.SendDiagCanDataToEcuCanThred()
asyncio.set_event_loop(loop)
asyncio.run(coroutine_canthred)
loop.run_forever()
pass
def can_thread_loop(self, loop):
logging.info("启动主Can线程")
coroutine_canthred = self.CanThredHw(can)
asyncio.set_event_loop(loop)
asyncio.run(coroutine_canthred)
msg = "can thread loop over"
logging.info(msg)
loop.run_forever()
def SetCan(self, event):
"""
选择对应的CAN
"""
obj = event.GetEventObject()
val = obj.GetStringSelection()
print("Dbg 19:val=", val, "\n")
self.can_name = val
event.Skip()
def OnSession(self, event):
# diag button
session_mode = self.Ecu_config["session"][
self.session_choice.GetStringSelection()
]
event_list = [{"命令": "进扩展模式", "模式": session_mode, "延时": 0}] # 参数为所有命令的 参数顺序堆叠
# self.Ecu_diag.DiagComplexEvent(event_list)
self.Ecu_diag.Diag_event_quee.put(event_list)
event.Skip()
def EcuReset(self,event):
event_list = [{"命令": "Ecu复位", "参数": [1], "延时": 0}] # 硬件复位
# self.Ecu_diag.DiagComplexEvent(event_list)
self.Ecu_diag.Diag_event_quee.put(event_list)
pass
def OnRoutineButtonClick(self, event):
choice_str = self.Routine_choice.GetStringSelection()
cmd = self.Ecu_config["routine"]["命令列表"][choice_str]["命令ID"]
# routine_id = int.from_bytes(bytes([cmd[0], cmd[1]]), 'big')
routine_id = int(cmd,16)
print('Dbg:"main_.py" :543 routine_id=',routine_id)
act = self.Ecu_config["routine"]["命令列表"][choice_str]["动作"]
print('Dbg:"main_.py" :636 act=',act)
# data = bytes(act)# 启停
data = None
event_list = [
{"命令": "进扩展模式", "模式": 3, "延时": 0},
{"命令": "进安全模式", "模式": 1, "延时": 0},
{"命令": "routine",
"ID":routine_id,
"参数": data, "延时": 0},
]
# self.Ecu_diag.DiagComplexEvent(event_list)
self.Ecu_diag.Diag_event_quee.put(event_list)
event.Skip()
pass
def OnClearDtcButton(self, event):
# 清故障码
event_list = [{"命令": "清故障码", "参数": [], "延时": 0,'Info':"清故障按键"}] # 参数为所有命令的 参数顺序堆叠
# self.Ecu_diag.DiagComplexEvent(event_list)
self.Ecu_diag.Diag_event_quee.put(event_list)
event.Skip()
def OnReadInfoButton(self, event):
# TODO
choice_str = self.Read_info_choice.GetStringSelection()
cmd = self.Ecu_config["数据DID"][choice_str]["命令ID"]
did = int(cmd,16)
# act = self.Ecu_config["routine"]["命令列表"][choice_str]["动作"]
# print('Dbg:"main_.py" :636 act=',act)
# data = bytes(act)
event_list = [
{"命令": "进扩展模式", "模式": 3, "延时": 0},
# {"命令": "进安全模式", "模式": 1, "延时": 0},
{"命令": "读数据", "DID":did, "参数": None, "延时": 0}, ]
# self.Ecu_diag.DiagComplexEvent(event_list)
self.Ecu_diag.Diag_event_quee.put(event_list)
event.Skip()
def OnFlashButtonClick(self, event):
# 加载S19, 刷写
# TODO 以后从配置文件读取
# 下载前处理结束
event_list = [
{"命令": "进扩展模式", "模式": 3, "延时": 0},
# {"命令": "控制DTC", "DTC组": 2, "延时": 0}, # 关闭DTC
# {"命令": "控制通信", "控制模式": 3, "通信模式": 1, "延时": 0}, # 关闭通讯
{"命令": "进扩展模式", "模式": 2, "延时": 0},
{"命令": "进安全模式", "模式": 0x11, "延时": 0},
]
# boot_file="d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/vcf_mot_c55_v.1.8_32.elf.s19"
# app_file="d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/LECRAB00019.s19"
# flash_config_file = "d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/Esc_falsh_config.json"
boot_file =self.filePicker_EscBoot.GetPath(),
app_file = self.filePicker_EscApp.GetPath(),
flash_config_file = self.filePicker_EscConfig.GetPath()
self.Ecu_diag.Diag_event_quee.put(event_list)
# self.Ecu_diag.DiagComplexEvent(event_list)
if not (flash_config_file):
flash_config_file = self.GetConfigFile(self.Ecu_select_path[2])
# "d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/Esc_falsh_config.json"
# if not boot_file:
# boot_file="d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/vcf_mot_c55_v.1.8_32.elf.s19"
# if not app_file:
# app_file="d:/gcbb/python/can/gpythoncan/EcuConfig/Leap/T03/Esc/LECRAB00019.s19"
self.Ecu_diag.Flash(
boot_file=boot_file,
app_file=app_file,
flash_config_file=flash_config_file,
)
#TODO 启动刷新线程
event.Skip()
def Test(self, event):
print('Dbg:"main_.py" :517 event=', event)
"""
UI自动测试
"""
# 打开CAN
self.Sim_mode = True
choice = self.CAN卡
self.WxChoiceEvent(choice,0)
self.TestSeq1()
# bt = self.m_button_open_can
# evt = wx.CommandEvent(wx.EVT_BUTTON.typeId, bt.GetId())
# wx.PostEvent(bt, evt)
# # 软件刷写
# bt = self.Flash_button
# evt = wx.CommandEvent(wx.EVT_BUTTON.typeId, bt.GetId())
# wx.PostEvent(bt, evt)
# self.Sim_mode = False
# # 选择诊断页
# event.Skip()
pass
def TestSeq1(self):
#打开CAN 卡
self.WxSendButtonEvent(self.m_button_open_can)
#切换到操作页
notebook = self.m_notebook1
notebook.SetSelection(1)
#选择ESC can卡页
self.WxChoiceEvent(self.控制器,1)
self.WxSendButtonEvent(self.Session_button)
print("设置控制器")
#session
self.WxChoiceEvent(self.session_choice,2)
self.WxSendButtonEvent(self.Session_button)
#routine
self.WxChoiceEvent(self.Routine_choice,2)
self.WxSendButtonEvent(self.Routine_button)
# print('--------------------')
# time.sleep(5)
self.WxChoiceEvent(self.Routine_choice,5)
self.WxSendButtonEvent(self.Routine_button)
#TODO 需要恢复的测试
#清故障码
self.WxSendButtonEvent(self.Clear_dtc_button)
#解锁控制器
self.WxSendButtonEvent(self.UnlockEcuBt)
# 读故障按键
self.WxSendButtonEvent(self.读故障按键)
# 刷写软件测试
self.WxSendButtonEvent(self.Flash_button)
# CAN卡
pass
def TestHw(self, event):
#选择CAN 卡
self.Sim_mode = False
choice = self.CAN卡
self.WxChoiceEvent(choice,2)
self.TestSeq1()
def WxSendButtonEvent(self,bt):
evt = wx.CommandEvent(wx.EVT_BUTTON.typeId, bt.GetId())
evt.SetEventObject(bt)
wx.PostEvent(bt, evt)
def WxChoiceEvent(self,choice,index):
choice.SetSelection(index)
evt = wx.CommandEvent(wx.EVT_CHOICE.typeId, choice.GetId())
evt.SetEventObject(choice)
wx.PostEvent(choice, evt)
# ---------------------------------------------------------------------------------------------
def SortDictList(data):
tmp_name_value = [tmp_data["name"] for tmp_data in data]
tmp_sorted_name_value = sorted(tmp_name_value)
index_list = [tmp_name_value.index(val) for val in tmp_sorted_name_value]
data = [data[i] for i in index_list]
return data
# # dict_data = tmp_dict_data
# data1 = [
# {'name':3,'s':'30'},
# {'name':6,'s':'60'},
# {'name':7,'s':'70'},
# {'name':2,'s':'20'},
# ]
# data2={'s':data1}
# data1 = SortDictList(data1)
# print(data2)
if __name__ == "__main__":
app = wx.App(False)
s = App(None)
s.Show()
app.MainLoop()
# res = operator.eq(ecu1['name'], ecu2['name'])
# print(ecu1['name'], '--', ecu2['name'])
# # if (res != 0):
# print(res)
# print(cdbc.Get_DbcNet())
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/gcbb/gpycan.git
git@gitee.com:gcbb/gpycan.git
gcbb
gpycan
gpycan
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891