1 Star 0 Fork 0

罗若元/python

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
shiyan4.py 16.54 KB
Copy Edit Raw Blame History
罗若元 authored 2025-06-08 13:52 +08:00 . 实验四
# coding: utf-8
# Project:PythonProject5
# File:shiyan4.py
# Author:罗若元
# Date :2025/6/8 13:51
# IDE:PyCharm
import os
import time
import random
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import threading
import sys
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
class JDSpider:
def __init__(self):
self.driver = None
self.root = None
self.progress_var = None
self.result_text = None
self.status_label = None
self.product_name = ""
self.cookies_path = os.path.join(os.getcwd(), "browser_cookies")
def setup_driver(self):
"""设置Edge浏览器驱动(无头模式)"""
try:
edge_options = Options()
# 添加这些参数解决崩溃问题
edge_options.add_argument("--no-sandbox")
edge_options.add_argument("--disable-dev-shm-usage")
edge_options.add_argument("--remote-debugging-port=9222")
edge_options.add_argument("--disable-extensions")
# 启用无头模式核心参数 [1,6,7](@ref)
edge_options.add_argument("--headless") # 无界面运行
edge_options.add_argument("--disable-gpu") # 禁用GPU加速(避免潜在问题)
# 原有配置保持不变
edge_options.add_argument("--no-sandbox")
edge_options.add_argument("--disable-dev-shm-usage")
edge_options.add_argument("--log-level=3")
edge_options.add_argument("--disable-extensions")
edge_options.add_argument("--disable-infobars")
edge_options.add_argument("--disable-notifications")
edge_options.add_argument("--disable-popup-blocking")
edge_options.add_argument(f"--user-data-dir={self.cookies_path}")
# 随机化用户代理(保持不变)
user_agents = [...] # 原有UA列表
edge_options.add_argument(f"--user-agent={random.choice(user_agents)}")
# 添加窗口大小参数(推荐)[1,5](@ref)
edge_options.add_argument("window-size=1920,1080") # 防止元素不可见
# 驱动路径配置(保持不变)
if getattr(sys, 'frozen', False):
driver_path = os.path.join(os.path.dirname(sys.executable), 'msedgedriver.exe')
else:
driver_path = os.path.join(os.getcwd(), 'msedgedriver.exe')
service = Service(driver_path)
self.driver = webdriver.Edge(service=service, options=edge_options)
self.driver.set_page_load_timeout(60)
return True
except Exception as e:
print(f"驱动设置失败: {str(e)}")
return None
def is_logged_in(self):
"""检查是否已登录京东(通过检查是否存在'切换账号'元素)"""
try:
self.driver.get("https://www.jd.com")
time.sleep(1)
# 尝试查找"切换账号"元素
try:
# 使用XPath查找包含"切换账号"文本的元素
switch_account = self.driver.find_element(By.XPATH, "//*[contains(text(), '切换账号')]")
if switch_account.is_displayed():
return True
except NoSuchElementException:
pass
except Exception as e:
print(f"未登录: {str(e)}")
return False
def login_jd(self):
"""手动登录京东"""
self.update_status("请在打开的浏览器中手动登录京东(等待时间5分钟)...")
try:
self.driver.get("https://passport.jd.com/new/login.aspx")
# 等待登录成功标志
WebDriverWait(self.driver, 300).until(
lambda driver: self.is_logged_in()
)
self.update_status("京东登录成功")
time.sleep(2)
return True
except Exception as e:
self.update_status(f"京东登录超时: {str(e)}")
return False
def get_jd_info(self, product_name):
"""爬取京东商品信息(按销量降序排序)"""
self.update_status(f"正在爬取京东: {product_name}...")
try:
# 检查用户数据目录是否存在
user_data_dir_exists = os.path.exists(self.cookies_path)
# 情况1:用户数据目录存在,尝试无头模式
if user_data_dir_exists:
self.update_status("检测到用户数据目录,尝试检测登陆状态...")
if not self.setup_driver(): # 无头模式启动
return None
# 检查登录状态
if self.is_logged_in():
self.update_status("已登录,开始爬取...")
else:
self.update_status("未登录,切换显示模式登录...")
self.driver.quit() # 关闭无头浏览器
# 有头模式登录
self.update_status("启动浏览器进行登录...")
edge_options = Options()
edge_options.add_argument(f"--user-data-dir={self.cookies_path}")
if getattr(sys, 'frozen', False):
driver_path = os.path.join(os.path.dirname(sys.executable), 'msedgedriver.exe')
else:
driver_path = os.path.join(os.getcwd(), 'msedgedriver.exe')
service = Service(driver_path)
self.driver = webdriver.Edge(service=service, options=edge_options)
if not self.login_jd():
return None
self.driver.quit() # 关闭有头浏览器
# 重新无头模式启动
if not self.setup_driver():
return None
if not self.is_logged_in():
self.update_status("登录后仍然未检测到登录状态")
return None
else:
# 情况2:用户数据目录不存在,直接有头模式登录
self.update_status("未检测到用户数据目录,启动有头浏览器进行首次登录...")
edge_options = Options()
edge_options.add_argument(f"--user-data-dir={self.cookies_path}")
if getattr(sys, 'frozen', False):
driver_path = os.path.join(os.path.dirname(sys.executable), 'msedgedriver.exe')
else:
driver_path = os.path.join(os.getcwd(), 'msedgedriver.exe')
service = Service(driver_path)
self.driver = webdriver.Edge(service=service, options=edge_options)
if not self.login_jd():
return None
# 按销量降序排序
url = f"https://search.jd.com/Search?keyword={product_name}&sort=sort_totalsales15_desc"
self.driver.get(url)
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.gl-item'))
)
time.sleep(1)
items = self.driver.find_elements(By.CSS_SELECTOR, '.gl-item')
if not items:
self.update_status("未找到京东商品数据")
return None
results = []
for item in items:
try:
# 店铺名字(京东自营/旗舰店标记*)
shop_element = item.find_element(By.CSS_SELECTOR, '.p-shop a')
shop = shop_element.text.strip()
if "京东自营" in shop or "旗舰店" in shop:
shop = f"{shop} *"
# 价格
price_element = item.find_element(By.CSS_SELECTOR, '.p-price strong i')
price = float(price_element.text)
# 累计评价数
try:
comment_element = item.find_element(By.CSS_SELECTOR, '.p-commit strong a')
comment_num = comment_element.text.strip()
except:
comment_num = "未知"
results.append({
'店铺名字': shop,
'价格': price,
'累计评价数': comment_num
})
except Exception as e:
continue
return results
except (TimeoutException, NoSuchElementException) as e:
self.update_status(f"京东爬取出错: {str(e)}")
return None
except Exception as e:
self.update_status(f"京东爬取发生未知错误: {str(e)}")
return None
def save_to_excel(self, df):
"""保存结果到Excel(仅保留店铺、价格、累计评价数)"""
if df is None or df.empty:
return False
try:
wb = Workbook()
ws = wb.active
ws.title = "京东商品信息"
# 表头修改:删除销量列
ws.append(["店铺名字", "价格", "累计评价数"])
# 仅处理目标列
for row in dataframe_to_rows(df[['店铺名字', '价格', '累计评价数']], index=False, header=False):
ws.append(row)
# 表头样式
header_fill = PatternFill(start_color="1E88E5", end_color="1E88E5", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF")
for cell in ws[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
# 自动调整列宽
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2)
ws.column_dimensions[column].width = adjusted_width
# 保存路径
if getattr(sys, 'frozen', False):
save_path = os.path.join(os.path.dirname(sys.executable), f"{self.product_name}_京东商品信息.xlsx")
else:
save_path = os.path.join(os.getcwd(), f"{self.product_name}_京东商品信息.xlsx")
wb.save(save_path)
return save_path
except Exception as e:
messagebox.showerror("保存错误", f"保存Excel文件时出错: {str(e)}")
return None
def start_spider(self):
"""开始爬取(线程化)"""
product_name = self.entry.get().strip()
if not product_name:
messagebox.showwarning("输入错误", "请输入商品名称")
return
self.progress_var.set(0)
self.result_text.delete(1.0, tk.END)
self.spider_button.config(state=tk.DISABLED)
self.status_label.config(text="开始爬取...")
threading.Thread(target=self.run_spider, args=(product_name,), daemon=True).start()
def run_spider(self, product_name):
"""执行爬取的线程函数"""
try:
if not self.setup_driver():
messagebox.showerror("错误", "无法初始化Edge浏览器驱动")
return
results = self.get_jd_info(product_name)
if results:
df = pd.DataFrame(results)
# 【左对齐输出】仅展示目标列,且左对齐
display_df = df[['店铺名字', '价格', '累计评价数']]
# 格式化输出为整齐的表格
self.result_text.insert(tk.END, "\n===== 京东商品信息 =====\n\n")
# 添加列标题(使用制表符对齐)
self.result_text.insert(tk.END, f"{'店铺名字'.ljust(40)}{'价格'.rjust(10)}{'累计评价数'.rjust(15)}\n")
self.result_text.insert(tk.END, "-" * 65 + "\n")
# 格式化每一行数据
for _, row in display_df.iterrows():
# 处理店铺名字过长的情况
shop = row['店铺名字']
if len(shop) > 38:
shop = shop[:35] + "..."
# 格式化价格(保留两位小数)
price = f{row['价格']:.2f}"
# 输出对齐的数据行
self.result_text.insert(tk.END,
f"{shop.ljust(40)}{price.rjust(10)}{str(row['累计评价数']).rjust(15)}\n")
self.result_text.insert(tk.END, "\n" + "=" * 65 + "\n")
self.result_text.insert(tk.END, f"共找到 {len(display_df)} 条商品信息\n")
save_path = self.save_to_excel(df)
if save_path:
self.result_text.insert(tk.END, f"\n结果已保存到: {save_path}\n")
self.progress_var.set(100)
self.status_label.config(text="完成")
except Exception as e:
messagebox.showerror("错误", f"发生未预期错误: {str(e)}")
import traceback
traceback.print_exc()
finally:
self.spider_button.config(state=tk.NORMAL)
if self.driver:
self.driver.quit()
self.driver = None
def update_status(self, message):
"""实时更新状态"""
if self.status_label:
self.status_label.config(text=message)
self.root.update_idletasks()
def create_gui(self):
"""图形界面"""
self.root = tk.Tk()
self.root.title("京东商品信息爬取工具")
self.root.geometry("800x600")
self.root.resizable(True, True)
style = ttk.Style()
style.theme_use('clam')
main_frame = ttk.Frame(self.root, padding=20)
main_frame.pack(fill=tk.BOTH, expand=True)
title_label = ttk.Label(main_frame, text="京东商品信息爬取工具", font=("Arial", 16, "bold"))
title_label.pack(pady=10)
desc_label = ttk.Label(main_frame,
text="输入商品名称,按【销量降序】爬取京东商品信息(店铺名字、价格、累计评价数)",
wraplength=600)
desc_label.pack(pady=5)
input_frame = ttk.Frame(main_frame)
input_frame.pack(fill=tk.X, pady=10)
ttk.Label(input_frame, text="商品名称:").pack(side=tk.LEFT, padx=(0, 10))
self.entry = ttk.Entry(input_frame, width=50)
self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.entry.focus()
self.spider_button = ttk.Button(main_frame, text="开始爬取", command=self.start_spider)
self.spider_button.pack(pady=10)
self.progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(main_frame, variable=self.progress_var, maximum=100)
progress_bar.pack(fill=tk.X, pady=10)
self.status_label = ttk.Label(main_frame, text="准备就绪")
self.status_label.pack()
result_frame = ttk.Frame(main_frame)
result_frame.pack(fill=tk.BOTH, expand=True, pady=10)
scrollbar = ttk.Scrollbar(result_frame)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 使用等宽字体确保对齐
self.result_text = tk.Text(result_frame, wrap=tk.WORD, yscrollcommand=scrollbar.set,
font=("Courier New", 10))
self.result_text.pack(fill=tk.BOTH, expand=True)
self.result_text.insert(tk.END, "输入商品名称后点击'开始爬取'按钮...\n")
scrollbar.config(command=self.result_text.yview)
bottom_label = ttk.Label(main_frame,
text="提示: 首次运行需手动登录京东,后续自动缓存登录状态",
foreground="gray")
bottom_label.pack(side=tk.BOTTOM, pady=5)
self.root.mainloop()
if __name__ == "__main__":
spider = JDSpider()
spider.create_gui()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/luo-ruoyuan/python.git
git@gitee.com:luo-ruoyuan/python.git
luo-ruoyuan
python
python
master

Search