代码拉取完成,页面将自动刷新
import os
import re
import time
import pathlib
import requests
from urllib.parse import urljoin, urlparse, urlsplit, unquote
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
start_url = "http://www.weather.com.cn"
chrome_driver_path = r"D:\HuaweiMoveData\Users\33659\Desktop\python\chromedriver-win64\chromedriver.exe"
save_dir = pathlib.Path("images多线程") # 保存目录
# ====== 新增:最大“页数”与最大图片数量 ======
max_pages = 19 # 最多“爬取 19 页”(这里用 19 次滚动来模拟)
max_images = 119 # 最多下载 119 张图片
scroll_max_rounds = max_pages # 滚动次数上限,直接等于 max_pages
scroll_pause = 1.0 # 每次滚动后的停顿时间(秒)
request_timeout = 20 # 单张图片下载超时(秒)
max_workers = 16 # 并发下载线程数
max_retries = 3 # 每张图片最大重试次数
save_dir.mkdir(parents=True, exist_ok=True)
def is_jpg_png(url: str) -> bool:
if not url:
return False
url = url.strip().strip('\'"')
if url.lower().startswith("data:image"):
return False
path = urlsplit(url).path.lower()
return path.endswith(".jpg") or path.endswith(".jpeg") or path.endswith(".png")
def filename_from_url(url: str) -> str:
parts = urlsplit(url)
name = os.path.basename(parts.path)
name = unquote(name)
if not name:
name = "image"
return name
def ensure_unique_path(base_dir: pathlib.Path, name: str) -> pathlib.Path:
p = base_dir / name
if not p.exists():
return p
stem, ext = os.path.splitext(name)
idx = 1
while True:
candidate = base_dir / f"{stem}({idx}){ext}"
if not candidate.exists():
return candidate
idx += 1
def normalize_url(u: str, base: str) -> str:
if not u:
return ""
u = u.strip().strip('\'"')
if u.startswith("url(") and u.endswith(")"):
u = u[4:-1].strip('\'"')
if u.startswith("//"): # 协议相对
return "http:" + u
return urljoin(base, u)
def build_headers(referer: str):
return {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari",
"Referer": referer,
"Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
"Connection": "keep-alive",
}
def download_once(url: str, referer: str) -> pathlib.Path:
headers = build_headers(referer)
with requests.get(url, headers=headers, timeout=request_timeout, stream=True) as resp:
resp.raise_for_status()
name = filename_from_url(url)
if not os.path.splitext(name)[1]:
ct = resp.headers.get("Content-Type", "").lower()
if "png" in ct:
name += ".png"
elif "jpeg" in ct or "jpg" in ct:
name += ".jpg"
path = ensure_unique_path(save_dir, name)
with open(path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return path
def download_with_retry(url: str, referer: str) -> tuple[str, bool, str]:
# 返回 (url, 是否成功, 本地路径/错误信息)
for attempt in range(1, max_retries + 1):
try:
path = download_once(url, referer)
return (url, True, str(path))
except Exception as e:
err = f"{type(e).__name__}: {e}"
if attempt < max_retries:
time.sleep(0.5 * attempt)
else:
return (url, False, err)
def collect_image_urls(driver, base_url: str) -> set[str]:
img_urls = set()
# 1) 常见 <img> 懒加载属性
img_attrs = ["src", "data-src", "data-original", "data-url", "src2"]
for attr in img_attrs:
try:
elems = driver.find_elements(By.CSS_SELECTOR, f"img[{attr}]")
except Exception:
elems = []
for el in elems:
try:
val = el.get_attribute(attr)
full = normalize_url(val, base_url)
if is_jpg_png(full):
img_urls.add(full)
except Exception:
pass
# 2) CSS 背景图
elems = driver.find_elements(By.CSS_SELECTOR, "*")
for el in elems:
try:
bg = el.value_of_css_property("background-image")
if not bg or bg == "none":
continue
for m in re.findall(r'url\((.*?)\)', bg):
full = normalize_url(m, base_url)
if is_jpg_png(full):
img_urls.add(full)
except Exception:
pass
# 3) 源码兜底
html = driver.page_source
for m in re.findall(
r'<img[^>]+?(?:src|data-src|data-original|data-url|src2)\s*=\s*["\']([^"\']+)["\']',
html,
flags=re.IGNORECASE,
):
full = normalize_url(m, base_url)
if is_jpg_png(full):
img_urls.add(full)
return img_urls
def main():
chrome_options = Options()
# 如需无头模式,取消下一行注释
# chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
service = Service(chrome_driver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
driver.get(start_url)
# ====== 最多滚动 max_pages(=19) 次,相当于“最多爬取 19 页” ======
last_height = driver.execute_script("return document.body.scrollHeight;")
rounds = 0
while rounds < scroll_max_rounds:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(scroll_pause)
new_height = driver.execute_script("return document.body.scrollHeight;")
rounds += 1
if new_height == last_height:
break
last_height = new_height
img_urls = collect_image_urls(driver, start_url)
print(f"共发现候选图片 {len(img_urls)} 张(首页、JPG/PNG)")
# ====== 限制最多下载 max_images(=119) 张 ======
img_list = sorted(img_urls)
if len(img_list) > max_images:
print(f"由于设置了最多下载 {max_images} 张图片,只会下载前 {max_images} 个 URL。")
img_list = img_list[:max_images]
else:
print(f"将尝试下载全部 {len(img_list)} 张图片。")
ok = 0
fails = 0
futures = []
referer = start_url
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for url in img_list: # ✅ 只对截断后的列表提交任务
futures.append(executor.submit(download_with_retry, url, referer))
for fut in as_completed(futures):
url, success, info = fut.result()
if success:
ok += 1
print(f"[OK] {url} -> {info}")
else:
fails += 1
print(f"[FAIL] {url} - {info}")
print(f"下载完成:成功 {ok} / 实际尝试 {len(img_list)} / 失败 {fails}")
finally:
driver.quit()
if __name__ == "__main__":
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。