代码拉取完成,页面将自动刷新
# -*- coding=utf-8 -*-
import json
from conf_example import *
import requests
import time
from PIL import Image
from json import loads
import getpass
import urllib
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import traceback
import logging
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#g_seat_code
g_seat_code_dict = {
"seatYZ":"1",
"seatRZ":"2",
"seatYW":"3",
"seatRW":"4",
"seatGJRW":"6",
"seatTDZ":"P",
"seatWZ":"WZ",
"seatEDZ":"O",
"seatYDZ":"M",
"seatSWZ":"9",
}
logger = logging.getLogger('shuapiao')
session = requests.session()
#restart conn
def restart_conn(conn):
print "restart connection"
conn.close()
global session
session.get('kyfw.12306.cn', timeout=100,verify=False)
#装饰器
def retries(max_tries):
def dec(func, conn=session):
def f2(*args, **kwargs):
tries = range(max_tries)
tries.reverse()
for tries_remaining in tries:
try:
return func(*args, **kwargs)
except requests.HTTPError as e:
print "conneciont error"
# restart_conn(conn)
except Exception as e:
if tries_remaining > 0:
traceback.print_exc()
logger.error("errror %d" % tries_remaining)
logger.error(traceback.format_exc())
else:
raise e
return f2
return dec
class LoginTic(object):
def __init__(self):
self.headers = {
"Accept":"*/*",
"Accept-Encoding":"gzip, deflate, br",
"Accept-Language":"zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.4",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36",
"Host":"kyfw.12306.cn",
"If-Modified-Since":"0",
"Pragma":"no-cache",
"Referer":"https://kyfw.12306.cn/otn/leftTicket/init",
"X-Requested-With":"XMLHttpRequest"
}
self.proxy_ext_header = {
"Accept": "*/*",
"X-Requested-With":"XMLHttpRequest",
"Referer": "https://kyfw.12306.cn/otn/login/init#",
"Accept-Language": "zh-cn",
"Accept-Encoding": "gzip, deflate",
"Proxy-Connection": "Keep-Alive",
"Pragma": "no-cache",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
# 创建一个网络请求session实现登录验证
self.session = requests.session()
# 加载站点信息
self.loadCityName()
# 获取验证码图片
def getImg(self):
url = "https://kyfw.12306.cn/passport/captcha/captcha-image?login_site=E&module=login&rand=sjrand"
response = self.session.get(url=url,headers=self.headers,verify=False)
# 把验证码图片保存到本地
with open('img.jpg','wb') as f:
f.write(response.content)
# 用pillow模块打开并解析验证码,这里是假的,自动解析以后学会了再实现
try:
im = Image.open('img.jpg')
# 展示验证码图片,会调用系统自带的图片浏览器打开图片,线程阻塞
im.show()
# 关闭,只是代码关闭,实际上图片浏览器没有关闭,但是终端已经可以进行交互了(结束阻塞)
im.close()
except:
print u'请输入验证码'
#=======================================================================
# 根据打开的图片识别验证码后手动输入,输入正确验证码对应的位置,例如:2,5
# ---------------------------------------
# | | |
# 1 | 2 | 3 | 4
# | | |
# ---------------------------------------
# | | |
# 5 | 6 | 7 | 8
# | | |
# ---------------------------------------
#=======================================================================
captcha_solution = raw_input(u'请输入验证码位置1-8,以","分割[例如2,5]:')
return captcha_solution
# 获取提交订单验证码
def getOrderCode(self):
url = "https://kyfw.12306.cn/otn/passcodeNew/getPassCodeNew?module=passenger&rand=randp&0.9064388337623583"
response = self.session.get(url=url,headers=self.headers,verify=False)
if response.status_code == 200:
return True
else:
return False
# 验证结果
def checkYanZheng(self,solution):
# 分割用户输入的验证码位置
soList = solution.split(',')
# 由于12306官方验证码是验证正确验证码的坐标范围,我们取每个验证码中点的坐标(大约值)
yanSol = ["",'35,35','105,35','175,35','245,35','35,105','105,105','175,105','245,105']
yanList = []
for item in soList:
print item
yanList.append(yanSol[int(item)])
# 正确验证码的坐标拼接成字符串,作为网络请求时的参数
yanStr = ','.join(yanList)
checkUrl = "https://kyfw.12306.cn/passport/captcha/captcha-check"
data = {
'login_site':'E', #固定的
'rand':'sjrand', #固定的
'answer':yanStr #验证码对应的坐标,两个为一组,跟选择顺序有关,有几个正确的,输入几个
}
# 发送验证
cont = self.session.post(url=checkUrl,data=data,headers=self.headers,verify=False)
# 返回json格式的字符串,用json模块解析
dic = loads(cont.content)
code = dic['result_code']
# 取出验证结果,4:成功 5:验证失败 7:过期
if str(code) == '4':
return True
else:
return False
# 发送登录请求的方法
def loginTo(self):
# 用户输入用户名,这里可以直接给定字符串
if not g_username:
userName = raw_input('请输入您的12306用户名:')
else:
userName = g_username
# 用户输入密码,这里也可以直接给定
# pwd = raw_input('Please input your password:')
# 输入的内容不显示,但是会接收,一般用于密码隐藏
if not g_password:
pwd = getpass.getpass('输入密码:')
else:
pwd = g_password
loginUrl = "https://kyfw.12306.cn/passport/web/login"
data = {
'username':userName,
'password':pwd,
'appid':'otn'
}
result = self.session.post(url=loginUrl,data=data,headers=self.headers,verify=False)
dic = loads(result.content)
print result.content
mes = dic['result_message']
# 结果的编码方式是Unicode编码,所以对比的时候字符串前面加u,或者mes.encode('utf-8') == '登录成功'进行判断,否则报错
if mes == u'登录成功':
#POST https://kyfw.12306.cn/passport/web/auth/uamtk
self.headers["Referer"] = "https://kyfw.12306.cn/otn/passport?redirect=/otn/login/userLogin"
cont = self.session.post("https://kyfw.12306.cn/passport/web/auth/uamtk",data={"appid":"otn"},headers=self.headers,verify=False)
tempAuth = loads(cont.text)
newAppTK = tempAuth["newapptk"]
#POST /otn/uamauthclient
cont = self.session.post("https://kyfw.12306.cn/otn/uamauthclient",data={"tk":newAppTK},headers=self.headers,verify=False)
#直接在这里check
self.checkUser()
print u'恭喜你,登录成功,可以购票!'
else:
print u'对不起,登录失败,请检查登录信息!'
#查询余票信息
@retries(5)
def refreshTicket(self,start,end):
print u"########## 步骤3:查询余票 #########"
# "https://kyfw.12306.cn/otn/leftTicket/query?leftTicketDTO.train_date=2017-10-19&leftTicketDTO.from_station=TNN&leftTicketDTO.to_station=HKN&purpose_codes=ADULT"
self.session.headers["Referer"] = "https://kyfw.12306.cn/otn/leftTicket/init"
url_query = "https://kyfw.12306.cn/otn/leftTicket/queryX?"+urllib.urlencode(g_query_data)
print u"开始查询: %s" % url_query
want_special = False
if len(g_buy_list) != 0:
want_special = True
print u"JUST For:%s" % (','.join(g_buy_list))
else:
print u"车次 出发 到达 时间 到达 历时 商务座 特等座 一等座 二等座 高级软卧 软卧 硬卧 软座 硬座 无座 其他备注"
q_cnt = 0
while 1:
q_cnt = q_cnt + 1
time.sleep(2)
cont = self.session.get(url_query, verify=False)
res = cont.text
try:
res_json = json.loads(res)
except Exception as e:
print u"解析票信息失败,变更地址重试"
#考虑可能是请求地址发生变动,换取另一个请求地址
url_query = "https://kyfw.12306.cn/otn/leftTicket/query?"+urllib.urlencode(g_query_data)
continue
if res_json['status'] != True:
url_query = "https://kyfw.12306.cn/otn/leftTicket/query?"+urllib.urlencode(g_query_data)
print u"请求余票信息失败,正在重试 %s" % res
continue
result = {}
ret = self.do_ticket(res_json, result, want_special)
if ret == 0:
break
elif ret == -2:
print u"no ticket, refresh %d times!" % q_cnt
time.sleep(g_query_sleep_time)
continue
elif ret == -3:
print res_json["messages"]
return True
# 载入城市信息
def loadCityName(self):
cityInfo = open("./infos/station_code.txt")
cityStr = cityInfo.read()
cityArr = cityStr.split("@")
temp = {}
for city in cityArr:
cityDetail = city.decode("utf-8").split("|")
temp[cityDetail[2]] = cityDetail
self.cityInfo = temp
print u"载入车站信息成功"
# 将列车数据解析成为对象
def parseTicketInfo(self,ticketInfo):
return dict(
trainSercet=ticketInfo[0],
trainBtnText=ticketInfo[1],
trainCode=ticketInfo[2],
trainNum=ticketInfo[3],
trainStart=ticketInfo[4],
trainEnd=ticketInfo[5],
trainFrom=ticketInfo[6],
trainTo=ticketInfo[7],
trainStartTime=ticketInfo[8],
trainEndTime=ticketInfo[9],
trainDuringTime=ticketInfo[10],
enableBuy=ticketInfo[11],
noticeText=ticketInfo[12],
startDate=ticketInfo[13],
trainLocationCode=ticketInfo[15],
seatSWZ=ticketInfo[32],
seatTDZ=ticketInfo[25],
seatYDZ=ticketInfo[31],
seatEDZ=ticketInfo[30],
seatGJRW=ticketInfo[21],
seatRW=ticketInfo[23],
seatYW=ticketInfo[28],
seatRZ=ticketInfo[24],
seatYZ=ticketInfo[29],
seatWZ=ticketInfo[26]
)
# 完成抢票任务
def do_ticket(self, json_data, result, want_special):
"""返回结果。-1表示请求失败重试,-2表示结果为空,0表示锁定到数据了"""
if "data" not in json_data:
return -3
if 'result' not in json_data['data']:
return -1
elif len(json_data["data"]["result"]) == 0:
return -2
else:
for item in json_data['data']['result']:
ticketInfo = self.parseTicketInfo(item.split("|"))
if ticketInfo["trainSercet"] == "" or ticketInfo["enableBuy"] == "N":
continue
if want_special and not ticketInfo["trainNum"] in g_buy_list:
continue
if ticketInfo["trainNum"] in g_ignore_list:
continue
has_ticket = False
for care_type in g_care_seat_types:
if ticketInfo[care_type] != "--" and ticketInfo[care_type] != u"无" and ticketInfo[care_type] != "" and ticketInfo[care_type] != "0":
has_ticket = True
continue
if has_ticket:
result[ticketInfo["trainNum"]] = ticketInfo
#query return none, retry
if not result:
return -2
#as the list prority
if want_special:
for train_code in g_buy_list:
if not result.has_key(train_code):
continue
ret = self.buy(result[train_code])
if not ret:
print u"Err during buy"
return -1
else:
return 0
#show all
for train_code, itemInfo in result.items():
self.show_ticket(itemInfo)
#get promote
cmd = raw_input(u"请输入想要购买的车次号:[r|q|K101]:")
cmd = cmd.strip()
print u"input:%s" % cmd
if cmd == "r":
print u"retry"
return -2
elif cmd == "q":
print u"quit"
return 0
else:
print u"buy ticket:%s" % cmd
if cmd not in result.keys():
print u"输入的车次不存在可购买列表中"
self.do_ticket(json_data,result,want_special)
ret = self.buy(result[cmd])
if not ret:
print u"Err during buy"
return -1
else:
return 0
#打印票的信息
def show_ticket(self, it):
print it['trainNum'], self.cityInfo[it['trainStart']][1],self.cityInfo[it['trainEnd']][1],it['trainStartTime'], it['trainEndTime'],it['trainDuringTime'], \
it['seatSWZ'],it['seatTDZ'], it['seatYDZ'],it['seatEDZ'],it['seatGJRW'], it['seatRW'],it['seatYW'],it['seatRZ'],it['seatWZ'],it['enableBuy']
#购买票
def buy(self, item):
#Step4
if not self.checkUser():
return False
#Step5
if not self.submitOrderRequest(item):
return False
#Step6
if not self.confirmPassenger_get_token():
return False
# self.proxy_ext_header["Referer"] = "https://kyfw.12306.cn/otn/confirmPassenger/initDc#nogo"
#Step7
# self.get_passenger_info()
#Step8
if not self.check_order_code():
return False
#Step9
if not self.checkOrderInfo():
return False
#Step10
if not self.getQueueCount(item):
return False
#Step11
if not self.confirmSingleForQueue(item["trainLocationCode"]):
return False
if not self.queryOrderWaitTime():
return False
#Step13
if not self.resultOrderForDcQueue():
return False
return True
#检查用户
def checkUser(self):
print u"########### 步骤4:检查用户 #########"
url_check_info = "https://kyfw.12306.cn/otn/login/checkUser"
data = [
('_json_att', ''),
]
post_data = urllib.urlencode(data)
print post_data
print u"开始检查用户: " #% post_data
cont = self.session.get(url_check_info,headers=self.headers,verify=False)
data = cont.text
res_json = json.loads(data)
print u"接受到检查用户:"
if not res_json['data'].has_key('flag') or res_json['data']['flag'] != True:
print u"用户检查失败, %s" % res_json
print u"尝试重新登录"
self.getImg()
self.loginTo()
else:
return True
#提交订单请求
def submitOrderRequest(self, item):
print u"############# 步骤5:提交订单请求 #########"
url_submit = "https://kyfw.12306.cn/otn/leftTicket/submitOrderRequest"
post_data = "secretStr=" + item['trainSercet']+"&train_date=" \
+ "2017-10-14" \
+ "&back_train_date=" + "2017-10-14" \
+ "&tour_flag=dc&purpose_codes=ADULT&query_from_station_name=" \
+ item['trainFrom'] \
+ "&query_to_station_name="+item['trainEnd'] \
+ "&undefined"
print post_data
print u"开始提交请求: " #% post_data
cont = self.session.post(url_submit + "?" + post_data, data = post_data.encode("utf8"), headers=self.headers,verify=False)
data = cont.text
res_json = json.loads(data)
if res_json['status'] != True:
if res_json["messages"][0].find(u"您还有未处理的订单") != -1:
print u"已经成功提交订单,抢票成功。"
exit(0)
else:
print u"提交订单失败"
print data
print ''.join(res_json['messages']).encode('gb2312')
return False
else:
return True
#确定乘客信息的token
def confirmPassenger_get_token(self):
print u"#############步骤6:确定乘客信息 #########"
url_confirm_passenger = "https://kyfw.12306.cn/otn/confirmPassenger/initDc"
cont = self.session.get(url_confirm_passenger, headers=self.headers,verify=False)
data = cont.text
# if cont.headers['Content-Encoding'] == 'gzip':
# tmp = StringIO.StringIO(data)
# gzipper = gzip.GzipFile(fileobj=tmp)
# data = gzipper.readlines()
key_word = "globalRepeatSubmitToken"
key_find = False
line_token = ''
line_request_info = ''
for line in data.split(u"\n"):
if line.startswith(u' var globalRepeatSubmitToken = '.encode("utf8")):
line_token = line.decode("utf8")
continue
elif line.startswith(u' var ticketInfoForPassengerForm'.encode("utf8")):
line_request_info = line.decode("utf8")
key_find = True
break
if key_find:
self.globalRepeatSubmitToken = line_token.split('=')[1].strip()[1:-2]
print u"Update globalRepeatSubmitToken=%s" % self.globalRepeatSubmitToken
req_data = line_request_info.split('=')[1].strip()[:-1]
req_data = req_data.replace("null", "''")
req_data = req_data.replace("true", "True")
req_data = req_data.replace("false", "False")
print u"line_request_info"
req_json = eval(req_data)
self.key_check_isChange = req_json['key_check_isChange']
self.leftTicketStr = req_json['leftTicketStr']
print u"Update key_check_isChange=%s" % self.key_check_isChange
return True
else:
print u"globalRepeatSubmitToken not found"
return False
#验证提交订单处的验证码
def check_order_code(self):
print u"############## 步骤8:验证订单提交处的验证码 #########"
ret = False
module = 'passenger'
rand_method = 'randp'
yan = self.getOrderCode()
return yan
#合并用户乘客信息str
def construct_passengerTicketStr(self,seatType="1"):
print u"###construct_passengerTicketStr###"
str1 = ''
str2 = ''
for p in g_passengers:
str1 = str1 + seatType +',0,1,' + p['name'] + ',1,' + p['id'] + ','+ p['tel']+ ',N_'
str2 = str2 + p['name'] + ',1,' + p['id'] + ',1_'
str1 = str1[:-1]
self.passengerTicketStr = str1.encode('utf8')
self.oldPassengerStr = str2.encode('utf8')
#验证订单信息
def checkOrderInfo(self):
print u"#################步骤9:校验订单信息 #########"
url_check_order = "https://kyfw.12306.cn/otn/confirmPassenger/checkOrderInfo"
#确定默认的座位信息,K默认为硬座,D默认为二等
seatType = g_seat_code_dict[g_care_seat_types[0]]
self.construct_passengerTicketStr(seatType)
data = [
("cancel_flag", "2"),
("bed_level_order_num", "000000000000000000000000000000"),
("passengerTicketStr", self.passengerTicketStr),
("oldPassengerStr", self.oldPassengerStr),
("tour_flag","dc"),
("randCode",""),
("_json_att", ''),
("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),
]
post_data = urllib.urlencode(data)
print post_data
print u"发送订单请求=====>"
#print u"cancel_flag=2&bed_level_order_num=000000000000000000000000000000&passengerTicketStr=1%2C0%2C1%2C%E6%9C%B1%E5%AD%94%E6%B4%8B%2C1%2C320721198711180812%2C13430680458%2CN&oldPassengerStr=%E6%9C%B1%E5%AD%94%E6%B4%8B%2C1%2C320721198711180812%2C1_&tour_flag=dc&randCode=ewgw&_json_att=&REPEAT_SUBMIT_TOKEN=ad51ea02d933faf91d3d2eaeb5d85b3e"
cont = self.session.post(url_check_order, data=post_data, headers=self.proxy_ext_header, verify=False)
data = cont.text
res_json = json.loads(data)
print u"获取订单返回结果:%s" % res_json
if res_json['status'] != True or res_json['data']['submitStatus'] != True:
print u"确认订单出错:%s" % res_json['data']['errMsg']
return False
return True
#获取队列信息
def getQueueCount(self, item):
print u"###########步骤十:获取队列数量 #########"
url_queue_count = "https://kyfw.12306.cn/otn/confirmPassenger/getQueueCount"
#buy_date = 'Sun Jan 5 00:00:00 UTC+0800 2014'
tlist = time.ctime().split()
tlist[3] = '00:00:00'
tlist.insert(4, 'UTC+0800')
buy_date = ' '.join(tlist)
s_type = ""
for t_type in g_care_seat_types:
if item[t_type] != "--" and item[t_type] != u"无":
s_type = g_seat_code_dict[t_type]
break
if not s_type:
print u"座位已经被抢完了,无法提交"
return False
data = [
("train_date", buy_date),
("train_no", item['trainCode']),
("stationTrainCode",item['trainNum']),
("seatType", s_type),
("fromStationTelecode", item['trainFrom'].encode("utf-8")),
("toStationTelecode", item['trainEnd'].encode("utf-8")),
("leftTicket",item['noticeText']),
("purpose_codes", "00"),
("_json_att", ''),
("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),
]
post_data = urllib.urlencode(data)
print u"send getQueueCount=====>"
print post_data
cont = self.session.post(url_queue_count, data=post_data, headers=self.proxy_ext_header,verify=False)
data = cont.text
res_json = json.loads(data)
print u"recv getQueueCount:%s" % res_json
if res_json['status'] != True:
print u"getQueueCount error :%s" % res_json
return False
return True
#从队列确认消息
def confirmSingleForQueue(self,locationCode):
print u"##############步骤十一:获取信号从队列中 #########"
url_check_info = "https://kyfw.12306.cn/otn/confirmPassenger/confirmSingleForQueue"
"&choose_seats=&seatDetailType=000&roomType=00&dwAll=N"
data = [
('passengerTicketStr', self.passengerTicketStr),
("oldPassengerStr", self.oldPassengerStr),
('randCode', ""),
('purpose_codes', "00"),
('key_check_isChange', self.key_check_isChange),
('leftTicketStr', self.leftTicketStr),
('train_location', locationCode),
('choose_seats',""),
('seatDetailType',"000"),
('roomType',"00"),
('dwAll','N'),
('_json_att', ''),
("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),
]
post_data = urllib.urlencode(data)
print post_data
print u"send confirmSingleForQueue=====>" #% post_data
cont = self.session.post(url_check_info, data=post_data, headers=self.proxy_ext_header,verify=False)
data = cont.text
res_json = json.loads(data)
print u"recv confirmSingleForQueue"
if not res_json['data'].has_key('submitStatus') or res_json['data']['submitStatus'] != True:
print u"confirmSingleForQueue failed, %s" % res_json
return False
else:
return True
#查询订单等待时间
def queryOrderWaitTime(self):
print u"###################步骤十二:查询订单等待时间 #########"
url_query_wait = "https://kyfw.12306.cn/otn/confirmPassenger/queryOrderWaitTime?"
cnt = 0
while 1:
data = [
('random', int(time.time())),
("tourFlag", "dc"),
('_json_att', ''),
("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),
]
url_query_wait = url_query_wait + urllib.urlencode(data)
print u"send queryOrderWaitTime:%d=====>" % cnt #% url
cont = self.session.request('GET', url_query_wait, headers=self.proxy_ext_header, verify=False)
data = cont.text
res_json = json.loads(data)
print u"recv queryOrderWaitTime:%s" % res_json
cnt = cnt + 1
if not res_json.has_key('data') or res_json['data']['queryOrderWaitTimeStatus'] != True:
print u"queryOrderWaitTime error"
print res_json['messages']
break
if res_json['data']['waitCount'] == 0:
self.orderId = res_json['data']['orderId']
if not self.orderId:
self.orderId = res_json['data']['requestId']
print u"Update orderId:%s" % self.orderId
break
else:
continue
return True
#订单查询
def resultOrderForDcQueue(self):
print u"###############步骤十三:resultOrderForDcQueue #########"
url_result = "https://kyfw.12306.cn/otn/confirmPassenger/resultOrderForDcQueue"
data = [
('orderSequence_no', self.orderId),
('_json_att', ''),
("REPEAT_SUBMIT_TOKEN", self.globalRepeatSubmitToken),
]
post_data = urllib.urlencode(data)
print u"send resultOrderForDcQueue=====>" #% url
cont = self.session.request('POST', url_result, data=post_data, headers=self.proxy_ext_header,verify=False)
data = cont.text
res_json = json.loads(data)
print u"recv queryOrderWaitTime"
if res_json["status"] != True:
print u"submit error"
print data
return False
else:
print u"#################抢票成功,请自行登录12306网站支付订单#########"
return True
if __name__ == '__main__':
# checkYanZheng('0,3')
login = LoginTic()
chek = False
#只有验证成功后才能执行登录操作
while not chek:
yan = login.getImg()
chek = login.checkYanZheng(yan)
if chek:
print u'验证通过!'
else:
print u'验证失败,请重新验证!'
login.loginTo()
#开始刷票
login.construct_passengerTicketStr()
login.refreshTicket("BJ","WH")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。