Files
green_classroom/core/tasks.py
2025-08-17 03:20:08 +08:00

227 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from celery import shared_task
from django.core.management import call_command
# from django.conf import settings
from .models import Website, Article
from .utils import full_site_crawler
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def crawl_website(self, website_id, node_id=None, batch_id=None):
"""
爬取单个网站的任务
"""
try:
website = Website.objects.get(id=website_id)
logger.info(f"开始爬取网站: {website.name} (节点: {node_id}, 批次: {batch_id})")
logger.info(f"网站URL: {website.base_url}")
# 记录任务开始
if node_id and batch_id:
from .distributed_crawler import distributed_crawler
distributed_crawler.heartbeat(node_id, 1)
logger.info(f"分布式爬虫心跳已发送 - 节点: {node_id}, 状态: 1")
# 调用爬虫函数
logger.info(f"开始调用 full_site_crawler 函数处理网站: {website.name}")
full_site_crawler(website.base_url, website, max_pages=100)
logger.info(f"完成调用 full_site_crawler 函数处理网站: {website.name}")
# 统计结果
article_count = website.article_set.count()
logger.info(f"网站 {website.name} 爬取完成,共 {article_count} 篇文章")
# 记录任务完成
if node_id and batch_id:
distributed_crawler.heartbeat(node_id, 0)
logger.info(f"分布式爬虫心跳已发送 - 节点: {node_id}, 状态: 0")
result = {
'website_id': website_id,
'website_name': website.name,
'article_count': article_count,
'status': 'success',
'node_id': node_id,
'batch_id': batch_id
}
logger.info(f"任务完成,返回结果: {result}")
return result
except Website.DoesNotExist:
error_msg = f"网站不存在: {website_id}"
logger.error(error_msg)
raise
except Exception as exc:
error_msg = f"爬取网站 {website_id} 失败: {exc}"
logger.error(error_msg)
# 重试任务
logger.info(f"准备重试任务将在5分钟后重试")
raise self.retry(exc=exc, countdown=60 * 5) # 5分钟后重试
@shared_task(bind=True, max_retries=3)
def crawl_all_websites(self):
"""
爬取所有网站的任务
"""
try:
logger.info("开始批量爬取所有网站")
# 获取所有启用的网站
websites = Website.objects.filter(enabled=True)
total_websites = websites.count()
logger.info(f"找到 {total_websites} 个启用的网站")
results = []
for website in websites:
try:
logger.info(f"启动网站 {website.name} 的爬取任务")
# 调用单个网站爬取任务
result = crawl_website.delay(website.id)
logger.info(f"网站 {website.name} 的爬取任务已启动任务ID: {result.id}")
results.append({
'website_id': website.id,
'website_name': website.name,
'task_id': result.id
})
except Exception as e:
error_msg = f"启动网站 {website.name} 爬取任务失败: {e}"
logger.error(error_msg)
results.append({
'website_id': website.id,
'website_name': website.name,
'error': str(e)
})
logger.info(f"批量爬取任务启动完成,共 {total_websites} 个网站")
return {
'total_websites': total_websites,
'results': results,
'status': 'started'
}
except Exception as exc:
error_msg = f"批量爬取任务失败: {exc}"
logger.error(error_msg)
raise self.retry(exc=exc, countdown=60 * 10) # 10分钟后重试
@shared_task
def crawl_specific_media(media_list):
"""
爬取指定媒体的任务
"""
try:
logger.info(f"开始爬取指定媒体: {media_list}")
# 调用管理命令
logger.info("调用 crawl_all_media 管理命令")
call_command('crawl_all_media', media=','.join(media_list))
logger.info("crawl_all_media 管理命令执行完成")
return {
'media_list': media_list,
'status': 'success'
}
except Exception as e:
error_msg = f"爬取指定媒体失败: {e}"
logger.error(error_msg)
raise
@shared_task
def cleanup_old_articles(days=30):
"""
清理旧文章的任务
"""
try:
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
logger.info(f"查找 {days} 天前的文章,截止日期: {cutoff_date}")
old_articles = Article.objects.filter(created_at__lt=cutoff_date)
count = old_articles.count()
logger.info(f"找到 {count} 篇旧文章")
old_articles.delete()
logger.info(f"已删除 {count} 篇旧文章")
logger.info(f"清理了 {count} 篇旧文章({days}天前)")
return {
'deleted_count': count,
'cutoff_date': cutoff_date.isoformat(),
'status': 'success'
}
except Exception as e:
error_msg = f"清理旧文章失败: {e}"
logger.error(error_msg)
raise
@shared_task
def export_articles():
"""
导出文章的任务
"""
try:
logger.info("开始导出文章")
# 调用导出命令
logger.info("调用 export_articles 管理命令")
call_command('export_articles')
logger.info("export_articles 管理命令执行完成")
return {
'status': 'success',
'message': '文章导出完成'
}
except Exception as e:
error_msg = f"导出文章失败: {e}"
logger.error(error_msg)
raise
@shared_task
def health_check():
"""
健康检查任务
"""
try:
logger.info("开始执行健康检查")
# 检查数据库连接
website_count = Website.objects.count()
article_count = Article.objects.count()
logger.info(f"数据库状态正常 - 网站数量: {website_count}, 文章数量: {article_count}")
# 检查Redis连接
from django.core.cache import cache
logger.info("检查Redis连接")
cache.set('health_check', 'ok', 60)
cache_result = cache.get('health_check')
logger.info(f"Redis连接状态: {'正常' if cache_result == 'ok' else '异常'}")
result = {
'database': 'ok',
'redis': 'ok' if cache_result == 'ok' else 'error',
'website_count': website_count,
'article_count': article_count,
'status': 'healthy'
}
logger.info(f"健康检查完成,结果: {result}")
return result
except Exception as e:
error_msg = f"健康检查失败: {e}"
logger.error(error_msg)
return {
'status': 'unhealthy',
'error': str(e)
}