import requests import time import re import logging import os import urllib3 from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from django.conf import settings from django.utils import timezone from django.core.files.base import ContentFile from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from .models import Website, CrawlTask, CrawledContent, CrawlLog, SearchKeyword, MediaFile # 禁用SSL警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 设置日志记录器 logger = logging.getLogger(__name__) class WebsiteCrawler: """网站爬虫引擎""" def __init__(self, task_id): self.task = CrawlTask.objects.get(id=task_id) self.keywords = [kw.strip() for kw in self.task.keywords.split(',') if kw.strip()] # 创建带重试策略的会话 self.session = requests.Session() self.session.headers.update({ 'User-Agent': settings.CRAWLER_SETTINGS['USER_AGENT'] }) # 设置重试策略 retry_strategy = Retry( total=settings.CRAWLER_SETTINGS.get('MAX_RETRIES', 3), backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # 设置超时 self.timeout = settings.CRAWLER_SETTINGS['TIMEOUT'] def log(self, level, message, website=None): """记录日志""" CrawlLog.objects.create( task=self.task, website=website, level=level, message=message ) # 同时记录到Python日志系统 logger.log(getattr(logging, level.upper()), f"Task {self.task.id}: {message}") def update_task_status(self, status, **kwargs): """更新任务状态""" self.task.status = status if status == 'running' and not self.task.started_at: self.task.started_at = timezone.now() elif status in ['completed', 'failed', 'cancelled']: self.task.completed_at = timezone.now() for key, value in kwargs.items(): setattr(self.task, key, value) self.task.save() def extract_text_content(self, soup): """提取文本内容,保持段落结构""" # 移除脚本和样式标签 for script in soup(["script", "style"]): script.decompose() # 处理段落标签,保持段落结构 paragraphs = [] # 查找所有段落相关的标签 for element in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'br']): if element.name in ['p', 'div']: text = element.get_text().strip() if text: paragraphs.append(text) elif element.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: text = element.get_text().strip() if text: paragraphs.append(f"\n{text}\n") # 标题前后加换行 elif element.name == 'br': paragraphs.append('\n') # 如果没有找到段落标签,使用原来的方法 if not paragraphs: text = soup.get_text() # 清理文本但保持换行 lines = [] for line in text.splitlines(): line = line.strip() if line: lines.append(line) return '\n\n'.join(lines) # 合并段落,用双换行分隔 content = '\n\n'.join(paragraphs) # 清理多余的空行 import re content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) return content.strip() def find_article_links(self, soup, base_url): """查找文章链接""" links = [] # 常见的文章链接选择器 selectors = [ 'a[href*="article"]', 'a[href*="news"]', 'a[href*="content"]', 'a[href*="detail"]', 'a[href*="view"]', 'a[href*="show"]', '.news-list a', '.article-list a', '.content-list a', 'h3 a', 'h4 a', '.title a', '.list-item a' ] for selector in selectors: elements = soup.select(selector) for element in elements: href = element.get('href') if href: full_url = urljoin(base_url, href) title = element.get_text().strip() if title and len(title) > 5: # 过滤掉太短的标题 links.append({ 'url': full_url, 'title': title }) return links def check_keyword_match(self, text, title): """检查关键字匹配 - 支持通配符的模糊匹配""" matched_keywords = [] text_lower = text.lower() title_lower = title.lower() for keyword in self.keywords: # 处理通配符匹配 import re # 转义特殊字符并转换 * 为 .* 正则表达式 escaped_keyword = re.escape(keyword) regex_pattern = escaped_keyword.replace(r'\*', '.*') # 编译正则表达式,支持不区分大小写的匹配 try: pattern = re.compile(regex_pattern, re.IGNORECASE) if pattern.search(text_lower) or pattern.search(title_lower): matched_keywords.append(keyword) except re.error: # 如果正则表达式编译失败,回退到简单匹配 keyword_lower = keyword.lower() if keyword_lower in text_lower or keyword_lower in title_lower: matched_keywords.append(keyword) return matched_keywords def extract_article_content(self, url, soup): """提取文章内容""" # 尝试多种内容选择器 content_selectors = [ '.article-content', '.content', '.article-body', '.news-content', '.main-content', '.post-content', 'article', '.detail-content', '#content', '.text' ] content = "" for selector in content_selectors: element = soup.select_one(selector) if element: content = self.extract_text_content(element) if len(content) > 100: # 确保内容足够长 break # 如果没找到特定内容区域,使用整个页面 if not content or len(content) < 100: content = self.extract_text_content(soup) return content def extract_publish_date(self, soup): """提取发布时间""" date_selectors = [ '.publish-time', '.pub-time', '.date', '.time', '.publish-date', 'time[datetime]', '.article-time', '.news-time', '.post-time', '.create-time', '.update-time', '.time span', '.date span', '.info span', # 一些网站使用.info类包含发布信息 '.meta span', '.meta-info', '.article-info span', '.news-info span', '.content-info span', '.a-shijian', # 上海纪检监察网站的发布时间类 '.l-time' # 天津纪检监察网站的发布时间类 ] for selector in date_selectors: elements = soup.select(selector) for element in elements: date_text = element.get_text().strip() if element.get('datetime'): date_text = element.get('datetime') # 如果文本太短或为空,跳过 if not date_text or len(date_text) < 4: continue # 尝试解析日期 try: from datetime import datetime import re # 清理日期文本,移除常见的无关字符 date_text = re.sub(r'发布(时间|日期)[::]?', '', date_text).strip() date_text = re.sub(r'时间[::]?', '', date_text).strip() date_text = re.sub(r'日期[::]?', '', date_text).strip() date_text = re.sub(r'发表于[::]?', '', date_text).strip() date_text = re.sub(r'更新[::]?', '', date_text).strip() date_text = re.sub(r'\s+', ' ', date_text).strip() # 替换多个空白字符为单个空格 # 如果有 datetime 属性且是标准格式,直接使用 if element.get('datetime'): datetime_attr = element.get('datetime') # 尝试解析常见的日期时间格式 for fmt in [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M', '%Y-%m-%d', '%Y/%m/%d %H:%M:%S', '%Y/%m/%d %H:%M', '%Y/%m/%d', '%Y年%m月%d日 %H:%M:%S', '%Y年%m月%d日 %H:%M', '%Y年%m月%d日', '%m/%d/%Y %H:%M:%S', '%m/%d/%Y %H:%M', '%m/%d/%Y', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %H:%M', '%d/%m/%Y', '%d.%m.%Y %H:%M:%S', '%d.%m.%Y %H:%M', '%d.%m.%Y' ]: try: if '%z' in fmt and '+' not in datetime_attr and datetime_attr.endswith('Z'): datetime_attr = datetime_attr[:-1] + '+0000' parsed_date = datetime.strptime(datetime_attr, fmt) if not timezone.is_aware(parsed_date): parsed_date = timezone.make_aware(parsed_date) return parsed_date except ValueError: continue # 尝试解析从文本中提取的日期 # 尝试解析各种常见的中文日期格式 for fmt in [ '%Y年%m月%d日 %H:%M:%S', '%Y年%m月%d日 %H:%M', '%Y年%m月%d日', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M', '%Y-%m-%d', '%Y/%m/%d %H:%M:%S', '%Y/%m/%d %H:%M', '%Y/%m/%d', '%m月%d日 %H:%M', '%m月%d日', '%m/%d/%Y %H:%M:%S', '%m/%d/%Y %H:%M', '%m/%d/%Y', '%d/%m/%Y %H:%M:%S', '%d/%m/%Y %H:%M', '%d/%m/%Y', '%d.%m.%Y %H:%M:%S', '%d.%m.%Y %H:%M', '%d.%m.%Y' ]: try: parsed_date = datetime.strptime(date_text, fmt) # 如果没有年份,使用当前年份 if '%Y' not in fmt: parsed_date = parsed_date.replace(year=datetime.now().year) if not timezone.is_aware(parsed_date): parsed_date = timezone.make_aware(parsed_date) return parsed_date except ValueError: continue # 如果以上格式都不匹配,尝试使用 dateutil 解析 try: from dateutil import parser # 过滤掉明显不是日期的文本 if len(date_text) > 5 and not date_text.isdigit(): parsed_date = parser.parse(date_text) if not timezone.is_aware(parsed_date): parsed_date = timezone.make_aware(parsed_date) return parsed_date except: pass except Exception as e: self.log('debug', f'解析日期失败: {date_text}, 错误: {str(e)}') continue return None def extract_author(self, soup): """提取作者信息""" author_selectors = [ '.author', '.writer', '.publisher', '.byline', '.article-author', '.news-author' ] for selector in author_selectors: element = soup.select_one(selector) if element: return element.get_text().strip() return "" def download_media_file(self, media_url, crawled_content, media_type='image', alt_text=''): """下载媒体文件""" try: # 检查URL是否有效 if not media_url or not media_url.startswith(('http://', 'https://')): return None # 请求媒体文件 response = self.session.get( media_url, timeout=self.timeout, verify=False, stream=False # 改为False以确保获取完整内容 ) response.raise_for_status() # 获取文件信息 content_type = response.headers.get('content-type', '') content_length = response.headers.get('content-length') file_size = int(content_length) if content_length else len(response.content) # 确定文件扩展名 file_extension = self.get_file_extension_from_url(media_url, content_type) # 生成文件名 filename = f"media_{crawled_content.id}_{len(crawled_content.media_files.all())}{file_extension}" # 创建媒体文件对象 media_file = MediaFile.objects.create( content=crawled_content, media_type=media_type, original_url=media_url, file_size=file_size, mime_type=content_type, alt_text=alt_text ) # 保存文件 media_file.local_file.save( filename, ContentFile(response.content), save=True ) self.log('info', f'媒体文件已下载: {filename} ({media_type})', crawled_content.website) return media_file except Exception as e: self.log('error', f'下载媒体文件失败 {media_url}: {str(e)}', crawled_content.website) return None def get_file_extension_from_url(self, url, content_type): """从URL或内容类型获取文件扩展名""" # 从URL获取扩展名 parsed_url = urlparse(url) path = parsed_url.path if '.' in path: return os.path.splitext(path)[1] # 从内容类型获取扩展名 content_type_map = { 'image/jpeg': '.jpg', 'image/jpg': '.jpg', 'image/png': '.png', 'image/gif': '.gif', 'image/webp': '.webp', 'image/svg+xml': '.svg', 'video/mp4': '.mp4', 'video/avi': '.avi', 'video/mov': '.mov', 'video/wmv': '.wmv', 'video/flv': '.flv', 'video/webm': '.webm', 'audio/mp3': '.mp3', 'audio/wav': '.wav', 'audio/ogg': '.ogg', 'application/pdf': '.pdf', 'application/msword': '.doc', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', } return content_type_map.get(content_type.lower(), '.bin') def extract_and_download_media(self, soup, crawled_content, base_url): """提取并下载页面中的媒体文件""" media_files = [] # 提取图片 images = soup.find_all('img') self.log('info', f'找到 {len(images)} 个图片标签', crawled_content.website) for img in images: src = img.get('src') if src: # 处理相对URL if src.startswith('//'): src = 'https:' + src elif src.startswith('/'): src = urljoin(base_url, src) elif not src.startswith(('http://', 'https://')): src = urljoin(base_url, src) alt_text = img.get('alt', '') self.log('info', f'尝试下载图片: {src}', crawled_content.website) media_file = self.download_media_file(src, crawled_content, 'image', alt_text) if media_file: media_files.append(media_file) self.log('info', f'成功下载图片: {media_file.local_file.name}', crawled_content.website) # 提取视频 videos = soup.find_all(['video', 'source']) for video in videos: src = video.get('src') if src: # 处理相对URL if src.startswith('//'): src = 'https:' + src elif src.startswith('/'): src = urljoin(base_url, src) elif not src.startswith(('http://', 'https://')): src = urljoin(base_url, src) media_file = self.download_media_file(src, crawled_content, 'video') if media_file: media_files.append(media_file) # 提取音频 audios = soup.find_all('audio') for audio in audios: src = audio.get('src') if src: # 处理相对URL if src.startswith('//'): src = 'https:' + src elif src.startswith('/'): src = urljoin(base_url, src) elif not src.startswith(('http://', 'https://')): src = urljoin(base_url, src) media_file = self.download_media_file(src, crawled_content, 'audio') if media_file: media_files.append(media_file) return media_files def mark_content_saved(self, crawled_content): """标记内容已保存(内容已存储在数据库中)""" try: crawled_content.is_local_saved = True crawled_content.save() media_count = crawled_content.media_files.count() self.log('info', f'文章内容已保存到数据库 (包含 {media_count} 个媒体文件)', crawled_content.website) return True except Exception as e: self.log('error', f'标记内容保存状态失败: {str(e)}', crawled_content.website) return False def crawl_website(self, website): """爬取单个网站""" self.log('info', f'开始爬取网站: {website.name}', website) try: # 请求主页 response = self.session.get( website.url, timeout=self.timeout, verify=False # 忽略SSL证书验证 ) response.raise_for_status() # 检查内容编码 if response.encoding != 'utf-8': # 尝试从响应头获取编码 content_type = response.headers.get('content-type', '') if 'charset=' in content_type: charset = content_type.split('charset=')[-1] response.encoding = charset else: response.encoding = 'utf-8' soup = BeautifulSoup(response.content, 'html.parser') # 查找文章链接 article_links = self.find_article_links(soup, website.url) self.log('info', f'找到 {len(article_links)} 个文章链接', website) crawled_count = 0 for link_info in article_links: try: # 请求文章页面 article_response = self.session.get( link_info['url'], timeout=self.timeout, verify=False # 忽略SSL证书验证 ) article_response.raise_for_status() # 检查内容编码 if article_response.encoding != 'utf-8': # 尝试从响应头获取编码 content_type = article_response.headers.get('content-type', '') if 'charset=' in content_type: charset = content_type.split('charset=')[-1] article_response.encoding = charset else: article_response.encoding = 'utf-8' article_soup = BeautifulSoup(article_response.content, 'html.parser') # 提取内容 content = self.extract_article_content(link_info['url'], article_soup) title = link_info['title'] # 检查关键字匹配 matched_keywords = self.check_keyword_match(content, title) if matched_keywords: # 提取其他信息 publish_date = self.extract_publish_date(article_soup) author = self.extract_author(article_soup) # 检查是否已存在相同URL的文章 existing_content = CrawledContent.objects.filter( url=link_info['url'], task=self.task ).first() if existing_content: # 如果已存在,更新现有记录而不是创建新记录 existing_content.title = title existing_content.content = content existing_content.publish_date = publish_date existing_content.author = author existing_content.keywords_matched = ','.join(matched_keywords) existing_content.save() # 更新媒体文件 # 先删除旧的媒体文件 existing_content.media_files.all().delete() # 然后重新下载媒体文件 media_files = self.extract_and_download_media(article_soup, existing_content, link_info['url']) self.log('info', f'更新已存在的文章: {title[:50]}...', website) else: # 保存新内容 crawled_content = CrawledContent.objects.create( task=self.task, website=website, title=title, content=content, url=link_info['url'], publish_date=publish_date, author=author, keywords_matched=','.join(matched_keywords), is_local_saved=False # 初始设置为False,保存到本地后会更新为True ) # 提取并下载媒体文件 media_files = self.extract_and_download_media(article_soup, crawled_content, link_info['url']) # 标记内容已保存 self.mark_content_saved(crawled_content) self.log('info', f'保存新文章: {title[:50]}...', website) crawled_count += 1 # 请求间隔 time.sleep(settings.CRAWLER_SETTINGS['REQUEST_DELAY']) except requests.exceptions.SSLError as e: self.log('error', f'SSL错误,跳过文章 {link_info["url"]}: {str(e)}', website) continue except requests.exceptions.ConnectionError as e: self.log('error', f'连接错误,跳过文章 {link_info["url"]}: {str(e)}', website) continue except requests.exceptions.Timeout as e: self.log('error', f'请求超时,跳过文章 {link_info["url"]}: {str(e)}', website) continue except requests.exceptions.RequestException as e: self.log('error', f'网络请求错误,跳过文章 {link_info["url"]}: {str(e)}', website) continue except UnicodeDecodeError as e: self.log('error', f'字符编码错误,跳过文章 {link_info["url"]}: {str(e)}', website) continue except Exception as e: self.log('error', f'处理文章失败 {link_info["url"]}: {str(e)}', website) continue self.log('info', f'网站爬取完成,共保存 {crawled_count} 篇文章', website) return crawled_count except requests.exceptions.SSLError as e: self.log('error', f'爬取网站SSL错误: {str(e)}', website) return 0 except requests.exceptions.ConnectionError as e: self.log('error', f'爬取网站连接错误: {str(e)}', website) return 0 except requests.exceptions.Timeout as e: self.log('error', f'爬取网站超时: {str(e)}', website) return 0 except requests.exceptions.RequestException as e: self.log('error', f'爬取网站网络错误: {str(e)}', website) return 0 except Exception as e: self.log('error', f'爬取网站失败: {str(e)}', website) return 0 def run(self): """运行爬取任务""" self.log('info', f'开始执行爬取任务: {self.task.name}') self.update_task_status('running') total_crawled = 0 websites = self.task.websites.filter(is_active=True) self.task.total_pages = websites.count() self.task.save() for website in websites: try: crawled_count = self.crawl_website(website) total_crawled += crawled_count self.task.crawled_pages += 1 self.task.save() except Exception as e: self.log('error', f'爬取网站 {website.name} 时发生错误: {str(e)}', website) continue # 更新任务状态 if total_crawled > 0: self.update_task_status('completed') self.log('info', f'爬取任务完成,共爬取 {total_crawled} 篇文章') else: self.update_task_status('failed', error_message='没有找到匹配的内容') self.log('error', '爬取任务失败,没有找到匹配的内容') def run_crawl_task(task_id): """运行爬取任务(Celery任务)""" try: crawler = WebsiteCrawler(task_id) crawler.run() return f"任务 {task_id} 执行完成" except Exception as e: # 记录异常到日志 logger.error(f"执行任务 {task_id} 时发生异常: {str(e)}", exc_info=True) task = CrawlTask.objects.get(id=task_id) task.status = 'failed' task.error_message = str(e) task.completed_at = timezone.now() task.save() CrawlLog.objects.create( task=task, level='error', message=f'任务执行失败: {str(e)}' ) return f"任务 {task_id} 执行失败: {str(e)}"