1 Star 1 Fork 0

Kongbai / Bdown

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
util.py 11.04 KB
一键复制 编辑 原始数据 按行查看 历史
kongbai 提交于 2022-01-07 12:21 . 修复title获取不到
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project :get_blibli_movie
@File :util.py
@IDE :PyCharm
@Author :Kongbai
@Date :2021/11/2 18:06
'''
import json
import os
import re
from lxml import etree
import requests
from contextlib import closing
from requests import RequestException
from retry import retry # TODO(Kongbai):修改请求重连三次的方式
class Bilibili(object):
def __init__(self, movie_url, user_file):
self.user_file = user_file
self.cookie = self.__get_user_info()
self.headers_common = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.49",
"referer": "https://www.bilibili.com/",
}
self.movie_url = movie_url
self.movie_title = None
def __get_user_info(self):
""""
从json文件中获取用户cookie以及id
:return:
"""
with open(self.user_file, 'r', encoding='utf8') as fp:
json_data = json.load(fp)
cookie = json_data['cookie']
return cookie
@retry(RequestException, tries=3, delay=1)
def get_movie_base_info(self):
"""
根据视频url,获取有关视频的基本详情信息
:param url:
:return:
"""
# 获取视频title,played_count,barrage,up_time,
response = requests.get(url=self.movie_url, headers=self.headers_common, timeout=(5, 10),
cookies=self.cookie).text
html = etree.HTML(response)
title = html.xpath('//*[@id="viewbox_report"]/h1/text()')[0]
played_count = html.xpath('//*[@id="viewbox_report"]/div/span[1]/text()')[0]
barrage = html.xpath('//*[@id="viewbox_report"]/div/span[2]/text()')[0]
up_time = html.xpath('//*[@id="viewbox_report"]/div/span[3]/text()')[0]
self.movie_title = title
return {"title": title, "played_count": played_count, "barrage": barrage, "up_time": up_time}
@retry(RequestException, tries=3, delay=1)
def get_movie_pages_info(self):
"""
获取视频的章节列表信息
:param movie_url:
:return:
"""
bvid = self.movie_url.split('/')[4].split('?')[0]
api_url = 'https://api.bilibili.com/x/player/pagelist?bvid={}&jsonp=jsonp'.format(bvid)
response = requests.get(url=api_url, headers=self.headers_common, timeout=(5, 10), cookies=self.cookie).json()
return response
class DownMovie:
def __init__(self, movie_title, page_url, cid, page_name, cookie, result_dirname, temp_dirname):
self.cookie = cookie
self.headers_common = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.49",
"referer": "https://www.bilibili.com/",
}
self.movie_title = self.filename_filter(movie_title)
self.page_name = page_name
self.page_url = page_url
self.cid = cid
self.temp_dirname = temp_dirname
self.result_dirname = os.path.join(result_dirname, self.movie_title)
self.movie_data = self.get_movie_palyurl_data() # 获取该视频的各种信息
self.url_video = self.movie_data['data']['dash']['video'][0]['base_url']
self.url_audio = self.movie_data['data']['dash']['audio'][0]['base_url']
@staticmethod
def filename_filter(filename):
filename = re.sub('[\/:*?"<>|]', '', filename)
return filename
@retry(RequestException, tries=3, delay=1)
def get_movie_palyurl_data(self):
"""
根据需要下载的章节url,获取该章节关于音频所有信息,包括品质,av url,等等
"""
response = requests.get(url=self.page_url, headers=self.headers_common, timeout=(5, 10),
cookies=self.cookie).text
html = etree.HTML(response)
script = html.xpath('//script[contains(text(),"window.__playinfo__")]/text()')[0]
data = json.loads(script.split('=', maxsplit=1)[1])
# print(data)
return data
# @retry(RequestException, tries=3, delay=1)
# def get_movie_av_url(self):
# """
# 暂时停用,获取下载视频的音频与视频的url,以及支持的视频品质
# """
# headers = {
# "authority": "api.bilibili.com",
# "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.49",
# "referer": self.page_url,
# "origin": "https://www.bilibili.com",
# }
# bvid = self.page_url.split('/')[4].split('?')[0]
# session=self.movie_data['session']
# api_url = f"https://api.bilibili.com/x/player/playurl?cid={self.cid}&qn=16&type=&otype=json&fourk=1&bvid={bvid}&fnver=0&fnval=976&session={session}"
# response = requests.get(url=api_url, headers=headers, timeout=(5, 10)).json()
# return response['data']['dash']['video'][0]['base_url'], response['data']['dash']['audio'][0]['base_url']
# @retry(RequestException, tries=3, delay=1)
# def get_movie_palyurl(self):
# """
# 暂时停用,用获取下载视频的音频与视频的url,以及支持的视频品质
# """
# headers = {
# "authority": "api.bilibili.com",
# "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.49",
# "referer": self.page_url,
# "origin": "https://www.bilibili.com",
# }
# bvid = self.page_url.split('/')[4].split('?')[0]
# api_url = f"https://api.bilibili.com/x/player/playurl?cid={self.cid}&qn=16&type=&otype=json&fourk=1&bvid={bvid}&fnver=0&fnval=976&session={self.session}"
# response = requests.get(url=api_url, headers=headers, timeout=(5, 10)).json()
# return response
@retry(RequestException, tries=3, delay=1)
def down_video(self, *args):
"""
根据视频url,下载视频
"""
table_obj = args[0]
row_obj = args[1]
item = table_obj.item(row_obj, "values")
response = requests.get(url=self.url_video, headers=self.headers_common, timeout=(6, 150), stream=True,
cookies=self.cookie)
total_size = int(response.headers['Content-Length'])
temp_size = 0
with closing(response) as res:
with open('{}/{}.mp4'.format(self.temp_dirname, self.page_name), 'wb') as fd:
for chunk in res.iter_content(chunk_size=1024 * 1000 * 5):
temp_size += len(chunk)
table_obj.item(row_obj, values=(
item[0], item[1], item[2], item[3], f'{round(100 * temp_size / total_size, 2)} %',
item[5]))
if chunk:
fd.write(chunk)
return True
@retry(RequestException, tries=3, delay=1)
def down_audio(self):
"""
根据音频url,下载音频
"""
response = requests.get(url=self.url_audio, headers=self.headers_common, timeout=(6, 150), stream=True,
cookies=self.cookie)
with closing(response) as res:
with open('{}/{}.mp3'.format(self.temp_dirname, self.page_name), 'wb') as fd:
for chunk in res.iter_content(chunk_size=1024 * 10000):
if chunk:
fd.write(chunk)
return True
def compose(self):
"""
将音频与视频合并成一个文件
"""
if not os.path.exists(self.result_dirname):
os.mkdir(self.result_dirname)
video_path = "{}/{}.mp4".format(self.temp_dirname, self.page_name)
audio_path = "{}/{}.mp3".format(self.temp_dirname, self.page_name)
result_path = os.path.join(self.result_dirname, f"{self.page_name}.mp4")
from subprocess import run
try:
command = r'ffmpeg -i "{}" -i "{}" -c copy "{}" -loglevel quiet'.format(video_path, audio_path, result_path)
run(command, shell=True)
except:
return False
else:
# 合成完毕,删除临时文件
os.remove("{}/{}.mp4".format(self.temp_dirname, self.page_name))
os.remove("{}/{}.mp3".format(self.temp_dirname, self.page_name))
return True
class UserCollection:
def __init__(self, user_file):
self.headers = ""
self.user_file = user_file
self.user_id, self.cookie = self.__get_user_info()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.49",
}
def __get_user_info(self):
"""
从json文件中获取用户cookie以及id
:return:
"""
with open(self.user_file, 'r', encoding='utf8') as fp:
json_data = json.load(fp)
cookie = json_data['cookie']
user_id = cookie['DedeUserID']
return user_id, cookie
@retry(RequestException, tries=3, delay=1)
def get_user_fav_list(self):
"""
根据用户id,以及收藏夹的cookies,返回所有的收藏夹列表
"""
url = "https://api.bilibili.com/x/v3/fav/folder/created/list-all?up_mid={}&jsonp=jsonp".format(self.user_id)
response = requests.get(url=url, headers=self.headers, timeout=5, cookies=self.cookie).json()
return response
@retry(RequestException, tries=3, delay=1)
def get_user_fav_movie_list(self, dir_id):
"""
根据一个收藏夹的id,以及收藏夹的cookies,返回该收藏夹所有的视频列表
"""
url = "https://api.bilibili.com/x/v3/fav/resource/list?media_id={}&pn=1&ps=20&keyword=&order=mtime&type=0&tid=0&platform=web&jsonp=jsonp".format(
dir_id)
response = requests.get(url=url, headers=self.headers, timeout=5, cookies=self.cookie).json()
return response
if __name__ == '__main__':
user_file = r"C:\Users\admin\.BilibliDown\user_info.json"
movie_url = "https://www.bilibili.com/video/BV1By4y1y711"
with open(user_file, 'r', encoding='utf8') as fp:
config_data = json.load(fp)
bili_obj = Bilibili(movie_url=movie_url, user_file=user_file)
base_info = bili_obj.get_movie_base_info()
page_info = bili_obj.get_movie_pages_info()
print(base_info, page_info)
movie_title = page_info['data'][0]['part']
page_url = movie_url + '?p' + str(page_info['data'][0]['page'])
cid = page_info['data'][0]['cid']
page_name = page_info['data'][0]['part']
result_dirname = config_data['result_dirname']
temp_dirname = config_data['temp_dirname']
cookie = config_data['cookie']
# 初始化下载对象
down_obj = DownMovie(movie_title=movie_title, page_url=page_url, cid=cid, page_name=page_name,
cookie=cookie, result_dirname=result_dirname, temp_dirname=temp_dirname)
down_obj.down_video()
Python
1
https://gitee.com/Kongbaizdl/Bdown.git
git@gitee.com:Kongbaizdl/Bdown.git
Kongbaizdl
Bdown
Bdown
master

搜索帮助