Files
green_classroom/core/utils.py
2025-08-11 14:33:32 +08:00

130 lines
3.8 KiB
Python

# core/utils.py
import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from collections import deque
from django.utils import timezone
from django.conf import settings
from core.models import Article
def download_media(url, save_dir):
try:
resp = requests.get(url, timeout=15)
resp.raise_for_status()
except Exception as e:
print(f"下载失败:{url},错误:{e}")
return None
filename = url.split("/")[-1].split("?")[0]
os.makedirs(save_dir, exist_ok=True)
filepath = os.path.join(save_dir, filename)
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(filepath):
filename = f"{base}_{counter}{ext}"
filepath = os.path.join(save_dir, filename)
counter += 1
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
def process_article(url, website):
if Article.objects.filter(url=url).exists():
print(f"文章已存在,跳过: {url}")
return
headers = {"User-Agent": "Mozilla/5.0"}
resp = requests.get(url, headers=headers)
resp.encoding = 'utf-8'
soup = BeautifulSoup(resp.text, "html.parser")
title_tag = soup.find("span", class_="title")
title = title_tag.get_text(strip=True) if title_tag else "无标题"
content_tag = soup.find("span", id="detailContent")
if not content_tag:
print("没有找到正文,跳过:", url)
return
imgs = content_tag.find_all("img")
media_files = []
safe_title = "".join(c if c.isalnum() else "_" for c in title)[:50]
save_dir = os.path.join(settings.MEDIA_ROOT, "articles", safe_title)
os.makedirs(save_dir, exist_ok=True)
for img in imgs:
src = img.get("src")
if not src:
continue
if not src.startswith("http"):
src = urljoin(url, src)
local_path = download_media(src, save_dir)
if local_path:
rel_path = os.path.relpath(local_path, settings.MEDIA_ROOT)
img["src"] = settings.MEDIA_URL + rel_path.replace("\\", "/")
media_files.append(rel_path.replace("\\", "/"))
content_html = str(content_tag)
article = Article.objects.create(
website=website,
title=title,
url=url,
content=content_html,
pub_date=timezone.now(),
media_files=media_files
)
print(f"已保存文章及图片:{title}")
def is_valid_url(url, base_netloc):
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
if parsed.netloc != base_netloc:
return False
return True
except Exception:
return False
def full_site_crawler(start_url, website, max_pages=1000):
headers = {"User-Agent": "Mozilla/5.0"}
visited = set()
queue = deque([start_url])
base_netloc = urlparse(start_url).netloc
pages_crawled = 0
while queue and pages_crawled < max_pages:
url = queue.popleft()
if url in visited:
continue
print(f"正在爬取:{url}")
visited.add(url)
try:
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status()
except Exception as e:
print(f"请求失败:{url},错误:{e}")
continue
resp.encoding = 'utf-8'
soup = BeautifulSoup(resp.text, "html.parser")
# 如果是文章页面,则调用文章处理
if soup.find("span", id="detailContent"):
process_article(url, website)
pages_crawled += 1
# 扩展队列,发现新链接
for link in soup.find_all("a", href=True):
href = urljoin(url, link["href"])
if href not in visited and is_valid_url(href, base_netloc):
queue.append(href)