1 Star 0 Fork 5

kaguva / study-china学习强国

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.py 35.63 KB
一键复制 编辑 原始数据 按行查看 历史
 小明同学 提交于 2023-02-07 01:20 . 添加小白教程
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time, copy
import math
import random
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import getpass, json, requests
from myClass.adb import App, Element, Event # 调用自己写的类
from threading import Thread, BoundedSemaphore
url = "https://pc.xuexi.cn/points/my-points.html" # 我的积分页面
class studyClass:
# app.scanQRcode(element, event) # 点击扫一扫
# #设置谷歌浏览器driver的目录所在
# # 每次新建浏览器都是全新的,不共享缓存的,例1登录了,例2还是未登录的状态
# # 为了避免这样的问题,webdriver.Chrome在外部创建好之后传入 函数内
# options = webdriver.ChromeOptions()
# profile_directory = r'--user-data-dir=C:\Users\Win10Pro\AppData\Local\Google\Chrome\User Data' # 使用已登录的信息
# options.add_experimental_option("excludeSwitches", ["enable-automation"])
# options.add_experimental_option("useAutomationExtension", False)
# # 使用headless无界面浏览器模式
# # options.add_argument('--headless')
# # options.add_argument('--disable-gpu')
# options.add_argument(profile_directory)
# service = ChromeService(executable_path="D:\python\chromedriver_102.exe")
# browser = webdriver.Chrome(service=service, options=options)
# wait = WebDriverWait(browser, 10) #等待的最大时间s-----显式等待
# http://chromedriver.storage.googleapis.com/index.html
def newBrower(self):
options = webdriver.ChromeOptions()
# options.add_experimental_option("excludeSwitches", ["enable-automation"]) # 禁用自动化栏
# options.add_experimental_option("useAutomationExtension", False)
# options.add_experimental_option("excludeSwitches", ['enable-logging']) # 关闭一些无用的输出(主要是该步骤)
# options.add_argument('--headless') # 使用headless无界面浏览器模式
# 每次新建浏览器都是全新的,不共享缓存的,例1登录了,例2还是未登录的状态
# 为了避免这样的问题,webdriver.Chrome在外部创建好之后传入 函数内
# profile_directory = r'--user-data-dir=C:/Users/'+getpass.getuser()+'/AppData/Local/Google/Chrome/User Data' # 使用已登录的信息,该页面登录后,打开其他标签页同样是登录的状态
# options.add_argument(profile_directory)
# chromedriver下载地址:https://chromedriver.storage.googleapis.com/index.html
service = ChromeService(executable_path=".\chromedriver.exe") # 设置谷歌浏览器driver的目录所在
# service = ChromeService(executable_path="./chrome-win/chromedriver.exe")
browser = webdriver.Chrome(service=service, options=options)
browser.maximize_window()
return browser
# 每日答题 答题是否正确
def isCorrect(self, browser):
try:
# 如果能找到这个xpath,证明答题错误
errXpath = '//*[@id="app"]/div/div[2]/div/div[4]/div[3]/div/div[2]' # 查看提示的xpath
browser.find_element(By.XPATH, errXpath).text
print("答题错误~刷新重来")
return False
except:
return True
# 循环关闭窗口,只剩一个
def closeWindow(self, browser):
n_h = len(browser.window_handles)
# window_before_title = browser.title
for s in range(n_h - 1, 0, -1):
# print(s)
browser.switch_to.window(browser.window_handles[s])
# if browser.title != window_before_title:
browser.close()
time.sleep(1)
browser.switch_to.window(browser.window_handles[0])
# 窗口句柄
def switch_window(self, browser, index = -1):
windows = browser.window_handles
# print("windows------------",windows)
if len(windows)>=1:
browser.switch_to.window(windows[index])
# 退出登录
def logout(self, browser, wait, confi = False):
self.openWeb(browser,"https://www.xuexi.cn/")
if confi == True:
confirm = browser.switch_to.alert # 选择 confirm
confirm.accept() # 确定
self.switch_window(browser) # 句柄切换到当前窗口
xpath = '//*[@id="root"]/div/div[1]/header/div[2]/div[2]/span/a'
self.elementToClick(browser, wait, By.XPATH, xpath)
# print("成功退出,返回登录页面")
time.sleep(1)
self.openWeb(browser)
# 判断元素是否存在
def isElementPresent(self,browser, by, value):
try:
element = browser.find_element(by=by, value=value)
return element
except NoSuchElementException as e:
# 发生了NoSuchElementException异常,说明页面中未找到该元素,返回False
return False
else:
# 没有发生异常,表示在页面中找到了该元素,返回True
return True
# 打开浏览器
def openWeb(self,browser,uri=url):
# 由于默认设置的等级是warning,所以只有warning的信息会输出到控制台。
# 利用logging.basicConfig()打印信息到控制台
# logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)
#设置自动化打开的浏览器访问网址
self.switch_window(browser)
#打开浏览器,并访问设置的网址。
browser.get(uri)
# logging.warning("打开浏览器,并访问设置的网址")
time.sleep(1)
# browser.close()
return
# 今天已取得的分数
# @pointType -> 我要选读文章/视听学习/视听学习时长/每日答题
def getCurrPoint(self, browser, wait, cookie = {}, account = '', pointType = ''):
time_end = time.time() # 记录结束时间
time_end = time.localtime(int(time_end)) # 记录结束时间
Time2 = time.strftime("%Y-%m-%d %H:%M:%S", time_end)
if len(cookie) > 0:
cookieStr = self.handleCookieToHeaderStr(cookie)
headers = {
'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
'Cookie':cookieStr
}
response = json.loads(requests.get(url='https://pc-proxy-api.xuexi.cn/delegate/score/days/listScoreProgress?sence=score&deviceType=2', headers=headers).text)
if response['code'] == 200:
if pointType != '':
for val in response['data']['taskProgress']:
if val['title'] == pointType:
point = val['currentScore']
break
else:
point = response['data']['totalScore']
print(account + ":"+Time2+"已取得的分数:" , str(point))
return account + ":"+Time2+"已取得的分数:" + str(point)
else:
print(account + ":"+Time2+"获取分数失败")
return account + ":"+Time2+"获取分数失败"
else:
self.openWeb(browser, "https://pc.xuexi.cn/points/my-points.html")
point = self.elementLocated(browser,wait, By.XPATH, '//*[@id="app"]/div/div[2]/div/div[2]/div[2]/span[3]')
if point != False:
point = point.text
else:
print(account + ":"+Time2+"获取今天的分数错误" )
return account + ":"+Time2+"获取今天的分数错误"
print(account + ":"+Time2+"已取得的分数:" , str(point))
return account + ":"+Time2+"已取得的分数:" + str(point)
# 判断是否已登录
def isLogin(self, browser):
return browser.current_url == url
# 判断 试听学习 的分数是否已满1
def stStudyPoint(self, browser, wait, account = ''):
if browser.current_url != url:
self.openWeb(browser) # 打开网站
# 判断分数否已满
stPoint = self.elementLocated(browser, wait, By.XPATH, '//*[@id="app"]/div/div[2]/div/div[3]/div[2]/div[3]/div[2]/div[1]/div[2]') # 视听学习 分数
stTimePoinr = self.elementLocated(browser, wait, By.XPATH, '//*[@id="app"]/div/div[2]/div/div[3]/div[2]/div[4]/div[2]/div[1]/div[2]') # 视听学习时长 分数
if stPoint != None:
stPoint = stPoint.text
if stTimePoinr != None:
stTimePoinr = stTimePoinr.text
if stPoint == '6分/6分' and stTimePoinr == '6分/6分':
print(account + ":试听学习-分数已满")
return True
else:
print(account + ":试听学习-分数未满:" + stPoint + stTimePoinr)
return False
# 将结果输出到txt
def printResult(self, str = "测试写入结果"):
Note = open('运行结果.txt',mode='a') # 打开txt文件
Note.write(str + ' \n') # 写入数据
Note.close()
# 秒数转化为分秒
def str2sec(self, seconds):
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return str(m) + ":" + str(s)
# 点击页面元素
def elementToClick(self, browser, wait, type = By.XPATH, path=''):
try:
startVideo = wait.until(
#判断该元素是否加载完成
EC.element_to_be_clickable((type, path))
)
startVideo.click()
except NoSuchElementException as e:
# 发生了NoSuchElementException异常,说明页面中未找到该元素,返回False
return None
else:
# 没有发生异常,表示在页面中找到了该元素,返回True
return startVideo
# 页面元素是否已加载
def elementLocated(self, browser, wait, type = By.XPATH, path=''):
try:
element = wait.until(
#判断该元素是否加载完成
EC.presence_of_element_located((type, path))
)
except NoSuchElementException as e:
# 发生了NoSuchElementException异常,说明页面中未找到该元素,返回False
return None
else:
# 没有发生异常,表示在页面中找到了该元素,返回True
return element
# 试听学习/视听学习时长
# 共12分 每有效收听/观看一个积1分
def stStudy(self, browser, wait, account = ''):
stPoint = self.stStudyPoint(browser, wait, account)
if stPoint == False:
# 未满分的时候进入
video_url_list = [
"https://www.xuexi.cn/lgdata/3j2u3cttsii9.json",
"https://www.xuexi.cn/lgdata/1novbsbi47k.json",
"https://www.xuexi.cn/lgdata/31c9ca1tgfqb.json",
"https://www.xuexi.cn/lgdata/1oajo2vt47l.json",
"https://www.xuexi.cn/lgdata/18rkaul9h7l.json",
"https://www.xuexi.cn/lgdata/2qfjjjrprmdh.json",
"https://www.xuexi.cn/lgdata/3o3ufqgl8rsn.json",
"https://www.xuexi.cn/lgdata/525pi8vcj24p.json",
"https://www.xuexi.cn/lgdata/1742g60067k.json"
]
url = video_url_list[random.randint(0, len(video_url_list)-1)]
response = json.loads(requests.get(url=url).text)
if len(response) > 0:
for u in response:
browser.get(u['url'])
seeTime = 60 + random.randint(1,10)
# self.elementToClick(browser, wait, By.CLASS_NAME, 'outter')
time.sleep(10)
browser.execute_script("window.scrollTo(0,200)") # 滑动到具体位置--不动的话经常不加分
# print("向下滚动200像素")
time.sleep(10)
browser.execute_script("window.scrollTo(0,400)") # 滑动到具体位置--不动的话经常不加分
# print("向下滚动200像素")
time.sleep(seeTime-20) # 观看70s
# 每看完3个视频,就判断一下分数
stPoint = self.stStudyPoint(browser, wait, account)
if stPoint == True:
break
else:
# 未满分的时候进入
continue
else:
print("找不到试听学习视频")
# 判断 选读文章 的分数是否已满
def readPoint(self, browser, wait, account = ''):
if browser.current_url != url:
self.openWeb(browser) # 打开网站
# 判断分数否已满
readPoint = self.elementLocated(browser, wait, By.XPATH, '//*[@id="app"]/div/div[2]/div/div[3]/div[2]/div[2]/div[2]/div[1]/div[2]') # 选读文章 分数
if readPoint != None:
readPoint = readPoint.text
if readPoint == '12分/12分':
print(account + ":选读文章-分数已满")
return True
else:
print(account + ":选读文章-分数未满:"+readPoint)
return False
# 我要选读文章
# 共12分 每有效阅读一篇积1分,上限6分/有效阅读文章累计1分钟积1分,上限6分
def read(self, browser,wait, account = ''):
readpoints = self.readPoint(browser,wait,account)
if readpoints == False:
article_url_list = [
"https://www.xuexi.cn/lgdata/35il6fpn0ohq.json",
"https://www.xuexi.cn/lgdata/45a3hac2bf1j.json",
"https://www.xuexi.cn/lgdata/1ajhkle8l72.json",
"https://www.xuexi.cn/lgdata/1ahjpjgb4n3.json",
"https://www.xuexi.cn/lgdata/1je1objnh73.json",
"https://www.xuexi.cn/lgdata/1kvrj9vvv73.json",
"https://www.xuexi.cn/lgdata/17qonfb74n3.json",
"https://www.xuexi.cn/lgdata/1i30sdhg0n3.json"
]
url = article_url_list[random.randint(0, len(article_url_list)-1)]
response = json.loads(requests.get(url=url).text)
if len(response) > 0:
for u in response:
browser.get(u['url'])
seeTime = 70 + random.randint(1,10)
time.sleep(10)
browser.execute_script("window.scrollTo(0,200)") # 滑动到具体位置--不动的话经常不加分
# print("向下滚动200像素")
time.sleep(10)
browser.execute_script("window.scrollTo(0,400)") # 滑动到具体位置--不动的话经常不加分
# print("向下滚动200像素")
time.sleep(seeTime-20) # 观看70s
readpoints = self.readPoint(browser,wait,account)
if readpoints == True:
break
else:
continue
else:
print("找不到选读文章")
# 每日答题-分数是否已满
def answerPoint(self, browser, wait, account = ''):
if browser.current_url != url:
self.openWeb(browser) # 打开网站
# 判断分数否已满
answerPoint = self.elementLocated(browser, wait, By.XPATH, '//*[@id="app"]/div/div[2]/div/div[3]/div[2]/div[5]/div[2]/div[1]/div[2]')
if answerPoint != None:
answerPoint = answerPoint.text
if answerPoint == '5分/5分':
print(account + ":每日答题-分数已满")
return True
else:
print(account + ":每日答题-分数未满:"+answerPoint)
return False
# 每日答题-刷新页面 使用
def answerRefreshPage(self, browser):
browser.refresh() # 刷新页面重新答题
time.sleep(1)
confirm = browser.switch_to.alert # 选择 confirm
confirm.accept() # 确定
self.switch_window(browser) # 句柄切换到当前窗口
# 每日答题
# 共5分 每组答题每答对1道积1分
def answer(self,browser, wait, account = ''):
answerpoint = self.answerPoint(browser, wait, account)
if answerpoint == False:
xpath = '//*[@id="app"]/div/div[2]/div/div[3]/div[2]/div[5]/div[2]/div[2]/div'
# print("每日答题 -> 去答题~")
browser.find_element(By.XPATH, xpath).click()
time.sleep(2)
restart = True
while restart:
# 执行while之后变为False,此处是为了解决页面刷新后的重新循环
restart = False
for ansIndex in range(1, 6):
type = wait.until(
#判断该元素是否加载完成
EC.presence_of_element_located((By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[1]'))
)
# type = browser.find_element(By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[1]').text # 获取当前题目的类型
type = type.text[-3:] # 截取后3位字符
# print("题型:" + type)
redXpath = '//*[@id="body-body"]/div[4]/div/div/div/div[2]/div/div/div/font'
try:
tipsXpath = '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[3]/span' # 查看提示的xpath
browser.find_element(By.XPATH, tipsXpath).click() # 点击查看提示
except NoSuchElementException as e:
# 有时候tips的Xpath会变化.....
tipsXpath = '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[4]/span' # 查看提示的xpath
browser.find_element(By.XPATH, tipsXpath).click() # 点击查看提示
time.sleep(1)
tipsAns = browser.find_elements(By.XPATH, redXpath) # 查询提示里的红字有多少
if len(tipsAns) == 0:
# 发生了NoSuchElementException异常,说明页面中未找到该元素
# print("没有红字,提示 -> ",browser.find_element(By.XPATH, '//*[@id="body-body"]/div[4]/div/div/div/div[2]/div/div/div').text)
time.sleep(1)
browser.refresh() # 遇到无法识别的题目,刷新页面重新答题
time.sleep(1)
confirm = browser.switch_to.alert # 选择 confirm
confirm.accept() # 确定
self.switch_window(browser) # 句柄切换到当前窗口
restart = True
break
else:
newtipsAns = []
# 处理里面为空的font
for t in tipsAns:
if t.text != "":
newtipsAns.append(t)
tipsAns = newtipsAns
if type == '多选题' or type == '单选题':
selectArr = []
ans = browser.find_elements(By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[4]/div') # 查询出所有选择项
for xx in tipsAns:
for index, a in enumerate(ans):
# 如果提示的和答案相同,就点击该答案
# print("xx.text ? ", xx.text)
# print("a.text ? ", a.text)
if xx.text in a.text:
# print("选中答案", a.text)
selectArr.append(index + 1) # 数组里记录正确的答案
break
# print("selectArr------------", selectArr)
if len(selectArr) == 0:
# print("找不到正确答案,刷新页面重新答题")
time.sleep(1)
self.answerRefreshPage(browser) # 刷新页面
time.sleep(4)
restart = True
break
else:
# 循环点击正确答案
browser.find_element(By.XPATH, tipsXpath).click() # 再次点击查看提示,关闭提示的显示,否则提示挡住下一题的按钮会报错
# print("循环点击正确答案")
for clickAnd in selectArr:
browser.find_element(By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[4]/div['+str(clickAnd)+']').click()
time.sleep(1)
if type == '填空题':
browser.find_element(By.XPATH, tipsXpath).click() # 再次点击查看提示,关闭提示的显示,否则提示挡住下一题的按钮会报错
ansInput = browser.find_elements(By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[1]/div[2]/div/input') # 查询出所有输入框
for index, xx in enumerate(tipsAns):
# print(index, xx.text)
ansInput[index].send_keys(xx.text)
time.sleep(1)
# print("选择完毕,下一题")
time.sleep(1)
browser.find_element(By.XPATH, '//*[@id="app"]/div/div[2]/div/div[4]/div[2]/button').click() # 点击确定(下一题)
time.sleep(1)
iscorrect = self.isCorrect(browser) # 答题是否正确
if iscorrect == False:
# 答题错误进入
self.answerRefreshPage(browser) # 刷新页面
time.sleep(4) # 等待下一题的加载
restart = True
break
time.sleep(4) # 等待下一题的加载
# 判断每日答题是否出现滑块
def answerSubmit(self,browser):
try:
# tipsXpath = '//*[@id="nc_mask"]' # 查看滑块的xpath
tipsClassName = '_2Y1eU51c9CRrLai-L7vlJC' # 查看滑块的tipsClassName
display = browser.find_element(By.CLASS_NAME, tipsClassName).value_of_css_property("display")
if display == 'none':
return True
else:
return False
except NoSuchElementException as e:
return True
# 控制安卓
def androidPart(self, account):
self.app.wakeUpPhome() # 唤醒手机
self.app.runApp(self.app.appPackage, self.app.appActivity) # 启动app
time.sleep(5) # 等待5秒,等开屏动画完毕
islog = self.app.isLogin(self.element, self.event, account)
if islog == False:
# 登录失败就跳过当前账号且重启app
self.app.runApp(self.app.appPackage, self.app.appActivity) # 启动app
self.app.scanQRcode(self.element, self.event) # 点击扫一扫
# 安卓退出登录
# closePhone 退出账号后是否灭屏, 用于账号操作完毕后灭屏使用
def androidLogout(self, closePhone = False):
self.app.logout(self.element, self.event, closePhone)
return True
# 把cookie字典助理成请求头的cookie字符串
def handleCookieToHeaderStr(self, cookie:dict):
cookieStr = ''
for key,val in cookie.items():
cookieStr += str(key)+'='+str(val['value'])+"; "
return cookieStr
def __init__(self) -> None:
connectdevice = input('是否有手机连接(1有 0:无):')
self.number = int(connectdevice.strip())
if self.number == 1:
self.app = App()
self.element = Element()
self.event = Event()
pass
def AndroidRun(self):
f = open("账号密码.txt",encoding = "utf-8") # 以 utf-8 的编码格式打开指定文件
accounts = json.loads(f.read()) # 账号从txt里获取
f.close() # 关闭文件
accountsIndex = 0 # 当前学习账号的索引
runTime = time.strftime("%d", time.localtime()) # 开始时间,用于标记每天0点之后的重新操作
# 一个账号搞定了,退出后等待扫码继续操作
browser = self.newBrower()
wait = WebDriverWait(browser, 10) #等待的最大时间s-----显式等待
# browser.find_element(By.CLASS_NAME, 'outter').is_displayed
# browser.get('https://www.xuexi.cn/lgpage/detail/index.html?id=1448691652601485949&item_id=1448691652601485949')
# self.elementToClick(browser, wait, By.CLASS_NAME, 'outter')
# input("执行完毕,按回车键退出")
self.openWeb(browser, url)
while True:
while True:
self.openWeb(browser, url)
islogin = self.isLogin(browser)
if islogin == False:
# print("请扫码登录")
js = "window.scrollTo(0,document.body.scrollHeight)" # 滑动到底部,便于扫码
browser.execute_script(js)
if accountsIndex < len(accounts) and self.number == 1:
self.androidPart(accounts[accountsIndex]) # 安卓操作部分
accountsIndex += 1
else:
loginQRcode = wait.until(
#判断该元素是否加载完成
EC.element_to_be_clickable((By.XPATH, '//*[@id="app"]/div/div[2]/div/div[2]/div[1]/span'))
)
try:
loginQRcode.click() # 能找到刷新按钮就点击
time.sleep(30) # 每60s点击一次刷新
except:
browser.refresh() # 找不到刷新按钮就刷新页面
else:
# print("登录成功")
break
time_start = time.time() # 记录开始时间
self.stStudy(browser, wait) # 视听学习
self.read(browser, wait) # 选读文章
self.answer(browser, wait) # 每日答题
sub = self.answerSubmit(browser)
time_end = time.time() # 记录结束时间
time_sum = int(time_end - time_start) # 计算的时间差为程序的执行时间,单位为秒/s
time_end = time.localtime(int(time_end)) # 记录结束时间
time_start = time.localtime(time_start)
Time1 = time.strftime("%Y-%m-%d %H:%M:%S", time_start)
Time2 = time.strftime("%Y-%m-%d %H:%M:%S", time_end)
print("时间起止:", Time1 , "~" , Time2)
print("总用时(分钟):", self.str2sec(time_sum))
# print("帐号(" + str(accounts[accountsIndex-1]['user']) + ")")
CurrPoint = self.getCurrPoint(browser,wait) # 查询今天获取得分数,便于观察
self.printResult("时间起止:"+ Time1 + "~" + Time2)
self.printResult("总用时(分钟):" + self.str2sec(time_sum))
# self.printResult("帐号(" + str(accounts[accountsIndex-1]['user']) + ")")
self.printResult(CurrPoint)
self.printResult("-----------------------------------------------------")
self.printResult("-----------------------------------------------------")
if self.number == 1:
# 有手机连接时,才进行安卓操作
self.androidLogout(accountsIndex >= len(accounts))
if accountsIndex >= len(accounts):
self.app.adbExit() # 关闭adb interface使用, 便于弹出手机
browser.quit() # 账号操作完毕后,关闭浏览器
break
if sub == False:
# 如果每日答题提交的时候,出现滑块
self.logout(browser, wait, True)
else:
self.logout(browser, wait)
self.closeWindow(browser)
time.sleep(1)
input("执行完毕,按回车键退出")
# 直接学习,供多线程调用
def threadingRun(self, pool_sema:BoundedSemaphore, cookie:dict, account:str):
pool_sema.acquire()
browser = self.newBrower()
wait = WebDriverWait(browser, 10) #等待的最大时间s-----显式等待
self.openWeb(browser, url)
for key,val in cookie.items():
browser.add_cookie(val) # add_cookie需要在打开所在domain的网页后才能添加成功,否则domain error
browser.execute_script('document.title = "'+account+'"') # 更改浏览器窗口的标题
self.stStudy(browser, wait, account) # 视听学习
self.read(browser, wait, account) # 选读文章
self.answer(browser, wait, account) # 每日答题
print("帐号(" + account + ")")
CurrPoint = self.getCurrPoint(browser, wait, cookie, account) # 查询今天获取得分数,便于观察
self.printResult("帐号(" + account + ")")
self.printResult(CurrPoint)
self.printResult("-----------------------------------------------------")
self.printResult("-----------------------------------------------------")
browser.quit()
pool_sema.release()
# 创建cookie池
class cookiePool(object):
def __init__(self) -> None:
self.color = bcolors()
self.studyC = studyClass()
self.crontabSecound = 60*60*12 # 定时跑分延迟设置 单位 秒
def threadingRun(self):
max_connections = 7 # 定义最大线程数
pool_sema = BoundedSemaphore (max_connections) # 使用BoundedSemaphore方法
if self.studyC.number == 1:
self.createCookiePool() # 注释此处就不需要手机扫码登录了
cookies = json.loads(self.getCookie())
threads = []
for key, val in cookies.items():
t1 = Thread(name = key, target=self.studyC.threadingRun, args=(pool_sema, val,str(key), ))
threads.append(t1)
# 启动
for t2 in threads:
t2.start()
# t2.join() # 需要并发运行, 这里就得注释掉
# 定时跑分
def crontab(self):
while True:
time.sleep(self.crontabSecound)
print("定时检测跑分")
self.checkTokenExpiry() # 检测token有效期
self.threadingRun() # 执行多线程
# 检测token有效期---等修复setcookie方法后写此
def checkTokenExpiry(self):
nowtime = int(time.time()) # 记录结束时间
cookies = json.loads(self.getCookie())
copyCookies = copy.copy(cookies) # 此处深拷贝一个新的变量用于循环,cookies用于做del操作。 否则for cookies又del cookies 会报错
for key,val in copyCookies.items():
if val['token']['expiry'] < nowtime: # 此处用于修复settoken格式后使用
# token过期后删除token
print(self.color.WARNING + str(key) + ":token过期" + self.color.ENDC)
self.studyC.printResult(str(key) + ":token过期")
# code。。。 此处可以考虑打开新的页面供扫码登录
del cookies[key]
self.setCookie(json.dumps(cookies)) # 更新cookies文件
pass
# 获取用户账号密码
def getUserData(self):
f = open("账号密码.txt",encoding = "utf-8") # 以 utf-8 的编码格式打开指定文件
accounts = json.loads(f.read()) # 账号从txt里获取
f.close() # 关闭文件
return accounts
# 创建cookie池
def createCookiePool(self):
userData = self.getUserData() # 用户登录账号密码的json数据
cookies = json.loads(self.getCookie())
for val in userData:
if str(val['user']) in cookies:
# 判断cookie未过期的账号跳过安卓扫码登录,提高效率
print(str(val['user']) + "cookie 存在,跳过")
continue
browser = self.studyC.newBrower()
self.studyC.openWeb(browser, url)
js = "window.scrollTo(0,document.body.scrollHeight)" # 滑动到底部,便于扫码
browser.execute_script(js)
self.studyC.androidPart(val) # 安卓操作部分
islogin = self.studyC.isLogin(browser)
if islogin == False:
print(str(val['user'])+"登录失败")
else:
userCookie = self.getBrowserCookie(browser)
tempDict = {} # 临时cookie字典
for cookieVal in userCookie:
tempDict[str(cookieVal['name'])] = cookieVal
y = {str(val['user']) : tempDict} # 当前登录用户的token
x = json.loads(self.getCookie()) # 之前的所有cookie
z = {**x, **y} # 合并两个字典,如果key相同,则y覆盖x
self.setCookie(json.dumps(z))
browser.quit()
self.studyC.androidLogout(True)
self.studyC.app.adbExit() # 关闭adb interface使用, 便于弹出手机
# 获取浏览器cookie
def getBrowserCookie(self, browser):
return browser.get_cookies()
# 设置浏览器cookie
def setBrowserCookie(self, browser, cookie_dict:dict):
browser.add_cookie(cookie_dict)
# 获取cookie文件数据
def getCookie(self, mode='r'):
# 以只读方式打开文件。文件的指针将会放在文件的开头。这是默认模式。
str = ''
with open('cookies.txt',mode) as f:
str = f.read()
if str == '':
str = "{}"
return str
# 写入cookie文件数据
def setCookie(self, jsonStr, mode='w+'):
# w 打开一个文件只用于写入。如果该文件已存在则打开文件,并从开头开始编辑,即原有内容会被删除。如果该文件不存在,创建新文件。
with open('cookies.txt',mode) as f:
# 将cookies保存为json格式
f.write(jsonStr)
class bcolors(object):
# print("\033[1;30m 字体颜色:白色\033[0m")
# print("\033[1;31m 字体颜色:红色\033[0m")
# print("\033[1;32m 字体颜色:深黄色\033[0m")
# print("\033[1;33m 字体颜色:浅黄色\033[0m")
# print("\033[1;34m 字体颜色:蓝色\033[0m")
# print("\033[1;35m 字体颜色:淡紫色\033[0m")
# print("\033[1;36m 字体颜色:青色\033[0m")
# print("\033[1;37m 字体颜色:灰色\033[0m")
# print("\033[1;38m 字体颜色:浅灰色\033[0m")
WHITE = '\033[1;30m' # 白色
RED = '\033[1;31m' # 红色
WARNING = '\033[1;33m' # 浅黄色
ENDC = '\033[0m' # 结束符
BOLD = '\033[1m' # 加粗
UNDERLINE = '\033[4m' # 下划线
# 控制手机单个账号操作
def AndroidRun():
studyC = studyClass() # 此处实例class,在class里面的func调用的时候,self会自动传入,第一个参数self就不用手动传入了
studyC.AndroidRun()
# 多线程版本
def threadingRun():
threading = cookiePool()
threading.checkTokenExpiry() # 检测token有效期
threading.threadingRun() # 执行多线程
# threading.crontab() # 定时跑分
connectdevice = input('选择学习类型 (0手动单线程 1自动多线程):')
chooice = int(connectdevice.strip())
if chooice == 1:
threadingRun()
else:
AndroidRun()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/kaguva.agree/study-china.git
git@gitee.com:kaguva.agree/study-china.git
kaguva.agree
study-china
study-china学习强国
master

搜索帮助

Bbcd6f05 5694891 0cc6727d 5694891