diff --git a/core/tasks.py b/core/tasks.py new file mode 100644 index 0000000..386db66 --- /dev/null +++ b/core/tasks.py @@ -0,0 +1,195 @@ +import logging +from celery import shared_task +from django.core.management import call_command +# from django.conf import settings +from .models import Website, Article +from .utils import full_site_crawler + +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, max_retries=3) +def crawl_website(self, website_id, node_id=None, batch_id=None): + """ + 爬取单个网站的任务 + """ + try: + website = Website.objects.get(id=website_id) + logger.info(f"开始爬取网站: {website.name} (节点: {node_id}, 批次: {batch_id})") + + # 记录任务开始 + if node_id and batch_id: + from .distributed_crawler import distributed_crawler + distributed_crawler.heartbeat(node_id, 1) + + # 调用爬虫函数 + full_site_crawler(website.base_url, website, max_pages=100) + + # 统计结果 + article_count = website.article_set.count() + logger.info(f"网站 {website.name} 爬取完成,共 {article_count} 篇文章") + + # 记录任务完成 + if node_id and batch_id: + distributed_crawler.heartbeat(node_id, 0) + + return { + 'website_id': website_id, + 'website_name': website.name, + 'article_count': article_count, + 'status': 'success', + 'node_id': node_id, + 'batch_id': batch_id + } + + except Website.DoesNotExist: + logger.error(f"网站不存在: {website_id}") + raise + except Exception as exc: + logger.error(f"爬取网站 {website_id} 失败: {exc}") + # 重试任务 + raise self.retry(exc=exc, countdown=60 * 5) # 5分钟后重试 + + +@shared_task(bind=True, max_retries=3) +def crawl_all_websites(self): + """ + 爬取所有网站的任务 + """ + try: + logger.info("开始批量爬取所有网站") + + # 获取所有启用的网站 + websites = Website.objects.filter(enabled=True) + total_websites = websites.count() + + results = [] + for website in websites: + try: + # 调用单个网站爬取任务 + result = crawl_website.delay(website.id) + results.append({ + 'website_id': website.id, + 'website_name': website.name, + 'task_id': result.id + }) + except Exception as e: + logger.error(f"启动网站 {website.name} 爬取任务失败: {e}") + results.append({ + 'website_id': website.id, + 'website_name': website.name, + 'error': str(e) + }) + + logger.info(f"批量爬取任务启动完成,共 {total_websites} 个网站") + + return { + 'total_websites': total_websites, + 'results': results, + 'status': 'started' + } + + except Exception as exc: + logger.error(f"批量爬取任务失败: {exc}") + raise self.retry(exc=exc, countdown=60 * 10) # 10分钟后重试 + + +@shared_task +def crawl_specific_media(media_list): + """ + 爬取指定媒体的任务 + """ + try: + logger.info(f"开始爬取指定媒体: {media_list}") + + # 调用管理命令 + call_command('crawl_all_media', media=','.join(media_list)) + + return { + 'media_list': media_list, + 'status': 'success' + } + + except Exception as e: + logger.error(f"爬取指定媒体失败: {e}") + raise + + +@shared_task +def cleanup_old_articles(days=30): + """ + 清理旧文章的任务 + """ + try: + from django.utils import timezone + from datetime import timedelta + + cutoff_date = timezone.now() - timedelta(days=days) + old_articles = Article.objects.filter(created_at__lt=cutoff_date) + count = old_articles.count() + + old_articles.delete() + + logger.info(f"清理了 {count} 篇旧文章({days}天前)") + + return { + 'deleted_count': count, + 'cutoff_date': cutoff_date.isoformat(), + 'status': 'success' + } + + except Exception as e: + logger.error(f"清理旧文章失败: {e}") + raise + + +@shared_task +def export_articles(): + """ + 导出文章的任务 + """ + try: + logger.info("开始导出文章") + + # 调用导出命令 + call_command('export_articles') + + return { + 'status': 'success', + 'message': '文章导出完成' + } + + except Exception as e: + logger.error(f"导出文章失败: {e}") + raise + + +@shared_task +def health_check(): + """ + 健康检查任务 + """ + try: + # 检查数据库连接 + website_count = Website.objects.count() + article_count = Article.objects.count() + + # 检查Redis连接 + from django.core.cache import cache + cache.set('health_check', 'ok', 60) + cache_result = cache.get('health_check') + + return { + 'database': 'ok', + 'redis': 'ok' if cache_result == 'ok' else 'error', + 'website_count': website_count, + 'article_count': article_count, + 'status': 'healthy' + } + + except Exception as e: + logger.error(f"健康检查失败: {e}") + return { + 'status': 'unhealthy', + 'error': str(e) + }