""" 爬取任务执行器 负责执行爬取任务并更新任务状态 """ import threading import time from django.utils import timezone from django.db import transaction from core.models import CrawlTask from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_CRAWL_CONFIGS class TaskExecutor: """任务执行器""" def __init__(self): self.running_tasks = {} self.cancelled_tasks = set() # 添加已取消任务的集合 self.lock = threading.Lock() def start_task(self, task_id, rerun=False): """启动任务""" with self.lock: if task_id in self.running_tasks: return False, "任务已在运行中" try: task = CrawlTask.objects.get(id=task_id) # 检查任务状态 if not rerun and task.status != 'pending': return False, "任务状态不允许启动" # 如果是重新执行,检查任务是否已完成或失败 if rerun and task.status not in ['completed', 'failed', 'cancelled']: return False, "只有已完成、失败或已取消的任务可以重新执行" # 重置任务状态(如果是重新执行) if rerun: task.status = 'running' task.started_at = timezone.now() task.completed_at = None task.error_message = None task.progress = 0 task.current_website = None task.current_action = None task.total_articles = 0 task.success_count = 0 task.failed_count = 0 task.result_details = {} else: # 更新任务状态 task.status = 'running' task.started_at = timezone.now() task.save() # 确保任务不在取消集合中 self.cancelled_tasks.discard(task_id) # 启动后台线程执行任务 thread = threading.Thread(target=self._execute_task, args=(task_id,)) thread.daemon = True thread.start() self.running_tasks[task_id] = thread return True, "任务已启动" + ("(重新执行)" if rerun else "") except CrawlTask.DoesNotExist: return False, "任务不存在" except Exception as e: return False, f"启动任务失败: {e}" def rerun_task(self, task_id): """重新执行任务""" return self.start_task(task_id, rerun=True) def cancel_task(self, task_id): """取消任务""" with self.lock: # 将任务标记为已取消 self.cancelled_tasks.add(task_id) if task_id in self.running_tasks: # 标记任务为取消状态 try: task = CrawlTask.objects.get(id=task_id) task.status = 'cancelled' task.completed_at = timezone.now() task.save() # 记录执行历史 task.add_execution_record( status='cancelled', started_at=task.started_at, completed_at=task.completed_at, error_message='任务被取消' ) # 移除运行中的任务 del self.running_tasks[task_id] return True, "任务已取消" except CrawlTask.DoesNotExist: return False, "任务不存在" else: # 即使任务不在运行中,也标记为已取消 try: task = CrawlTask.objects.get(id=task_id) if task.status in ['pending', 'running']: task.status = 'cancelled' task.completed_at = timezone.now() task.save() # 记录执行历史 task.add_execution_record( status='cancelled', started_at=task.started_at, completed_at=task.completed_at, error_message='任务被取消' ) return True, "任务已取消" except CrawlTask.DoesNotExist: pass return False, "任务未在运行中" def is_task_cancelled(self, task_id): """检查任务是否已被取消""" with self.lock: return task_id in self.cancelled_tasks def _execute_task(self, task_id): """执行任务的核心逻辑""" try: task = CrawlTask.objects.get(id=task_id) # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 根据任务类型执行不同的爬取逻辑 if task.task_type == 'keyword': self._execute_keyword_task(task) elif task.task_type == 'historical': self._execute_historical_task(task) elif task.task_type == 'full_site': self._execute_full_site_task(task) else: raise ValueError(f"不支持的任务类型: {task.task_type}") # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 任务完成 with transaction.atomic(): task = CrawlTask.objects.select_for_update().get(id=task_id) task.status = 'completed' task.completed_at = timezone.now() task.progress = 100 task.save() # 记录执行历史 task.add_execution_record( status='completed', started_at=task.started_at, completed_at=task.completed_at ) except Exception as e: # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 任务失败 try: with transaction.atomic(): task = CrawlTask.objects.select_for_update().get(id=task_id) task.status = 'failed' task.completed_at = timezone.now() task.error_message = str(e) task.save() # 记录执行历史 task.add_execution_record( status='failed', started_at=task.started_at, completed_at=task.completed_at, error_message=str(e) ) except: pass finally: # 清理运行中的任务记录 with self.lock: if task_id in self.running_tasks: del self.running_tasks[task_id] # 从取消集合中移除任务 self.cancelled_tasks.discard(task_id) def _mark_task_cancelled(self, task_id): """标记任务为已取消""" try: with transaction.atomic(): task = CrawlTask.objects.select_for_update().get(id=task_id) task.status = 'cancelled' task.completed_at = timezone.now() task.save() # 记录执行历史 task.add_execution_record( status='cancelled', started_at=task.started_at, completed_at=task.completed_at, error_message='任务被取消' ) except CrawlTask.DoesNotExist: pass def _execute_keyword_task(self, task): """执行关键词搜索任务""" task_id = task.id # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新当前操作 task.current_action = "开始关键词搜索" task.save() # 准备参数 selected_websites = task.websites.all() if selected_websites: websites = [w.name for w in selected_websites] else: websites = list(WEBSITE_CRAWL_CONFIGS.keys()) start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None # 设置任务ID,以便在爬虫函数中检查取消状态 crawl_by_keyword.task_id = task_id # 使用新的关键词爬虫引擎 try: # 延迟导入以避免循环依赖 from core.keyword_crawler import KeywordCrawler crawler = KeywordCrawler(task_id, self) crawler.run() # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新结果统计 task = CrawlTask.objects.get(id=task_id) if task.status == 'completed': # 统计爬取的文章数量 from core.models import Article article_count = Article.objects.filter(website__in=task.websites.all()).count() task.total_articles = article_count task.success_count = article_count task.failed_count = 0 task.result_details = { 'total_articles': article_count, 'success_count': article_count, 'failed_count': 0, 'keyword': task.keyword, 'websites': [w.name for w in task.websites.all()] } task.save() # 添加执行记录 task.add_execution_record( status='completed', started_at=task.started_at, completed_at=task.completed_at ) elif self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return except Exception as e: if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新任务状态为失败 task = CrawlTask.objects.get(id=task_id) task.status = 'failed' task.error_message = str(e) task.completed_at = timezone.now() task.save() # 添加执行记录 task.add_execution_record( status='failed', started_at=task.started_at, completed_at=task.completed_at, error_message=str(e) ) raise e def _execute_historical_task(self, task): """执行历史文章任务""" task_id = task.id # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新当前操作 task.current_action = "开始历史文章爬取" task.save() # 准备参数 selected_websites = task.websites.all() if selected_websites: websites = [w.name for w in selected_websites] else: websites = list(WEBSITE_CRAWL_CONFIGS.keys()) start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None # 设置任务ID,以便在爬虫函数中检查取消状态 crawl_historical_articles.task_id = task_id # 执行爬取 try: results = crawl_historical_articles( website_names=websites, start_date=start_date, end_date=end_date, max_articles_per_site=task.max_articles ) except Exception as e: if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return raise e # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新结果 task.total_articles = results['total_articles'] task.success_count = results['success_count'] task.failed_count = results['failed_count'] task.result_details = results['website_results'] task.save() def _execute_full_site_task(self, task): """执行全站爬取任务""" task_id = task.id # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新当前操作 task.current_action = "开始全站爬取" task.save() # 准备参数 selected_websites = task.websites.all() if selected_websites: websites = [w.name for w in selected_websites] else: websites = list(WEBSITE_CRAWL_CONFIGS.keys()) total_websites = len(websites) completed_websites = 0 for website_name in websites: # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return try: # 更新当前网站 task.current_website = website_name task.current_action = f"正在爬取 {website_name}" task.save() # 获取或创建网站对象 from core.models import Website website, created = Website.objects.get_or_create( name=website_name, defaults={ 'base_url': WEBSITE_CRAWL_CONFIGS[website_name]["base_url"], 'enabled': True } ) # 设置任务ID,以便在爬虫函数中检查取消状态 full_site_crawler.task_id = task_id # 执行全站爬取 try: full_site_crawler( WEBSITE_CRAWL_CONFIGS[website_name]["base_url"], website, max_pages=task.max_pages ) except Exception as e: if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return raise e completed_websites += 1 progress = int((completed_websites / total_websites) * 100) task.progress = progress task.save() except Exception as e: # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 记录错误但继续处理其他网站 print(f"爬取网站 {website_name} 时出错: {e}") continue # 检查任务是否已被取消 if self.is_task_cancelled(task_id): self._mark_task_cancelled(task_id) return # 更新最终结果 task.total_articles = completed_websites # 这里可以改为实际爬取的文章数 task.success_count = completed_websites task.failed_count = total_websites - completed_websites task.save() def get_task_status(self, task_id): """获取任务状态""" try: task = CrawlTask.objects.get(id=task_id) return { 'status': task.status, 'progress': task.progress, 'current_website': task.current_website, 'current_action': task.current_action, 'total_articles': task.total_articles, 'success_count': task.success_count, 'failed_count': task.failed_count, 'error_message': task.error_message } except CrawlTask.DoesNotExist: return None # 全局任务执行器实例 task_executor = TaskExecutor()