support keyword crawl
This commit is contained in:
@@ -8,7 +8,7 @@ import time
|
||||
from django.utils import timezone
|
||||
from django.db import transaction
|
||||
from core.models import CrawlTask
|
||||
from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_SEARCH_CONFIGS
|
||||
from core.utils import crawl_by_keyword, crawl_historical_articles, full_site_crawler, WEBSITE_CRAWL_CONFIGS
|
||||
|
||||
|
||||
class TaskExecutor:
|
||||
@@ -16,9 +16,10 @@ class TaskExecutor:
|
||||
|
||||
def __init__(self):
|
||||
self.running_tasks = {}
|
||||
self.cancelled_tasks = set() # 添加已取消任务的集合
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def start_task(self, task_id):
|
||||
def start_task(self, task_id, rerun=False):
|
||||
"""启动任务"""
|
||||
with self.lock:
|
||||
if task_id in self.running_tasks:
|
||||
@@ -26,30 +27,61 @@ class TaskExecutor:
|
||||
|
||||
try:
|
||||
task = CrawlTask.objects.get(id=task_id)
|
||||
if task.status != 'pending':
|
||||
|
||||
# 检查任务状态
|
||||
if not rerun and task.status != 'pending':
|
||||
return False, "任务状态不允许启动"
|
||||
|
||||
# 更新任务状态
|
||||
task.status = 'running'
|
||||
task.started_at = timezone.now()
|
||||
# 如果是重新执行,检查任务是否已完成或失败
|
||||
if rerun and task.status not in ['completed', 'failed', 'cancelled']:
|
||||
return False, "只有已完成、失败或已取消的任务可以重新执行"
|
||||
|
||||
# 重置任务状态(如果是重新执行)
|
||||
if rerun:
|
||||
task.status = 'running'
|
||||
task.started_at = timezone.now()
|
||||
task.completed_at = None
|
||||
task.error_message = None
|
||||
task.progress = 0
|
||||
task.current_website = None
|
||||
task.current_action = None
|
||||
task.total_articles = 0
|
||||
task.success_count = 0
|
||||
task.failed_count = 0
|
||||
task.result_details = {}
|
||||
else:
|
||||
# 更新任务状态
|
||||
task.status = 'running'
|
||||
task.started_at = timezone.now()
|
||||
|
||||
task.save()
|
||||
|
||||
# 确保任务不在取消集合中
|
||||
self.cancelled_tasks.discard(task_id)
|
||||
|
||||
# 启动后台线程执行任务
|
||||
thread = threading.Thread(target=self._execute_task, args=(task_id,))
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
self.running_tasks[task_id] = thread
|
||||
return True, "任务已启动"
|
||||
return True, "任务已启动" + ("(重新执行)" if rerun else "")
|
||||
|
||||
except CrawlTask.DoesNotExist:
|
||||
return False, "任务不存在"
|
||||
except Exception as e:
|
||||
return False, f"启动任务失败: {e}"
|
||||
|
||||
def rerun_task(self, task_id):
|
||||
"""重新执行任务"""
|
||||
return self.start_task(task_id, rerun=True)
|
||||
|
||||
def cancel_task(self, task_id):
|
||||
"""取消任务"""
|
||||
with self.lock:
|
||||
# 将任务标记为已取消
|
||||
self.cancelled_tasks.add(task_id)
|
||||
|
||||
if task_id in self.running_tasks:
|
||||
# 标记任务为取消状态
|
||||
try:
|
||||
@@ -58,19 +90,55 @@ class TaskExecutor:
|
||||
task.completed_at = timezone.now()
|
||||
task.save()
|
||||
|
||||
# 记录执行历史
|
||||
task.add_execution_record(
|
||||
status='cancelled',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at,
|
||||
error_message='任务被取消'
|
||||
)
|
||||
|
||||
# 移除运行中的任务
|
||||
del self.running_tasks[task_id]
|
||||
return True, "任务已取消"
|
||||
except CrawlTask.DoesNotExist:
|
||||
return False, "任务不存在"
|
||||
else:
|
||||
# 即使任务不在运行中,也标记为已取消
|
||||
try:
|
||||
task = CrawlTask.objects.get(id=task_id)
|
||||
if task.status in ['pending', 'running']:
|
||||
task.status = 'cancelled'
|
||||
task.completed_at = timezone.now()
|
||||
task.save()
|
||||
|
||||
# 记录执行历史
|
||||
task.add_execution_record(
|
||||
status='cancelled',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at,
|
||||
error_message='任务被取消'
|
||||
)
|
||||
return True, "任务已取消"
|
||||
except CrawlTask.DoesNotExist:
|
||||
pass
|
||||
return False, "任务未在运行中"
|
||||
|
||||
def is_task_cancelled(self, task_id):
|
||||
"""检查任务是否已被取消"""
|
||||
with self.lock:
|
||||
return task_id in self.cancelled_tasks
|
||||
|
||||
def _execute_task(self, task_id):
|
||||
"""执行任务的核心逻辑"""
|
||||
try:
|
||||
task = CrawlTask.objects.get(id=task_id)
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 根据任务类型执行不同的爬取逻辑
|
||||
if task.task_type == 'keyword':
|
||||
self._execute_keyword_task(task)
|
||||
@@ -81,6 +149,11 @@ class TaskExecutor:
|
||||
else:
|
||||
raise ValueError(f"不支持的任务类型: {task.task_type}")
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 任务完成
|
||||
with transaction.atomic():
|
||||
task = CrawlTask.objects.select_for_update().get(id=task_id)
|
||||
@@ -88,8 +161,20 @@ class TaskExecutor:
|
||||
task.completed_at = timezone.now()
|
||||
task.progress = 100
|
||||
task.save()
|
||||
|
||||
# 记录执行历史
|
||||
task.add_execution_record(
|
||||
status='completed',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 任务失败
|
||||
try:
|
||||
with transaction.atomic():
|
||||
@@ -98,6 +183,14 @@ class TaskExecutor:
|
||||
task.completed_at = timezone.now()
|
||||
task.error_message = str(e)
|
||||
task.save()
|
||||
|
||||
# 记录执行历史
|
||||
task.add_execution_record(
|
||||
status='failed',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at,
|
||||
error_message=str(e)
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
@@ -106,9 +199,37 @@ class TaskExecutor:
|
||||
with self.lock:
|
||||
if task_id in self.running_tasks:
|
||||
del self.running_tasks[task_id]
|
||||
# 从取消集合中移除任务
|
||||
self.cancelled_tasks.discard(task_id)
|
||||
|
||||
def _mark_task_cancelled(self, task_id):
|
||||
"""标记任务为已取消"""
|
||||
try:
|
||||
with transaction.atomic():
|
||||
task = CrawlTask.objects.select_for_update().get(id=task_id)
|
||||
task.status = 'cancelled'
|
||||
task.completed_at = timezone.now()
|
||||
task.save()
|
||||
|
||||
# 记录执行历史
|
||||
task.add_execution_record(
|
||||
status='cancelled',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at,
|
||||
error_message='任务被取消'
|
||||
)
|
||||
except CrawlTask.DoesNotExist:
|
||||
pass
|
||||
|
||||
def _execute_keyword_task(self, task):
|
||||
"""执行关键词搜索任务"""
|
||||
task_id = task.id
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新当前操作
|
||||
task.current_action = "开始关键词搜索"
|
||||
task.save()
|
||||
@@ -118,30 +239,90 @@ class TaskExecutor:
|
||||
if selected_websites:
|
||||
websites = [w.name for w in selected_websites]
|
||||
else:
|
||||
websites = list(WEBSITE_SEARCH_CONFIGS.keys())
|
||||
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
|
||||
|
||||
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
|
||||
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
|
||||
|
||||
# 执行爬取
|
||||
results = crawl_by_keyword(
|
||||
keyword=task.keyword,
|
||||
website_names=websites,
|
||||
max_pages=task.max_pages,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
max_articles=task.max_articles
|
||||
)
|
||||
# 设置任务ID,以便在爬虫函数中检查取消状态
|
||||
crawl_by_keyword.task_id = task_id
|
||||
|
||||
# 更新结果
|
||||
task.total_articles = results['total_articles']
|
||||
task.success_count = results['success_count']
|
||||
task.failed_count = results['failed_count']
|
||||
task.result_details = results['website_results']
|
||||
task.save()
|
||||
# 使用新的关键词爬虫引擎
|
||||
try:
|
||||
# 延迟导入以避免循环依赖
|
||||
from core.keyword_crawler import KeywordCrawler
|
||||
crawler = KeywordCrawler(task_id, self)
|
||||
crawler.run()
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新结果统计
|
||||
task = CrawlTask.objects.get(id=task_id)
|
||||
if task.status == 'completed':
|
||||
# 统计爬取的文章数量
|
||||
from core.models import Article
|
||||
article_count = Article.objects.filter(website__in=task.websites.all()).count()
|
||||
task.total_articles = article_count
|
||||
task.success_count = article_count
|
||||
task.failed_count = 0
|
||||
task.result_details = {
|
||||
'total_articles': article_count,
|
||||
'success_count': article_count,
|
||||
'failed_count': 0,
|
||||
'keyword': task.keyword,
|
||||
'websites': [w.name for w in task.websites.all()]
|
||||
}
|
||||
task.save()
|
||||
|
||||
# 添加执行记录
|
||||
task.add_execution_record(
|
||||
status='completed',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at
|
||||
)
|
||||
elif self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新任务状态为失败
|
||||
task = CrawlTask.objects.get(id=task_id)
|
||||
task.status = 'failed'
|
||||
task.error_message = str(e)
|
||||
task.completed_at = timezone.now()
|
||||
task.save()
|
||||
|
||||
# 添加执行记录
|
||||
task.add_execution_record(
|
||||
status='failed',
|
||||
started_at=task.started_at,
|
||||
completed_at=task.completed_at,
|
||||
error_message=str(e)
|
||||
)
|
||||
|
||||
raise e
|
||||
|
||||
def _execute_historical_task(self, task):
|
||||
"""执行历史文章任务"""
|
||||
task_id = task.id
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新当前操作
|
||||
task.current_action = "开始历史文章爬取"
|
||||
task.save()
|
||||
@@ -151,18 +332,32 @@ class TaskExecutor:
|
||||
if selected_websites:
|
||||
websites = [w.name for w in selected_websites]
|
||||
else:
|
||||
websites = list(WEBSITE_SEARCH_CONFIGS.keys())
|
||||
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
|
||||
|
||||
start_date = task.start_date.strftime('%Y-%m-%d') if task.start_date else None
|
||||
end_date = task.end_date.strftime('%Y-%m-%d') if task.end_date else None
|
||||
|
||||
# 设置任务ID,以便在爬虫函数中检查取消状态
|
||||
crawl_historical_articles.task_id = task_id
|
||||
|
||||
# 执行爬取
|
||||
results = crawl_historical_articles(
|
||||
website_names=websites,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
max_articles_per_site=task.max_articles
|
||||
)
|
||||
try:
|
||||
results = crawl_historical_articles(
|
||||
website_names=websites,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
max_articles_per_site=task.max_articles
|
||||
)
|
||||
except Exception as e:
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
raise e
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新结果
|
||||
task.total_articles = results['total_articles']
|
||||
@@ -173,6 +368,13 @@ class TaskExecutor:
|
||||
|
||||
def _execute_full_site_task(self, task):
|
||||
"""执行全站爬取任务"""
|
||||
task_id = task.id
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新当前操作
|
||||
task.current_action = "开始全站爬取"
|
||||
task.save()
|
||||
@@ -182,12 +384,17 @@ class TaskExecutor:
|
||||
if selected_websites:
|
||||
websites = [w.name for w in selected_websites]
|
||||
else:
|
||||
websites = list(WEBSITE_SEARCH_CONFIGS.keys())
|
||||
websites = list(WEBSITE_CRAWL_CONFIGS.keys())
|
||||
|
||||
total_websites = len(websites)
|
||||
completed_websites = 0
|
||||
|
||||
for website_name in websites:
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
try:
|
||||
# 更新当前网站
|
||||
task.current_website = website_name
|
||||
@@ -199,17 +406,26 @@ class TaskExecutor:
|
||||
website, created = Website.objects.get_or_create(
|
||||
name=website_name,
|
||||
defaults={
|
||||
'base_url': WEBSITE_SEARCH_CONFIGS[website_name]["search_url"],
|
||||
'base_url': WEBSITE_CRAWL_CONFIGS[website_name]["base_url"],
|
||||
'enabled': True
|
||||
}
|
||||
)
|
||||
|
||||
# 设置任务ID,以便在爬虫函数中检查取消状态
|
||||
full_site_crawler.task_id = task_id
|
||||
|
||||
# 执行全站爬取
|
||||
full_site_crawler(
|
||||
WEBSITE_SEARCH_CONFIGS[website_name]["search_url"],
|
||||
website,
|
||||
max_pages=task.max_pages
|
||||
)
|
||||
try:
|
||||
full_site_crawler(
|
||||
WEBSITE_CRAWL_CONFIGS[website_name]["base_url"],
|
||||
website,
|
||||
max_pages=task.max_pages
|
||||
)
|
||||
except Exception as e:
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
raise e
|
||||
|
||||
completed_websites += 1
|
||||
progress = int((completed_websites / total_websites) * 100)
|
||||
@@ -217,10 +433,19 @@ class TaskExecutor:
|
||||
task.save()
|
||||
|
||||
except Exception as e:
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
# 记录错误但继续处理其他网站
|
||||
print(f"爬取网站 {website_name} 时出错: {e}")
|
||||
continue
|
||||
|
||||
# 检查任务是否已被取消
|
||||
if self.is_task_cancelled(task_id):
|
||||
self._mark_task_cancelled(task_id)
|
||||
return
|
||||
|
||||
# 更新最终结果
|
||||
task.total_articles = completed_websites # 这里可以改为实际爬取的文章数
|
||||
task.success_count = completed_websites
|
||||
@@ -246,4 +471,4 @@ class TaskExecutor:
|
||||
|
||||
|
||||
# 全局任务执行器实例
|
||||
task_executor = TaskExecutor()
|
||||
task_executor = TaskExecutor()
|
||||
Reference in New Issue
Block a user