Ai
1 Star 0 Fork 0

从一开始/speech python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
speech_client.py 12.69 KB
一键复制 编辑 原始数据 按行查看 历史
从一开始 提交于 2025-07-23 17:32 +08:00 . 优化
import pyaudio
import requests
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog
import threading
import time
import json
from datetime import datetime, timedelta
import textwrap
class VoiceRecognitionApp:
def __init__(self, root):
self.root = root
self.root.title("语音识别工具")
self.root.geometry("700x600")
self.root.resizable(True, True)
# 确保中文显示正常
self.setup_ui_fonts()
# 音频配置参数
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.CHUNK = 1024
# 状态变量
self.is_recording = False
self.audio_frames = []
self.recording_start_time = None
self.recording_duration = 0
self.server_url = "http://117.72.50.177:5000/recognize"
self.display_all_fields = True # 默认显示全部字段
self.display_fields = [] # 当不显示全部字段时使用
# 创建UI
self.create_widgets()
# 初始化音频
self.p = pyaudio.PyAudio()
self.stream = None
# 定时器更新录音时间
self.update_timer()
def setup_ui_fonts(self):
"""设置UI字体以确保中文正常显示"""
default_font = ('SimHei', 10)
self.root.option_add("*Font", default_font)
def create_widgets(self):
"""创建应用程序界面组件"""
# 创建主框架
main_frame = ttk.Frame(self.root, padding="20")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(main_frame, text="语音识别工具", font=('SimHei', 16, 'bold'))
title_label.pack(pady=(0, 20))
# 录音控制区域
control_frame = ttk.LabelFrame(main_frame, text="录音控制", padding="10")
control_frame.pack(fill=tk.X, pady=(0, 15))
# 录音时间显示
self.time_label = ttk.Label(control_frame, text="00:00:00", font=('SimHei', 14))
self.time_label.pack(side=tk.LEFT, padx=20)
# 控制按钮
button_frame = ttk.Frame(control_frame)
button_frame.pack(side=tk.RIGHT)
self.start_btn = ttk.Button(button_frame, text="开始录音", command=self.start_recording)
self.start_btn.pack(side=tk.LEFT, padx=5)
self.stop_btn = ttk.Button(button_frame, text="停止录音", command=self.stop_recording, state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT, padx=5)
# 服务器设置区域
server_frame = ttk.LabelFrame(main_frame, text="服务器设置", padding="10")
server_frame.pack(fill=tk.X, pady=(0, 15))
ttk.Label(server_frame, text="服务器地址:").pack(side=tk.LEFT)
self.server_entry = ttk.Entry(server_frame)
self.server_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.server_entry.insert(0, self.server_url)
save_server_btn = ttk.Button(server_frame, text="保存", command=self.save_server_url)
save_server_btn.pack(side=tk.RIGHT, padx=5)
# 结果显示区域
result_frame = ttk.LabelFrame(main_frame, text="识别结果", padding="10")
result_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 15))
result_controls = ttk.Frame(result_frame)
result_controls.pack(fill=tk.X, pady=(0, 5))
# 显示全部/自定义字段切换
self.display_mode_var = tk.StringVar(value="all")
all_radio = ttk.Radiobutton(result_controls, text="显示全部结果",
variable=self.display_mode_var, value="all",
command=self.toggle_display_mode)
all_radio.pack(side=tk.LEFT, padx=5)
custom_radio = ttk.Radiobutton(result_controls, text="自定义显示字段",
variable=self.display_mode_var, value="custom",
command=self.toggle_display_mode)
custom_radio.pack(side=tk.LEFT, padx=5)
self.configure_fields_btn = ttk.Button(result_controls, text="配置显示字段",
command=self.configure_display_fields, state=tk.DISABLED)
self.configure_fields_btn.pack(side=tk.RIGHT)
# 结果文本框
self.result_text = tk.Text(result_frame, wrap=tk.WORD, height=12)
self.result_text.pack(fill=tk.BOTH, expand=True)
# 添加滚动条
scrollbar = ttk.Scrollbar(self.result_text, command=self.result_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.result_text.config(yscrollcommand=scrollbar.set, state=tk.DISABLED)
# 状态栏
self.status_var = tk.StringVar()
self.status_var.set("就绪")
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def toggle_display_mode(self):
"""切换显示模式(全部/自定义)"""
if self.display_mode_var.get() == "all":
self.display_all_fields = True
self.configure_fields_btn.config(state=tk.DISABLED)
else:
self.display_all_fields = False
self.configure_fields_btn.config(state=tk.NORMAL)
# 如果有结果,重新显示
self.result_text.config(state=tk.NORMAL)
current_result = self.result_text.get(1.0, tk.END).strip()
self.result_text.config(state=tk.DISABLED)
if current_result and hasattr(self, 'last_result'):
self.display_result(self.last_result)
def start_recording(self):
"""开始录音"""
if self.is_recording:
return
self.is_recording = True
self.audio_frames = []
self.recording_start_time = time.time()
# 更新UI状态
self.start_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
self.status_var.set("正在录音...")
# 启动录音线程
self.recording_thread = threading.Thread(target=self.record_audio)
self.recording_thread.daemon = True
self.recording_thread.start()
def record_audio(self):
"""录音线程函数"""
try:
self.stream = self.p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
while self.is_recording:
data = self.stream.read(self.CHUNK)
self.audio_frames.append(data)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("错误", f"录音失败: {str(e)}"))
self.root.after(0, self.stop_recording)
def stop_recording(self):
"""停止录音并发送到服务器"""
if not self.is_recording:
return
self.is_recording = False
self.recording_duration = time.time() - self.recording_start_time
# 停止音频流
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# 更新UI状态
self.start_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
self.status_var.set("正在处理...")
# 启动发送线程
threading.Thread(target=self.send_audio_to_server, daemon=True).start()
def send_audio_to_server(self):
"""将音频发送到服务器进行识别"""
try:
# 拼接音频数据
audio_data = b''.join(self.audio_frames)
# 发送到服务器
response = requests.post(self.server_url, data=audio_data, timeout=30)
response.raise_for_status() # 如果响应状态码不是200,会抛出异常
result = response.json()
self.last_result = result # 保存最后一次结果用于重新显示
# 在UI线程中更新结果
self.root.after(0, lambda: self.display_result(result))
self.root.after(0, lambda: self.status_var.set("识别完成"))
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("错误", f"发送失败: {str(e)}"))
self.root.after(0, lambda: self.status_var.set("就绪"))
def display_result(self, result):
"""显示识别结果,支持显示全部字段或自定义字段"""
self.result_text.config(state=tk.NORMAL)
self.result_text.delete(1.0, tk.END)
try:
# 显示全部字段
if self.display_all_fields:
# 如果是字典类型,按键值对显示
if isinstance(result, dict):
for key, value in result.items():
# 处理长文本换行
if isinstance(value, str) and len(value) > 80:
wrapped_text = textwrap.fill(value, width=80)
self.result_text.insert(tk.END, f"{key}:\n{wrapped_text}\n\n")
else:
self.result_text.insert(tk.END, f"{key}: {value}\n\n")
# 如果是列表类型
elif isinstance(result, list):
for i, item in enumerate(result):
self.result_text.insert(tk.END, f"项目 {i+1}:\n{str(item)}\n\n")
# 其他类型直接显示
else:
self.result_text.insert(tk.END, f"识别结果:\n{str(result)}\n")
# 显示自定义字段
else:
if not self.display_fields or len(self.display_fields) == 0:
self.result_text.insert(tk.END, "请先配置要显示的字段\n")
return
if isinstance(result, dict):
for field in self.display_fields:
if field in result:
self.result_text.insert(tk.END, f"{field}: {result[field]}\n\n")
else:
self.result_text.insert(tk.END, f"{field}: 未找到该字段\n\n")
else:
self.result_text.insert(tk.END, f"识别结果: {str(result)}\n")
except Exception as e:
self.result_text.insert(tk.END, f"显示结果时出错: {str(e)}\n\n")
self.result_text.insert(tk.END, f"原始结果: {str(result)}\n")
self.result_text.config(state=tk.DISABLED)
def update_timer(self):
"""更新录音时间显示"""
if self.is_recording and self.recording_start_time:
elapsed = time.time() - self.recording_start_time
time_str = str(timedelta(seconds=int(elapsed)))
self.time_label.config(text=time_str)
# 每100毫秒更新一次
self.root.after(100, self.update_timer)
def save_server_url(self):
"""保存服务器地址设置"""
new_url = self.server_entry.get().strip()
if new_url:
self.server_url = new_url
messagebox.showinfo("提示", "服务器地址已更新")
else:
messagebox.showwarning("警告", "服务器地址不能为空")
def configure_display_fields(self):
"""配置要显示的结果字段"""
current_fields = ", ".join(self.display_fields)
fields = simpledialog.askstring(
"配置显示字段",
"请输入要显示的结果字段,用逗号分隔:\n(例如: text,confidence)",
initialvalue=current_fields
)
if fields is not None:
# 处理输入,去除空格并分割
self.display_fields = [field.strip() for field in fields.split(',') if field.strip()]
messagebox.showinfo("提示", "显示字段已更新")
# 如果有结果,重新显示
if hasattr(self, 'last_result'):
self.display_result(self.last_result)
def on_close(self):
"""关闭应用程序时的清理工作"""
self.is_recording = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = VoiceRecognitionApp(root)
root.protocol("WM_DELETE_WINDOW", app.on_close)
root.mainloop()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/onlyMeStyle/speech-python.git
git@gitee.com:onlyMeStyle/speech-python.git
onlyMeStyle
speech-python
speech python
main

搜索帮助