1 Star 0 Fork 0

fgfyo/itv

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
itvtest.py 8.65 KB
一键复制 编辑 原始数据 按行查看 历史
tonghaokeji 提交于 2024-02-23 09:09 . Add files via upload
import os
import re
import time
import datetime
import threading
from queue import Queue
import requests
import eventlet
eventlet.monkey_patch()
# 线程安全的队列,用于存储下载任务
task_queue = Queue()
# 线程安全的列表,用于存储结果
results = []
channels = []
error_channels = []
with open("itv.txt", 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
line = line.strip()
if line:
channel_name, channel_url = line.split(',')
if '卫视' in channel_name or 'CCTV' in channel_name or '农民' in channel_name or '戏曲' in channel_name or '梨园' in channel_name:
channels.append((channel_name, channel_url))
# 定义工作线程函数
def worker():
while True:
# 从队列中获取一个任务
channel_name, channel_url = task_queue.get()
try:
channel_url_t = channel_url.rstrip(channel_url.split('/')[-1]) # m3u8链接前缀
lines = requests.get(channel_url).text.strip().split('\n') # 获取m3u8文件内容
ts_lists = [line.split('/')[-1] for line in lines if line.startswith('#') == False] # 获取m3u8文件下视频流后缀
ts_lists_0 = ts_lists[0].rstrip(ts_lists[0].split('.ts')[-1]) # m3u8链接前缀
ts_url = channel_url_t + ts_lists[0] # 拼接单个视频片段下载链接
# 多获取的视频数据进行5秒钟限制
with eventlet.Timeout(5, False):
start_time = time.time()
content = requests.get(ts_url).content
end_time = time.time()
response_time = (end_time - start_time) * 1
if content:
with open(ts_lists_0, 'ab') as f:
f.write(content) # 写入文件
file_size = len(content)
# print(f"文件大小:{file_size} 字节")
download_speed = file_size / response_time / 1024
# print(f"下载速度:{download_speed:.3f} kB/s")
normalized_speed = min(max(download_speed / 1024, 0.001), 100) # 将速率从kB/s转换为MB/s并限制在1~100之间
#print(f"标准化后的速率:{normalized_speed:.3f} MB/s")
# 删除下载的文件
os.remove(ts_lists_0)
result = channel_name, channel_url, f"{normalized_speed:.3f} MB/s"
results.append(result)
numberx = (len(results) + len(error_channels)) / len(channels) * 100
print(f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。")
except:
error_channel = channel_name, channel_url
error_channels.append(error_channel)
numberx = (len(results) + len(error_channels)) / len(channels) * 100
print(f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。")
# 标记任务完成
task_queue.task_done()
# 创建多个工作线程
num_threads = 10
for _ in range(num_threads):
t = threading.Thread(target=worker, daemon=True)
#t = threading.Thread(target=worker, args=(event,len(channels))) # 将工作线程设置为守护线程
t.start()
#event.set()
# 添加下载任务到队列
for channel in channels:
task_queue.put(channel)
# 等待所有任务完成
task_queue.join()
def channel_key(channel_name):
match = re.search(r'\d+', channel_name)
if match:
return int(match.group())
else:
return float('inf') # 返回一个无穷大的数字作为关键字
# 对频道进行排序
results.sort(key=lambda x: (x[0], -float(x[2].split()[0])))
results.sort(key=lambda x: channel_key(x[0]))
now_today = datetime.date.today()
# 将结果写入文件
with open("itv_results.txt", 'w', encoding='utf-8') as file:
for result in results:
channel_name, channel_url, speed = result
file.write(f"{channel_name},{channel_url},{speed}\n")
with open("itv_speed.txt", 'w', encoding='utf-8') as file:
for result in results:
channel_name, channel_url, speed = result
file.write(f"{channel_name},{channel_url}\n")
result_counter = 8 # 每个频道需要的个数
with open("itvlist.txt", 'w', encoding='utf-8') as file:
channel_counters = {}
file.write('央视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
file.write('卫视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if '卫视' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
file.write('其他频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' not in channel_name and '卫视' not in channel_name and '测试' not in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
file.write(f"{now_today}更新,#genre#\n")
with open("itvlist.m3u", 'w', encoding='utf-8') as file:
channel_counters = {}
file.write('#EXTM3U\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"央视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"央视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
#file.write('卫视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if '卫视' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"卫视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"卫视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
#file.write('其他频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' not in channel_name and '卫视' not in channel_name and '测试' not in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"其他频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"其他频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
file.write(f"#EXTINF:-1 group-title=\"{now_today}更新\"\n")
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/fgfyo/itv.git
git@gitee.com:fgfyo/itv.git
fgfyo
itv
itv
main

搜索帮助