""" Django Admin扩展 提供增强的管理界面功能 """ import logging from datetime import datetime, timedelta from django.contrib import admin from django.contrib.admin import SimpleListFilter from django.contrib.admin.utils import model_format_dict from django.contrib import messages from django.http import HttpResponseRedirect from django.urls import path, reverse from django.utils.html import format_html from django.utils import timezone from django.db.models import Count, Q from django.core.cache import cache from .models import Website, Article from .tasks import crawl_website, crawl_all_websites, cleanup_old_articles from .distributed_crawler import distributed_crawler logger = logging.getLogger(__name__) class WebsiteStatusFilter(SimpleListFilter): """网站状态过滤器""" title = '网站状态' parameter_name = 'status' def lookups(self, request, model_admin): return ( ('enabled', '已启用'), ('disabled', '已禁用'), ('no_articles', '无文章'), ('recent_crawl', '最近爬取'), ) def queryset(self, request, queryset): if self.value() == 'enabled': return queryset.filter(enabled=True) elif self.value() == 'disabled': return queryset.filter(enabled=False) elif self.value() == 'no_articles': return queryset.annotate(article_count=Count('article')).filter(article_count=0) elif self.value() == 'recent_crawl': week_ago = timezone.now() - timedelta(days=7) return queryset.filter(last_crawl__gte=week_ago) return queryset class ArticleDateFilter(SimpleListFilter): """文章日期过滤器""" title = '发布时间' parameter_name = 'date_range' def lookups(self, request, model_admin): return ( ('today', '今天'), ('week', '本周'), ('month', '本月'), ('quarter', '本季度'), ) def queryset(self, request, queryset): now = timezone.now() if self.value() == 'today': return queryset.filter(created_at__date=now.date()) elif self.value() == 'week': week_start = now - timedelta(days=now.weekday()) return queryset.filter(created_at__gte=week_start.replace(hour=0, minute=0, second=0)) elif self.value() == 'month': return queryset.filter(created_at__year=now.year, created_at__month=now.month) elif self.value() == 'quarter': quarter = (now.month - 1) // 3 quarter_start_month = quarter * 3 + 1 return queryset.filter( created_at__year=now.year, created_at__month__gte=quarter_start_month, created_at__month__lt=quarter_start_month + 3 ) return queryset class WebsiteAdmin(admin.ModelAdmin): """网站管理""" list_display = [ 'name', 'base_url', 'enabled', 'article_count', 'last_crawl_display', 'status_indicator', 'actions_column' ] list_filter = [WebsiteStatusFilter, 'enabled'] search_fields = ['name', 'base_url'] readonly_fields = ['article_count'] actions = ['enable_websites', 'disable_websites', 'crawl_selected', 'crawl_all'] fieldsets = ( ('基本信息', { 'fields': ('name', 'base_url', 'enabled') }), ('统计信息', { 'fields': ('article_count',), 'classes': ('collapse',) }), ('时间信息', { 'fields': (), 'classes': ('collapse',) }), ) # 添加get_websites方法以支持模板中的网站选择 def get_websites(self, request): """获取所有启用的网站,用于模板中的选择框""" return Website.objects.filter(enabled=True) def article_count(self, obj): """文章数量""" return obj.article_set.count() article_count.short_description = '文章数量' def last_crawl_display(self, obj): """最后爬取时间显示""" return '未实现' last_crawl_display.short_description = '最后爬取' def status_indicator(self, obj): """状态指示器""" if obj.enabled: return format_html(' 正常') else: return format_html(' 禁用') status_indicator.short_description = '状态' def actions_column(self, obj): """操作列""" return format_html( '爬取 ' '查看文章', reverse('admin:crawl_website', args=[obj.id]), reverse('admin:core_article_changelist') + f'?website__id__exact={obj.id}' ) actions_column.short_description = '操作' def enable_websites(self, request, queryset): """启用选中的网站""" updated = queryset.update(enabled=True) self.message_user(request, f'成功启用 {updated} 个网站') enable_websites.short_description = '启用选中的网站' def disable_websites(self, request, queryset): """禁用选中的网站""" updated = queryset.update(enabled=False) self.message_user(request, f'成功禁用 {updated} 个网站') disable_websites.short_description = '禁用选中的网站' def crawl_selected(self, request, queryset): """爬取选中的网站""" for website in queryset: try: task = crawl_website.delay(website.id) self.message_user( request, f'网站 {website.name} 爬取任务已启动 (任务ID: {task.id})', messages.SUCCESS ) except Exception as e: error_msg = str(e) if "[Errno 61] Connection refused" in error_msg: detailed_msg = "连接被拒绝,可能是Redis或其他依赖服务未启动。请检查以下几点:\n1. Redis服务是否运行 (尝试运行: redis-server)\n2. 如果使用Docker,请确保容器正在运行\n3. 检查Django配置中的CELERY_BROKER_URL设置\n4. 在本地开发环境中,可以运行 'python manage.py runserver' 和 'celery -A myproject worker -l info' 来启动必要的服务" else: detailed_msg = error_msg self.message_user( request, f'网站 {website.name} 爬取任务启动失败: {detailed_msg}', messages.ERROR ) crawl_selected.short_description = '爬取选中的网站' def crawl_all(self, request, queryset): try: task = crawl_all_websites.delay() self.message_user( request, f'批量爬取任务已启动 (任务ID: {task.id})', messages.SUCCESS ) except Exception as e: error_msg = str(e) if "[Errno 61] Connection refused" in error_msg: detailed_msg = "连接被拒绝,可能是Redis或其他依赖服务未启动。请检查以下几点:\n1. Redis服务是否运行 (尝试运行: redis-server)\n2. 如果使用Docker,请确保容器正在运行\n3. 检查Django配置中的CELERY_BROKER_URL设置\n4. 在本地开发环境中,可以运行 'python manage.py runserver' 和 'celery -A myproject worker -l info' 来启动必要的服务" else: detailed_msg = error_msg self.message_user( request, f'批量爬取任务启动失败: {detailed_msg}', messages.ERROR ) # crawl_all.short_description = '爬取所有网站' def get_urls(self): """添加自定义URL""" urls = super().get_urls() custom_urls = [ path( '/crawl/', self.admin_site.admin_view(self.crawl_website_view), name='crawl_website', ), path( 'run-crawler/', self.admin_site.admin_view(self.run_crawler_view), name='run_crawler', ), ] return custom_urls + urls def crawl_website_view(self, request, website_id): """爬取单个网站视图""" try: website = Website.objects.get(id=website_id) task = crawl_website.delay(website_id) self.message_user( request, f'网站 {website.name} 爬取任务已启动 (任务ID: {task.id})', messages.SUCCESS ) except Website.DoesNotExist: self.message_user(request, '网站不存在', messages.ERROR) except Exception as e: error_msg = str(e) if "[Errno 61] Connection refused" in error_msg: detailed_msg = "连接被拒绝,可能是Redis或其他依赖服务未启动。请检查以下几点:\n1. Redis服务是否运行 (尝试运行: redis-server)\n2. 如果使用Docker,请确保容器正在运行\n3. 检查Django配置中的CELERY_BROKER_URL设置\n4. 在本地开发环境中,可以运行 'python manage.py runserver' 和 'celery -A myproject worker -l info' 来启动必要的服务" else: detailed_msg = error_msg self.message_user(request, f'爬取任务启动失败: {detailed_msg}', messages.ERROR) return HttpResponseRedirect(reverse('admin:core_website_changelist')) def run_crawler_view(self, request): """运行爬虫视图""" try: task = crawl_all_websites.delay() self.message_user( request, f'批量爬取任务已启动 (任务ID: {task.id})', messages.SUCCESS ) except Exception as e: error_msg = str(e) if "[Errno 61] Connection refused" in error_msg: detailed_msg = "连接被拒绝,可能是Redis或其他依赖服务未启动。请检查以下几点:\n1. Redis服务是否运行 (尝试运行: redis-server)\n2. 如果使用Docker,请确保容器正在运行\n3. 检查Django配置中的CELERY_BROKER_URL设置\n4. 在本地开发环境中,可以运行 'python manage.py runserver' 和 'celery -A myproject worker -l info' 来启动必要的服务" else: detailed_msg = error_msg self.message_user( request, f'批量爬取任务启动失败: {detailed_msg}', messages.ERROR ) return HttpResponseRedirect(reverse('admin:core_website_changelist')) class ArticleAdmin(admin.ModelAdmin): """文章管理""" list_display = [ 'title', 'website', 'created_at', 'media_count', 'actions_column' ] list_filter = [ ArticleDateFilter, 'website', 'created_at' ] search_fields = ['title', 'content', 'url'] readonly_fields = ['created_at', 'media_files_display'] date_hierarchy = 'created_at' fieldsets = ( ('基本信息', { 'fields': ('title', 'url', 'website') }), ('内容', { 'fields': ('content',) }), ('媒体文件', { 'fields': ('media_files_display',), 'classes': ('collapse',) }), ('时间信息', { 'fields': ('created_at',), 'classes': ('collapse',) }), ) # 添加导出选中文章的操作 actions = ['export_selected_articles'] def export_selected_articles(self, request, queryset): """ 导出选中的文章为ZIP文件 """ import zipfile from django.http import HttpResponse from io import BytesIO from django.conf import settings import os from bs4 import BeautifulSoup from docx import Document # 创建内存中的ZIP文件 zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'w') as zip_file: # 为每篇文章创建文件夹并添加内容 for article in queryset: # 创建文章文件夹名称 article_folder = f"article_{article.id}_{article.title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')}" # 创建Word文档 doc = Document() doc.add_heading(article.title, 0) # 添加文章信息 doc.add_paragraph(f"网站: {article.website.name if article.website else ''}") doc.add_paragraph(f"URL: {article.url}") doc.add_paragraph(f"发布时间: {article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else ''}") doc.add_paragraph(f"创建时间: {article.created_at.strftime('%Y-%m-%d %H:%M:%S') if article.created_at else ''}") # 添加内容标题 doc.add_heading('内容:', level=1) # 处理HTML内容 soup = BeautifulSoup(article.content, 'html.parser') content_text = soup.get_text() doc.add_paragraph(content_text) # 将Word文档保存到内存中 doc_buffer = BytesIO() doc.save(doc_buffer) doc_buffer.seek(0) # 将Word文档添加到ZIP文件 zip_file.writestr(os.path.join(article_folder, f'{article.title.replace("/", "_")}.docx'), doc_buffer.getvalue()) # 添加媒体文件到ZIP包 if article.media_files: for media_file in article.media_files: try: full_path = os.path.join(settings.MEDIA_ROOT, media_file) if os.path.exists(full_path): # 添加文件到ZIP包 zip_file.write(full_path, os.path.join(article_folder, 'media', os.path.basename(media_file))) except Exception as e: # 如果添加媒体文件失败,继续处理其他文件 pass # 创建HttpResponse zip_buffer.seek(0) response = HttpResponse(zip_buffer.getvalue(), content_type='application/zip') response['Content-Disposition'] = 'attachment; filename=selected_articles.zip' return response export_selected_articles.short_description = "导出所选的文章为ZIP" def content_preview(self, obj): """内容预览""" return obj.content[:100] + '...' if len(obj.content) > 100 else obj.content content_preview.short_description = '内容预览' def media_count(self, obj): """媒体文件数量""" if obj.media_files: return len(obj.media_files) return 0 media_count.short_description = '媒体文件' def media_files_display(self, obj): """媒体文件显示""" if not obj.media_files: return '无媒体文件' html = '
' for i, media in enumerate(obj.media_files): if media.get('type') == 'image': html += f'
' elif media.get('type') == 'video': html += f'
' html += '
' return format_html(html) media_files_display.short_description = '媒体文件' def actions_column(self, obj): """操作列""" # 修改: 添加跳转到本地文章详情页的链接 return format_html( '查看原文 ' '本地查看', obj.url, reverse('article_detail', args=[obj.id]) ) actions_column.short_description = '操作' #class CrawlerStatusAdmin(admin.ModelAdmin): # """爬虫状态管理""" # change_list_template = 'admin/crawler_status.html' # # def changelist_view(self, request, extra_context=None): # """爬虫状态视图""" # # 获取分布式爬虫状态 # nodes = distributed_crawler.get_available_nodes() # node_statuses = [] # # for node_id in nodes: # status = distributed_crawler.get_node_status(node_id) # node_statuses.append(status) # # # 获取最近的批次 # batches = distributed_crawler.get_all_batches()[:10] # # # 获取任务统计 # task_stats = { # 'active_tasks': len([n for n in node_statuses if n['active_tasks'] > 0]), # 'total_nodes': len(nodes), # 'total_batches': len(batches), # } # # extra_context = extra_context or {} # extra_context.update({ # 'nodes': node_statuses, # 'batches': batches, # 'task_stats': task_stats, # }) # # return super().changelist_view(request, extra_context) # # 注册管理类 admin.site.register(Website, WebsiteAdmin) admin.site.register(Article, ArticleAdmin) # 隐藏Celery Results管理功能 # 禁用django_celery_results应用的自动注册 try: from django_celery_results.models import TaskResult, GroupResult from django_celery_results.admin import TaskResultAdmin, GroupResultAdmin admin.site.unregister(TaskResult) admin.site.unregister(GroupResult) except: pass # 隐藏Celery Beat周期任务管理功能 # 禁用django_celery_beat应用的自动注册 try: from django_celery_beat.models import PeriodicTask, ClockedSchedule, CrontabSchedule, SolarSchedule, IntervalSchedule admin.site.unregister(PeriodicTask) admin.site.unregister(ClockedSchedule) admin.site.unregister(CrontabSchedule) admin.site.unregister(SolarSchedule) admin.site.unregister(IntervalSchedule) except: pass # 自定义管理站点标题 admin.site.site_header = 'Green Classroom 管理系统' admin.site.site_title = 'Green Classroom' admin.site.index_title = '欢迎使用 Green Classroom 管理系统'