Files
green_classroom/core/utils.py
2025-08-13 00:26:39 +08:00

276 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from collections import deque
from django.utils import timezone
from django.conf import settings
from core.models import Article
import re
def download_media(url, save_dir):
try:
# 添加请求头以避免403 Forbidden错误
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Referer": urljoin(url, "/")
}
resp = requests.get(url, timeout=15, headers=headers)
resp.raise_for_status()
except Exception as e:
print(f"下载失败:{url},错误:{e}")
return None
# 更安全地处理文件名,去除查询参数并处理特殊字符
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename or '.' not in filename:
# 如果URL路径中没有有效的文件名使用默认名称
filename = 'media_file'
# 清理文件名中的特殊字符
filename = re.sub(r'[^\w\-_\.]', '_', filename)
# 确保文件有扩展名
if '.' not in filename:
content_type = resp.headers.get('content-type', '')
if 'image/jpeg' in content_type:
filename += '.jpg'
elif 'image/png' in content_type:
filename += '.png'
elif 'image/gif' in content_type:
filename += '.gif'
elif 'video/mp4' in content_type:
filename += '.mp4'
elif 'video/avi' in content_type:
filename += '.avi'
elif 'video/quicktime' in content_type:
filename += '.mov'
else:
filename += '.bin' # 默认二进制扩展名
os.makedirs(save_dir, exist_ok=True)
filepath = os.path.join(save_dir, filename)
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(filepath):
filename = f"{base}_{counter}{ext}"
filepath = os.path.join(save_dir, filename)
counter += 1
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
def process_article(url, website):
# 检查文章是否已存在,如果存在则跳过
if Article.objects.filter(url=url).exists():
print(f"文章已存在,跳过: {url}")
return
headers = {"User-Agent": "Mozilla/5.0"}
resp = requests.get(url, headers=headers)
resp.encoding = 'utf-8'
soup = BeautifulSoup(resp.text, "html.parser")
# 处理不同网站的文章结构
if website.name == "www.news.cn":
title_tag = soup.find("span", class_="title")
content_tag = soup.find("span", id="detailContent")
elif website.name == "东方烟草报":
# 优化东方烟草报的标题提取逻辑,按优先级尝试多种选择器
title_tag = (
soup.find("h1", id="title") or # 特别针对带id="title"的h1标签
soup.find("h1") or # 主要标题标签
soup.find("title") or # 页面title标签
soup.find("div", class_="title") or # 某些页面可能使用div.title
soup.find("h2") # 备选标题标签
)
content_tag = soup.find("div", class_="content") # 东方烟草报的内容通常在div.content中
# 增加对另一种内容结构的支持
if not content_tag:
content_tag = soup.find("div", id="gallery")
# 再增加对新内容结构的支持
if not content_tag:
content_tag = soup.find("div", id="ContentText")
elif website.name == "www.gov.cn":
# 中国政府网的文章结构处理
title_tag = soup.find("h1") or soup.find("title")
# 查找主要内容区域,通常在.mainBody或content中
content_tag = (
soup.find("div", class_="pages_content") or
soup.find("div", class_="article_con") or
soup.find("div", class_="content") or
soup.find("div", id="content") or
soup.find("div", class_="mainBody")
)
else:
# 默认处理方式
title_tag = soup.find("h1") or soup.find("title")
content_tag = soup.find("div", class_="content") or soup.find("div", id="content")
title = title_tag.get_text(strip=True) if title_tag else "无标题"
# 对标题进行额外处理,去除可能的多余空白字符
title = title.strip() if title else "无标题"
if not content_tag:
print("没有找到正文,跳过:", url)
return
imgs = content_tag.find_all("img")
# 查找视频元素
videos = content_tag.find_all("video")
media_files = []
safe_title = "".join(c if c.isalnum() else "_" for c in title)[:50]
save_dir = os.path.join(settings.MEDIA_ROOT, "articles", safe_title)
os.makedirs(save_dir, exist_ok=True)
for img in imgs:
src = img.get("src")
if not src:
continue
if not src.startswith("http"):
src = urljoin(url, src)
local_path = download_media(src, save_dir)
if local_path:
rel_path = os.path.relpath(local_path, settings.MEDIA_ROOT)
img["src"] = settings.MEDIA_URL + rel_path.replace("\\", "/")
media_files.append(rel_path.replace("\\", "/"))
# 处理视频文件
for video in videos:
src = video.get("src")
if not src:
# 检查<source>标签
source = video.find("source")
if source:
src = source.get("src")
if not src:
continue
if not src.startswith("http"):
src = urljoin(url, src)
local_path = download_media(src, save_dir)
if local_path:
rel_path = os.path.relpath(local_path, settings.MEDIA_ROOT)
# 更新视频src属性
if video.get("src"):
video["src"] = settings.MEDIA_URL + rel_path.replace("\\", "/")
else:
source = video.find("source")
if source:
source["src"] = settings.MEDIA_URL + rel_path.replace("\\", "/")
media_files.append(rel_path.replace("\\", "/"))
content_html = str(content_tag)
try:
# 使用try-except处理可能的数据库约束错误
article = Article.objects.create(
website=website,
title=title,
url=url,
content=content_html,
pub_date=timezone.now(),
media_files=media_files
)
print(f"已保存文章及图片:{title}")
except Exception as e:
# 处理重复URL或其他数据库错误
if "UNIQUE constraint failed" in str(e) and "core_article.url" in str(e):
print(f"文章URL重复跳过保存: {url}")
else:
print(f"保存文章时出错: {url},错误:{e}")
def is_valid_url(url, base_netloc):
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
if parsed.netloc != base_netloc:
return False
return True
except Exception:
return False
def full_site_crawler(start_url, website, max_pages=1000):
headers = {"User-Agent": "Mozilla/5.0"}
visited = set()
queue = deque([start_url])
base_netloc = urlparse(start_url).netloc
pages_crawled = 0
while queue and pages_crawled < max_pages:
url = queue.popleft()
if url in visited:
continue
print(f"正在爬取:{url}")
visited.add(url)
try:
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status()
except Exception as e:
print(f"请求失败:{url},错误:{e}")
continue
resp.encoding = 'utf-8'
soup = BeautifulSoup(resp.text, "html.parser")
# 根据不同网站判断文章页面
is_article_page = False
if website.name == "www.news.cn":
is_article_page = soup.find("span", id="detailContent") is not None
elif website.name == "东方烟草报":
# 对于东方烟草报我们增加基于URL模式的判断
# 东方烟草报的文章URL通常包含/content/和日期格式
parsed_url = urlparse(url)
path = parsed_url.path
is_article_page = (
soup.find("div", class_="content") is not None or
soup.find("div", id="gallery") is not None or
soup.find("div", id="ContentText") is not None or
("/content/" in path and len(path) > 20)
)
elif website.name == "www.gov.cn":
# 中国政府网的文章页面判断逻辑
parsed_url = urlparse(url)
path = parsed_url.path
is_article_page = (
soup.find("div", class_="pages_content") is not None or
soup.find("div", class_="article_con") is not None or
soup.find("div", class_="content") is not None or
soup.find("div", id="content") is not None or
soup.find("div", class_="mainBody") is not None or
("/zhengce/" in path) or
("/xinwen/" in path) or
("/huoban/" in path)
)
else:
# 默认判断逻辑
is_article_page = (
soup.find("div", class_="content") is not None or
soup.find("div", id="content") is not None
)
# 如果是文章页面,则调用文章处理
if is_article_page:
process_article(url, website)
pages_crawled += 1
# 扩展队列,发现新链接
for link in soup.find_all("a", href=True):
href = urljoin(url, link["href"])
if href not in visited and is_valid_url(href, base_netloc):
queue.append(href)