Ai
1 Star 1 Fork 1

青果网络/python-proxy-sdk

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
pool.py 2.79 KB
一键复制 编辑 原始数据 按行查看 历史
青果网络 提交于 2023-01-06 09:39 +08:00 . init
import logging
import time
import requests
from common import utils
DePoolCollectAddr = "https://proxy.qg.net/replace"
DstPoolCollectAddr = "https://proxy.qg.net/extract"
DsnPoolCollectAddr = "https://proxy.qg.net/allocate"
class ProxyIP:
def __init__(self, ip, serverAddr, deadline):
self.ip = ip
self.serverAddr = serverAddr
self.deadline = deadline
self.usable = True
def markUnusable(self):
self.usable = False
class IPPool:
def __init__(self, url, logger):
self.url = url
self.ips = list()
self.cursor = 0
self.logger = logger
def __delete__(self, ip):
for i, p in enumerate(self.ips):
if p != ip:
continue
self.ips.remove(ip)
def collector(self):
try:
resp = requests.get(self.url).json()
except Exception as e:
return self.logger.exception("failed to collector resource: ", str(e))
if resp["Code"] != 0:
return self.logger.warning(resp["Msg"])
ips = []
for r in resp["Data"]:
ip = ProxyIP(r['IP'], r['host'], r['deadline'])
ips.append(ip)
self.ips =ips
self.cursor = 0
def getIPs(self):
return self.ips
def next(self):
global ip
count = 0
while True:
try:
ip = self.ips[self.cursor]
except IndexError:
continue
self.cursor = (self.cursor+1)%len(self.ips)
if utils.ConvertTimestamp(ip.deadline) - time.time() > 3 and ip.usable :
break
else:
self.__delete__(ip)
if count > len(self.ips):
return
count+=1
return ip
async def autoRefreshPool(self, interval):
while True:
time.sleep(interval)
try:
self.collector()
except Exception as e :
self.logger.exception("failed to auto refresh pool:", str(e))
def NewDynamicExclusiveIPPool(auth, cap):
pool = IPPool(
url= f"{DePoolCollectAddr}?Key={auth.key}&Pwd={auth.pwd}&Num={cap}",
logger= logging.getLogger("DeIPPool")
)
pool.collector()
return pool
def NewDynamicShareByNumIPPool(auth, cap):
pool = IPPool(
url= f"{DsnPoolCollectAddr}?Key={auth.key}&Pwd={auth.pwd}&Num={cap}",
logger= logging.getLogger("DsnIPPool")
)
pool.collector()
return pool
def NewDynamicShareByTimeIPPool(auth, cap):
pool = IPPool(
url= f"{DstPoolCollectAddr}?Key={auth.key}&Pwd={auth.pwd}&Num={cap}",
logger= logging.getLogger("DeIPPool")
)
pool.collector()
return pool
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/qgnet/python-proxy-sdk.git
git@gitee.com:qgnet/python-proxy-sdk.git
qgnet
python-proxy-sdk
python-proxy-sdk
master

搜索帮助