236 lines
8.4 KiB
Python
236 lines
8.4 KiB
Python
"""
|
|
爬取任务执行器
|
|
负责执行爬取任务并更新任务状态
|
|
"""
|
|
|
|
import threading
|
|
import time
|
|
from django.utils import timezone
|
|
from django.db import transaction
|
|
from core.models import CrawlTask
|
|
from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_SEARCH_CONFIGS
|
|
|
|
|
|
class TaskExecutor:
|
|
"""任务执行器"""
|
|
|
|
def __init__(self):
|
|
self.running_tasks = {}
|
|
self.lock = threading.Lock()
|
|
|
|
def start_task(self, task_id):
|
|
"""启动任务"""
|
|
with self.lock:
|
|
if task_id in self.running_tasks:
|
|
return False, "任务已在运行中"
|
|
|
|
try:
|
|
task = CrawlTask.objects.get(id=task_id)
|
|
if task.status != 'pending':
|
|
return False, "任务状态不允许启动"
|
|
|
|
# 更新任务状态
|
|
task.status = 'running'
|
|
task.started_at = timezone.now()
|
|
task.save()
|
|
|
|
# 启动后台线程执行任务
|
|
thread = threading.Thread(target=self._execute_task, args=(task_id,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
self.running_tasks[task_id] = thread
|
|
return True, "任务已启动"
|
|
|
|
except CrawlTask.DoesNotExist:
|
|
return False, "任务不存在"
|
|
except Exception as e:
|
|
return False, f"启动任务失败: {e}"
|
|
|
|
def cancel_task(self, task_id):
|
|
"""取消任务"""
|
|
with self.lock:
|
|
if task_id in self.running_tasks:
|
|
# 标记任务为取消状态
|
|
try:
|
|
task = CrawlTask.objects.get(id=task_id)
|
|
task.status = 'cancelled'
|
|
task.completed_at = timezone.now()
|
|
task.save()
|
|
|
|
# 移除运行中的任务
|
|
del self.running_tasks[task_id]
|
|
return True, "任务已取消"
|
|
except CrawlTask.DoesNotExist:
|
|
return False, "任务不存在"
|
|
else:
|
|
return False, "任务未在运行中"
|
|
|
|
def _execute_task(self, task_id):
|
|
"""执行任务的核心逻辑"""
|
|
try:
|
|
task = CrawlTask.objects.get(id=task_id)
|
|
|
|
# 根据任务类型执行不同的爬取逻辑
|
|
if task.task_type == 'keyword':
|
|
self._execute_keyword_task(task)
|
|
elif task.task_type == 'historical':
|
|
self._execute_historical_task(task)
|
|
elif task.task_type == 'full_site':
|
|
self._execute_full_site_task(task)
|
|
else:
|
|
raise ValueError(f"不支持的任务类型: {task.task_type}")
|
|
|
|
# 任务完成
|
|
with transaction.atomic():
|
|
task = CrawlTask.objects.select_for_update().get(id=task_id)
|
|
task.status = 'completed'
|
|
task.completed_at = timezone.now()
|
|
task.progress = 100
|
|
task.save()
|
|
|
|
except Exception as e:
|
|
# 任务失败
|
|
try:
|
|
with transaction.atomic():
|
|
task = CrawlTask.objects.select_for_update().get(id=task_id)
|
|
task.status = 'failed'
|
|
task.completed_at = timezone.now()
|
|
task.error_message = str(e)
|
|
task.save()
|
|
except:
|
|
pass
|
|
|
|
finally:
|
|
# 清理运行中的任务记录
|
|
with self.lock:
|
|
if task_id in self.running_tasks:
|
|
del self.running_tasks[task_id]
|
|
|
|
def _execute_keyword_task(self, task):
|
|
"""执行关键词搜索任务"""
|
|
# 更新当前操作
|
|
task.current_action = "开始关键词搜索"
|
|
task.save()
|
|
|
|
# 准备参数
|
|
websites = task.websites if task.websites else list(WEBSITE_SEARCH_CONFIGS.keys())
|
|
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
|
|
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
|
|
|
|
# 执行爬取
|
|
results = crawl_by_keyword(
|
|
keyword=task.keyword,
|
|
website_names=websites,
|
|
max_pages=task.max_pages,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
max_articles=task.max_articles
|
|
)
|
|
|
|
# 更新结果
|
|
task.total_articles = results['total_articles']
|
|
task.success_count = results['success_count']
|
|
task.failed_count = results['failed_count']
|
|
task.result_details = results['website_results']
|
|
task.save()
|
|
|
|
def _execute_historical_task(self, task):
|
|
"""执行历史文章任务"""
|
|
# 更新当前操作
|
|
task.current_action = "开始历史文章爬取"
|
|
task.save()
|
|
|
|
# 准备参数
|
|
websites = task.websites if task.websites else list(WEBSITE_SEARCH_CONFIGS.keys())
|
|
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
|
|
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
|
|
|
|
# 执行爬取
|
|
results = crawl_historical_articles(
|
|
website_names=websites,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
max_articles_per_site=task.max_articles
|
|
)
|
|
|
|
# 更新结果
|
|
task.total_articles = results['total_articles']
|
|
task.success_count = results['success_count']
|
|
task.failed_count = results['failed_count']
|
|
task.result_details = results['website_results']
|
|
task.save()
|
|
|
|
def _execute_full_site_task(self, task):
|
|
"""执行全站爬取任务"""
|
|
# 更新当前操作
|
|
task.current_action = "开始全站爬取"
|
|
task.save()
|
|
|
|
# 准备参数
|
|
websites = task.websites if task.websites else list(WEBSITE_SEARCH_CONFIGS.keys())
|
|
|
|
total_websites = len(websites)
|
|
completed_websites = 0
|
|
|
|
for website_name in websites:
|
|
try:
|
|
# 更新当前网站
|
|
task.current_website = website_name
|
|
task.current_action = f"正在爬取 {website_name}"
|
|
task.save()
|
|
|
|
# 获取或创建网站对象
|
|
from core.models import Website
|
|
website, created = Website.objects.get_or_create(
|
|
name=website_name,
|
|
defaults={
|
|
'base_url': WEBSITE_SEARCH_CONFIGS[website_name]["search_url"],
|
|
'enabled': True
|
|
}
|
|
)
|
|
|
|
# 执行全站爬取
|
|
full_site_crawler(
|
|
WEBSITE_SEARCH_CONFIGS[website_name]["search_url"],
|
|
website,
|
|
max_pages=task.max_pages
|
|
)
|
|
|
|
completed_websites += 1
|
|
progress = int((completed_websites / total_websites) * 100)
|
|
task.progress = progress
|
|
task.save()
|
|
|
|
except Exception as e:
|
|
# 记录错误但继续处理其他网站
|
|
print(f"爬取网站 {website_name} 时出错: {e}")
|
|
continue
|
|
|
|
# 更新最终结果
|
|
task.total_articles = completed_websites # 这里可以改为实际爬取的文章数
|
|
task.success_count = completed_websites
|
|
task.failed_count = total_websites - completed_websites
|
|
task.save()
|
|
|
|
def get_task_status(self, task_id):
|
|
"""获取任务状态"""
|
|
try:
|
|
task = CrawlTask.objects.get(id=task_id)
|
|
return {
|
|
'status': task.status,
|
|
'progress': task.progress,
|
|
'current_website': task.current_website,
|
|
'current_action': task.current_action,
|
|
'total_articles': task.total_articles,
|
|
'success_count': task.success_count,
|
|
'failed_count': task.failed_count,
|
|
'error_message': task.error_message
|
|
}
|
|
except CrawlTask.DoesNotExist:
|
|
return None
|
|
|
|
|
|
# 全局任务执行器实例
|
|
task_executor = TaskExecutor()
|