1 Star 1 Fork 4

吴烜/用蓝牙共享内容

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
server.py 2.65 KB
一键复制 编辑 原始数据 按行查看 历史
吴烜 提交于 2024-08-03 03:07 +08:00 . 整理文档
import sys
import asyncio
import threading
import logging
from typing import Any, Union
from bless import BlessServer
from bless import BlessGATTCharacteristic
from bless import GATTCharacteristicProperties
from bless import GATTAttributePermissions
from the_uuids import 唯一识别码
# Configure logging
logging.basicConfig(
level = logging.DEBUG,
datefmt='%H:%M:%S',
format = '%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.FileHandler("debug.log"),logging.StreamHandler()]
)
日志 = logging.getLogger(__name__)
各识别码 = 唯一识别码()
# NOTE: Some systems require different synchronization methods.
触发器: Union[asyncio.Event, threading.Event]
if sys.platform in ["darwin", "win32"]:
触发器 = threading.Event()
else:
触发器 = asyncio.Event()
async def (loop):
触发器.clear()
日志.info("Starting server...")
# Instantiate the server
my_service_name = "Test Service"
服务器 = BlessServer(name=my_service_name, loop=loop)
def write_request(characteristic: BlessGATTCharacteristic, value: Any, **kwargs):
日志.info(f"Received data: {value.decode('utf-8')}")
# [蓝牙核心规范(V5.2)7.6-深入详解之GATT(1)](https://bbs.huaweicloud.com/blogs/detail/309098)
# “属性协议”部分描述了GATT协议包的各个部分与长度限制,其中属性值的大小限制为 512b
characteristic.value = '我吃了'.encode('utf-8')
服务器.update_value(各识别码.服务识别码, 各识别码.特征识别码)
日志.info("Updated!")
服务器.write_request_func = write_request
# Add Service
await 服务器.add_new_service(各识别码.服务识别码)
日志.info("Service added.")
# Add a Characteristic to the service
char_flags = (
GATTCharacteristicProperties.write
| GATTCharacteristicProperties.notify
)
permissions = GATTAttributePermissions.readable | GATTAttributePermissions.writeable
await 服务器.add_new_characteristic(
各识别码.服务识别码, 各识别码.特征识别码, char_flags, None, permissions
)
日志.info("Characteristic added.")
await 服务器.start()
日志.info("Server started.")
if 触发器.__module__ == "threading":
# noinspection PyAsyncCall
触发器.wait()
else:
await 触发器.wait()
await 服务器.stop()
日志.info("Server stopped.")
try:
loop = asyncio.get_event_loop()
loop.run_until_complete((loop))
except Exception as e:
日志.error(f"An error occurred: {e}")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhishi/share-content-using-bluetooth.git
git@gitee.com:zhishi/share-content-using-bluetooth.git
zhishi
share-content-using-bluetooth
用蓝牙共享内容
master

搜索帮助