Ai
5 Star 4 Fork 2

思必驰科技股份有限公司/dui-dds-server-api-samples

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
dds-websocket.py 3.45 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python
import json
import time
import hmac
import asyncio
#import ssl
import websockets
from hashlib import sha1
from uuid import uuid4
alias = "prod"
audioFile = "8k.wav"
# 使用自己产品的相关参数替换下列参数。
productId = "x"
apikey = "x"
async def textRequest(ws):
content = {
"aiType":"dm",
"topic": 'nlu.input.text',
"recordId": uuid4().hex,
"refText": "姚明是谁"
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
async def triggerIntent(ws):
content = {
"aiType":"dm",
'topic': 'dm.input.intent',
'recordId': uuid4().hex,
'skillId': '2018040200000004',
'intent': '查询天气',
'task': "天气",
'slots': {
'国内城市': "苏州"
}
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
async def audioRequest(ws):
content = {
"aiType":"dm",
"topic": "recorder.stream.start",
"recordId": uuid4().hex,
"audio": {
"audioType": "wav",
"sampleRate": 8000,
"channel": 1,
"sampleBytes": 2
}
}
try:
await ws.send(json.dumps(content))
with open(audioFile, 'rb') as f:
while True:
chunk = f.read(3200)
if not chunk:
await ws.send(bytes("", encoding="utf-8"))
break
await ws.send(chunk)
async for message in ws:
print(message)
resp = json.loads(message)
if 'dm' in resp:
break
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
ws.close()
async def skillSetting(ws):
# 做技能级配置。
content = {
"topic": "skill.settings",
"skillId": "2018040200000004",
"option": "set",
"settings": [
{
"key": "city",
"value": "苏州"
}
]
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
async def systemSetting(ws):
# 做系统级配置。
content = {
"topic": "system.settings",
"settings": [
{
"key": "location",
"value": {
"longitude": "80",
"latitude": "120",
"address": "china",
"city": "suzhou",
}
}
]
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
async def dds_demo():
# 云端对云端。
url = f"wss://dds.dui.ai/dds/v2/{alias}?serviceType=websocket&productId={productId}&apikey={apikey}"
print(url)
async with websockets.connect(url) as websocket:
await textRequest(websocket)
# await triggerIntent(websocket)
# await audioRequest(websocket)
# await skillSetting(websocket)
# await systemSetting(websocket)
asyncio.get_event_loop().run_until_complete(dds_demo())
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/szaispeech/dui-dds-server-api-samples.git
git@gitee.com:szaispeech/dui-dds-server-api-samples.git
szaispeech
dui-dds-server-api-samples
dui-dds-server-api-samples
master

搜索帮助