1 Star 0 Fork 0

庄靖轩/数据采集1

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
task1单线程.py 5.60 KB
一键复制 编辑 原始数据 按行查看 历史
庄靖轩 提交于 2025-11-17 11:57 +08:00 . 作业代码
import os
import re
import time
import pathlib
import requests
from urllib.parse import urljoin, urlparse, urlsplit, unquote, urlunsplit
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
# ====== 可配置项 ======
start_url = "http://www.weather.com.cn"
chrome_driver_path = r"D:\HuaweiMoveData\Users\33659\Desktop\python\chromedriver-win64\chromedriver.exe"
save_dir = pathlib.Path("images单线程")
scroll_max_rounds = 19 # 最多爬取19 页
request_timeout = 20 # 单张图片下载超时(秒)
max_images = 119 # 最多下载 119 张图片
save_dir.mkdir(parents=True, exist_ok=True)
def is_jpg_png(url: str) -> bool:
if not url:
return False
url = url.strip().strip('\'"')
if url.lower().startswith("data:image"):
return False
path = urlsplit(url).path.lower()
return path.endswith(".jpg") or path.endswith(".jpeg") or path.endswith(".png")
def filename_from_url(url: str) -> str:
parts = urlsplit(url)
path = parts.path
name = os.path.basename(path)
name = unquote(name)
if not name:
name = "image"
return name
def ensure_unique_path(base_dir: pathlib.Path, name: str) -> pathlib.Path:
p = base_dir / name
if not p.exists():
return p
stem, ext = os.path.splitext(name)
idx = 1
while True:
candidate = base_dir / f"{stem}({idx}){ext}"
if not candidate.exists():
return candidate
idx += 1
def normalize_url(u: str, base: str) -> str:
if not u:
return ""
u = u.strip().strip('\'"')
if u.startswith("url(") and u.endswith(")"):
u = u[4:-1].strip('\'"')
if u.startswith("//"):
return "http:" + u
return urljoin(base, u)
def download_image(url: str, referer: str):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari",
"Referer": referer,
"Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
}
resp = requests.get(url, headers=headers, timeout=request_timeout, stream=True)
resp.raise_for_status()
name = filename_from_url(url)
if not os.path.splitext(name)[1]:
ct = resp.headers.get("Content-Type", "").lower()
if "png" in ct:
name += ".png"
elif "jpeg" in ct or "jpg" in ct:
name += ".jpg"
path = ensure_unique_path(save_dir, name)
with open(path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"[OK] {url} -> {path}")
return True
except Exception as e:
print(f"[FAIL] {url} - {e}")
return False
chrome_options = Options()
# chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
service = Service(chrome_driver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
driver.get(start_url)
# ====== 限制最多“滚动 19 次” ======
last_height = driver.execute_script("return document.body.scrollHeight;")
rounds = 0
while rounds < scroll_max_rounds:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(1.0)
new_height = driver.execute_script("return document.body.scrollHeight;")
rounds += 1
if new_height == last_height:
break
last_height = new_height
img_urls = set()
img_attrs = ["src", "data-src", "data-original", "data-url", "src2"]
for attr in img_attrs:
try:
elems = driver.find_elements(By.CSS_SELECTOR, f"img[{attr}]")
except Exception:
elems = []
for el in elems:
try:
val = el.get_attribute(attr)
full = normalize_url(val, start_url)
if is_jpg_png(full):
img_urls.add(full)
except Exception:
pass
elems = driver.find_elements(By.CSS_SELECTOR, "*")
for el in elems:
try:
bg = el.value_of_css_property("background-image")
if not bg or bg == "none":
continue
for m in re.findall(r'url\((.*?)\)', bg):
full = normalize_url(m, start_url)
if is_jpg_png(full):
img_urls.add(full)
except Exception:
pass
html = driver.page_source
for m in re.findall(
r'<img[^>]+?(?:src|data-src|data-original|data-url|src2)\s*=\s*["\']([^"\']+)["\']',
html,
flags=re.IGNORECASE
):
full = normalize_url(m, start_url)
if is_jpg_png(full):
img_urls.add(full)
print(f"共发现候选图片 {len(img_urls)} 张(仅当前首页、JPG/PNG)")
# ====== 限制最多下载 119 张图片 ======
ok = 0
for url in sorted(img_urls):
if ok >= max_images: # 达到上限就停止
print(f"已达到最大下载数量 {max_images} 张,停止下载。")
break
if download_image(url, referer=start_url):
ok += 1
print(f"下载完成:成功 {ok} / 发现 {len(img_urls)}")
finally:
driver.quit()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhuang-jingxuan/data.git
git@gitee.com:zhuang-jingxuan/data.git
zhuang-jingxuan
data
数据采集1
master

搜索帮助