Files
green_classroom/core/api.py

746 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
RESTful API模块
提供完整的API接口支持爬虫管理、数据查询、任务控制
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any
import json
import csv
import io
import zipfile
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.core.paginator import Paginator
from django.db.models import Q, Count
from django.utils import timezone
# 添加DRF相关导入
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.authentication import SessionAuthentication, TokenAuthentication
# 添加python-docx库支持
from docx import Document
# 添加BeautifulSoup导入
from bs4 import BeautifulSoup
from .models import Website, Article
from .tasks import crawl_website, cleanup_old_articles
from .distributed_crawler import distributed_crawler
logger = logging.getLogger(__name__)
def api_response(data=None, message="", status=200, error=None):
"""统一的API响应格式"""
response = {
"success": status < 400,
"message": message,
"timestamp": datetime.now().isoformat(),
}
if data is not None:
response["data"] = data
if error:
response["error"] = error
# 如果是DRF视图则返回DRF Response
if hasattr(api_response, '_use_drf_response') and api_response._use_drf_response:
return Response(response, status=status)
return JsonResponse(response, status=status)
# 修改健康检查接口为DRF类视图
class HealthView(APIView):
"""健康检查接口"""
permission_classes = [] # 允许无认证访问
authentication_classes = []
def get(self, request):
try:
# 检查数据库连接
website_count = Website.objects.count()
article_count = Article.objects.count()
# 检查Redis连接
from django.core.cache import cache
cache.set('health_check', 'ok', 60)
cache_result = cache.get('health_check')
health_data = {
"status": "healthy",
"database": "ok",
"redis": "ok" if cache_result == 'ok' else 'error',
"website_count": website_count,
"article_count": article_count,
"uptime": "running"
}
# 设置使用DRF响应
api_response._use_drf_response = True
return api_response(data=health_data, message="服务运行正常")
except Exception as e:
logger.error(f"健康检查失败: {e}")
return api_response(
data={"status": "unhealthy", "error": str(e)},
message="服务异常",
status=500,
error=str(e)
)
finally:
api_response._use_drf_response = False
# 修改网站列表接口为DRF类视图
class WebsitesView(APIView):
"""获取网站列表"""
permission_classes = [IsAuthenticated]
authentication_classes = [SessionAuthentication, TokenAuthentication]
def get(self, request):
try:
# 分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 20))
search = request.GET.get('search', '')
enabled = request.GET.get('enabled', '')
# 构建查询
queryset = Website.objects.all()
if search:
queryset = queryset.filter(
Q(name__icontains=search) |
Q(base_url__icontains=search)
)
if enabled in ['true', 'false']:
queryset = queryset.filter(enabled=enabled == 'true')
# 排序 - 使用id字段替代不存在的created_at字段
queryset = queryset.order_by('-id')
# 分页
paginator = Paginator(queryset, page_size)
websites_page = paginator.get_page(page)
# 统计数据
stats = {
'total_websites': Website.objects.count(),
'enabled_websites': Website.objects.filter(enabled=True).count(),
'disabled_websites': Website.objects.filter(enabled=False).count(),
}
# 序列化数据
websites_data = []
for website in websites_page:
website_data = {
'id': website.id,
'name': website.name,
'base_url': website.base_url,
'enabled': website.enabled,
# 移除不存在的created_at和updated_at字段
'article_count': website.article_set.count(),
'last_crawl': website.last_crawl.isoformat() if getattr(website, 'last_crawl', None) else None,
}
websites_data.append(website_data)
response_data = {
'websites': websites_data,
'pagination': {
'page': page,
'page_size': page_size,
'total_pages': paginator.num_pages,
'total_count': paginator.count,
'has_next': websites_page.has_next(),
'has_previous': websites_page.has_previous(),
},
'stats': stats
}
# 设置使用DRF响应
api_response._use_drf_response = True
return api_response(data=response_data, message="获取网站列表成功")
except Exception as e:
logger.error(f"获取网站列表失败: {e}")
return api_response(message="获取网站列表失败", status=500, error=str(e))
finally:
api_response._use_drf_response = False
@csrf_exempt
@require_http_methods(["GET"])
def api_website_detail(request, website_id):
"""获取网站详情"""
try:
website = Website.objects.get(id=website_id)
# 获取最近的文章
recent_articles = website.article_set.order_by('-created_at')[:10]
website_data = {
'id': website.id,
'name': website.name,
'base_url': website.base_url,
'enabled': website.enabled,
'created_at': website.created_at.isoformat(),
'updated_at': website.updated_at.isoformat(),
'last_crawl': website.last_crawl.isoformat() if website.last_crawl else None,
'article_count': website.article_set.count(),
'recent_articles': [
{
'id': article.id,
'title': article.title,
'url': article.url,
'created_at': article.created_at.isoformat(),
}
for article in recent_articles
]
}
return api_response(data=website_data, message="获取网站详情成功")
except Website.DoesNotExist:
return api_response(message="网站不存在", status=404, error="Website not found")
except Exception as e:
logger.error(f"获取网站详情失败: {e}")
return api_response(message="获取网站详情失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["POST"])
def api_crawl_website(request, website_id):
"""爬取指定网站"""
try:
website = Website.objects.get(id=website_id)
# 启动爬虫任务
task = crawl_website.delay(website_id)
response_data = {
'task_id': task.id,
'website_id': website_id,
'website_name': website.name,
'status': 'started'
}
return api_response(data=response_data, message="爬虫任务已启动")
except Website.DoesNotExist:
return api_response(message="网站不存在", status=404, error="Website not found")
except Exception as e:
logger.error(f"启动爬虫任务失败: {e}")
return api_response(message="启动爬虫任务失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET"])
def api_articles(request):
"""获取文章列表"""
try:
# 分页参数
page = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 20))
search = request.GET.get('search', '')
website_id = request.GET.get('website_id', '')
date_from = request.GET.get('date_from', '')
date_to = request.GET.get('date_to', '')
# 构建查询
queryset = Article.objects.select_related('website').all()
if search:
queryset = queryset.filter(
Q(title__icontains=search) |
Q(content__icontains=search)
)
if website_id:
queryset = queryset.filter(website_id=website_id)
if date_from:
try:
date_from_obj = datetime.fromisoformat(date_from.replace('Z', '+00:00'))
queryset = queryset.filter(created_at__gte=date_from_obj)
except ValueError:
pass
if date_to:
try:
date_to_obj = datetime.fromisoformat(date_to.replace('Z', '+00:00'))
queryset = queryset.filter(created_at__lte=date_to_obj)
except ValueError:
pass
# 排序
queryset = queryset.order_by('-created_at')
# 分页
paginator = Paginator(queryset, page_size)
articles_page = paginator.get_page(page)
# 统计数据
stats = {
'total_articles': Article.objects.count(),
'today_articles': Article.objects.filter(
created_at__date=timezone.now().date()
).count(),
'week_articles': Article.objects.filter(
created_at__gte=timezone.now() - timedelta(days=7)
).count(),
}
# 序列化数据
articles_data = []
for article in articles_page:
article_data = {
'id': article.id,
'title': article.title,
'url': article.url,
'content': article.content[:200] + '...' if len(article.content) > 200 else article.content,
'created_at': article.created_at.isoformat(),
'website': {
'id': article.website.id,
'name': article.website.name,
},
'media_files': article.media_files,
}
articles_data.append(article_data)
response_data = {
'articles': articles_data,
'pagination': {
'page': page,
'page_size': page_size,
'total_pages': paginator.num_pages,
'total_count': paginator.count,
'has_next': articles_page.has_next(),
'has_previous': articles_page.has_previous(),
},
'stats': stats
}
return api_response(data=response_data, message="获取文章列表成功")
except Exception as e:
logger.error(f"获取文章列表失败: {e}")
return api_response(message="获取文章列表失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET"])
def api_article_detail(request, article_id):
"""获取文章详情"""
try:
article = Article.objects.select_related('website').get(id=article_id)
article_data = {
'id': article.id,
'title': article.title,
'url': article.url,
'content': article.content,
'created_at': article.created_at.isoformat(),
'website': {
'id': article.website.id,
'name': article.website.name,
'base_url': article.website.base_url,
},
'media_files': article.media_files,
}
return api_response(data=article_data, message="获取文章详情成功")
except Article.DoesNotExist:
return api_response(message="文章不存在", status=404, error="Article not found")
except Exception as e:
logger.error(f"获取文章详情失败: {e}")
return api_response(message="获取文章详情失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET"])
def api_crawler_status(request):
"""获取爬虫状态"""
try:
# 获取分布式爬虫状态
nodes = distributed_crawler.get_available_nodes()
node_statuses = []
for node_id in nodes:
status = distributed_crawler.get_node_status(node_id)
node_statuses.append(status)
# 获取最近的批次
batches = distributed_crawler.get_all_batches()[:10]
# 获取任务统计
task_stats = {
'active_tasks': len([n for n in node_statuses if n['active_tasks'] > 0]),
'total_nodes': len(nodes),
'total_batches': len(batches),
}
response_data = {
'nodes': node_statuses,
'batches': batches,
'stats': task_stats,
}
return api_response(data=response_data, message="获取爬虫状态成功")
except Exception as e:
logger.error(f"获取爬虫状态失败: {e}")
return api_response(message="获取爬虫状态失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["POST"])
def api_start_distributed_crawl(request):
"""启动分布式爬取"""
try:
data = json.loads(request.body)
website_ids = data.get('website_ids', [])
if not website_ids:
return api_response(message="请选择要爬取的网站", status=400, error="No websites selected")
# 启动分布式爬取
batch_id = distributed_crawler.distribute_crawl_tasks(website_ids)
if batch_id in ['no_websites', 'no_available_nodes']:
return api_response(message="无法启动分布式爬取", status=400, error=batch_id)
response_data = {
'batch_id': batch_id,
'website_ids': website_ids,
'status': 'started'
}
return api_response(data=response_data, message="分布式爬取已启动")
except json.JSONDecodeError:
return api_response(message="请求数据格式错误", status=400, error="Invalid JSON")
except Exception as e:
logger.error(f"启动分布式爬取失败: {e}")
return api_response(message="启动分布式爬取失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET"])
def api_batch_status(request, batch_id):
"""获取批次状态"""
try:
batch_status = distributed_crawler.get_batch_status(batch_id)
if batch_status.get('status') == 'not_found':
return api_response(message="批次不存在", status=404, error="Batch not found")
return api_response(data=batch_status, message="获取批次状态成功")
except Exception as e:
logger.error(f"获取批次状态失败: {e}")
return api_response(message="获取批次状态失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET", "POST"])
def api_cleanup_articles(request):
"""清理旧文章"""
# 如果是GET请求返回清理功能的描述信息
if request.method == "GET":
response_data = {
'description': '文章清理API',
'method': 'POST',
'parameters': {
'days': '保留天数默认30天'
},
'example': {
'days': 30
}
}
return api_response(data=response_data, message="API使用说明")
try:
data = json.loads(request.body)
days = data.get('days', 30)
# 启动清理任务
task = cleanup_old_articles.delay(days)
response_data = {
'task_id': task.id,
'days': days,
'status': 'started'
}
return api_response(data=response_data, message="清理任务已启动")
except json.JSONDecodeError:
return api_response(message="请求数据格式错误", status=400, error="Invalid JSON")
except Exception as e:
logger.error(f"启动清理任务失败: {e}")
return api_response(message="启动清理任务失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["GET"])
def api_stats(request):
"""获取统计信息"""
try:
# 基础统计
total_websites = Website.objects.count()
total_articles = Article.objects.count()
enabled_websites = Website.objects.filter(enabled=True).count()
# 时间统计
today = timezone.now().date()
week_ago = timezone.now() - timedelta(days=7)
month_ago = timezone.now() - timedelta(days=30)
today_articles = Article.objects.filter(created_at__date=today).count()
week_articles = Article.objects.filter(created_at__gte=week_ago).count()
month_articles = Article.objects.filter(created_at__gte=month_ago).count()
# 网站统计
website_stats = []
for website in Website.objects.all():
website_stats.append({
'id': website.id,
'name': website.name,
'article_count': website.article_set.count(),
# 使用getattr安全访问last_crawl属性如果不存在则返回None
'last_crawl': website.last_crawl.isoformat() if getattr(website, 'last_crawl', None) else None,
})
# 分布式爬虫统计
nodes = distributed_crawler.get_available_nodes()
batches = distributed_crawler.get_all_batches()
response_data = {
'overview': {
'total_websites': total_websites,
'enabled_websites': enabled_websites,
'total_articles': total_articles,
'today_articles': today_articles,
'week_articles': week_articles,
'month_articles': month_articles,
},
'websites': website_stats,
'crawler': {
'active_nodes': len(nodes),
'total_batches': len(batches),
'recent_batches': batches[:5],
}
}
return api_response(data=response_data, message="获取统计信息成功")
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return api_response(message="获取统计信息失败", status=500, error=str(e))
@csrf_exempt
@require_http_methods(["POST"])
def export_articles(request):
"""导出文章"""
try:
data = json.loads(request.body)
article_ids = data.get('article_ids', [])
export_format = data.get('format', 'docx') # 默认改为docx格式
if not article_ids:
return api_response(message="请选择要导出的文章", status=400, error="No articles selected")
# 获取文章数据
articles = Article.objects.filter(id__in=article_ids).select_related('website')
if not articles.exists():
return api_response(message="未找到指定的文章", status=404, error="Articles not found")
import os # 添加导入
from django.conf import settings # 添加导入
if export_format == 'json':
# 导出为JSON格式
articles_data = []
for article in articles:
articles_data.append({
'id': article.id,
'title': article.title,
'url': article.url,
'content': article.content,
'created_at': article.created_at.isoformat(),
'website': {
'id': article.website.id,
'name': article.website.name,
},
'media_files': article.media_files,
})
response = HttpResponse(
json.dumps(articles_data, ensure_ascii=False, indent=2),
content_type='application/json'
)
response['Content-Disposition'] = 'attachment; filename="articles.json"'
return response
elif export_format == 'csv':
# 导出为CSV格式
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['ID', '标题', '网址', '内容', '创建时间', '网站'])
for article in articles:
writer.writerow([
article.id,
article.title,
article.url,
article.content[:1000] + '...' if len(article.content) > 1000 else article.content,
article.created_at.isoformat(),
article.website.name
])
response = HttpResponse(output.getvalue(), content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="articles.csv"'
return response
elif export_format == 'docx':
# 导出为Word格式每个文章一个文件夹
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for article in articles:
# 创建文章文件夹名称
safe_title = "".join(c for c in article.title if c.isalnum() or c in (' ','_','-')).rstrip()
folder_name = f"article_{article.id}_{safe_title}"[:50]
# 创建Word文档
doc = Document()
doc.add_heading(article.title, 0)
# 添加文章信息
doc.add_paragraph(f"网站: {article.website.name}")
doc.add_paragraph(f"网址: {article.url}")
doc.add_paragraph(f"发布时间: {article.pub_date.isoformat() if article.pub_date else 'N/A'}")
doc.add_paragraph(f"创建时间: {article.created_at.isoformat()}")
# 添加内容标题
doc.add_heading('内容:', level=1)
# 处理HTML内容
content_text = BeautifulSoup(article.content, 'html.parser').get_text()
doc.add_paragraph(content_text)
# 将文档保存到内存中
doc_buffer = io.BytesIO()
doc.save(doc_buffer)
doc_buffer.seek(0)
# 添加到ZIP文件
zip_file.writestr(f"{folder_name}/article.docx", doc_buffer.getvalue())
# 添加媒体文件(如果存在)
if article.media_files:
for media in article.media_files:
try:
# 如果是本地文件路径
if not media.startswith('http'):
media_path = os.path.join(settings.MEDIA_ROOT, media.lstrip('/'))
if os.path.exists(media_path):
zip_file.write(media_path, f"{folder_name}/media/{os.path.basename(media_path)}")
# 如果是URL格式的媒体文件
else:
import requests
from io import BytesIO
response = requests.get(media, timeout=10)
if response.status_code == 200:
image_stream = BytesIO(response.content)
media_filename = f"{folder_name}/media/{os.path.basename(media)}"
zip_file.writestr(media_filename, image_stream.getvalue())
except Exception:
# 忽略无法添加的媒体文件
pass
response = HttpResponse(zip_buffer.getvalue(), content_type='application/zip')
response['Content-Disposition'] = 'attachment; filename="articles.zip"'
return response
elif export_format == 'zip':
# 导出为ZIP包每个文章一个文件夹
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for article in articles:
# 创建文章文件夹名称
safe_title = "".join(c for c in article.title if c.isalnum() or c in (' ','_','-')).rstrip()
folder_name = f"article_{article.id}_{safe_title}"[:50]
# 创建Word文档
doc = Document()
doc.add_heading(article.title, 0)
# 添加文章信息
doc.add_paragraph(f"网站: {article.website.name}")
doc.add_paragraph(f"网址: {article.url}")
doc.add_paragraph(f"发布时间: {article.pub_date.isoformat() if article.pub_date else 'N/A'}")
doc.add_paragraph(f"创建时间: {article.created_at.isoformat()}")
# 添加内容标题
doc.add_heading('内容:', level=1)
# 处理HTML内容
content_text = BeautifulSoup(article.content, 'html.parser').get_text()
doc.add_paragraph(content_text)
# 将文档保存到内存中
doc_buffer = io.BytesIO()
doc.save(doc_buffer)
doc_buffer.seek(0)
# 添加到ZIP文件
zip_file.writestr(f"{folder_name}/article.docx", doc_buffer.getvalue())
# 添加媒体文件(如果存在)
if article.media_files:
for media in article.media_files:
try:
# 如果是本地文件路径
if not media.startswith('http'):
media_path = os.path.join(settings.MEDIA_ROOT, media.lstrip('/'))
if os.path.exists(media_path):
zip_file.write(media_path, f"{folder_name}/media/{os.path.basename(media_path)}")
# 如果是URL格式的媒体文件
else:
import requests
from io import BytesIO
response = requests.get(media, timeout=10)
if response.status_code == 200:
image_stream = BytesIO(response.content)
media_filename = f"{folder_name}/media/{os.path.basename(media)}"
zip_file.writestr(media_filename, image_stream.getvalue())
except Exception:
# 忽略无法添加的媒体文件
pass
response = HttpResponse(zip_buffer.getvalue(), content_type='application/zip')
response['Content-Disposition'] = 'attachment; filename="articles.zip"'
return response
else:
return api_response(message="不支持的导出格式", status=400, error="Unsupported format")
except json.JSONDecodeError:
return api_response(message="请求数据格式错误", status=400, error="Invalid JSON")
except Exception as e:
logger.error(f"导出文章失败: {e}")
return api_response(message="导出文章失败", status=500, error=str(e))