Files
green_classroom/crawler_engine.py
2025-09-26 10:39:36 +08:00

710 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import time
import re
import logging
import os
import urllib3
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from django.conf import settings
from django.utils import timezone
from django.core.files.base import ContentFile
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .models import Website, CrawlTask, CrawledContent, CrawlLog, SearchKeyword, MediaFile
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 设置日志记录器
logger = logging.getLogger(__name__)
class WebsiteCrawler:
"""网站爬虫引擎"""
def __init__(self, task_id):
self.task = CrawlTask.objects.get(id=task_id)
self.keywords = [kw.strip() for kw in self.task.keywords.split(',') if kw.strip()]
# 创建带重试策略的会话
self.session = requests.Session()
self.session.headers.update({
'User-Agent': settings.CRAWLER_SETTINGS['USER_AGENT']
})
# 设置重试策略
retry_strategy = Retry(
total=settings.CRAWLER_SETTINGS.get('MAX_RETRIES', 3),
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# 设置超时
self.timeout = settings.CRAWLER_SETTINGS['TIMEOUT']
def log(self, level, message, website=None):
"""记录日志"""
CrawlLog.objects.create(
task=self.task,
website=website,
level=level,
message=message
)
# 同时记录到Python日志系统
logger.log(getattr(logging, level.upper()), f"Task {self.task.id}: {message}")
def update_task_status(self, status, **kwargs):
"""更新任务状态"""
self.task.status = status
if status == 'running' and not self.task.started_at:
self.task.started_at = timezone.now()
elif status in ['completed', 'failed', 'cancelled']:
self.task.completed_at = timezone.now()
for key, value in kwargs.items():
setattr(self.task, key, value)
self.task.save()
def extract_text_content(self, soup):
"""提取文本内容,保持段落结构"""
# 移除脚本和样式标签
for script in soup(["script", "style"]):
script.decompose()
# 处理段落标签,保持段落结构
paragraphs = []
# 查找所有段落相关的标签
for element in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'br']):
if element.name in ['p', 'div']:
text = element.get_text().strip()
if text:
paragraphs.append(text)
elif element.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
text = element.get_text().strip()
if text:
paragraphs.append(f"\n{text}\n") # 标题前后加换行
elif element.name == 'br':
paragraphs.append('\n')
# 如果没有找到段落标签,使用原来的方法
if not paragraphs:
text = soup.get_text()
# 清理文本但保持换行
lines = []
for line in text.splitlines():
line = line.strip()
if line:
lines.append(line)
return '\n\n'.join(lines)
# 合并段落,用双换行分隔
content = '\n\n'.join(paragraphs)
# 清理多余的空行
import re
content = re.sub(r'\n\s*\n\s*\n', '\n\n', content)
return content.strip()
def find_article_links(self, soup, base_url):
"""查找文章链接"""
links = []
# 常见的文章链接选择器
selectors = [
'a[href*="article"]',
'a[href*="news"]',
'a[href*="content"]',
'a[href*="detail"]',
'a[href*="view"]',
'a[href*="show"]',
'.news-list a',
'.article-list a',
'.content-list a',
'h3 a',
'h4 a',
'.title a',
'.list-item a'
]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
href = element.get('href')
if href:
full_url = urljoin(base_url, href)
title = element.get_text().strip()
if title and len(title) > 5: # 过滤掉太短的标题
links.append({
'url': full_url,
'title': title
})
return links
def check_keyword_match(self, text, title):
"""检查关键字匹配"""
matched_keywords = []
text_lower = text.lower()
title_lower = title.lower()
for keyword in self.keywords:
keyword_lower = keyword.lower()
if keyword_lower in text_lower or keyword_lower in title_lower:
matched_keywords.append(keyword)
return matched_keywords
def extract_article_content(self, url, soup):
"""提取文章内容"""
# 尝试多种内容选择器
content_selectors = [
'.article-content',
'.content',
'.article-body',
'.news-content',
'.main-content',
'.post-content',
'article',
'.detail-content',
'#content',
'.text'
]
content = ""
for selector in content_selectors:
element = soup.select_one(selector)
if element:
content = self.extract_text_content(element)
if len(content) > 100: # 确保内容足够长
break
# 如果没找到特定内容区域,使用整个页面
if not content or len(content) < 100:
content = self.extract_text_content(soup)
return content
def extract_publish_date(self, soup):
"""提取发布时间"""
date_selectors = [
'.publish-time',
'.pub-time',
'.date',
'.time',
'.publish-date',
'time[datetime]',
'.article-time',
'.news-time',
'.post-time',
'.create-time',
'.update-time',
'.time span',
'.date span',
'.info span', # 一些网站使用.info类包含发布信息
'.meta span',
'.meta-info',
'.article-info span',
'.news-info span',
'.content-info span',
'.a-shijian', # 上海纪检监察网站的发布时间类
'.l-time' # 天津纪检监察网站的发布时间类
]
for selector in date_selectors:
elements = soup.select(selector)
for element in elements:
date_text = element.get_text().strip()
if element.get('datetime'):
date_text = element.get('datetime')
# 如果文本太短或为空,跳过
if not date_text or len(date_text) < 4:
continue
# 尝试解析日期
try:
from datetime import datetime
import re
# 清理日期文本,移除常见的无关字符
date_text = re.sub(r'发布(时间|日期)[:]?', '', date_text).strip()
date_text = re.sub(r'时间[:]?', '', date_text).strip()
date_text = re.sub(r'日期[:]?', '', date_text).strip()
date_text = re.sub(r'发表于[:]?', '', date_text).strip()
date_text = re.sub(r'更新[:]?', '', date_text).strip()
date_text = re.sub(r'\s+', ' ', date_text).strip() # 替换多个空白字符为单个空格
# 如果有 datetime 属性且是标准格式,直接使用
if element.get('datetime'):
datetime_attr = element.get('datetime')
# 尝试解析常见的日期时间格式
for fmt in [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%S%z',
'%Y-%m-%d %H:%M',
'%Y-%m-%d',
'%Y/%m/%d %H:%M:%S',
'%Y/%m/%d %H:%M',
'%Y/%m/%d',
'%Y年%m月%d%H:%M:%S',
'%Y年%m月%d%H:%M',
'%Y年%m月%d',
'%m/%d/%Y %H:%M:%S',
'%m/%d/%Y %H:%M',
'%m/%d/%Y',
'%d/%m/%Y %H:%M:%S',
'%d/%m/%Y %H:%M',
'%d/%m/%Y',
'%d.%m.%Y %H:%M:%S',
'%d.%m.%Y %H:%M',
'%d.%m.%Y'
]:
try:
if '%z' in fmt and '+' not in datetime_attr and datetime_attr.endswith('Z'):
datetime_attr = datetime_attr[:-1] + '+0000'
parsed_date = datetime.strptime(datetime_attr, fmt)
if not timezone.is_aware(parsed_date):
parsed_date = timezone.make_aware(parsed_date)
return parsed_date
except ValueError:
continue
# 尝试解析从文本中提取的日期
# 尝试解析各种常见的中文日期格式
for fmt in [
'%Y年%m月%d%H:%M:%S',
'%Y年%m月%d%H:%M',
'%Y年%m月%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d',
'%Y/%m/%d %H:%M:%S',
'%Y/%m/%d %H:%M',
'%Y/%m/%d',
'%m月%d%H:%M',
'%m月%d',
'%m/%d/%Y %H:%M:%S',
'%m/%d/%Y %H:%M',
'%m/%d/%Y',
'%d/%m/%Y %H:%M:%S',
'%d/%m/%Y %H:%M',
'%d/%m/%Y',
'%d.%m.%Y %H:%M:%S',
'%d.%m.%Y %H:%M',
'%d.%m.%Y'
]:
try:
parsed_date = datetime.strptime(date_text, fmt)
# 如果没有年份,使用当前年份
if '%Y' not in fmt:
parsed_date = parsed_date.replace(year=datetime.now().year)
if not timezone.is_aware(parsed_date):
parsed_date = timezone.make_aware(parsed_date)
return parsed_date
except ValueError:
continue
# 如果以上格式都不匹配,尝试使用 dateutil 解析
try:
from dateutil import parser
# 过滤掉明显不是日期的文本
if len(date_text) > 5 and not date_text.isdigit():
parsed_date = parser.parse(date_text)
if not timezone.is_aware(parsed_date):
parsed_date = timezone.make_aware(parsed_date)
return parsed_date
except:
pass
except Exception as e:
self.log('debug', f'解析日期失败: {date_text}, 错误: {str(e)}')
continue
return None
def extract_author(self, soup):
"""提取作者信息"""
author_selectors = [
'.author',
'.writer',
'.publisher',
'.byline',
'.article-author',
'.news-author'
]
for selector in author_selectors:
element = soup.select_one(selector)
if element:
return element.get_text().strip()
return ""
def download_media_file(self, media_url, crawled_content, media_type='image', alt_text=''):
"""下载媒体文件"""
try:
# 检查URL是否有效
if not media_url or not media_url.startswith(('http://', 'https://')):
return None
# 请求媒体文件
response = self.session.get(
media_url,
timeout=self.timeout,
verify=False,
stream=False # 改为False以确保获取完整内容
)
response.raise_for_status()
# 获取文件信息
content_type = response.headers.get('content-type', '')
content_length = response.headers.get('content-length')
file_size = int(content_length) if content_length else len(response.content)
# 确定文件扩展名
file_extension = self.get_file_extension_from_url(media_url, content_type)
# 生成文件名
filename = f"media_{crawled_content.id}_{len(crawled_content.media_files.all())}{file_extension}"
# 创建媒体文件对象
media_file = MediaFile.objects.create(
content=crawled_content,
media_type=media_type,
original_url=media_url,
file_size=file_size,
mime_type=content_type,
alt_text=alt_text
)
# 保存文件
media_file.local_file.save(
filename,
ContentFile(response.content),
save=True
)
self.log('info', f'媒体文件已下载: {filename} ({media_type})', crawled_content.website)
return media_file
except Exception as e:
self.log('error', f'下载媒体文件失败 {media_url}: {str(e)}', crawled_content.website)
return None
def get_file_extension_from_url(self, url, content_type):
"""从URL或内容类型获取文件扩展名"""
# 从URL获取扩展名
parsed_url = urlparse(url)
path = parsed_url.path
if '.' in path:
return os.path.splitext(path)[1]
# 从内容类型获取扩展名
content_type_map = {
'image/jpeg': '.jpg',
'image/jpg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'image/svg+xml': '.svg',
'video/mp4': '.mp4',
'video/avi': '.avi',
'video/mov': '.mov',
'video/wmv': '.wmv',
'video/flv': '.flv',
'video/webm': '.webm',
'audio/mp3': '.mp3',
'audio/wav': '.wav',
'audio/ogg': '.ogg',
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
}
return content_type_map.get(content_type.lower(), '.bin')
def extract_and_download_media(self, soup, crawled_content, base_url):
"""提取并下载页面中的媒体文件"""
media_files = []
# 提取图片
images = soup.find_all('img')
self.log('info', f'找到 {len(images)} 个图片标签', crawled_content.website)
for img in images:
src = img.get('src')
if src:
# 处理相对URL
if src.startswith('//'):
src = 'https:' + src
elif src.startswith('/'):
src = urljoin(base_url, src)
elif not src.startswith(('http://', 'https://')):
src = urljoin(base_url, src)
alt_text = img.get('alt', '')
self.log('info', f'尝试下载图片: {src}', crawled_content.website)
media_file = self.download_media_file(src, crawled_content, 'image', alt_text)
if media_file:
media_files.append(media_file)
self.log('info', f'成功下载图片: {media_file.local_file.name}', crawled_content.website)
# 提取视频
videos = soup.find_all(['video', 'source'])
for video in videos:
src = video.get('src')
if src:
# 处理相对URL
if src.startswith('//'):
src = 'https:' + src
elif src.startswith('/'):
src = urljoin(base_url, src)
elif not src.startswith(('http://', 'https://')):
src = urljoin(base_url, src)
media_file = self.download_media_file(src, crawled_content, 'video')
if media_file:
media_files.append(media_file)
# 提取音频
audios = soup.find_all('audio')
for audio in audios:
src = audio.get('src')
if src:
# 处理相对URL
if src.startswith('//'):
src = 'https:' + src
elif src.startswith('/'):
src = urljoin(base_url, src)
elif not src.startswith(('http://', 'https://')):
src = urljoin(base_url, src)
media_file = self.download_media_file(src, crawled_content, 'audio')
if media_file:
media_files.append(media_file)
return media_files
def mark_content_saved(self, crawled_content):
"""标记内容已保存(内容已存储在数据库中)"""
try:
crawled_content.is_local_saved = True
crawled_content.save()
media_count = crawled_content.media_files.count()
self.log('info', f'文章内容已保存到数据库 (包含 {media_count} 个媒体文件)', crawled_content.website)
return True
except Exception as e:
self.log('error', f'标记内容保存状态失败: {str(e)}', crawled_content.website)
return False
def crawl_website(self, website):
"""爬取单个网站"""
self.log('info', f'开始爬取网站: {website.name}', website)
try:
# 请求主页
response = self.session.get(
website.url,
timeout=self.timeout,
verify=False # 忽略SSL证书验证
)
response.raise_for_status()
# 检查内容编码
if response.encoding != 'utf-8':
# 尝试从响应头获取编码
content_type = response.headers.get('content-type', '')
if 'charset=' in content_type:
charset = content_type.split('charset=')[-1]
response.encoding = charset
else:
response.encoding = 'utf-8'
soup = BeautifulSoup(response.content, 'html.parser')
# 查找文章链接
article_links = self.find_article_links(soup, website.url)
self.log('info', f'找到 {len(article_links)} 个文章链接', website)
crawled_count = 0
for link_info in article_links:
try:
# 请求文章页面
article_response = self.session.get(
link_info['url'],
timeout=self.timeout,
verify=False # 忽略SSL证书验证
)
article_response.raise_for_status()
# 检查内容编码
if article_response.encoding != 'utf-8':
# 尝试从响应头获取编码
content_type = article_response.headers.get('content-type', '')
if 'charset=' in content_type:
charset = content_type.split('charset=')[-1]
article_response.encoding = charset
else:
article_response.encoding = 'utf-8'
article_soup = BeautifulSoup(article_response.content, 'html.parser')
# 提取内容
content = self.extract_article_content(link_info['url'], article_soup)
title = link_info['title']
# 检查关键字匹配
matched_keywords = self.check_keyword_match(content, title)
if matched_keywords:
# 提取其他信息
publish_date = self.extract_publish_date(article_soup)
author = self.extract_author(article_soup)
# 检查是否已存在相同URL的文章
existing_content = CrawledContent.objects.filter(
url=link_info['url'],
task=self.task
).first()
if existing_content:
# 如果已存在,更新现有记录而不是创建新记录
existing_content.title = title
existing_content.content = content
existing_content.publish_date = publish_date
existing_content.author = author
existing_content.keywords_matched = ','.join(matched_keywords)
existing_content.save()
# 更新媒体文件
# 先删除旧的媒体文件
existing_content.media_files.all().delete()
# 然后重新下载媒体文件
media_files = self.extract_and_download_media(article_soup, existing_content, link_info['url'])
self.log('info', f'更新已存在的文章: {title[:50]}...', website)
else:
# 保存新内容
crawled_content = CrawledContent.objects.create(
task=self.task,
website=website,
title=title,
content=content,
url=link_info['url'],
publish_date=publish_date,
author=author,
keywords_matched=','.join(matched_keywords),
is_local_saved=False # 初始设置为False保存到本地后会更新为True
)
# 提取并下载媒体文件
media_files = self.extract_and_download_media(article_soup, crawled_content, link_info['url'])
# 标记内容已保存
self.mark_content_saved(crawled_content)
self.log('info', f'保存新文章: {title[:50]}...', website)
crawled_count += 1
# 请求间隔
time.sleep(settings.CRAWLER_SETTINGS['REQUEST_DELAY'])
except requests.exceptions.SSLError as e:
self.log('error', f'SSL错误跳过文章 {link_info["url"]}: {str(e)}', website)
continue
except requests.exceptions.ConnectionError as e:
self.log('error', f'连接错误,跳过文章 {link_info["url"]}: {str(e)}', website)
continue
except requests.exceptions.Timeout as e:
self.log('error', f'请求超时,跳过文章 {link_info["url"]}: {str(e)}', website)
continue
except requests.exceptions.RequestException as e:
self.log('error', f'网络请求错误,跳过文章 {link_info["url"]}: {str(e)}', website)
continue
except UnicodeDecodeError as e:
self.log('error', f'字符编码错误,跳过文章 {link_info["url"]}: {str(e)}', website)
continue
except Exception as e:
self.log('error', f'处理文章失败 {link_info["url"]}: {str(e)}', website)
continue
self.log('info', f'网站爬取完成,共保存 {crawled_count} 篇文章', website)
return crawled_count
except requests.exceptions.SSLError as e:
self.log('error', f'爬取网站SSL错误: {str(e)}', website)
return 0
except requests.exceptions.ConnectionError as e:
self.log('error', f'爬取网站连接错误: {str(e)}', website)
return 0
except requests.exceptions.Timeout as e:
self.log('error', f'爬取网站超时: {str(e)}', website)
return 0
except requests.exceptions.RequestException as e:
self.log('error', f'爬取网站网络错误: {str(e)}', website)
return 0
except Exception as e:
self.log('error', f'爬取网站失败: {str(e)}', website)
return 0
def run(self):
"""运行爬取任务"""
self.log('info', f'开始执行爬取任务: {self.task.name}')
self.update_task_status('running')
total_crawled = 0
websites = self.task.websites.filter(is_active=True)
self.task.total_pages = websites.count()
self.task.save()
for website in websites:
try:
crawled_count = self.crawl_website(website)
total_crawled += crawled_count
self.task.crawled_pages += 1
self.task.save()
except Exception as e:
self.log('error', f'爬取网站 {website.name} 时发生错误: {str(e)}', website)
continue
# 更新任务状态
if total_crawled > 0:
self.update_task_status('completed')
self.log('info', f'爬取任务完成,共爬取 {total_crawled} 篇文章')
else:
self.update_task_status('failed', error_message='没有找到匹配的内容')
self.log('error', '爬取任务失败,没有找到匹配的内容')
def run_crawl_task(task_id):
"""运行爬取任务Celery任务"""
try:
crawler = WebsiteCrawler(task_id)
crawler.run()
return f"任务 {task_id} 执行完成"
except Exception as e:
# 记录异常到日志
logger.error(f"执行任务 {task_id} 时发生异常: {str(e)}", exc_info=True)
task = CrawlTask.objects.get(id=task_id)
task.status = 'failed'
task.error_message = str(e)
task.completed_at = timezone.now()
task.save()
CrawlLog.objects.create(
task=task,
level='error',
message=f'任务执行失败: {str(e)}'
)
return f"任务 {task_id} 执行失败: {str(e)}"