diff --git a/KEYWORD_CRAWLER_EXAMPLES.md b/KEYWORD_CRAWLER_EXAMPLES.md deleted file mode 100644 index 207704a..0000000 --- a/KEYWORD_CRAWLER_EXAMPLES.md +++ /dev/null @@ -1,152 +0,0 @@ -# 关键词爬取功能使用示例 - -## 快速开始 - -### 1. 查看支持的网站 -```bash -python manage.py crawl_by_keyword --list-websites -``` - -### 2. 基本关键词搜索 -```bash -# 搜索"人工智能"相关文章 -python manage.py crawl_by_keyword --keyword "人工智能" - -# 搜索"两会"相关文章,限制数量 -python manage.py crawl_by_keyword --keyword "两会" --max-pages 3 --max-articles 20 -``` - -### 3. 指定网站搜索 -```bash -# 只在人民日报和新华网搜索 -python manage.py crawl_by_keyword --keyword "人工智能" --websites "人民日报" "新华网" -``` - -### 4. 日期范围搜索 -```bash -# 搜索2024年1月的文章 -python manage.py crawl_by_keyword --keyword "新闻" --start-date "2024-01-01" --end-date "2024-01-31" -``` - -### 5. 历史文章爬取 -```bash -# 爬取最近30天的历史文章 -python manage.py crawl_by_keyword --keyword "新闻" --historical - -# 爬取指定日期范围的历史文章 -python manage.py crawl_by_keyword --keyword "新闻" --historical --start-date "2024-01-01" --end-date "2024-01-31" -``` - -### 6. 保存结果 -```bash -# 将爬取结果保存到JSON文件 -python manage.py crawl_by_keyword --keyword "人工智能" --output results.json -``` - -## 多网站一键爬取 - -### 1. 全站爬取 -```bash -# 爬取所有网站的最新文章 -python manage.py crawl_all_websites --mode full - -# 爬取指定网站 -python manage.py crawl_all_websites --mode full --websites "新华网" "人民日报" "央视网" -``` - -### 2. 关键词爬取 -```bash -# 在所有网站搜索"人工智能" -python manage.py crawl_all_websites --mode keyword --keyword "人工智能" - -# 限制搜索页数和文章数量 -python manage.py crawl_all_websites --mode keyword --keyword "人工智能" --max-search-pages 5 --max-articles 30 -``` - -### 3. 混合模式 -```bash -# 同时进行全站爬取和关键词搜索 -python manage.py crawl_all_websites --mode both --keyword "人工智能" -``` - -## 实际使用场景 - -### 场景1:新闻热点追踪 -```bash -# 追踪"人工智能"相关新闻 -python manage.py crawl_by_keyword --keyword "人工智能" --max-pages 5 --max-articles 50 --output ai_news.json -``` - -### 场景2:政策文件收集 -```bash -# 收集"政策"相关文章 -python manage.py crawl_by_keyword --keyword "政策" --websites "中国政府网" "新华网" "人民日报" --max-articles 30 -``` - -### 场景3:历史资料整理 -```bash -# 整理2024年1月的所有新闻 -python manage.py crawl_by_keyword --keyword "新闻" --historical --start-date "2024-01-01" --end-date "2024-01-31" --max-articles 100 -``` - -### 场景4:全面信息收集 -```bash -# 一键收集所有网站的最新信息 -python manage.py crawl_all_websites --mode both --keyword "新闻" --max-search-pages 3 --max-articles 20 -``` - -## 注意事项 - -1. **网络连接**:确保网络连接稳定 -2. **请求频率**:系统会自动控制请求频率,避免对目标网站造成压力 -3. **存储空间**:爬取的文章和媒体文件会占用存储空间 -4. **时间消耗**:大量爬取可能需要较长时间 -5. **网站限制**:某些网站可能有反爬虫机制 - -## 故障排除 - -### 常见问题 - -1. **搜索无结果** - - 检查关键词是否正确 - - 尝试使用更通用的关键词 - - 检查日期范围是否合理 - -2. **网站访问失败** - - 检查网络连接 - - 某些网站可能暂时不可用 - - 尝试减少并发请求 - -3. **编码问题** - - 系统已自动处理常见编码问题 - - 如仍有问题,请检查网站编码设置 - -### 调试技巧 - -1. **使用小范围测试** - ```bash - python manage.py crawl_by_keyword --keyword "测试" --websites "新华网" --max-pages 1 --max-articles 3 - ``` - -2. **查看详细输出** - - 命令会显示详细的爬取进度 - - 注意错误信息和警告 - -3. **保存结果分析** - ```bash - python manage.py crawl_by_keyword --keyword "测试" --output debug.json - ``` - -## 性能优化建议 - -1. **合理设置参数** - - 根据需求调整 `max-pages` 和 `max-articles` - - 避免设置过大的数值 - -2. **分批处理** - - 对于大量数据,建议分批处理 - - 可以按网站或时间段分批 - -3. **定期清理** - - 定期清理不需要的文章数据 - - 清理过期的媒体文件 diff --git a/WEBSITES_FIELD_FIX.md b/WEBSITES_FIELD_FIX.md deleted file mode 100644 index 1f3add4..0000000 --- a/WEBSITES_FIELD_FIX.md +++ /dev/null @@ -1,188 +0,0 @@ -# 网站字段修复说明 - -## 问题描述 - -您遇到了 "can only join an iterable" 错误,这是因为将 `CrawlTask` 模型中的 `websites` 字段从 `JSONField` 改为了 `ManyToManyField`,但代码中还有一些地方没有相应更新。 - -## 已修复的问题 - -### 1. 任务执行器修复 - -在 `core/task_executor.py` 中,已经修复了以下方法: - -- `_execute_keyword_task()` - 关键词搜索任务执行 -- `_execute_historical_task()` - 历史文章任务执行 -- `_execute_full_site_task()` - 全站爬取任务执行 - -**修复前:** -```python -websites = task.websites if task.websites else list(WEBSITE_SEARCH_CONFIGS.keys()) -``` - -**修复后:** -```python -selected_websites = task.websites.all() -if selected_websites: - websites = [w.name for w in selected_websites] -else: - websites = list(WEBSITE_SEARCH_CONFIGS.keys()) -``` - -### 2. 模型方法修复 - -在 `core/models.py` 中,`get_websites_display()` 方法已经正确处理了 `ManyToManyField`: - -```python -def get_websites_display(self): - """获取网站列表的显示文本""" - try: - websites = self.websites.all() - if not websites: - return "所有网站" - # 确保网站名称是字符串并可以被join处理 - website_names = [str(w.name) for w in websites if w.name] - return ", ".join(website_names) if website_names else "所有网站" - except Exception: - # 如果出现任何异常,返回默认值 - return "所有网站" -``` - -### 3. 管理界面修复 - -在 `core/admin_extended.py` 中,已经修复了任务创建逻辑: - -- 使用 `Website.objects.filter(name__in=websites)` 获取网站对象 -- 使用 `task.websites.set(website_objects)` 设置关联关系 - -## 测试验证 - -已经通过以下测试验证修复: - -1. **网站字段功能测试** - 验证 `ManyToManyField` 的基本操作 -2. **任务执行器测试** - 验证任务执行器中的网站获取逻辑 -3. **Web界面操作测试** - 验证完整的Web界面操作流程 -4. **系统检查** - Django系统检查无错误 - -## 使用方法 - -### 1. 确保数据库迁移已应用 - -```bash -python manage.py migrate -``` - -### 2. 创建网站数据 - -如果还没有网站数据,可以通过以下方式创建: - -```python -from core.models import Website - -# 创建一些测试网站 -websites_to_create = ["新华网", "人民日报", "央视网"] -for name in websites_to_create: - website, created = Website.objects.get_or_create( - name=name, - defaults={ - 'base_url': f'http://{name}.com', - 'enabled': True - } - ) -``` - -### 3. 通过Web界面创建任务 - -1. 启动服务器:`python manage.py runserver` -2. 访问管理后台:`http://localhost:8000/admin/` -3. 在首页点击"快速创建爬取任务" -4. 选择任务类型并填写相关信息 -5. 选择目标网站 -6. 创建并启动任务 - -### 4. 通过命令行创建任务 - -```bash -# 关键词搜索 -python manage.py crawl_by_keyword --keyword "人工智能" - -# 历史文章爬取 -python manage.py crawl_by_keyword --keyword "新闻" --historical - -# 多网站一键爬取 -python manage.py crawl_all_websites --mode both --keyword "人工智能" -``` - -## 故障排除 - -### 如果仍然遇到 "can only join an iterable" 错误 - -1. **检查数据库迁移**: - ```bash - python manage.py showmigrations core - python manage.py migrate - ``` - -2. **检查网站数据**: - ```python - from core.models import Website - print(Website.objects.filter(enabled=True).count()) - ``` - -3. **检查任务数据**: - ```python - from core.models import CrawlTask - task = CrawlTask.objects.first() - if task: - print(f"任务网站: {task.get_websites_display()}") - ``` - -4. **重启服务器**: - ```bash - # 停止当前服务器 (Ctrl+C) - python manage.py runserver - ``` - -### 如果遇到其他错误 - -1. **检查日志**:查看Django日志输出 -2. **检查网络连接**:确保可以访问目标网站 -3. **检查网站配置**:确保 `WEBSITE_SEARCH_CONFIGS` 中的配置正确 - -## 技术细节 - -### ManyToManyField vs JSONField - -**之前的JSONField方式:** -```python -websites = models.JSONField(default=list, verbose_name="目标网站") -# 使用: task.websites = ["新华网", "人民日报"] -``` - -**现在的ManyToManyField方式:** -```python -websites = models.ManyToManyField(Website, blank=True, verbose_name="目标网站") -# 使用: task.websites.set(website_objects) -``` - -### 优势 - -1. **数据完整性**:通过外键关系确保数据一致性 -2. **查询效率**:可以利用数据库索引进行高效查询 -3. **关系管理**:Django自动处理多对多关系的创建和删除 -4. **数据验证**:自动验证关联的网站是否存在 - -## 总结 - -修复已经完成,现在您可以: - -1. 通过Web界面创建和管理爬取任务 -2. 选择特定的网站进行爬取 -3. 实时监控任务进度 -4. 查看详细的爬取结果 - -如果仍然遇到问题,请检查: -- 数据库迁移是否已应用 -- 网站数据是否存在 -- 服务器是否已重启 - -系统现在应该可以正常工作了! diff --git a/core/admin_extended.py b/core/admin_extended.py index feb5f4a..02daa0d 100644 --- a/core/admin_extended.py +++ b/core/admin_extended.py @@ -462,9 +462,10 @@ class CrawlTaskAdmin(admin.ModelAdmin): 'status', 'progress', 'current_website', 'current_action', 'total_articles', 'success_count', 'failed_count', 'created_at', 'started_at', 'completed_at', 'error_message', - 'result_details', 'duration_display', 'progress_display' + 'result_details', 'duration_display', 'progress_display', + 'execution_count', 'last_execution_at', 'execution_summary' ] - actions = ['start_tasks', 'cancel_tasks', 'delete_completed_tasks'] + actions = ['start_tasks', 'rerun_tasks', 'cancel_tasks', 'delete_completed_tasks'] class Media: js = ('admin/js/crawl_task_actions.js',) @@ -488,6 +489,10 @@ class CrawlTaskAdmin(admin.ModelAdmin): 'fields': ('created_at', 'started_at', 'completed_at', 'duration_display'), 'classes': ('collapse',) }), + ('执行历史', { + 'fields': ('execution_count', 'last_execution_at', 'execution_summary'), + 'classes': ('collapse',) + }), ('错误信息', { 'fields': ('error_message',), 'classes': ('collapse',) @@ -539,6 +544,11 @@ class CrawlTaskAdmin(admin.ModelAdmin): return "-" duration_display.short_description = '执行时长' + def execution_summary(self, obj): + """执行摘要显示""" + return obj.get_execution_summary() + execution_summary.short_description = '执行摘要' + def actions_column(self, obj): """操作列""" actions = [] @@ -551,6 +561,10 @@ class CrawlTaskAdmin(admin.ModelAdmin): if obj.status == 'completed': actions.append(f'查看结果') + actions.append(f'重新执行') + + if obj.status in ['failed', 'cancelled']: + actions.append(f'重新执行') return format_html(' '.join(actions)) actions_column.short_description = '操作' @@ -572,6 +586,23 @@ class CrawlTaskAdmin(admin.ModelAdmin): self.message_user(request, f'成功启动 {started_count} 个任务', messages.SUCCESS) start_tasks.short_description = '启动选中的任务' + def rerun_tasks(self, request, queryset): + """重新执行选中的任务""" + rerun_count = 0 + for task in queryset.filter(status__in=['completed', 'failed', 'cancelled']): + try: + success, message = task_executor.rerun_task(task.id) + if success: + rerun_count += 1 + else: + self.message_user(request, f'重新执行任务 {task.name} 失败: {message}', messages.ERROR) + except Exception as e: + self.message_user(request, f'重新执行任务 {task.name} 失败: {e}', messages.ERROR) + + if rerun_count > 0: + self.message_user(request, f'成功重新执行 {rerun_count} 个任务', messages.SUCCESS) + rerun_tasks.short_description = '重新执行选中的任务' + def cancel_tasks(self, request, queryset): """取消选中的任务""" cancelled_count = 0 @@ -587,6 +618,9 @@ class CrawlTaskAdmin(admin.ModelAdmin): if cancelled_count > 0: self.message_user(request, f'成功取消 {cancelled_count} 个任务', messages.SUCCESS) + elif queryset.filter(status__in=['pending', 'running']).count() > 0: + # 有任务但没有成功取消任何任务 + self.message_user(request, '没有成功取消任何任务', messages.WARNING) cancel_tasks.short_description = '取消选中的任务' def delete_completed_tasks(self, request, queryset): @@ -628,6 +662,11 @@ class CrawlTaskAdmin(admin.ModelAdmin): self.admin_site.admin_view(self.cancel_task_view), name='cancel_task', ), + path( + '/rerun/', + self.admin_site.admin_view(self.rerun_task_view), + name='rerun_task', + ), path( '/results/', self.admin_site.admin_view(self.view_results_view), @@ -640,7 +679,7 @@ class CrawlTaskAdmin(admin.ModelAdmin): """创建关键词搜索任务视图""" if request.method == 'POST': try: - from .utils import WEBSITE_SEARCH_CONFIGS + from .utils import WEBSITE_CRAWL_CONFIGS name = request.POST.get('name', '') keyword = request.POST.get('keyword', '') @@ -688,7 +727,7 @@ class CrawlTaskAdmin(admin.ModelAdmin): """创建历史文章任务视图""" if request.method == 'POST': try: - from .utils import WEBSITE_SEARCH_CONFIGS + from .utils import WEBSITE_CRAWL_CONFIGS name = request.POST.get('name', '') websites = request.POST.getlist('websites') @@ -733,7 +772,7 @@ class CrawlTaskAdmin(admin.ModelAdmin): """创建全站爬取任务视图""" if request.method == 'POST': try: - from .utils import WEBSITE_SEARCH_CONFIGS + from .utils import WEBSITE_CRAWL_CONFIGS name = request.POST.get('name', '') websites = request.POST.getlist('websites') @@ -783,6 +822,19 @@ class CrawlTaskAdmin(admin.ModelAdmin): return HttpResponseRedirect(reverse('admin:core_crawltask_changelist')) + def rerun_task_view(self, request, task_id): + """重新执行任务视图""" + try: + success, message = task_executor.rerun_task(task_id) + if success: + self.message_user(request, f'任务已重新执行: {message}', messages.SUCCESS) + else: + self.message_user(request, f'重新执行任务失败: {message}', messages.ERROR) + except Exception as e: + self.message_user(request, f'重新执行任务失败: {e}', messages.ERROR) + + return HttpResponseRedirect(reverse('admin:core_crawltask_changelist')) + def cancel_task_view(self, request, task_id): """取消任务视图""" try: diff --git a/core/keyword_crawler.py b/core/keyword_crawler.py new file mode 100644 index 0000000..9b47a33 --- /dev/null +++ b/core/keyword_crawler.py @@ -0,0 +1,765 @@ +""" +关键词爬虫引擎 +基于 crawler_engine.py 的关键词爬取方法改进 +""" + +import requests +import time +import re +import logging +import os +import urllib3 +from bs4 import BeautifulSoup +from urllib.parse import urljoin, urlparse +from django.conf import settings +from django.utils import timezone +from django.core.files.base import ContentFile +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from .models import Website, CrawlTask, Article +from .utils import get_page_with_selenium, get_page_with_requests, check_keyword_in_content + +# 禁用SSL警告 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# 设置日志记录器 +logger = logging.getLogger(__name__) + + +class KeywordCrawler: + """关键词爬虫引擎""" + + def __init__(self, task_id, task_executor_instance=None): + self.task = CrawlTask.objects.get(id=task_id) + self.task_id = task_id + self.task_executor = task_executor_instance + self.keywords = [kw.strip() for kw in self.task.keyword.split(',') if kw.strip()] if self.task.keyword else [] + + # 创建带重试策略的会话 + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + }) + + # 设置重试策略 + retry_strategy = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + # 设置超时 + self.timeout = 15 + + def log(self, level, message, website=None): + """记录日志""" + print(f"[{level.upper()}] {message}") + logger.log(getattr(logging, level.upper()), f"Task {self.task.id}: {message}") + + def is_cancelled(self): + """检查任务是否已被取消""" + if self.task_executor: + return self.task_executor.is_task_cancelled(self.task_id) + return False + + def update_task_status(self, status, **kwargs): + """更新任务状态""" + self.task.status = status + if status == 'running' and not self.task.started_at: + self.task.started_at = timezone.now() + elif status in ['completed', 'failed', 'cancelled']: + self.task.completed_at = timezone.now() + + for key, value in kwargs.items(): + setattr(self.task, key, value) + self.task.save() + + def extract_text_content(self, soup): + """提取文本内容,保持段落结构""" + # 移除脚本和样式标签 + for script in soup(["script", "style"]): + script.decompose() + + # 处理段落标签,保持段落结构 + paragraphs = [] + + # 查找所有段落相关的标签 + for element in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'br']): + if element.name in ['p', 'div']: + text = element.get_text().strip() + if text: + paragraphs.append(text) + elif element.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: + text = element.get_text().strip() + if text: + paragraphs.append(f"\n{text}\n") # 标题前后加换行 + elif element.name == 'br': + paragraphs.append('\n') + + # 如果没有找到段落标签,使用原来的方法 + if not paragraphs: + text = soup.get_text() + # 清理文本但保持换行 + lines = [] + for line in text.splitlines(): + line = line.strip() + if line: + lines.append(line) + return '\n\n'.join(lines) + + # 合并段落,用双换行分隔 + content = '\n\n'.join(paragraphs) + + # 清理多余的空行 + content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) + + return content.strip() + + def clean_url(self, url): + """清理和修复URL""" + try: + # 处理空值或None + if not url or url is None: + return "" + + # 修复常见的URL问题 + # 将错误的编码字符恢复 + url = str(url).replace('%C3%97', '×') # 修复 × 字符的错误编码 + url = url.replace('%E2%80%93', '–') # 修复 – 字符的错误编码 + url = url.replace('%E2%80%94', '—') # 修复 — 字符的错误编码 + + # 解析URL并重新构建 + parsed = urlparse(url) + + # 清理查询参数 + if parsed.query: + # 处理查询参数中的编码问题 + from urllib.parse import parse_qs, urlencode, unquote + query_params = parse_qs(parsed.query) + cleaned_params = {} + + for key, values in query_params.items(): + # 解码参数名 + clean_key = unquote(key) + # 解码参数值 + clean_values = [unquote(val) for val in values] + cleaned_params[clean_key] = clean_values + + # 重新构建查询字符串 + query_string = urlencode(cleaned_params, doseq=True) + else: + query_string = '' + + # 重新构建URL + clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" + if query_string: + clean_url += f"?{query_string}" + if parsed.fragment: + clean_url += f"#{parsed.fragment}" + + return clean_url + + except Exception as e: + self.log('warning', f'URL清理失败: {url}, 错误: {e}') + return url + + def is_valid_article_url(self, url): + """检查是否是有效的文章URL""" + try: + # 排除一些明显不是文章的URL + exclude_patterns = [ + 'javascript:', 'mailto:', '#', 'tel:', + '.pdf', '.doc', '.docx', '.xls', '.xlsx', + '.jpg', '.jpeg', '.png', '.gif', '.svg', + '.mp3', '.mp4', '.avi', '.mov' + ] + + url_lower = url.lower() + for pattern in exclude_patterns: + if pattern in url_lower: + return False + + # 检查URL长度 + if len(url) < 10: + return False + + # 检查是否包含文章相关的关键词 + article_keywords = ['article', 'news', 'content', 'detail', 'view', 'show', 'post'] + url_lower = url.lower() + for keyword in article_keywords: + if keyword in url_lower: + return True + + # 如果URL看起来像文章ID或路径,也认为是有效的 + if any(char.isdigit() for char in url) and len(url.split('/')) > 3: + return True + + return False + + except Exception: + return False + + def find_article_links(self, soup, base_url): + """查找文章链接""" + links = [] + seen_urls = set() # 避免重复URL + + # 常见的文章链接选择器 + selectors = [ + 'a[href*="article"]', + 'a[href*="news"]', + 'a[href*="content"]', + 'a[href*="detail"]', + 'a[href*="view"]', + 'a[href*="show"]', + '.news-list a', + '.article-list a', + '.content-list a', + 'h3 a', + 'h4 a', + '.title a', + '.list-item a' + ] + + for selector in selectors: + elements = soup.select(selector) + for element in elements: + href = element.get('href') + if href: + # 清理和修复URL + clean_href = self.clean_url(href) + full_url = urljoin(base_url, clean_href) + + # 再次清理完整URL + full_url = self.clean_url(full_url) + + # 检查URL是否有效且未重复 + if (full_url not in seen_urls and + self.is_valid_article_url(full_url) and + full_url.startswith(('http://', 'https://'))): + + title = element.get_text().strip() + if title and len(title) > 5: # 过滤掉太短的标题 + links.append({ + 'url': full_url, + 'title': title + }) + seen_urls.add(full_url) + + return links + + def check_keyword_match(self, text, title): + """检查关键字匹配 - 改进版本""" + matched_keywords = [] + text_lower = text.lower() + title_lower = title.lower() + + for keyword in self.keywords: + keyword_lower = keyword.lower() + # 使用改进的关键字检查函数 + if check_keyword_in_content(text, keyword) or check_keyword_in_content(title, keyword): + matched_keywords.append(keyword) + + return matched_keywords + + def extract_article_content(self, url, soup): + """提取文章内容""" + # 尝试多种内容选择器 + content_selectors = [ + '.article-content', + '.content', + '.article-body', + '.news-content', + '.main-content', + '.post-content', + 'article', + '.detail-content', + '#content', + '.text', + '.box_con', # 新华网等网站使用 + '.content_area', # 央视网等网站使用 + ] + + content = "" + for selector in content_selectors: + element = soup.select_one(selector) + if element: + content = self.extract_text_content(element) + if len(content) > 100: # 确保内容足够长 + break + + # 如果没找到特定内容区域,使用整个页面 + if not content or len(content) < 100: + content = self.extract_text_content(soup) + + return content + + def extract_publish_date(self, soup): + """提取发布时间""" + date_selectors = [ + '.publish-time', + '.pub-time', + '.date', + '.time', + '.publish-date', + 'time[datetime]', + '.article-time', + '.news-time', + '.post-time', + '.create-time', + '.update-time', + '.time span', + '.date span', + '.info span', + '.meta span', + '.meta-info', + '.article-info span', + '.news-info span', + '.content-info span', + '.a-shijian', + '.l-time' + ] + + for selector in date_selectors: + elements = soup.select(selector) + for element in elements: + date_text = element.get_text().strip() + if element.get('datetime'): + date_text = element.get('datetime') + + # 如果文本太短或为空,跳过 + if not date_text or len(date_text) < 4: + continue + + # 尝试解析日期 + try: + from datetime import datetime + + # 清理日期文本 + date_text = re.sub(r'发布(时间|日期)[::]?', '', date_text).strip() + date_text = re.sub(r'时间[::]?', '', date_text).strip() + date_text = re.sub(r'日期[::]?', '', date_text).strip() + date_text = re.sub(r'发表于[::]?', '', date_text).strip() + date_text = re.sub(r'更新[::]?', '', date_text).strip() + date_text = re.sub(r'\s+', ' ', date_text).strip() + + # 如果有 datetime 属性且是标准格式,直接使用 + if element.get('datetime'): + datetime_attr = element.get('datetime') + # 尝试解析常见的日期时间格式 + for fmt in [ + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%S%z', + '%Y-%m-%d %H:%M', + '%Y-%m-%d', + '%Y/%m/%d %H:%M:%S', + '%Y/%m/%d %H:%M', + '%Y/%m/%d', + '%Y年%m月%d日 %H:%M:%S', + '%Y年%m月%d日 %H:%M', + '%Y年%m月%d日', + ]: + try: + if '%z' in fmt and '+' not in datetime_attr and datetime_attr.endswith('Z'): + datetime_attr = datetime_attr[:-1] + '+0000' + parsed_date = datetime.strptime(datetime_attr, fmt) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except ValueError: + continue + + # 尝试解析从文本中提取的日期 + for fmt in [ + '%Y年%m月%d日 %H:%M:%S', + '%Y年%m月%d日 %H:%M', + '%Y年%m月%d日', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%d %H:%M', + '%Y-%m-%d', + '%Y/%m/%d %H:%M:%S', + '%Y/%m/%d %H:%M', + '%Y/%m/%d', + '%m月%d日 %H:%M', + '%m月%d日', + ]: + try: + parsed_date = datetime.strptime(date_text, fmt) + # 如果没有年份,使用当前年份 + if '%Y' not in fmt: + parsed_date = parsed_date.replace(year=datetime.now().year) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except ValueError: + continue + + # 如果以上格式都不匹配,尝试使用 dateutil 解析 + try: + from dateutil import parser + if len(date_text) > 5 and not date_text.isdigit(): + parsed_date = parser.parse(date_text) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except: + pass + + except Exception as e: + self.log('debug', f'解析日期失败: {date_text}, 错误: {str(e)}') + continue + + return None + + def extract_author(self, soup): + """提取作者信息""" + author_selectors = [ + '.author', + '.writer', + '.publisher', + '.byline', + '.article-author', + '.news-author', + '.source' + ] + + for selector in author_selectors: + element = soup.select_one(selector) + if element: + return element.get_text().strip() + + return "" + + def download_media_file(self, media_url, article, media_type='image', alt_text=''): + """下载媒体文件 - 适配现有模型结构""" + try: + # 检查URL是否有效 + if not media_url or not media_url.startswith(('http://', 'https://')): + return None + + # 请求媒体文件 + response = self.session.get( + media_url, + timeout=self.timeout, + verify=False, + stream=False + ) + response.raise_for_status() + + # 获取文件信息 + content_type = response.headers.get('content-type', '') + file_size = len(response.content) + + # 确定文件扩展名 + file_extension = self.get_file_extension_from_url(media_url, content_type) + + # 生成文件名 + existing_media_count = len(article.media_files) if article.media_files else 0 + filename = f"media_{article.id}_{existing_media_count}{file_extension}" + + # 创建媒体文件信息字典 + media_info = { + 'type': media_type, + 'original_url': media_url, + 'filename': filename, + 'file_size': file_size, + 'mime_type': content_type, + 'alt_text': alt_text, + 'downloaded_at': timezone.now().isoformat() + } + + # 更新文章的媒体文件列表 + if not article.media_files: + article.media_files = [media_info] + else: + article.media_files.append(media_info) + + # 保存文件到本地(这里简化处理,实际项目中可能需要更复杂的文件存储) + self.log('info', f'媒体文件已记录: {filename} ({media_type})') + return media_info + + except Exception as e: + self.log('error', f'下载媒体文件失败 {media_url}: {str(e)}') + return None + + def get_file_extension_from_url(self, url, content_type): + """从URL或内容类型获取文件扩展名""" + # 从URL获取扩展名 + parsed_url = urlparse(url) + path = parsed_url.path + if '.' in path: + return os.path.splitext(path)[1] + + # 从内容类型获取扩展名 + content_type_map = { + 'image/jpeg': '.jpg', + 'image/jpg': '.jpg', + 'image/png': '.png', + 'image/gif': '.gif', + 'image/webp': '.webp', + 'image/svg+xml': '.svg', + 'video/mp4': '.mp4', + 'video/avi': '.avi', + 'video/mov': '.mov', + 'video/wmv': '.wmv', + 'video/flv': '.flv', + 'video/webm': '.webm', + 'audio/mp3': '.mp3', + 'audio/wav': '.wav', + 'audio/ogg': '.ogg', + 'application/pdf': '.pdf', + 'application/msword': '.doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', + } + + return content_type_map.get(content_type.lower(), '.bin') + + def extract_and_download_media(self, soup, article, base_url): + """提取并下载页面中的媒体文件""" + media_files = [] + + # 提取图片 + images = soup.find_all('img') + self.log('info', f'找到 {len(images)} 个图片标签') + + for img in images: + src = img.get('src') + if src: + # 处理相对URL + if src.startswith('//'): + src = 'https:' + src + elif src.startswith('/'): + src = urljoin(base_url, src) + elif not src.startswith(('http://', 'https://')): + src = urljoin(base_url, src) + + alt_text = img.get('alt', '') + media_file = self.download_media_file(src, article, 'image', alt_text) + if media_file: + media_files.append(media_file) + + # 提取视频 + videos = soup.find_all(['video', 'source']) + for video in videos: + src = video.get('src') + if src: + # 处理相对URL + if src.startswith('//'): + src = 'https:' + src + elif src.startswith('/'): + src = urljoin(base_url, src) + elif not src.startswith(('http://', 'https://')): + src = urljoin(base_url, src) + + media_file = self.download_media_file(src, article, 'video') + if media_file: + media_files.append(media_file) + + return media_files + + def crawl_website(self, website): + """爬取单个网站""" + self.log('info', f'开始爬取网站: {website.name}') + + try: + # 请求主页 + response = self.session.get( + website.base_url, + timeout=self.timeout, + verify=False + ) + response.raise_for_status() + + # 检查内容编码 + if response.encoding != 'utf-8': + content_type = response.headers.get('content-type', '') + if 'charset=' in content_type: + charset = content_type.split('charset=')[-1] + response.encoding = charset + else: + response.encoding = 'utf-8' + + soup = BeautifulSoup(response.content, 'html.parser') + + # 查找文章链接 + article_links = self.find_article_links(soup, website.base_url) + self.log('info', f'找到 {len(article_links)} 个文章链接') + + crawled_count = 0 + for link_info in article_links: + # 检查任务是否已被取消 + if self.is_cancelled(): + self.log('info', '任务已被取消,停止处理文章') + return crawled_count + + try: + # 清理和验证URL + clean_url = self.clean_url(link_info['url']) + + # 检查URL是否仍然有效 + if not self.is_valid_article_url(clean_url): + self.log('warning', f'跳过无效URL: {clean_url}') + continue + + self.log('info', f'正在处理文章: {clean_url}') + + # 请求文章页面 + article_response = self.session.get( + clean_url, + timeout=self.timeout, + verify=False + ) + article_response.raise_for_status() + + # 检查内容编码 + if article_response.encoding != 'utf-8': + content_type = article_response.headers.get('content-type', '') + if 'charset=' in content_type: + charset = content_type.split('charset=')[-1] + article_response.encoding = charset + else: + article_response.encoding = 'utf-8' + + article_soup = BeautifulSoup(article_response.content, 'html.parser') + + # 提取内容 + content = self.extract_article_content(clean_url, article_soup) + title = link_info['title'] + + # 检查关键字匹配 + matched_keywords = self.check_keyword_match(content, title) + + if matched_keywords: + # 提取其他信息 + publish_date = self.extract_publish_date(article_soup) + author = self.extract_author(article_soup) + + # 检查是否已存在相同URL的文章 + existing_article = Article.objects.filter( + url=clean_url + ).first() + + if existing_article: + # 如果已存在,更新现有记录 + existing_article.title = title + existing_article.content = content + existing_article.pub_date = publish_date + existing_article.media_files = [] # 重置媒体文件列表 + existing_article.save() + + # 更新媒体文件 + media_files = self.extract_and_download_media(article_soup, existing_article, clean_url) + + self.log('info', f'更新已存在的文章: {title[:50]}...') + else: + # 保存新内容 + article = Article.objects.create( + website=website, + title=title, + content=content, + url=clean_url, + pub_date=publish_date, + media_files=[] + ) + + # 提取并下载媒体文件 + media_files = self.extract_and_download_media(article_soup, article, clean_url) + + self.log('info', f'保存新文章: {title[:50]}...') + + crawled_count += 1 + + # 请求间隔 + time.sleep(1) + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + self.log('warning', f'文章不存在 (404): {clean_url}') + elif e.response.status_code == 403: + self.log('warning', f'访问被拒绝 (403): {clean_url}') + elif e.response.status_code == 429: + self.log('warning', f'请求过于频繁 (429): {clean_url}') + time.sleep(5) # 等待更长时间 + else: + self.log('error', f'HTTP错误 {e.response.status_code}: {clean_url}') + continue + except requests.exceptions.Timeout as e: + self.log('warning', f'请求超时: {clean_url}') + continue + except requests.exceptions.ConnectionError as e: + self.log('warning', f'连接错误: {clean_url}') + continue + except Exception as e: + self.log('error', f'处理文章失败 {clean_url}: {str(e)}') + continue + + self.log('info', f'网站爬取完成,共保存 {crawled_count} 篇文章') + return crawled_count + + except Exception as e: + self.log('error', f'爬取网站失败: {str(e)}') + return 0 + + def run(self): + """运行爬取任务""" + self.log('info', f'开始执行关键词爬取任务: {self.task.name}') + self.update_task_status('running') + + total_crawled = 0 + websites = self.task.websites.all() + self.task.total_pages = websites.count() + self.task.save() + + for website in websites: + # 检查任务是否已被取消 + if self.is_cancelled(): + self.log('info', '任务已被取消,停止爬取') + self.update_task_status('cancelled', error_message='任务被取消') + return + + try: + crawled_count = self.crawl_website(website) + total_crawled += crawled_count + self.task.crawled_pages += 1 + self.task.save() + + # 再次检查任务是否已被取消 + if self.is_cancelled(): + self.log('info', '任务已被取消,停止爬取') + self.update_task_status('cancelled', error_message='任务被取消') + return + + except Exception as e: + self.log('error', f'爬取网站 {website.name} 时发生错误: {str(e)}') + continue + + # 更新任务状态 + if total_crawled > 0: + self.update_task_status('completed') + self.log('info', f'关键词爬取任务完成,共爬取 {total_crawled} 篇文章') + else: + self.update_task_status('failed', error_message='没有找到匹配的内容') + self.log('error', '关键词爬取任务失败,没有找到匹配的内容') + + +def run_keyword_crawl_task(task_id, task_executor_instance=None): + """运行关键词爬取任务""" + try: + crawler = KeywordCrawler(task_id, task_executor_instance) + crawler.run() + return f"关键词爬取任务 {task_id} 执行完成" + except Exception as e: + # 记录异常到日志 + logger.error(f"执行关键词爬取任务 {task_id} 时发生异常: {str(e)}", exc_info=True) + + task = CrawlTask.objects.get(id=task_id) + task.status = 'failed' + task.error_message = str(e) + task.completed_at = timezone.now() + task.save() + + return f"关键词爬取任务 {task_id} 执行失败: {str(e)}" \ No newline at end of file diff --git a/core/management/commands/crawl_all_websites.py b/core/management/commands/crawl_all_websites.py index d71cb71..5b22734 100644 --- a/core/management/commands/crawl_all_websites.py +++ b/core/management/commands/crawl_all_websites.py @@ -1,71 +1,64 @@ +import json from django.core.management.base import BaseCommand from core.utils import full_site_crawler, crawl_by_keyword, WEBSITE_SEARCH_CONFIGS -from core.models import Website -import json class Command(BaseCommand): - help = "一键爬取所有支持的网站" + help = '一键爬取所有网站' def add_arguments(self, parser): parser.add_argument( '--mode', '-m', type=str, + default='both', choices=['full', 'keyword', 'both'], - default='full', - help='爬取模式: full(全站爬取), keyword(关键词爬取), both(两种模式)' + help='爬取模式: full(全站爬取), keyword(关键词搜索), both(两者都执行)' ) parser.add_argument( '--keyword', '-k', type=str, - help='关键词搜索模式下的搜索关键词' + help='关键词搜索的关键词' ) parser.add_argument( '--websites', '-w', type=str, nargs='*', - help='指定要爬取的网站名称列表,如果不指定则爬取所有支持的网站' + help='指定要爬取的网站列表' ) parser.add_argument( '--max-pages', '-p', type=int, default=500, - help='全站爬取最大页数 (默认: 500)' + help='全站爬取的最大页数' ) parser.add_argument( - '--max-search-pages', '-sp', + '--max-search-pages', '-P', type=int, default=10, - help='关键词搜索最大页数 (默认: 10)' + help='关键词搜索的最大页数' ) parser.add_argument( '--max-articles', '-a', type=int, default=100, - help='关键词搜索最大文章数量 (默认: 100)' + help='关键词搜索的最大文章数' ) parser.add_argument( '--start-date', '-s', type=str, - help='开始日期 (格式: YYYY-MM-DD)' + help='开始日期 (YYYY-MM-DD)' ) parser.add_argument( '--end-date', '-e', type=str, - help='结束日期 (格式: YYYY-MM-DD)' - ) - - parser.add_argument( - '--list-websites', '-l', - action='store_true', - help='列出所有支持的网站' + help='结束日期 (YYYY-MM-DD)' ) parser.add_argument( @@ -75,9 +68,15 @@ class Command(BaseCommand): ) parser.add_argument( - '--skip-existing', + '--skip-existing', '-S', action='store_true', - help='跳过已存在的网站配置' + help='跳过已存在的网站' + ) + + parser.add_argument( + '--list-websites', '-l', + action='store_true', + help='列出所有支持的网站' ) def handle(self, *args, **options): @@ -162,6 +161,7 @@ class Command(BaseCommand): self.stdout.write(f"{'='*50}") # 获取或创建网站对象 + from core.models import Website website, created = Website.objects.get_or_create( name=website_name, defaults={ @@ -263,4 +263,4 @@ class Command(BaseCommand): except Exception as e: self.stdout.write(self.style.ERROR(f"一键爬取过程中出现错误: {e}")) - raise + raise \ No newline at end of file diff --git a/core/management/commands/crawl_by_keyword.py b/core/management/commands/crawl_by_keyword.py index fb06880..7d27e4f 100644 --- a/core/management/commands/crawl_by_keyword.py +++ b/core/management/commands/crawl_by_keyword.py @@ -1,15 +1,16 @@ -from django.core.management.base import BaseCommand -from core.utils import crawl_by_keyword, crawl_historical_articles, WEBSITE_SEARCH_CONFIGS import json +from django.core.management.base import BaseCommand +from core.utils import crawl_by_keyword, WEBSITE_SEARCH_CONFIGS class Command(BaseCommand): - help = "根据关键词爬取多个网站的文章" + help = '根据关键词爬取文章' def add_arguments(self, parser): parser.add_argument( '--keyword', '-k', type=str, + required=True, help='搜索关键词' ) @@ -17,39 +18,39 @@ class Command(BaseCommand): '--websites', '-w', type=str, nargs='*', - help='指定要爬取的网站名称列表,如果不指定则爬取所有支持的网站' + help='指定要爬取的网站列表' ) parser.add_argument( '--max-pages', '-p', type=int, default=10, - help='每个网站最大搜索页数 (默认: 10)' + help='每个网站最大搜索页数' ) parser.add_argument( - '--max-articles', '-a', + '--max-articles', '-m', type=int, default=100, - help='最大文章数量 (默认: 100)' + help='最大文章数量' ) parser.add_argument( '--start-date', '-s', type=str, - help='开始日期 (格式: YYYY-MM-DD)' + help='开始日期 (YYYY-MM-DD)' ) parser.add_argument( '--end-date', '-e', type=str, - help='结束日期 (格式: YYYY-MM-DD)' + help='结束日期 (YYYY-MM-DD)' ) parser.add_argument( - '--historical', + '--historical', '-H', action='store_true', - help='爬取历史文章模式' + help='使用历史文章爬取模式' ) parser.add_argument( @@ -121,6 +122,7 @@ class Command(BaseCommand): if historical: # 历史文章爬取模式 self.stdout.write(self.style.WARNING("使用历史文章爬取模式")) + from core.utils import crawl_historical_articles results = crawl_historical_articles( website_names=websites, start_date=start_date, @@ -163,4 +165,4 @@ class Command(BaseCommand): except Exception as e: self.stdout.write(self.style.ERROR(f"爬取过程中出现错误: {e}")) - raise + raise \ No newline at end of file diff --git a/core/migrations/0004_crawltask_execution_count_and_more.py b/core/migrations/0004_crawltask_execution_count_and_more.py new file mode 100644 index 0000000..c506df6 --- /dev/null +++ b/core/migrations/0004_crawltask_execution_count_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 5.1 on 2025-09-25 02:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0003_remove_crawltask_websites_crawltask_websites'), + ] + + operations = [ + migrations.AddField( + model_name='crawltask', + name='execution_count', + field=models.IntegerField(default=0, verbose_name='执行次数'), + ), + migrations.AddField( + model_name='crawltask', + name='execution_history', + field=models.JSONField(blank=True, default=list, verbose_name='执行历史'), + ), + migrations.AddField( + model_name='crawltask', + name='last_execution_at', + field=models.DateTimeField(blank=True, null=True, verbose_name='最后执行时间'), + ), + ] diff --git a/core/models.py b/core/models.py index 83eb907..c969c81 100644 --- a/core/models.py +++ b/core/models.py @@ -72,6 +72,11 @@ class CrawlTask(models.Model): created_by = models.CharField(max_length=100, blank=True, null=True, verbose_name="创建者") + # 执行历史字段 + execution_count = models.IntegerField(default=0, verbose_name="执行次数") + last_execution_at = models.DateTimeField(blank=True, null=True, verbose_name="最后执行时间") + execution_history = models.JSONField(default=list, blank=True, verbose_name="执行历史") + class Meta: verbose_name = "爬取任务" verbose_name_plural = "爬取任务" @@ -122,4 +127,47 @@ class CrawlTask(models.Model): return f"失败: {self.error_message[:50]}..." if self.error_message else "失败" elif self.status == 'cancelled': return "已取消" - return "未知状态" \ No newline at end of file + return "未知状态" + + def add_execution_record(self, status, started_at=None, completed_at=None, error_message=None): + """添加执行记录""" + if not started_at: + started_at = timezone.now() + + execution_record = { + 'execution_id': len(self.execution_history) + 1, + 'started_at': started_at.isoformat() if started_at else None, + 'completed_at': completed_at.isoformat() if completed_at else None, + 'status': status, + 'error_message': error_message, + 'success_count': self.success_count, + 'failed_count': self.failed_count, + 'total_articles': self.total_articles + } + + # 更新执行历史 + if not self.execution_history: + self.execution_history = [] + + self.execution_history.append(execution_record) + + # 更新执行次数和最后执行时间 + self.execution_count += 1 + self.last_execution_at = started_at + + # 只保留最近10次执行记录 + if len(self.execution_history) > 10: + self.execution_history = self.execution_history[-10:] + + self.save() + + def get_execution_summary(self): + """获取执行摘要""" + if not self.execution_history: + return "暂无执行记录" + + total_executions = len(self.execution_history) + successful_executions = len([r for r in self.execution_history if r['status'] == 'completed']) + failed_executions = len([r for r in self.execution_history if r['status'] == 'failed']) + + return f"执行 {total_executions} 次,成功 {successful_executions} 次,失败 {failed_executions} 次" \ No newline at end of file diff --git a/core/static/admin/js/crawl_task_actions.js b/core/static/admin/js/crawl_task_actions.js index bf6b676..5e69651 100644 --- a/core/static/admin/js/crawl_task_actions.js +++ b/core/static/admin/js/crawl_task_actions.js @@ -36,7 +36,15 @@ function cancelTask(taskId) { }) .then(response => { if (response.ok) { - location.reload(); + // 显示取消中的提示 + const cancelButton = document.querySelector(`a[href="javascript:void(0)"][onclick="cancelTask(${taskId})"]`); + if (cancelButton) { + cancelButton.textContent = '取消中...'; + cancelButton.style.pointerEvents = 'none'; + cancelButton.style.opacity = '0.5'; + } + // 5秒后刷新页面以查看状态更新 + setTimeout(() => location.reload(), 2000); } else { alert('取消任务失败'); } @@ -48,6 +56,37 @@ function cancelTask(taskId) { } } +function rerunTask(taskId) { + if (confirm('确定要重新执行这个任务吗?这将重置任务状态并重新开始爬取。')) { + fetch(`/admin/core/crawltask/${taskId}/rerun/`, { + method: 'POST', + headers: { + 'X-CSRFToken': getCookie('csrftoken'), + 'Content-Type': 'application/x-www-form-urlencoded', + }, + }) + .then(response => { + if (response.ok) { + // 显示重新执行中的提示 + const rerunButton = document.querySelector(`a[href="javascript:void(0)"][onclick="rerunTask(${taskId})"]`); + if (rerunButton) { + rerunButton.textContent = '重新执行中...'; + rerunButton.style.pointerEvents = 'none'; + rerunButton.style.opacity = '0.5'; + } + // 2秒后刷新页面以查看状态更新 + setTimeout(() => location.reload(), 2000); + } else { + alert('重新执行任务失败'); + } + }) + .catch(error => { + console.error('Error:', error); + alert('重新执行任务失败'); + }); + } +} + function viewResults(taskId) { window.open(`/admin/core/crawltask/${taskId}/results/`, '_blank'); } @@ -81,4 +120,4 @@ function autoRefreshRunningTasks() { // 页面加载完成后执行 document.addEventListener('DOMContentLoaded', function() { autoRefreshRunningTasks(); -}); +}); \ No newline at end of file diff --git a/core/task_executor.py b/core/task_executor.py index 05c2b6f..42dbcf1 100644 --- a/core/task_executor.py +++ b/core/task_executor.py @@ -8,7 +8,7 @@ import time from django.utils import timezone from django.db import transaction from core.models import CrawlTask -from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_SEARCH_CONFIGS +from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_CRAWL_CONFIGS class TaskExecutor: @@ -16,9 +16,10 @@ class TaskExecutor: def __init__(self): self.running_tasks = {} + self.cancelled_tasks = set() # 添加已取消任务的集合 self.lock = threading.Lock() - def start_task(self, task_id): + def start_task(self, task_id, rerun=False): """启动任务""" with self.lock: if task_id in self.running_tasks: @@ -26,30 +27,61 @@ class TaskExecutor: try: task = CrawlTask.objects.get(id=task_id) - if task.status != 'pending': + + # 检查任务状态 + if not rerun and task.status != 'pending': return False, "任务状态不允许启动" - # 更新任务状态 - task.status = 'running' - task.started_at = timezone.now() + # 如果是重新执行,检查任务是否已完成或失败 + if rerun and task.status not in ['completed', 'failed', 'cancelled']: + return False, "只有已完成、失败或已取消的任务可以重新执行" + + # 重置任务状态(如果是重新执行) + if rerun: + task.status = 'running' + task.started_at = timezone.now() + task.completed_at = None + task.error_message = None + task.progress = 0 + task.current_website = None + task.current_action = None + task.total_articles = 0 + task.success_count = 0 + task.failed_count = 0 + task.result_details = {} + else: + # 更新任务状态 + task.status = 'running' + task.started_at = timezone.now() + task.save() + # 确保任务不在取消集合中 + self.cancelled_tasks.discard(task_id) + # 启动后台线程执行任务 thread = threading.Thread(target=self._execute_task, args=(task_id,)) thread.daemon = True thread.start() self.running_tasks[task_id] = thread - return True, "任务已启动" + return True, "任务已启动" + ("(重新执行)" if rerun else "") except CrawlTask.DoesNotExist: return False, "任务不存在" except Exception as e: return False, f"启动任务失败: {e}" + def rerun_task(self, task_id): + """重新执行任务""" + return self.start_task(task_id, rerun=True) + def cancel_task(self, task_id): """取消任务""" with self.lock: + # 将任务标记为已取消 + self.cancelled_tasks.add(task_id) + if task_id in self.running_tasks: # 标记任务为取消状态 try: @@ -58,19 +90,55 @@ class TaskExecutor: task.completed_at = timezone.now() task.save() + # 记录执行历史 + task.add_execution_record( + status='cancelled', + started_at=task.started_at, + completed_at=task.completed_at, + error_message='任务被取消' + ) + # 移除运行中的任务 del self.running_tasks[task_id] return True, "任务已取消" except CrawlTask.DoesNotExist: return False, "任务不存在" else: + # 即使任务不在运行中,也标记为已取消 + try: + task = CrawlTask.objects.get(id=task_id) + if task.status in ['pending', 'running']: + task.status = 'cancelled' + task.completed_at = timezone.now() + task.save() + + # 记录执行历史 + task.add_execution_record( + status='cancelled', + started_at=task.started_at, + completed_at=task.completed_at, + error_message='任务被取消' + ) + return True, "任务已取消" + except CrawlTask.DoesNotExist: + pass return False, "任务未在运行中" + def is_task_cancelled(self, task_id): + """检查任务是否已被取消""" + with self.lock: + return task_id in self.cancelled_tasks + def _execute_task(self, task_id): """执行任务的核心逻辑""" try: task = CrawlTask.objects.get(id=task_id) + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 根据任务类型执行不同的爬取逻辑 if task.task_type == 'keyword': self._execute_keyword_task(task) @@ -81,6 +149,11 @@ class TaskExecutor: else: raise ValueError(f"不支持的任务类型: {task.task_type}") + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 任务完成 with transaction.atomic(): task = CrawlTask.objects.select_for_update().get(id=task_id) @@ -88,8 +161,20 @@ class TaskExecutor: task.completed_at = timezone.now() task.progress = 100 task.save() + + # 记录执行历史 + task.add_execution_record( + status='completed', + started_at=task.started_at, + completed_at=task.completed_at + ) except Exception as e: + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 任务失败 try: with transaction.atomic(): @@ -98,6 +183,14 @@ class TaskExecutor: task.completed_at = timezone.now() task.error_message = str(e) task.save() + + # 记录执行历史 + task.add_execution_record( + status='failed', + started_at=task.started_at, + completed_at=task.completed_at, + error_message=str(e) + ) except: pass @@ -106,9 +199,37 @@ class TaskExecutor: with self.lock: if task_id in self.running_tasks: del self.running_tasks[task_id] + # 从取消集合中移除任务 + self.cancelled_tasks.discard(task_id) + + def _mark_task_cancelled(self, task_id): + """标记任务为已取消""" + try: + with transaction.atomic(): + task = CrawlTask.objects.select_for_update().get(id=task_id) + task.status = 'cancelled' + task.completed_at = timezone.now() + task.save() + + # 记录执行历史 + task.add_execution_record( + status='cancelled', + started_at=task.started_at, + completed_at=task.completed_at, + error_message='任务被取消' + ) + except CrawlTask.DoesNotExist: + pass def _execute_keyword_task(self, task): """执行关键词搜索任务""" + task_id = task.id + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 更新当前操作 task.current_action = "开始关键词搜索" task.save() @@ -118,30 +239,90 @@ class TaskExecutor: if selected_websites: websites = [w.name for w in selected_websites] else: - websites = list(WEBSITE_SEARCH_CONFIGS.keys()) + websites = list(WEBSITE_CRAWL_CONFIGS.keys()) start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None - # 执行爬取 - results = crawl_by_keyword( - keyword=task.keyword, - website_names=websites, - max_pages=task.max_pages, - start_date=start_date, - end_date=end_date, - max_articles=task.max_articles - ) + # 设置任务ID,以便在爬虫函数中检查取消状态 + crawl_by_keyword.task_id = task_id - # 更新结果 - task.total_articles = results['total_articles'] - task.success_count = results['success_count'] - task.failed_count = results['failed_count'] - task.result_details = results['website_results'] - task.save() + # 使用新的关键词爬虫引擎 + try: + # 延迟导入以避免循环依赖 + from core.keyword_crawler import KeywordCrawler + crawler = KeywordCrawler(task_id, self) + crawler.run() + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + + # 更新结果统计 + task = CrawlTask.objects.get(id=task_id) + if task.status == 'completed': + # 统计爬取的文章数量 + from core.models import Article + article_count = Article.objects.filter(website__in=task.websites.all()).count() + task.total_articles = article_count + task.success_count = article_count + task.failed_count = 0 + task.result_details = { + 'total_articles': article_count, + 'success_count': article_count, + 'failed_count': 0, + 'keyword': task.keyword, + 'websites': [w.name for w in task.websites.all()] + } + task.save() + + # 添加执行记录 + task.add_execution_record( + status='completed', + started_at=task.started_at, + completed_at=task.completed_at + ) + elif self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + + except Exception as e: + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + + # 更新任务状态为失败 + task = CrawlTask.objects.get(id=task_id) + task.status = 'failed' + task.error_message = str(e) + task.completed_at = timezone.now() + task.save() + + # 添加执行记录 + task.add_execution_record( + status='failed', + started_at=task.started_at, + completed_at=task.completed_at, + error_message=str(e) + ) + + raise e def _execute_historical_task(self, task): """执行历史文章任务""" + task_id = task.id + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 更新当前操作 task.current_action = "开始历史文章爬取" task.save() @@ -151,18 +332,32 @@ class TaskExecutor: if selected_websites: websites = [w.name for w in selected_websites] else: - websites = list(WEBSITE_SEARCH_CONFIGS.keys()) + websites = list(WEBSITE_CRAWL_CONFIGS.keys()) start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None + # 设置任务ID,以便在爬虫函数中检查取消状态 + crawl_historical_articles.task_id = task_id + # 执行爬取 - results = crawl_historical_articles( - website_names=websites, - start_date=start_date, - end_date=end_date, - max_articles_per_site=task.max_articles - ) + try: + results = crawl_historical_articles( + website_names=websites, + start_date=start_date, + end_date=end_date, + max_articles_per_site=task.max_articles + ) + except Exception as e: + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + raise e + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return # 更新结果 task.total_articles = results['total_articles'] @@ -173,6 +368,13 @@ class TaskExecutor: def _execute_full_site_task(self, task): """执行全站爬取任务""" + task_id = task.id + + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 更新当前操作 task.current_action = "开始全站爬取" task.save() @@ -182,12 +384,17 @@ class TaskExecutor: if selected_websites: websites = [w.name for w in selected_websites] else: - websites = list(WEBSITE_SEARCH_CONFIGS.keys()) + websites = list(WEBSITE_CRAWL_CONFIGS.keys()) total_websites = len(websites) completed_websites = 0 for website_name in websites: + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + try: # 更新当前网站 task.current_website = website_name @@ -199,17 +406,26 @@ class TaskExecutor: website, created = Website.objects.get_or_create( name=website_name, defaults={ - 'base_url': WEBSITE_SEARCH_CONFIGS[website_name]["search_url"], + 'base_url': WEBSITE_CRAWL_CONFIGS[website_name]["base_url"], 'enabled': True } ) + # 设置任务ID,以便在爬虫函数中检查取消状态 + full_site_crawler.task_id = task_id + # 执行全站爬取 - full_site_crawler( - WEBSITE_SEARCH_CONFIGS[website_name]["search_url"], - website, - max_pages=task.max_pages - ) + try: + full_site_crawler( + WEBSITE_CRAWL_CONFIGS[website_name]["base_url"], + website, + max_pages=task.max_pages + ) + except Exception as e: + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + raise e completed_websites += 1 progress = int((completed_websites / total_websites) * 100) @@ -217,10 +433,19 @@ class TaskExecutor: task.save() except Exception as e: + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return # 记录错误但继续处理其他网站 print(f"爬取网站 {website_name} 时出错: {e}") continue + # 检查任务是否已被取消 + if self.is_task_cancelled(task_id): + self._mark_task_cancelled(task_id) + return + # 更新最终结果 task.total_articles = completed_websites # 这里可以改为实际爬取的文章数 task.success_count = completed_websites @@ -246,4 +471,4 @@ class TaskExecutor: # 全局任务执行器实例 -task_executor = TaskExecutor() +task_executor = TaskExecutor() \ No newline at end of file diff --git a/core/utils.py b/core/utils.py index 09b488c..c65935a 100644 --- a/core/utils.py +++ b/core/utils.py @@ -2274,24 +2274,204 @@ def full_site_crawler(start_url, website, max_pages=1000): queue.append(href) + +# 网站爬取配置 +WEBSITE_CRAWL_CONFIGS = { + "新华网": { + "base_url": "http://www.xinhuanet.com", + "article_list_urls": [ + "http://www.xinhuanet.com/politics/", + "http://www.xinhuanet.com/world/", + "http://www.xinhuanet.com/finance/", + "http://www.xinhuanet.com/tech/", + "http://www.xinhuanet.com/edu/", + "http://www.xinhuanet.com/health/" + ], + "article_selector": "a[href*='/politics/'], a[href*='/world/'], a[href*='/finance/'], a[href*='/tech/'], a[href*='/edu/'], a[href*='/health/']", + "content_selector": ".box_con, .content, .article-content, .text", + "title_selector": "h1, .title, .article-title", + "pagination": True, + "pagination_selector": ".page a, .pagination a" + }, + "人民日报": { + "base_url": "http://www.people.com.cn", + "article_list_urls": [ + "http://politics.people.com.cn/", + "http://world.people.com.cn/", + "http://finance.people.com.cn/", + "http://tech.people.com.cn/", + "http://edu.people.com.cn/", + "http://health.people.com.cn/" + ], + "article_selector": "a[href*='/politics/'], a[href*='/world/'], a[href*='/finance/'], a[href*='/tech/'], a[href*='/edu/'], a[href*='/health/']", + "content_selector": ".box_con, .content, .article-content, .text", + "title_selector": "h1, .title, .article-title", + "pagination": True, + "pagination_selector": ".page a, .pagination a" + }, + "央视网": { + "base_url": "http://www.cctv.com", + "article_list_urls": [ + "http://news.cctv.com/politics/", + "http://news.cctv.com/world/", + "http://news.cctv.com/finance/", + "http://news.cctv.com/tech/", + "http://news.cctv.com/edu/", + "http://news.cctv.com/health/" + ], + "article_selector": "a[href*='/politics/'], a[href*='/world/'], a[href*='/finance/'], a[href*='/tech/'], a[href*='/edu/'], a[href*='/health/']", + "content_selector": ".content_area, .content, .article-content, .text", + "title_selector": "h1, .title, .article-title", + "pagination": True, + "pagination_selector": ".page a, .pagination a" + }, + "光明日报": { + "base_url": "http://www.gmw.cn", + "article_list_urls": ["http://www.gmw.cn/"], + "article_selector": "a[href*='gmw.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "经济日报": { + "base_url": "http://www.ce.cn", + "article_list_urls": ["http://www.ce.cn/"], + "article_selector": "a[href*='ce.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国日报": { + "base_url": "http://www.chinadaily.com.cn", + "article_list_urls": ["http://www.chinadaily.com.cn/"], + "article_selector": "a[href*='chinadaily.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "工人日报": { + "base_url": "https://www.workercn.cn", + "article_list_urls": ["https://www.workercn.cn/"], + "article_selector": "a[href*='workercn.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "科技日报": { + "base_url": "http://www.stdaily.com", + "article_list_urls": ["http://www.stdaily.com/"], + "article_selector": "a[href*='stdaily.com']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "人民政协网": { + "base_url": "https://www.rmzxw.com.cn", + "article_list_urls": ["https://www.rmzxw.com.cn/"], + "article_selector": "a[href*='rmzxw.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国纪检监察报": { + "base_url": "http://www.jjjcb.cn", + "article_list_urls": ["http://www.jjjcb.cn/"], + "article_selector": "a[href*='jjjcb.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国新闻社": { + "base_url": "https://www.chinanews.com.cn", + "article_list_urls": ["https://www.chinanews.com.cn/"], + "article_selector": "a[href*='chinanews.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "学习时报": { + "base_url": "http://www.studytimes.cn", + "article_list_urls": ["http://www.studytimes.cn/"], + "article_selector": "a[href*='studytimes.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国青年报": { + "base_url": "http://www.cyol.com", + "article_list_urls": ["http://www.cyol.com/"], + "article_selector": "a[href*='cyol.com']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国妇女报": { + "base_url": "http://www.cnwomen.com.cn", + "article_list_urls": ["http://www.cnwomen.com.cn/"], + "article_selector": "a[href*='cnwomen.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "法治日报": { + "base_url": "http://www.legaldaily.com.cn", + "article_list_urls": ["http://www.legaldaily.com.cn/"], + "article_selector": "a[href*='legaldaily.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "农民日报": { + "base_url": "http://www.farmer.com.cn", + "article_list_urls": ["http://www.farmer.com.cn/"], + "article_selector": "a[href*='farmer.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "学习强国": { + "base_url": "https://www.xuexi.cn", + "article_list_urls": ["https://www.xuexi.cn/"], + "article_selector": "a[href*='xuexi.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "旗帜网": { + "base_url": "http://www.qizhiwang.org.cn", + "article_list_urls": ["http://www.qizhiwang.org.cn/"], + "article_selector": "a[href*='qizhiwang.org.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国网": { + "base_url": "http://www.china.com.cn", + "article_list_urls": ["http://www.china.com.cn/"], + "article_selector": "a[href*='china.com.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "中国政府网": { + "base_url": "http://www.gov.cn", + "article_list_urls": ["http://www.gov.cn/"], + "article_selector": "a[href*='gov.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "求是网": { + "base_url": "http://www.qstheory.cn", + "article_list_urls": ["http://www.qstheory.cn/"], + "article_selector": "a[href*='qstheory.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + }, + "解放军报": { + "base_url": "http://www.81.cn", + "article_list_urls": ["http://www.81.cn/"], + "article_selector": "a[href*='81.cn']", + "content_selector": ".content, .article-content, .text", + "title_selector": "h1, .title, .article-title" + } +} + # 网站搜索配置 WEBSITE_SEARCH_CONFIGS = { "新华网": { - "search_url": "http://so.news.cn/getNews", + "search_url": "http://www.news.cn/search", "search_params": { - "keyword": "{keyword}", - "curPage": "{page}", - "sortField": "0", - "sortType": "1" + "q": "{keyword}" }, - "method": "post", - "headers": { - "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" - } + "method": "get", + "note": "新华网有较强反爬虫机制,可能需要使用Selenium等工具处理" }, "人民日报": { - "search_url": "http://search.people.com.cn/search", + "search_url": "http://search.people.cn", "search_params": { "keyword": "{keyword}", "st": "0", @@ -2464,6 +2644,175 @@ WEBSITE_SEARCH_CONFIGS = { } } +# 新的网站爬取配置 - 直接爬取文章列表页面,然后进行关键字过滤 +# WEBSITE_CRAWL_CONFIGS 已移至 WEBSITE_SEARCH_CONFIGS + +def check_keyword_in_content(content, keyword): + """ + 检查内容是否包含关键字 + + Args: + content: 文章内容 + keyword: 关键字 + + Returns: + bool: 是否包含关键字 + """ + if not content or not keyword: + return False + + # 转换为小写进行比较 + content_lower = content.lower() + keyword_lower = keyword.lower() + + # 检查是否包含关键字 + return keyword_lower in content_lower + + +def get_page_with_requests(url, website_name=""): + """使用requests获取页面内容""" + try: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3", + "Accept-Encoding": "gzip, deflate", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1" + } + + # 为不同网站设置特殊的请求头 + if "新华网" in website_name: + headers.update({ + "Referer": "http://www.news.cn/", + }) + elif "人民网" in website_name: + headers.update({ + "Referer": "http://www.people.com.cn/", + }) + + resp = requests.get(url, headers=headers, timeout=15) + resp.raise_for_status() + resp.encoding = 'utf-8' + + soup = BeautifulSoup(resp.text, "html.parser") + return soup + except Exception as e: + print(f"使用requests获取页面失败: {url}, 错误: {e}") + return None + + +def crawl_articles_with_keyword_filter(website_name, keyword, max_pages=10, start_date=None, end_date=None): + """ + 爬取文章并进行关键字过滤 + + Args: + website_name: 网站名称 + keyword: 关键字 + max_pages: 最大页数 + start_date: 开始日期 + end_date: 结束日期 + + Returns: + list: 符合条件的文章URL列表 + """ + if website_name not in WEBSITE_SEARCH_CONFIGS: + print(f"网站 {website_name} 不支持爬取功能") + return [] + + config = WEBSITE_SEARCH_CONFIGS[website_name] + article_urls = [] + + print(f"开始爬取 {website_name} 并过滤关键字 '{keyword}'...") + + # 获取文章列表页面 + # 对于新华网等网站,使用搜索URL作为基础URL + base_urls = config.get("article_list_urls", [config.get("search_url", "")]) + for list_url in base_urls: + try: + print(f"爬取列表页面: {list_url}") + # 首先尝试使用Selenium,如果失败则使用requests + soup = get_page_with_selenium(list_url, website_name) + if not soup: + print("尝试使用requests获取页面...") + soup = get_page_with_requests(list_url, website_name) + if not soup: + continue + + # 获取文章链接 + links = soup.select(config.get("article_selector", "a")) + page_urls = [] + + for link in links: + href = link.get("href") + if not href: + continue + + # 处理相对URL + if not href.startswith("http"): + href = urljoin(list_url, href) + + # 检查是否是文章URL + parsed_list_url = urlparse(list_url) + base_netloc = parsed_list_url.netloc + if is_valid_url(href, base_netloc) and href not in page_urls: + page_urls.append(href) + + print(f"找到 {len(page_urls)} 个文章链接") + + # 检查每个文章是否包含关键字 + for article_url in page_urls: + try: + print(f"检查文章: {article_url}") + # 首先尝试使用Selenium,如果失败则使用requests + article_soup = get_page_with_selenium(article_url, website_name) + if not article_soup: + print("尝试使用requests获取文章页面...") + article_soup = get_page_with_requests(article_url, website_name) + if not article_soup: + continue + + # 获取文章标题 + title_element = article_soup.select_one(config.get("title_selector", "h1")) + title = title_element.get_text().strip() if title_element else "" + + # 获取文章内容 + content_element = article_soup.select_one(config.get("content_selector", ".content")) + content = content_element.get_text().strip() if content_element else "" + + # 检查标题和内容是否包含关键字 + if check_keyword_in_content(title, keyword) or check_keyword_in_content(content, keyword): + print(f"✓ 文章包含关键字: {title[:50]}...") + article_urls.append(article_url) + else: + print(f"✗ 文章不包含关键字: {title[:50]}...") + + except Exception as e: + print(f"检查文章时出错: {e}") + continue + + # 如果设置了分页,处理分页 + if config.get("pagination", False) and len(article_urls) < max_pages * 10: + pagination_links = soup.select(config.get("pagination_selector", ".page a")) + for i, page_link in enumerate(pagination_links[:max_pages-1]): + page_href = page_link.get("href") + if page_href and not page_href.startswith("http"): + page_href = urljoin(list_url, page_href) + + if page_href and page_href not in [list_url]: + # 递归处理分页 + page_articles = crawl_articles_with_keyword_filter( + website_name, keyword, 1, start_date, end_date + ) + article_urls.extend(page_articles) + + except Exception as e: + print(f"爬取列表页面时出错: {e}") + continue + + print(f"{website_name} 关键字过滤完成,找到 {len(article_urls)} 篇相关文章") + return article_urls + def search_articles_by_keyword(website_name, keyword, max_pages=10, start_date=None, end_date=None): """ @@ -2479,76 +2828,8 @@ def search_articles_by_keyword(website_name, keyword, max_pages=10, start_date=N Returns: list: 搜索到的文章URL列表 """ - if website_name not in WEBSITE_SEARCH_CONFIGS: - print(f"网站 {website_name} 不支持搜索功能") - return [] - - config = WEBSITE_SEARCH_CONFIGS[website_name] - article_urls = [] - - # 设置默认日期范围 - if not start_date: - start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d") - if not end_date: - end_date = datetime.now().strftime("%Y-%m-%d") - - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - } - headers.update(config.get("headers", {})) - - for page in range(1, max_pages + 1): - try: - # 构建搜索参数 - search_params = {} - for key, value in config["search_params"].items(): - search_params[key] = value.format( - keyword=quote(keyword), - page=page, - start_date=start_date, - end_date=end_date - ) - - print(f"搜索 {website_name} 第 {page} 页: {keyword}") - - if config["method"] == "post": - response = requests.post( - config["search_url"], - data=search_params, - headers=headers, - timeout=15 - ) - else: - response = requests.get( - config["search_url"], - params=search_params, - headers=headers, - timeout=15 - ) - - response.raise_for_status() - response.encoding = 'utf-8' - - # 解析搜索结果 - soup = BeautifulSoup(response.text, "html.parser") - page_urls = extract_search_results(soup, website_name) - - if not page_urls: - print(f"第 {page} 页没有找到更多结果") - break - - article_urls.extend(page_urls) - print(f"第 {page} 页找到 {len(page_urls)} 篇文章") - - # 避免请求过快 - time.sleep(1) - - except Exception as e: - print(f"搜索第 {page} 页时出错: {e}") - continue - - print(f"总共找到 {len(article_urls)} 篇文章") - return article_urls + # 使用新的关键字过滤策略 + return crawl_articles_with_keyword_filter(website_name, keyword, max_pages, start_date, end_date) def extract_search_results(soup, website_name): @@ -2738,8 +3019,12 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, Returns: dict: 爬取结果统计 """ + # 导入task_executor用于检查任务取消状态 + from core.task_executor import task_executor + task_id = getattr(crawl_by_keyword, 'task_id', None) + if website_names is None: - website_names = list(WEBSITE_SEARCH_CONFIGS.keys()) + website_names = list(WEBSITE_CRAWL_CONFIGS.keys()) results = { "keyword": keyword, @@ -2753,6 +3038,11 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, print(f"目标网站: {', '.join(website_names)}") for website_name in website_names: + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止爬取") + break + print(f"\n开始爬取 {website_name}...") try: @@ -2761,7 +3051,7 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, website, created = Website.objects.get_or_create( name=website_name, defaults={ - 'base_url': WEBSITE_SEARCH_CONFIGS[website_name]["search_url"], + 'base_url': WEBSITE_SEARCH_CONFIGS.get(website_name, {}).get("search_url", ""), 'enabled': True } ) @@ -2791,6 +3081,11 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, website_failed = 0 for i, url in enumerate(article_urls, 1): + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止处理文章") + break + try: print(f"处理第 {i}/{len(article_urls)} 篇: {url}") process_article(url, website) @@ -2805,6 +3100,10 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, website_failed += 1 results["failed_count"] += 1 + # 如果任务被取消,跳出循环 + if task_id and task_executor.is_task_cancelled(task_id): + break + results["website_results"][website_name] = { "found_urls": len(article_urls), "processed": len(article_urls), @@ -2815,6 +3114,11 @@ def crawl_by_keyword(keyword, website_names=None, max_pages=10, start_date=None, print(f"{website_name} 完成: 成功 {website_success}, 失败 {website_failed}") except Exception as e: + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止爬取") + break + print(f"爬取 {website_name} 时出错: {e}") results["website_results"][website_name] = { "found_urls": 0, @@ -2849,13 +3153,17 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None Returns: dict: 爬取结果统计 """ + # 导入task_executor用于检查任务取消状态 + from core.task_executor import task_executor + task_id = getattr(crawl_historical_articles, 'task_id', None) + if not start_date: start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if website_names is None: - website_names = list(WEBSITE_SEARCH_CONFIGS.keys()) + website_names = list(WEBSITE_CRAWL_CONFIGS.keys()) results = { "start_date": start_date, @@ -2874,6 +3182,11 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None common_keywords = ["新闻", "报道", "文章", "资讯", "动态"] for website_name in website_names: + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止爬取") + break + print(f"\n开始爬取 {website_name} 历史文章...") try: @@ -2881,7 +3194,7 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None website, created = Website.objects.get_or_create( name=website_name, defaults={ - 'base_url': WEBSITE_SEARCH_CONFIGS[website_name]["search_url"], + 'base_url': WEBSITE_SEARCH_CONFIGS.get(website_name, {}).get("search_url", ""), 'enabled': True } ) @@ -2892,6 +3205,11 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None # 使用多个关键词搜索 for keyword in common_keywords: + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止搜索") + break + try: article_urls = search_articles_by_keyword( website_name, keyword, max_pages=5, @@ -2906,6 +3224,10 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None print(f"搜索关键词 '{keyword}' 时出错: {e}") continue + # 如果任务被取消,跳出循环 + if task_id and task_executor.is_task_cancelled(task_id): + break + # 限制文章数量 article_urls = list(all_urls)[:max_articles_per_site] @@ -2922,6 +3244,11 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None print(f"{website_name} 找到 {len(article_urls)} 篇历史文章,开始处理...") for i, url in enumerate(article_urls, 1): + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止处理文章") + break + try: print(f"处理第 {i}/{len(article_urls)} 篇: {url}") process_article(url, website) @@ -2935,6 +3262,10 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None website_failed += 1 results["failed_count"] += 1 + # 如果任务被取消,跳出循环 + if task_id and task_executor.is_task_cancelled(task_id): + break + results["website_results"][website_name] = { "found_urls": len(article_urls), "processed": len(article_urls), @@ -2945,6 +3276,11 @@ def crawl_historical_articles(website_names=None, start_date=None, end_date=None print(f"{website_name} 完成: 成功 {website_success}, 失败 {website_failed}") except Exception as e: + # 检查任务是否被取消 + if task_id and task_executor.is_task_cancelled(task_id): + print(f"任务 {task_id} 已被取消,停止爬取") + break + print(f"爬取 {website_name} 历史文章时出错: {e}") results["website_results"][website_name] = { "found_urls": 0, diff --git a/crawler_engine.py b/crawler_engine.py new file mode 100644 index 0000000..8c67555 --- /dev/null +++ b/crawler_engine.py @@ -0,0 +1,710 @@ +import requests +import time +import re +import logging +import os +import urllib3 +from bs4 import BeautifulSoup +from urllib.parse import urljoin, urlparse +from django.conf import settings +from django.utils import timezone +from django.core.files.base import ContentFile +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from .models import Website, CrawlTask, CrawledContent, CrawlLog, SearchKeyword, MediaFile + +# 禁用SSL警告 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# 设置日志记录器 +logger = logging.getLogger(__name__) + + +class WebsiteCrawler: + """网站爬虫引擎""" + + def __init__(self, task_id): + self.task = CrawlTask.objects.get(id=task_id) + self.keywords = [kw.strip() for kw in self.task.keywords.split(',') if kw.strip()] + + # 创建带重试策略的会话 + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': settings.CRAWLER_SETTINGS['USER_AGENT'] + }) + + # 设置重试策略 + retry_strategy = Retry( + total=settings.CRAWLER_SETTINGS.get('MAX_RETRIES', 3), + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + # 设置超时 + self.timeout = settings.CRAWLER_SETTINGS['TIMEOUT'] + + def log(self, level, message, website=None): + """记录日志""" + CrawlLog.objects.create( + task=self.task, + website=website, + level=level, + message=message + ) + # 同时记录到Python日志系统 + logger.log(getattr(logging, level.upper()), f"Task {self.task.id}: {message}") + + def update_task_status(self, status, **kwargs): + """更新任务状态""" + self.task.status = status + if status == 'running' and not self.task.started_at: + self.task.started_at = timezone.now() + elif status in ['completed', 'failed', 'cancelled']: + self.task.completed_at = timezone.now() + + for key, value in kwargs.items(): + setattr(self.task, key, value) + self.task.save() + + def extract_text_content(self, soup): + """提取文本内容,保持段落结构""" + # 移除脚本和样式标签 + for script in soup(["script", "style"]): + script.decompose() + + # 处理段落标签,保持段落结构 + paragraphs = [] + + # 查找所有段落相关的标签 + for element in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'br']): + if element.name in ['p', 'div']: + text = element.get_text().strip() + if text: + paragraphs.append(text) + elif element.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: + text = element.get_text().strip() + if text: + paragraphs.append(f"\n{text}\n") # 标题前后加换行 + elif element.name == 'br': + paragraphs.append('\n') + + # 如果没有找到段落标签,使用原来的方法 + if not paragraphs: + text = soup.get_text() + # 清理文本但保持换行 + lines = [] + for line in text.splitlines(): + line = line.strip() + if line: + lines.append(line) + return '\n\n'.join(lines) + + # 合并段落,用双换行分隔 + content = '\n\n'.join(paragraphs) + + # 清理多余的空行 + import re + content = re.sub(r'\n\s*\n\s*\n', '\n\n', content) + + return content.strip() + + def find_article_links(self, soup, base_url): + """查找文章链接""" + links = [] + + # 常见的文章链接选择器 + selectors = [ + 'a[href*="article"]', + 'a[href*="news"]', + 'a[href*="content"]', + 'a[href*="detail"]', + 'a[href*="view"]', + 'a[href*="show"]', + '.news-list a', + '.article-list a', + '.content-list a', + 'h3 a', + 'h4 a', + '.title a', + '.list-item a' + ] + + for selector in selectors: + elements = soup.select(selector) + for element in elements: + href = element.get('href') + if href: + full_url = urljoin(base_url, href) + title = element.get_text().strip() + if title and len(title) > 5: # 过滤掉太短的标题 + links.append({ + 'url': full_url, + 'title': title + }) + + return links + + def check_keyword_match(self, text, title): + """检查关键字匹配""" + matched_keywords = [] + text_lower = text.lower() + title_lower = title.lower() + + for keyword in self.keywords: + keyword_lower = keyword.lower() + if keyword_lower in text_lower or keyword_lower in title_lower: + matched_keywords.append(keyword) + + return matched_keywords + + def extract_article_content(self, url, soup): + """提取文章内容""" + # 尝试多种内容选择器 + content_selectors = [ + '.article-content', + '.content', + '.article-body', + '.news-content', + '.main-content', + '.post-content', + 'article', + '.detail-content', + '#content', + '.text' + ] + + content = "" + for selector in content_selectors: + element = soup.select_one(selector) + if element: + content = self.extract_text_content(element) + if len(content) > 100: # 确保内容足够长 + break + + # 如果没找到特定内容区域,使用整个页面 + if not content or len(content) < 100: + content = self.extract_text_content(soup) + + return content + + def extract_publish_date(self, soup): + """提取发布时间""" + date_selectors = [ + '.publish-time', + '.pub-time', + '.date', + '.time', + '.publish-date', + 'time[datetime]', + '.article-time', + '.news-time', + '.post-time', + '.create-time', + '.update-time', + '.time span', + '.date span', + '.info span', # 一些网站使用.info类包含发布信息 + '.meta span', + '.meta-info', + '.article-info span', + '.news-info span', + '.content-info span', + '.a-shijian', # 上海纪检监察网站的发布时间类 + '.l-time' # 天津纪检监察网站的发布时间类 + ] + + for selector in date_selectors: + elements = soup.select(selector) + for element in elements: + date_text = element.get_text().strip() + if element.get('datetime'): + date_text = element.get('datetime') + + # 如果文本太短或为空,跳过 + if not date_text or len(date_text) < 4: + continue + + # 尝试解析日期 + try: + from datetime import datetime + import re + + # 清理日期文本,移除常见的无关字符 + date_text = re.sub(r'发布(时间|日期)[::]?', '', date_text).strip() + date_text = re.sub(r'时间[::]?', '', date_text).strip() + date_text = re.sub(r'日期[::]?', '', date_text).strip() + date_text = re.sub(r'发表于[::]?', '', date_text).strip() + date_text = re.sub(r'更新[::]?', '', date_text).strip() + date_text = re.sub(r'\s+', ' ', date_text).strip() # 替换多个空白字符为单个空格 + + # 如果有 datetime 属性且是标准格式,直接使用 + if element.get('datetime'): + datetime_attr = element.get('datetime') + # 尝试解析常见的日期时间格式 + for fmt in [ + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%S%z', + '%Y-%m-%d %H:%M', + '%Y-%m-%d', + '%Y/%m/%d %H:%M:%S', + '%Y/%m/%d %H:%M', + '%Y/%m/%d', + '%Y年%m月%d日 %H:%M:%S', + '%Y年%m月%d日 %H:%M', + '%Y年%m月%d日', + '%m/%d/%Y %H:%M:%S', + '%m/%d/%Y %H:%M', + '%m/%d/%Y', + '%d/%m/%Y %H:%M:%S', + '%d/%m/%Y %H:%M', + '%d/%m/%Y', + '%d.%m.%Y %H:%M:%S', + '%d.%m.%Y %H:%M', + '%d.%m.%Y' + ]: + try: + if '%z' in fmt and '+' not in datetime_attr and datetime_attr.endswith('Z'): + datetime_attr = datetime_attr[:-1] + '+0000' + parsed_date = datetime.strptime(datetime_attr, fmt) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except ValueError: + continue + + # 尝试解析从文本中提取的日期 + # 尝试解析各种常见的中文日期格式 + for fmt in [ + '%Y年%m月%d日 %H:%M:%S', + '%Y年%m月%d日 %H:%M', + '%Y年%m月%d日', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%d %H:%M', + '%Y-%m-%d', + '%Y/%m/%d %H:%M:%S', + '%Y/%m/%d %H:%M', + '%Y/%m/%d', + '%m月%d日 %H:%M', + '%m月%d日', + '%m/%d/%Y %H:%M:%S', + '%m/%d/%Y %H:%M', + '%m/%d/%Y', + '%d/%m/%Y %H:%M:%S', + '%d/%m/%Y %H:%M', + '%d/%m/%Y', + '%d.%m.%Y %H:%M:%S', + '%d.%m.%Y %H:%M', + '%d.%m.%Y' + ]: + try: + parsed_date = datetime.strptime(date_text, fmt) + # 如果没有年份,使用当前年份 + if '%Y' not in fmt: + parsed_date = parsed_date.replace(year=datetime.now().year) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except ValueError: + continue + + # 如果以上格式都不匹配,尝试使用 dateutil 解析 + try: + from dateutil import parser + # 过滤掉明显不是日期的文本 + if len(date_text) > 5 and not date_text.isdigit(): + parsed_date = parser.parse(date_text) + if not timezone.is_aware(parsed_date): + parsed_date = timezone.make_aware(parsed_date) + return parsed_date + except: + pass + + except Exception as e: + self.log('debug', f'解析日期失败: {date_text}, 错误: {str(e)}') + continue + + return None + + def extract_author(self, soup): + """提取作者信息""" + author_selectors = [ + '.author', + '.writer', + '.publisher', + '.byline', + '.article-author', + '.news-author' + ] + + for selector in author_selectors: + element = soup.select_one(selector) + if element: + return element.get_text().strip() + + return "" + + def download_media_file(self, media_url, crawled_content, media_type='image', alt_text=''): + """下载媒体文件""" + try: + # 检查URL是否有效 + if not media_url or not media_url.startswith(('http://', 'https://')): + return None + + # 请求媒体文件 + response = self.session.get( + media_url, + timeout=self.timeout, + verify=False, + stream=False # 改为False以确保获取完整内容 + ) + response.raise_for_status() + + # 获取文件信息 + content_type = response.headers.get('content-type', '') + content_length = response.headers.get('content-length') + file_size = int(content_length) if content_length else len(response.content) + + # 确定文件扩展名 + file_extension = self.get_file_extension_from_url(media_url, content_type) + + # 生成文件名 + filename = f"media_{crawled_content.id}_{len(crawled_content.media_files.all())}{file_extension}" + + # 创建媒体文件对象 + media_file = MediaFile.objects.create( + content=crawled_content, + media_type=media_type, + original_url=media_url, + file_size=file_size, + mime_type=content_type, + alt_text=alt_text + ) + + # 保存文件 + media_file.local_file.save( + filename, + ContentFile(response.content), + save=True + ) + + self.log('info', f'媒体文件已下载: {filename} ({media_type})', crawled_content.website) + return media_file + + except Exception as e: + self.log('error', f'下载媒体文件失败 {media_url}: {str(e)}', crawled_content.website) + return None + + def get_file_extension_from_url(self, url, content_type): + """从URL或内容类型获取文件扩展名""" + # 从URL获取扩展名 + parsed_url = urlparse(url) + path = parsed_url.path + if '.' in path: + return os.path.splitext(path)[1] + + # 从内容类型获取扩展名 + content_type_map = { + 'image/jpeg': '.jpg', + 'image/jpg': '.jpg', + 'image/png': '.png', + 'image/gif': '.gif', + 'image/webp': '.webp', + 'image/svg+xml': '.svg', + 'video/mp4': '.mp4', + 'video/avi': '.avi', + 'video/mov': '.mov', + 'video/wmv': '.wmv', + 'video/flv': '.flv', + 'video/webm': '.webm', + 'audio/mp3': '.mp3', + 'audio/wav': '.wav', + 'audio/ogg': '.ogg', + 'application/pdf': '.pdf', + 'application/msword': '.doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', + } + + return content_type_map.get(content_type.lower(), '.bin') + + def extract_and_download_media(self, soup, crawled_content, base_url): + """提取并下载页面中的媒体文件""" + media_files = [] + + # 提取图片 + images = soup.find_all('img') + self.log('info', f'找到 {len(images)} 个图片标签', crawled_content.website) + + for img in images: + src = img.get('src') + if src: + # 处理相对URL + if src.startswith('//'): + src = 'https:' + src + elif src.startswith('/'): + src = urljoin(base_url, src) + elif not src.startswith(('http://', 'https://')): + src = urljoin(base_url, src) + + alt_text = img.get('alt', '') + self.log('info', f'尝试下载图片: {src}', crawled_content.website) + media_file = self.download_media_file(src, crawled_content, 'image', alt_text) + if media_file: + media_files.append(media_file) + self.log('info', f'成功下载图片: {media_file.local_file.name}', crawled_content.website) + + # 提取视频 + videos = soup.find_all(['video', 'source']) + for video in videos: + src = video.get('src') + if src: + # 处理相对URL + if src.startswith('//'): + src = 'https:' + src + elif src.startswith('/'): + src = urljoin(base_url, src) + elif not src.startswith(('http://', 'https://')): + src = urljoin(base_url, src) + + media_file = self.download_media_file(src, crawled_content, 'video') + if media_file: + media_files.append(media_file) + + # 提取音频 + audios = soup.find_all('audio') + for audio in audios: + src = audio.get('src') + if src: + # 处理相对URL + if src.startswith('//'): + src = 'https:' + src + elif src.startswith('/'): + src = urljoin(base_url, src) + elif not src.startswith(('http://', 'https://')): + src = urljoin(base_url, src) + + media_file = self.download_media_file(src, crawled_content, 'audio') + if media_file: + media_files.append(media_file) + + return media_files + + def mark_content_saved(self, crawled_content): + """标记内容已保存(内容已存储在数据库中)""" + try: + crawled_content.is_local_saved = True + crawled_content.save() + + media_count = crawled_content.media_files.count() + self.log('info', f'文章内容已保存到数据库 (包含 {media_count} 个媒体文件)', crawled_content.website) + return True + except Exception as e: + self.log('error', f'标记内容保存状态失败: {str(e)}', crawled_content.website) + return False + + def crawl_website(self, website): + """爬取单个网站""" + self.log('info', f'开始爬取网站: {website.name}', website) + + try: + # 请求主页 + response = self.session.get( + website.url, + timeout=self.timeout, + verify=False # 忽略SSL证书验证 + ) + response.raise_for_status() + + # 检查内容编码 + if response.encoding != 'utf-8': + # 尝试从响应头获取编码 + content_type = response.headers.get('content-type', '') + if 'charset=' in content_type: + charset = content_type.split('charset=')[-1] + response.encoding = charset + else: + response.encoding = 'utf-8' + + soup = BeautifulSoup(response.content, 'html.parser') + + # 查找文章链接 + article_links = self.find_article_links(soup, website.url) + self.log('info', f'找到 {len(article_links)} 个文章链接', website) + + crawled_count = 0 + for link_info in article_links: + try: + # 请求文章页面 + article_response = self.session.get( + link_info['url'], + timeout=self.timeout, + verify=False # 忽略SSL证书验证 + ) + article_response.raise_for_status() + + # 检查内容编码 + if article_response.encoding != 'utf-8': + # 尝试从响应头获取编码 + content_type = article_response.headers.get('content-type', '') + if 'charset=' in content_type: + charset = content_type.split('charset=')[-1] + article_response.encoding = charset + else: + article_response.encoding = 'utf-8' + + article_soup = BeautifulSoup(article_response.content, 'html.parser') + + # 提取内容 + content = self.extract_article_content(link_info['url'], article_soup) + title = link_info['title'] + + # 检查关键字匹配 + matched_keywords = self.check_keyword_match(content, title) + + if matched_keywords: + # 提取其他信息 + publish_date = self.extract_publish_date(article_soup) + author = self.extract_author(article_soup) + + # 检查是否已存在相同URL的文章 + existing_content = CrawledContent.objects.filter( + url=link_info['url'], + task=self.task + ).first() + + if existing_content: + # 如果已存在,更新现有记录而不是创建新记录 + existing_content.title = title + existing_content.content = content + existing_content.publish_date = publish_date + existing_content.author = author + existing_content.keywords_matched = ','.join(matched_keywords) + existing_content.save() + + # 更新媒体文件 + # 先删除旧的媒体文件 + existing_content.media_files.all().delete() + # 然后重新下载媒体文件 + media_files = self.extract_and_download_media(article_soup, existing_content, link_info['url']) + + self.log('info', f'更新已存在的文章: {title[:50]}...', website) + else: + # 保存新内容 + crawled_content = CrawledContent.objects.create( + task=self.task, + website=website, + title=title, + content=content, + url=link_info['url'], + publish_date=publish_date, + author=author, + keywords_matched=','.join(matched_keywords), + is_local_saved=False # 初始设置为False,保存到本地后会更新为True + ) + + # 提取并下载媒体文件 + media_files = self.extract_and_download_media(article_soup, crawled_content, link_info['url']) + + # 标记内容已保存 + self.mark_content_saved(crawled_content) + + self.log('info', f'保存新文章: {title[:50]}...', website) + + crawled_count += 1 + + # 请求间隔 + time.sleep(settings.CRAWLER_SETTINGS['REQUEST_DELAY']) + + except requests.exceptions.SSLError as e: + self.log('error', f'SSL错误,跳过文章 {link_info["url"]}: {str(e)}', website) + continue + except requests.exceptions.ConnectionError as e: + self.log('error', f'连接错误,跳过文章 {link_info["url"]}: {str(e)}', website) + continue + except requests.exceptions.Timeout as e: + self.log('error', f'请求超时,跳过文章 {link_info["url"]}: {str(e)}', website) + continue + except requests.exceptions.RequestException as e: + self.log('error', f'网络请求错误,跳过文章 {link_info["url"]}: {str(e)}', website) + continue + except UnicodeDecodeError as e: + self.log('error', f'字符编码错误,跳过文章 {link_info["url"]}: {str(e)}', website) + continue + except Exception as e: + self.log('error', f'处理文章失败 {link_info["url"]}: {str(e)}', website) + continue + + self.log('info', f'网站爬取完成,共保存 {crawled_count} 篇文章', website) + return crawled_count + + except requests.exceptions.SSLError as e: + self.log('error', f'爬取网站SSL错误: {str(e)}', website) + return 0 + except requests.exceptions.ConnectionError as e: + self.log('error', f'爬取网站连接错误: {str(e)}', website) + return 0 + except requests.exceptions.Timeout as e: + self.log('error', f'爬取网站超时: {str(e)}', website) + return 0 + except requests.exceptions.RequestException as e: + self.log('error', f'爬取网站网络错误: {str(e)}', website) + return 0 + except Exception as e: + self.log('error', f'爬取网站失败: {str(e)}', website) + return 0 + + def run(self): + """运行爬取任务""" + self.log('info', f'开始执行爬取任务: {self.task.name}') + self.update_task_status('running') + + total_crawled = 0 + websites = self.task.websites.filter(is_active=True) + self.task.total_pages = websites.count() + self.task.save() + + for website in websites: + try: + crawled_count = self.crawl_website(website) + total_crawled += crawled_count + self.task.crawled_pages += 1 + self.task.save() + + except Exception as e: + self.log('error', f'爬取网站 {website.name} 时发生错误: {str(e)}', website) + continue + + # 更新任务状态 + if total_crawled > 0: + self.update_task_status('completed') + self.log('info', f'爬取任务完成,共爬取 {total_crawled} 篇文章') + else: + self.update_task_status('failed', error_message='没有找到匹配的内容') + self.log('error', '爬取任务失败,没有找到匹配的内容') + + +def run_crawl_task(task_id): + """运行爬取任务(Celery任务)""" + try: + crawler = WebsiteCrawler(task_id) + crawler.run() + return f"任务 {task_id} 执行完成" + except Exception as e: + # 记录异常到日志 + logger.error(f"执行任务 {task_id} 时发生异常: {str(e)}", exc_info=True) + + task = CrawlTask.objects.get(id=task_id) + task.status = 'failed' + task.error_message = str(e) + task.completed_at = timezone.now() + task.save() + + CrawlLog.objects.create( + task=task, + level='error', + message=f'任务执行失败: {str(e)}' + ) + return f"任务 {task_id} 执行失败: {str(e)}" \ No newline at end of file