Files
green_classroom/core/task_executor.py
2025-09-26 10:39:36 +08:00

474 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
爬取任务执行器
负责执行爬取任务并更新任务状态
"""
import threading
import time
from django.utils import timezone
from django.db import transaction
from core.models import CrawlTask
from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_CRAWL_CONFIGS
class TaskExecutor:
"""任务执行器"""
def __init__(self):
self.running_tasks = {}
self.cancelled_tasks = set() # 添加已取消任务的集合
self.lock = threading.Lock()
def start_task(self, task_id, rerun=False):
"""启动任务"""
with self.lock:
if task_id in self.running_tasks:
return False, "任务已在运行中"
try:
task = CrawlTask.objects.get(id=task_id)
# 检查任务状态
if not rerun and task.status != 'pending':
return False, "任务状态不允许启动"
# 如果是重新执行,检查任务是否已完成或失败
if rerun and task.status not in ['completed', 'failed', 'cancelled']:
return False, "只有已完成、失败或已取消的任务可以重新执行"
# 重置任务状态(如果是重新执行)
if rerun:
task.status = 'running'
task.started_at = timezone.now()
task.completed_at = None
task.error_message = None
task.progress = 0
task.current_website = None
task.current_action = None
task.total_articles = 0
task.success_count = 0
task.failed_count = 0
task.result_details = {}
else:
# 更新任务状态
task.status = 'running'
task.started_at = timezone.now()
task.save()
# 确保任务不在取消集合中
self.cancelled_tasks.discard(task_id)
# 启动后台线程执行任务
thread = threading.Thread(target=self._execute_task, args=(task_id,))
thread.daemon = True
thread.start()
self.running_tasks[task_id] = thread
return True, "任务已启动" + ("(重新执行)" if rerun else "")
except CrawlTask.DoesNotExist:
return False, "任务不存在"
except Exception as e:
return False, f"启动任务失败: {e}"
def rerun_task(self, task_id):
"""重新执行任务"""
return self.start_task(task_id, rerun=True)
def cancel_task(self, task_id):
"""取消任务"""
with self.lock:
# 将任务标记为已取消
self.cancelled_tasks.add(task_id)
if task_id in self.running_tasks:
# 标记任务为取消状态
try:
task = CrawlTask.objects.get(id=task_id)
task.status = 'cancelled'
task.completed_at = timezone.now()
task.save()
# 记录执行历史
task.add_execution_record(
status='cancelled',
started_at=task.started_at,
completed_at=task.completed_at,
error_message='任务被取消'
)
# 移除运行中的任务
del self.running_tasks[task_id]
return True, "任务已取消"
except CrawlTask.DoesNotExist:
return False, "任务不存在"
else:
# 即使任务不在运行中,也标记为已取消
try:
task = CrawlTask.objects.get(id=task_id)
if task.status in ['pending', 'running']:
task.status = 'cancelled'
task.completed_at = timezone.now()
task.save()
# 记录执行历史
task.add_execution_record(
status='cancelled',
started_at=task.started_at,
completed_at=task.completed_at,
error_message='任务被取消'
)
return True, "任务已取消"
except CrawlTask.DoesNotExist:
pass
return False, "任务未在运行中"
def is_task_cancelled(self, task_id):
"""检查任务是否已被取消"""
with self.lock:
return task_id in self.cancelled_tasks
def _execute_task(self, task_id):
"""执行任务的核心逻辑"""
try:
task = CrawlTask.objects.get(id=task_id)
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 根据任务类型执行不同的爬取逻辑
if task.task_type == 'keyword':
self._execute_keyword_task(task)
elif task.task_type == 'historical':
self._execute_historical_task(task)
elif task.task_type == 'full_site':
self._execute_full_site_task(task)
else:
raise ValueError(f"不支持的任务类型: {task.task_type}")
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 任务完成
with transaction.atomic():
task = CrawlTask.objects.select_for_update().get(id=task_id)
task.status = 'completed'
task.completed_at = timezone.now()
task.progress = 100
task.save()
# 记录执行历史
task.add_execution_record(
status='completed',
started_at=task.started_at,
completed_at=task.completed_at
)
except Exception as e:
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 任务失败
try:
with transaction.atomic():
task = CrawlTask.objects.select_for_update().get(id=task_id)
task.status = 'failed'
task.completed_at = timezone.now()
task.error_message = str(e)
task.save()
# 记录执行历史
task.add_execution_record(
status='failed',
started_at=task.started_at,
completed_at=task.completed_at,
error_message=str(e)
)
except:
pass
finally:
# 清理运行中的任务记录
with self.lock:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
# 从取消集合中移除任务
self.cancelled_tasks.discard(task_id)
def _mark_task_cancelled(self, task_id):
"""标记任务为已取消"""
try:
with transaction.atomic():
task = CrawlTask.objects.select_for_update().get(id=task_id)
task.status = 'cancelled'
task.completed_at = timezone.now()
task.save()
# 记录执行历史
task.add_execution_record(
status='cancelled',
started_at=task.started_at,
completed_at=task.completed_at,
error_message='任务被取消'
)
except CrawlTask.DoesNotExist:
pass
def _execute_keyword_task(self, task):
"""执行关键词搜索任务"""
task_id = task.id
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新当前操作
task.current_action = "开始关键词搜索"
task.save()
# 准备参数
selected_websites = task.websites.all()
if selected_websites:
websites = [w.name for w in selected_websites]
else:
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
# 设置任务ID以便在爬虫函数中检查取消状态
crawl_by_keyword.task_id = task_id
# 使用新的关键词爬虫引擎
try:
# 延迟导入以避免循环依赖
from core.keyword_crawler import KeywordCrawler
crawler = KeywordCrawler(task_id, self)
crawler.run()
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新结果统计
task = CrawlTask.objects.get(id=task_id)
if task.status == 'completed':
# 统计爬取的文章数量
from core.models import Article
article_count = Article.objects.filter(website__in=task.websites.all()).count()
task.total_articles = article_count
task.success_count = article_count
task.failed_count = 0
task.result_details = {
'total_articles': article_count,
'success_count': article_count,
'failed_count': 0,
'keyword': task.keyword,
'websites': [w.name for w in task.websites.all()]
}
task.save()
# 添加执行记录
task.add_execution_record(
status='completed',
started_at=task.started_at,
completed_at=task.completed_at
)
elif self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
except Exception as e:
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新任务状态为失败
task = CrawlTask.objects.get(id=task_id)
task.status = 'failed'
task.error_message = str(e)
task.completed_at = timezone.now()
task.save()
# 添加执行记录
task.add_execution_record(
status='failed',
started_at=task.started_at,
completed_at=task.completed_at,
error_message=str(e)
)
raise e
def _execute_historical_task(self, task):
"""执行历史文章任务"""
task_id = task.id
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新当前操作
task.current_action = "开始历史文章爬取"
task.save()
# 准备参数
selected_websites = task.websites.all()
if selected_websites:
websites = [w.name for w in selected_websites]
else:
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
# 设置任务ID以便在爬虫函数中检查取消状态
crawl_historical_articles.task_id = task_id
# 执行爬取
try:
results = crawl_historical_articles(
website_names=websites,
start_date=start_date,
end_date=end_date,
max_articles_per_site=task.max_articles
)
except Exception as e:
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
raise e
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新结果
task.total_articles = results['total_articles']
task.success_count = results['success_count']
task.failed_count = results['failed_count']
task.result_details = results['website_results']
task.save()
def _execute_full_site_task(self, task):
"""执行全站爬取任务"""
task_id = task.id
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新当前操作
task.current_action = "开始全站爬取"
task.save()
# 准备参数
selected_websites = task.websites.all()
if selected_websites:
websites = [w.name for w in selected_websites]
else:
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
total_websites = len(websites)
completed_websites = 0
for website_name in websites:
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
try:
# 更新当前网站
task.current_website = website_name
task.current_action = f"正在爬取 {website_name}"
task.save()
# 获取或创建网站对象
from core.models import Website
website, created = Website.objects.get_or_create(
name=website_name,
defaults={
'base_url': WEBSITE_CRAWL_CONFIGS[website_name]["base_url"],
'enabled': True
}
)
# 设置任务ID以便在爬虫函数中检查取消状态
full_site_crawler.task_id = task_id
# 执行全站爬取
try:
full_site_crawler(
WEBSITE_CRAWL_CONFIGS[website_name]["base_url"],
website,
max_pages=task.max_pages
)
except Exception as e:
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
raise e
completed_websites += 1
progress = int((completed_websites / total_websites) * 100)
task.progress = progress
task.save()
except Exception as e:
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 记录错误但继续处理其他网站
print(f"爬取网站 {website_name} 时出错: {e}")
continue
# 检查任务是否已被取消
if self.is_task_cancelled(task_id):
self._mark_task_cancelled(task_id)
return
# 更新最终结果
task.total_articles = completed_websites # 这里可以改为实际爬取的文章数
task.success_count = completed_websites
task.failed_count = total_websites - completed_websites
task.save()
def get_task_status(self, task_id):
"""获取任务状态"""
try:
task = CrawlTask.objects.get(id=task_id)
return {
'status': task.status,
'progress': task.progress,
'current_website': task.current_website,
'current_action': task.current_action,
'total_articles': task.total_articles,
'success_count': task.success_count,
'failed_count': task.failed_count,
'error_message': task.error_message
}
except CrawlTask.DoesNotExist:
return None
# 全局任务执行器实例
task_executor = TaskExecutor()