# core/utils.py import os import requests from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from collections import deque from django.utils import timezone from django.conf import settings from core.models import Article import re def download_media(url, save_dir): try: # 添加请求头以避免403 Forbidden错误 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Referer": urljoin(url, "/") } resp = requests.get(url, timeout=15, headers=headers) resp.raise_for_status() except Exception as e: print(f"下载失败:{url},错误:{e}") return None # 更安全地处理文件名,去除查询参数并处理特殊字符 parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) if not filename or '.' not in filename: # 如果URL路径中没有有效的文件名,使用默认名称 filename = 'media_file' # 清理文件名中的特殊字符 filename = re.sub(r'[^\w\-_\.]', '_', filename) # 确保文件有扩展名 if '.' not in filename: content_type = resp.headers.get('content-type', '') if 'image/jpeg' in content_type: filename += '.jpg' elif 'image/png' in content_type: filename += '.png' elif 'image/gif' in content_type: filename += '.gif' else: filename += '.bin' # 默认二进制扩展名 os.makedirs(save_dir, exist_ok=True) filepath = os.path.join(save_dir, filename) base, ext = os.path.splitext(filename) counter = 1 while os.path.exists(filepath): filename = f"{base}_{counter}{ext}" filepath = os.path.join(save_dir, filename) counter += 1 with open(filepath, "wb") as f: f.write(resp.content) return filepath def process_article(url, website): if Article.objects.filter(url=url).exists(): print(f"文章已存在,跳过: {url}") return headers = {"User-Agent": "Mozilla/5.0"} resp = requests.get(url, headers=headers) resp.encoding = 'utf-8' soup = BeautifulSoup(resp.text, "html.parser") # 处理不同网站的文章结构 if website.name == "www.news.cn": title_tag = soup.find("span", class_="title") content_tag = soup.find("span", id="detailContent") elif website.name == "东方烟草报": # 优化东方烟草报的标题提取逻辑,按优先级尝试多种选择器 title_tag = ( soup.find("h1", id="title") or # 特别针对带id="title"的h1标签 soup.find("h1") or # 主要标题标签 soup.find("title") or # 页面title标签 soup.find("div", class_="title") or # 某些页面可能使用div.title soup.find("h2") # 备选标题标签 ) content_tag = soup.find("div", class_="content") # 东方烟草报的内容通常在div.content中 # 增加对另一种内容结构的支持 if not content_tag: content_tag = soup.find("div", id="gallery") # 再增加对新内容结构的支持 if not content_tag: content_tag = soup.find("div", id="ContentText") elif website.name == "www.gov.cn": # 中国政府网的文章结构处理 title_tag = soup.find("h1") or soup.find("title") # 查找主要内容区域,通常在.mainBody或content中 content_tag = ( soup.find("div", class_="pages_content") or soup.find("div", class_="article_con") or soup.find("div", class_="content") or soup.find("div", id="content") or soup.find("div", class_="mainBody") ) else: # 默认处理方式 title_tag = soup.find("h1") or soup.find("title") content_tag = soup.find("div", class_="content") or soup.find("div", id="content") title = title_tag.get_text(strip=True) if title_tag else "无标题" # 对标题进行额外处理,去除可能的多余空白字符 title = title.strip() if title else "无标题" if not content_tag: print("没有找到正文,跳过:", url) return imgs = content_tag.find_all("img") media_files = [] safe_title = "".join(c if c.isalnum() else "_" for c in title)[:50] save_dir = os.path.join(settings.MEDIA_ROOT, "articles", safe_title) os.makedirs(save_dir, exist_ok=True) for img in imgs: src = img.get("src") if not src: continue if not src.startswith("http"): src = urljoin(url, src) local_path = download_media(src, save_dir) if local_path: rel_path = os.path.relpath(local_path, settings.MEDIA_ROOT) img["src"] = settings.MEDIA_URL + rel_path.replace("\\", "/") media_files.append(rel_path.replace("\\", "/")) content_html = str(content_tag) article = Article.objects.create( website=website, title=title, url=url, content=content_html, pub_date=timezone.now(), media_files=media_files ) print(f"已保存文章及图片:{title}") def is_valid_url(url, base_netloc): try: parsed = urlparse(url) if parsed.scheme not in ("http", "https"): return False if parsed.netloc != base_netloc: return False return True except Exception: return False def full_site_crawler(start_url, website, max_pages=1000): headers = {"User-Agent": "Mozilla/5.0"} visited = set() queue = deque([start_url]) base_netloc = urlparse(start_url).netloc pages_crawled = 0 while queue and pages_crawled < max_pages: url = queue.popleft() if url in visited: continue print(f"正在爬取:{url}") visited.add(url) try: resp = requests.get(url, headers=headers, timeout=15) resp.raise_for_status() except Exception as e: print(f"请求失败:{url},错误:{e}") continue resp.encoding = 'utf-8' soup = BeautifulSoup(resp.text, "html.parser") # 根据不同网站判断文章页面 is_article_page = False if website.name == "www.news.cn": is_article_page = soup.find("span", id="detailContent") is not None elif website.name == "东方烟草报": # 对于东方烟草报,我们增加基于URL模式的判断 # 东方烟草报的文章URL通常包含/content/和日期格式 parsed_url = urlparse(url) path = parsed_url.path is_article_page = ( soup.find("div", class_="content") is not None or soup.find("div", id="gallery") is not None or soup.find("div", id="ContentText") is not None or ("/content/" in path and len(path) > 20) ) elif website.name == "www.gov.cn": # 中国政府网的文章页面判断逻辑 parsed_url = urlparse(url) path = parsed_url.path is_article_page = ( soup.find("div", class_="pages_content") is not None or soup.find("div", class_="article_con") is not None or soup.find("div", class_="content") is not None or soup.find("div", id="content") is not None or soup.find("div", class_="mainBody") is not None or ("/zhengce/" in path) or ("/xinwen/" in path) or ("/huoban/" in path) ) else: # 默认判断逻辑 is_article_page = ( soup.find("div", class_="content") is not None or soup.find("div", id="content") is not None ) # 如果是文章页面,则调用文章处理 if is_article_page: process_article(url, website) pages_crawled += 1 # 扩展队列,发现新链接 for link in soup.find_all("a", href=True): href = urljoin(url, link["href"]) if href not in visited and is_valid_url(href, base_netloc): queue.append(href)