139 Star 667 Fork 256

mktime/python-learn

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
downloads
images
models
qsbk
.gitignore
2.png
ExcelHelper.py
FSM.py
LICENSE
README.md
SSEncrypt.module
ai-agent.py
arch-install.md
audio-rtmp.py
auto_build.py
baidu_ocr.py
balance_config.ini
balance_query.py
balance_query_batch.txt
bencode.py
bloom_1.cpp
bloom_1.py
bloom_2.py
capture-camera.py
check_https_domain_exp_date.py
cnn-demo.py
contacts.xml
crack_test.py
crack_wifi.txt
data-structure.py
dcm2jpg.py
decrypt_remmina.py
delicious.md
delicious.xml
demo.c
dht_client.py
dou2.py
download.py
egcd.py
export_delicious.sh
face-detect.py
fetchurl.py
find_repeat.py
findbig.py
five-elements.py
flv2mp4.py
get_bookmarks.py
get_version.py
gpt35.py
howto_terminate_thread.py
http_proxy.py
i3_config
import_ess_picture.py
ipfind.py
joseph-cycle.py
kimi.py
learn-class.py
learn-docker.md
learn-thread.py
learn_thread.py
linux-tips.md
merge_excel.py
monitor.sh
mydict.py
mysql_init.sql
new.txt
ocr_words.py
orm.py
parse_excel.py
parse_json.py
payload.py
pm25.py
print_ascii.py
problem.py
python-learn-1.py
python-learn-2.py
qiubai.py
remote.sh
repeated.sql
rpc_server.py
scan.py
scan2.py
score.py
screen.sh
send_fangtang.py
sina_user.json
smz_report.py
sock5.py
sock_client.py
sock_serv.py
start-audio.sh
sync_aliyun.py
system_update.py
template.xml
test_img2pdf.py
test_pdf2images.py
torrent_client.py
totp.py
tray.py
update.ico
urls.txt
v2ex.py
v2ex.txt
wifi-keep.sh
wind_crawl.py
x_test.npy
x_train.npy
xml2vcf.py
y_test.npy
y_train.npy
克隆/下载
audio-rtmp.py 4.03 KB
一键复制 编辑 原始数据 按行查看 历史
import pyaudio
import subprocess
import sys
import noisereduce as nr
import numpy as np
from datetime import datetime, timedelta
import hashlib
# 设置参数
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
RTMP_URL = 'rtmp://127.0.0.1/live/stream' # 替换为你的 RTMP 服务器地址
DEVICE_INDEX = 0
'''
rtmp服务器搭建指南:
1.编译安装nginx和rtmp module,下载源码.
```
https://nginx.org/download/nginx-1.27.2.tar.gz
https://github.com/arut/nginx-rtmp-module.git
```
生成makefile
```
./configure --prefix=/Users/linus/nginx \
--with-cc-opt=-Wno-unused-but-set-variable \
--add-module=/Users/linus/nginx-rtmp-module;
```
2.配置nginx,在nginx.conf末尾追加
```
rtmp {
    server {
        listen 1935;  # RTMP 端口
        chunk_size 4096;
        allow publish 127.0.0.1;
        deny publish all;
        application live {
            live on;  # 开启直播
            record off;  # 关闭录制(如果不需要)
        }
    }
}
```
3.启动nginx
4.运行本代码
python audio-rtmp.py
5.打开vlc,选择网络播放,地址
rtmp://127.0.0.1/live/stream
'''
def start_rtmp_server():
# 初始化 PyAudio
audio = pyaudio.PyAudio()
# 打开音频流
stream = audio.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# 启动 FFmpeg 进程
ffmpeg_cmd = [
'ffmpeg',
'-f', 's16le', # 输入格式
'-ar', str(RATE), # 采样率
'-ac', str(CHANNELS), # 声道数
'-i', '-', # 输入来自标准输入
'-c:a', 'aac', # 音频编码器
'-f', 'flv', # 输出格式
RTMP_URL
]
process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
print(f"开始录音并推送到 RTMP 服务器{RTMP_URL}")
try:
while True:
try:
# 从音频流中读取数据
data = stream.read(CHUNK)
# 将字节数据转换为 NumPy 数组
audio_data = np.frombuffer(data, dtype=np.int16)
# 进行降噪处理
reduced_noise = nr.reduce_noise(y=audio_data, sr=RATE)
# 将处理后的数据转换回字节格式
output_data = reduced_noise.astype(np.int16).tobytes()
# 将处理后的数据写入 FFmpeg 进程
process.stdin.write(output_data)
except BrokenPipeError:
print('[warn] pipe write.')
#except Exception:
# print('[warn] opps.')
except KeyboardInterrupt:
print("停止录音.")
# 清理资源
stream.stop_stream()
stream.close()
audio.terminate()
process.stdin.close()
process.wait()
def choose_device():
# 初始化 PyAudio
audio = pyaudio.PyAudio()
# 列出所有音频输入设备
for i in range(audio.get_device_count()):
device_info = audio.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
print(f"Device {i}: {device_info['name']}")
audio.terminate()
print(40 * '*')
print('>>请输入麦克风编号:', end='', flush=True)
DEVICE_INDEX = sys.stdin.readline().strip()
def create_rtmp_url():
'''如果使用腾讯云直播服务,创建推流地址'''
domain = 'push.tlivecloud.com'
app_name = 'live'
stream_name = 'stream'
key = 'key123'
current_date = datetime.now()
next_month = current_date + timedelta(days=30)
_time = int(next_month.timestamp())
hex_time = str(hex(_time))[2:].upper()
_txSecret = key + stream_name + hex_time
txSecret = hashlib.md5(_txSecret.encode('utf-8')).hexdigest()
txTime = hex_time
rtmp_url = f'rtmp://{domain}/{app_name}/{stream_name}?txSecret={txSecret}&txTime={txTime}'
print('rtmp_url:', rtmp_url)
return rtmp_url
if __name__ == '__main__':
#choose_device()
start_rtmp_server()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/mktime/python-learn.git
git@gitee.com:mktime/python-learn.git
mktime
python-learn
python-learn
master

搜索帮助