2 Star 9 Fork 2

TMRNic/Android Sensors System

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sensors-host.py 5.54 KB
一键复制 编辑 原始数据 按行查看 历史
TMRNic 提交于 2年前 . 提交代码
import socket
import matplotlib.pyplot as plt
import numpy as np
from threading import Thread
import time
from netifaces import interfaces, ifaddresses ,AF_INET
import winreg as wr
import queue
from PIL import Image
import io
#定义获取Windows系统网卡接口的在注册表的键值的函数
def get_key(ifname):
#获取所有网络接口卡的键值
id = interfaces()
#存放网卡键值与键值名称的字典
key_name = {}
try:
#建立链接注册表,"HKEY_LOCAL_MACHINE",None表示本地计算机
reg = wr.ConnectRegistry(None,wr.HKEY_LOCAL_MACHINE)
# 打开r'SYSTEM\CurrentControlSet\Control\Network\{4d36e972-e325-11ce-bfc1-08002be10318}',固定的
reg_key = wr.OpenKey(reg , r'SYSTEM\CurrentControlSet\Control\Network\{4d36e972-e325-11ce-bfc1-08002be10318}')
except :
return ('路径出错或者其他问题,请仔细检查')
for i in id:
try:
#尝试读取每一个网卡键值下对应的Name
reg_subkey = wr.OpenKey(reg_key , i + r'\Connection')
#如果存在Name,写入key_name字典
key_name[wr.QueryValueEx(reg_subkey , 'Name')[0]] = i
# print(wr.QueryValueEx(reg_subkey , 'Name')[0])
except FileNotFoundError:
pass
# print('所有接口信息字典列表: ' + str(key_name) + '\n')
return key_name[ifname]
def get_ip_address(ifname):
key = get_key(ifname)
if not key:
return
else:
#返回ipv4地址信息
return ifaddresses(key)[AF_INET][0]['addr']
def pop_data(queue, len):
buffer = bytearray()
for _ in range(len):
buffer += queue.get(1)
return buffer
def data_process():
global data_queue
sensor_data = []
head = data_queue.get(1)
if head == b'\xAA':
head = data_queue.get(1)
if head == b'\x55':
if data_queue.qsize() >= 44:
buffer = bytearray()
for _ in range(44):
# buffer.append(data_queue.get(1))
buffer += data_queue.get(1)
# buffer = bytes(buffer)
sensor_data = np.frombuffer(buffer, dtype='>f4')
return 1, sensor_data
elif head == b'\xF5':
buffer = pop_data(data_queue, 4)
imgSize = np.frombuffer(buffer, dtype='>i4')
buffer = pop_data(data_queue, imgSize[0])
return 2, buffer
return 0, sensor_data
def bytearray_to_bitmap(bytearray_data, width, height):
img = Image.new('RGB', (width, height))
pixels = img.load()
for y in range(height):
for x in range(width):
index = (y * width + x) * 3
r, g, b = bytearray_data[index:index+3]
pixels[x, y] = (r, g, b)
return img
def save_bitmap(img, file_path):
with open(file_path, 'wb') as f:
img.save(f, format='BMP')
# 生成数据
def generate_data(data, t):
global tList, attList
tList = np.roll(tList, -1, axis=0)
attList = np.roll(attList, -1, axis=0)
tList[-1] = t
attList[-1,:] = data[0:9]
# 更新图形
def update_plot():
global tList, attList, ax1, ax2, ax3
x = tList
y = attList
ax1.cla()
ax1.plot(x, y[:,0:3]) # 绘制新的图形
ax1.set_xlabel('t / s')
ax1.set_ylabel('w(t) / deg/s')
ax2.cla()
ax2.plot(x, y[:,3:6]) # 绘制新的图形
ax2.set_xlabel('t / s')
ax2.set_ylabel('a(t) / m/s2')
ax3.cla()
ax3.plot(x, y[:,6:9]) # 绘制新的图形
ax3.set_xlabel('t / s')
ax3.set_ylabel('m(t) / ?')
plt.draw() # 更新图形
plt.pause(0.1)
# 定义绘图函数
def get_data():
global data_queue
while True:
data = client_socket.recv(1)
if len(data) > 0:
data_queue.put(data)
def process_data():
global t, fp, k_img
while True:
tag, data = data_process()
if tag==1: # 处理IMU数据
t += ts
generate_data(data, t) # 生成新的数据
fp.write(f'{data[0]} {data[1]} {data[2]} {data[3]} {data[4]} {data[5]} {data[6]} {data[7]} {data[8]} {data[9]} {data[10]}\n')
fp.flush()
elif tag==2: # 处理Image数据
img = Image.open(io.BytesIO(data))
# img = bytearray_to_bitmap(data, img_width, img_height)
# img.show()
save_bitmap(img, f'images/{k_img}.bmp')
k_img += 1
#============ 主程序 ==================
k_img = 1
t = 0
ts = 0.01
tList = np.arange(-5, 0, ts)
attList = np.zeros((len(tList), 9))
ip_wlan = get_ip_address('WLAN')
# 创建socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(ip_wlan)
server_socket.bind((ip_wlan, 9050))
server_socket.listen(60)
print(f"服务器已启动,请在Android端连接{ip_wlan}\n")
# 接受客户端连接
client_socket, client_address = server_socket.accept()
print(f"客户端 {client_address} 已连接...")
img_width = 1280
img_height = 720
data_queue = queue.Queue()
fp = open('imu.txt', 'w')
# 使用线程进行绘图
thread = Thread(target=get_data)
thread.start()
print("启动数据接收线程...")
# 使用线程进行绘图
thread_plt = Thread(target=process_data)
thread_plt.start()
print("启动数据处理线程...")
plt.ion()
# 创建一个画布
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(6, 6))
while True:
update_plot()
fp.close()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/tmrnic/android-sensors-system.git
git@gitee.com:tmrnic/android-sensors-system.git
tmrnic
android-sensors-system
Android Sensors System
master

搜索帮助