Support celery
This commit is contained in:
195
core/tasks.py
Normal file
195
core/tasks.py
Normal file
@@ -0,0 +1,195 @@
|
||||
import logging
|
||||
from celery import shared_task
|
||||
from django.core.management import call_command
|
||||
# from django.conf import settings
|
||||
from .models import Website, Article
|
||||
from .utils import full_site_crawler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, max_retries=3)
|
||||
def crawl_website(self, website_id, node_id=None, batch_id=None):
|
||||
"""
|
||||
爬取单个网站的任务
|
||||
"""
|
||||
try:
|
||||
website = Website.objects.get(id=website_id)
|
||||
logger.info(f"开始爬取网站: {website.name} (节点: {node_id}, 批次: {batch_id})")
|
||||
|
||||
# 记录任务开始
|
||||
if node_id and batch_id:
|
||||
from .distributed_crawler import distributed_crawler
|
||||
distributed_crawler.heartbeat(node_id, 1)
|
||||
|
||||
# 调用爬虫函数
|
||||
full_site_crawler(website.base_url, website, max_pages=100)
|
||||
|
||||
# 统计结果
|
||||
article_count = website.article_set.count()
|
||||
logger.info(f"网站 {website.name} 爬取完成,共 {article_count} 篇文章")
|
||||
|
||||
# 记录任务完成
|
||||
if node_id and batch_id:
|
||||
distributed_crawler.heartbeat(node_id, 0)
|
||||
|
||||
return {
|
||||
'website_id': website_id,
|
||||
'website_name': website.name,
|
||||
'article_count': article_count,
|
||||
'status': 'success',
|
||||
'node_id': node_id,
|
||||
'batch_id': batch_id
|
||||
}
|
||||
|
||||
except Website.DoesNotExist:
|
||||
logger.error(f"网站不存在: {website_id}")
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error(f"爬取网站 {website_id} 失败: {exc}")
|
||||
# 重试任务
|
||||
raise self.retry(exc=exc, countdown=60 * 5) # 5分钟后重试
|
||||
|
||||
|
||||
@shared_task(bind=True, max_retries=3)
|
||||
def crawl_all_websites(self):
|
||||
"""
|
||||
爬取所有网站的任务
|
||||
"""
|
||||
try:
|
||||
logger.info("开始批量爬取所有网站")
|
||||
|
||||
# 获取所有启用的网站
|
||||
websites = Website.objects.filter(enabled=True)
|
||||
total_websites = websites.count()
|
||||
|
||||
results = []
|
||||
for website in websites:
|
||||
try:
|
||||
# 调用单个网站爬取任务
|
||||
result = crawl_website.delay(website.id)
|
||||
results.append({
|
||||
'website_id': website.id,
|
||||
'website_name': website.name,
|
||||
'task_id': result.id
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"启动网站 {website.name} 爬取任务失败: {e}")
|
||||
results.append({
|
||||
'website_id': website.id,
|
||||
'website_name': website.name,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
logger.info(f"批量爬取任务启动完成,共 {total_websites} 个网站")
|
||||
|
||||
return {
|
||||
'total_websites': total_websites,
|
||||
'results': results,
|
||||
'status': 'started'
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"批量爬取任务失败: {exc}")
|
||||
raise self.retry(exc=exc, countdown=60 * 10) # 10分钟后重试
|
||||
|
||||
|
||||
@shared_task
|
||||
def crawl_specific_media(media_list):
|
||||
"""
|
||||
爬取指定媒体的任务
|
||||
"""
|
||||
try:
|
||||
logger.info(f"开始爬取指定媒体: {media_list}")
|
||||
|
||||
# 调用管理命令
|
||||
call_command('crawl_all_media', media=','.join(media_list))
|
||||
|
||||
return {
|
||||
'media_list': media_list,
|
||||
'status': 'success'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"爬取指定媒体失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@shared_task
|
||||
def cleanup_old_articles(days=30):
|
||||
"""
|
||||
清理旧文章的任务
|
||||
"""
|
||||
try:
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
|
||||
cutoff_date = timezone.now() - timedelta(days=days)
|
||||
old_articles = Article.objects.filter(created_at__lt=cutoff_date)
|
||||
count = old_articles.count()
|
||||
|
||||
old_articles.delete()
|
||||
|
||||
logger.info(f"清理了 {count} 篇旧文章({days}天前)")
|
||||
|
||||
return {
|
||||
'deleted_count': count,
|
||||
'cutoff_date': cutoff_date.isoformat(),
|
||||
'status': 'success'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"清理旧文章失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@shared_task
|
||||
def export_articles():
|
||||
"""
|
||||
导出文章的任务
|
||||
"""
|
||||
try:
|
||||
logger.info("开始导出文章")
|
||||
|
||||
# 调用导出命令
|
||||
call_command('export_articles')
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': '文章导出完成'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导出文章失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@shared_task
|
||||
def health_check():
|
||||
"""
|
||||
健康检查任务
|
||||
"""
|
||||
try:
|
||||
# 检查数据库连接
|
||||
website_count = Website.objects.count()
|
||||
article_count = Article.objects.count()
|
||||
|
||||
# 检查Redis连接
|
||||
from django.core.cache import cache
|
||||
cache.set('health_check', 'ok', 60)
|
||||
cache_result = cache.get('health_check')
|
||||
|
||||
return {
|
||||
'database': 'ok',
|
||||
'redis': 'ok' if cache_result == 'ok' else 'error',
|
||||
'website_count': website_count,
|
||||
'article_count': article_count,
|
||||
'status': 'healthy'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"健康检查失败: {e}")
|
||||
return {
|
||||
'status': 'unhealthy',
|
||||
'error': str(e)
|
||||
}
|
||||
Reference in New Issue
Block a user