517 lines
22 KiB
Python
517 lines
22 KiB
Python
from django.contrib import admin
|
||
from django.contrib.admin import AdminSite
|
||
from .models import Website, Article
|
||
# 添加actions相关的导入
|
||
from django.contrib import messages
|
||
from django.http import HttpResponseRedirect
|
||
# 添加导出功能所需导入
|
||
import csv
|
||
from django.http import HttpResponse
|
||
import json
|
||
# 添加视图函数需要的导入
|
||
from django.shortcuts import render, redirect
|
||
from django.urls import path
|
||
from django.contrib import admin
|
||
from django.http import JsonResponse
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.core.management import call_command
|
||
import threading
|
||
import uuid
|
||
from django.utils import timezone
|
||
|
||
|
||
# 创建自定义管理站点
|
||
|
||
# 实例化管理站点
|
||
|
||
# 添加运行爬虫的视图函数
|
||
def run_crawler_view(request):
|
||
"""
|
||
管理后台运行爬虫的视图
|
||
"""
|
||
if request.method == 'POST':
|
||
website_name = request.POST.get('website_name')
|
||
if not website_name:
|
||
messages.error(request, '请选择要爬取的网站')
|
||
return redirect('admin:core_article_changelist')
|
||
|
||
try:
|
||
# 根据网站名称确定要执行的爬虫命令
|
||
if website_name == 'crawl_xinhua':
|
||
crawler_name = 'crawl_xinhua'
|
||
elif website_name == 'crawl_dongfangyancao':
|
||
crawler_name = 'crawl_dongfangyancao'
|
||
elif website_name == 'crawl_articles':
|
||
crawler_name = 'crawl_articles'
|
||
else:
|
||
# 对于其他网站,使用通用爬虫命令
|
||
crawler_name = 'crawl_articles'
|
||
|
||
# 运行爬虫命令,不传递website_name作为参数
|
||
call_command(crawler_name)
|
||
|
||
messages.success(request, f'成功执行爬虫: {crawler_name}')
|
||
except Exception as e:
|
||
messages.error(request, f'执行爬虫失败: {str(e)}')
|
||
|
||
return redirect('admin:core_article_changelist')
|
||
|
||
|
||
@admin.register(Website)
|
||
class WebsiteAdmin(admin.ModelAdmin):
|
||
list_display = ('name', 'base_url', 'enabled')
|
||
|
||
|
||
# 为ArticleAdmin添加自定义动作
|
||
@admin.register(Article)
|
||
class ArticleAdmin(admin.ModelAdmin):
|
||
list_display = ('title', 'website', 'pub_date')
|
||
search_fields = ('title', 'content')
|
||
# 添加动作选项
|
||
actions = ['delete_selected_articles', 'export_as_csv', 'export_as_json',
|
||
'export_as_word', 'export_with_media']
|
||
|
||
# 重写get_urls方法,添加自定义URL
|
||
def get_urls(self):
|
||
urls = super().get_urls()
|
||
custom_urls = [
|
||
path('run-crawler/', self.admin_site.admin_view(run_crawler_view), name='run_crawler'),
|
||
]
|
||
return custom_urls + urls
|
||
|
||
def export_as_csv(self, request, queryset):
|
||
"""导出选中的文章为CSV格式"""
|
||
meta = self.model._meta
|
||
field_names = [field.name for field in meta.fields]
|
||
|
||
response = HttpResponse(content_type='text/csv')
|
||
response['Content-Disposition'] = 'attachment; filename={}.csv'.format(meta)
|
||
writer = csv.writer(response)
|
||
|
||
writer.writerow(field_names)
|
||
for obj in queryset:
|
||
row = [getattr(obj, field)() if callable(getattr(obj, field)) else getattr(obj, field) for field in
|
||
field_names]
|
||
writer.writerow(row)
|
||
|
||
return response
|
||
|
||
export_as_csv.short_description = "导出选中文章为CSV格式"
|
||
|
||
def export_as_json(self, request, queryset):
|
||
"""导出选中的文章为JSON格式"""
|
||
response = HttpResponse(content_type='application/json')
|
||
response['Content-Disposition'] = 'attachment; filename=articles.json'
|
||
|
||
# 构造要导出的数据
|
||
articles_data = []
|
||
for article in queryset:
|
||
articles_data.append({
|
||
'id': article.id,
|
||
'title': article.title,
|
||
'website': article.website.name,
|
||
'url': article.url,
|
||
'pub_date': article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else None,
|
||
'content': article.content,
|
||
'created_at': article.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'media_files': article.media_files
|
||
})
|
||
|
||
# 写入JSON数据
|
||
response.write(json.dumps(articles_data, ensure_ascii=False, indent=2))
|
||
return response
|
||
|
||
export_as_json.short_description = "导出选中文章为JSON格式"
|
||
|
||
def export_as_word(self, request, queryset):
|
||
"""导出选中的文章为Word格式"""
|
||
try:
|
||
from docx import Document
|
||
from io import BytesIO
|
||
from docx.shared import Inches
|
||
except ImportError:
|
||
self.message_user(request, "缺少python-docx库,请安装: pip install python-docx", messages.ERROR)
|
||
return
|
||
|
||
# 创建Word文档
|
||
doc = Document()
|
||
doc.add_heading('文章导出', 0)
|
||
|
||
for article in queryset:
|
||
# 添加文章标题
|
||
doc.add_heading(article.title, level=1)
|
||
|
||
# 添加文章元数据
|
||
doc.add_paragraph(f"网站: {article.website.name}")
|
||
doc.add_paragraph(f"URL: {article.url}")
|
||
doc.add_paragraph(
|
||
f"发布时间: {article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else 'N/A'}")
|
||
doc.add_paragraph(f"创建时间: {article.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# 添加文章内容
|
||
doc.add_heading('内容', level=2)
|
||
# 简单处理HTML内容,移除标签并处理图片
|
||
from bs4 import BeautifulSoup
|
||
soup = BeautifulSoup(article.content, 'html.parser')
|
||
|
||
# 处理内容中的图片
|
||
for img in soup.find_all('img'):
|
||
src = img.get('src', '')
|
||
if src:
|
||
# 尝试添加图片到文档
|
||
try:
|
||
import os
|
||
from django.conf import settings
|
||
import requests
|
||
from io import BytesIO
|
||
|
||
# 构建完整的图片路径
|
||
if src.startswith('http'):
|
||
# 网络图片
|
||
response = requests.get(src, timeout=10)
|
||
image_stream = BytesIO(response.content)
|
||
doc.add_picture(image_stream, width=Inches(4.0))
|
||
else:
|
||
# 本地图片
|
||
full_path = os.path.join(settings.MEDIA_ROOT, src.lstrip('/'))
|
||
if os.path.exists(full_path):
|
||
doc.add_picture(full_path, width=Inches(4.0))
|
||
except Exception as e:
|
||
# 如果添加图片失败,添加图片URL作为文本
|
||
doc.add_paragraph(f"[图片: {src}]")
|
||
|
||
# 移除原始img标签
|
||
img.decompose()
|
||
|
||
content_text = soup.get_text()
|
||
doc.add_paragraph(content_text)
|
||
|
||
# 添加媒体文件信息
|
||
if article.media_files:
|
||
doc.add_heading('媒体文件', level=2)
|
||
for media_file in article.media_files:
|
||
try:
|
||
import os
|
||
from django.conf import settings
|
||
from io import BytesIO
|
||
import requests
|
||
|
||
full_path = os.path.join(settings.MEDIA_ROOT, media_file)
|
||
if os.path.exists(full_path):
|
||
# 添加图片到文档
|
||
doc.add_picture(full_path, width=Inches(4.0))
|
||
else:
|
||
# 如果是URL格式的媒体文件
|
||
if media_file.startswith('http'):
|
||
response = requests.get(media_file, timeout=10)
|
||
image_stream = BytesIO(response.content)
|
||
doc.add_picture(image_stream, width=Inches(4.0))
|
||
else:
|
||
doc.add_paragraph(media_file)
|
||
except Exception as e:
|
||
doc.add_paragraph(media_file)
|
||
|
||
# 添加分页符
|
||
doc.add_page_break()
|
||
|
||
# 保存到内存
|
||
buffer = BytesIO()
|
||
doc.save(buffer)
|
||
buffer.seek(0)
|
||
|
||
# 创建HttpResponse
|
||
from django.http import HttpResponse
|
||
response = HttpResponse(buffer.getvalue(),
|
||
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document')
|
||
response['Content-Disposition'] = 'attachment; filename=articles.docx'
|
||
return response
|
||
|
||
export_as_word.short_description = "导出选中文章为Word格式"
|
||
|
||
def export_with_media(self, request, queryset):
|
||
"""导出选中的文章及媒体文件为ZIP包"""
|
||
try:
|
||
from docx import Document
|
||
from io import BytesIO
|
||
from docx.shared import Inches
|
||
import zipfile
|
||
except ImportError:
|
||
self.message_user(request, "缺少必要库,请安装: pip install python-docx", messages.ERROR)
|
||
return
|
||
|
||
# 创建内存中的ZIP文件
|
||
zip_buffer = BytesIO()
|
||
|
||
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
|
||
for article in queryset:
|
||
# 为每篇文章创建单独的文件夹
|
||
article_folder = f"article_{article.id}_{article.title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')}"
|
||
|
||
# 创建Word文档
|
||
doc = Document()
|
||
doc.add_heading(article.title, 0)
|
||
|
||
# 添加文章元数据
|
||
doc.add_paragraph(f"网站: {article.website.name}")
|
||
doc.add_paragraph(f"URL: {article.url}")
|
||
doc.add_paragraph(
|
||
f"发布时间: {article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else 'N/A'}")
|
||
doc.add_paragraph(f"创建时间: {article.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# 添加文章内容
|
||
doc.add_heading('内容', level=2)
|
||
# 简单处理HTML内容,移除标签并处理图片
|
||
from bs4 import BeautifulSoup
|
||
soup = BeautifulSoup(article.content, 'html.parser')
|
||
|
||
# 处理内容中的图片
|
||
for img in soup.find_all('img'):
|
||
src = img.get('src', '')
|
||
if src:
|
||
# 尝试添加图片到文档
|
||
try:
|
||
import os
|
||
from django.conf import settings
|
||
import requests
|
||
|
||
# 构建完整的图片路径
|
||
if src.startswith('http'):
|
||
# 网络图片
|
||
response = requests.get(src, timeout=10)
|
||
image_stream = BytesIO(response.content)
|
||
doc.add_picture(image_stream, width=Inches(4.0))
|
||
# 将网络文件保存到ZIP
|
||
zip_file.writestr(os.path.join(article_folder, 'media', os.path.basename(src)), response.content)
|
||
else:
|
||
# 本地图片
|
||
full_path = os.path.join(settings.MEDIA_ROOT, src.lstrip('/'))
|
||
if os.path.exists(full_path):
|
||
doc.add_picture(full_path, width=Inches(4.0))
|
||
# 添加文件到ZIP包
|
||
zip_file.write(full_path, os.path.join(article_folder, 'media', src.lstrip('/')))
|
||
except Exception as e:
|
||
# 如果添加图片失败,添加图片URL作为文本
|
||
doc.add_paragraph(f"[图片: {src}]")
|
||
|
||
# 移除原始img标签
|
||
img.decompose()
|
||
|
||
content_text = soup.get_text()
|
||
doc.add_paragraph(content_text)
|
||
|
||
# 添加媒体文件信息并打包媒体文件
|
||
if article.media_files:
|
||
doc.add_heading('媒体文件', level=2)
|
||
for media_file in article.media_files:
|
||
try:
|
||
import os
|
||
from django.conf import settings
|
||
|
||
full_path = os.path.join(settings.MEDIA_ROOT, media_file)
|
||
# 检查文件扩展名以确定处理方式
|
||
file_extension = os.path.splitext(media_file)[1].lower()
|
||
|
||
# 图片文件处理
|
||
if file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']:
|
||
if os.path.exists(full_path):
|
||
# 添加图片到文档
|
||
doc.add_picture(full_path, width=Inches(4.0))
|
||
# 添加文件到ZIP包
|
||
zip_file.write(full_path, os.path.join(article_folder, 'media', media_file))
|
||
else:
|
||
# 如果是URL格式的媒体文件
|
||
if media_file.startswith('http'):
|
||
response = requests.get(media_file, timeout=10)
|
||
image_stream = BytesIO(response.content)
|
||
doc.add_picture(image_stream, width=Inches(4.0))
|
||
# 将网络文件保存到ZIP
|
||
zip_file.writestr(os.path.join(article_folder, 'media', os.path.basename(media_file)), response.content)
|
||
else:
|
||
doc.add_paragraph(media_file)
|
||
# 视频文件处理
|
||
elif file_extension in ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm']:
|
||
# 视频文件只添加到ZIP包中,不在Word文档中显示
|
||
if os.path.exists(full_path):
|
||
# 添加文件到ZIP包
|
||
zip_file.write(full_path, os.path.join(article_folder, 'media', media_file))
|
||
# 在Word文档中添加视频文件信息
|
||
doc.add_paragraph(f"[视频文件: {media_file}]")
|
||
else:
|
||
# 如果是URL格式的媒体文件
|
||
if media_file.startswith('http'):
|
||
# 将网络文件保存到ZIP
|
||
response = requests.get(media_file, timeout=10)
|
||
zip_file.writestr(os.path.join(article_folder, 'media', os.path.basename(media_file)), response.content)
|
||
doc.add_paragraph(f"[视频文件: {media_file}]")
|
||
else:
|
||
doc.add_paragraph(media_file)
|
||
# 其他文件类型
|
||
else:
|
||
if os.path.exists(full_path):
|
||
# 添加文件到ZIP包
|
||
zip_file.write(full_path, os.path.join(article_folder, 'media', media_file))
|
||
doc.add_paragraph(f"[文件: {media_file}]")
|
||
else:
|
||
# 如果是URL格式的媒体文件
|
||
if media_file.startswith('http'):
|
||
response = requests.get(media_file, timeout=10)
|
||
zip_file.writestr(os.path.join(article_folder, 'media', os.path.basename(media_file)), response.content)
|
||
doc.add_paragraph(f"[文件: {media_file}]")
|
||
else:
|
||
doc.add_paragraph(media_file)
|
||
except Exception as e:
|
||
doc.add_paragraph(media_file)
|
||
|
||
# 保存每篇文章的Word文档到ZIP文件中的对应文件夹
|
||
doc_buffer = BytesIO()
|
||
doc.save(doc_buffer)
|
||
doc_buffer.seek(0)
|
||
zip_file.writestr(os.path.join(article_folder, f'{article.title.replace("/", "_")}.docx'), doc_buffer.read())
|
||
|
||
# 创建HttpResponse
|
||
zip_buffer.seek(0)
|
||
from django.http import HttpResponse
|
||
response = HttpResponse(zip_buffer.getvalue(), content_type='application/zip')
|
||
response['Content-Disposition'] = 'attachment; filename=articles_export.zip'
|
||
return response
|
||
|
||
export_with_media.short_description = "导出选中文章及媒体文件(ZIP包)"
|
||
|
||
|
||
# 为不同网站创建专门的文章管理类
|
||
class NewsCnArticleAdmin(admin.ModelAdmin):
|
||
list_display = ('title', 'pub_date')
|
||
search_fields = ('title', 'content')
|
||
list_filter = ('pub_date',)
|
||
actions = ['export_as_csv', 'export_as_json']
|
||
|
||
def get_queryset(self, request):
|
||
qs = super().get_queryset(request)
|
||
# 只显示新华网的文章
|
||
return qs.filter(website__name='www.news.cn')
|
||
|
||
def export_as_csv(self, request, queryset):
|
||
"""导出选中的文章为CSV格式"""
|
||
meta = self.model._meta
|
||
field_names = [field.name for field in meta.fields if field.name != 'content'] # 排除content字段以减小CSV大小
|
||
|
||
response = HttpResponse(content_type='text/csv')
|
||
response['Content-Disposition'] = 'attachment; filename=news_cn_articles.csv'
|
||
writer = csv.writer(response)
|
||
|
||
writer.writerow(field_names)
|
||
for obj in queryset:
|
||
row = []
|
||
for field in field_names:
|
||
value = getattr(obj, field)
|
||
if callable(value):
|
||
value = value()
|
||
if field == 'website':
|
||
value = value.name
|
||
row.append(value)
|
||
writer.writerow(row)
|
||
|
||
return response
|
||
|
||
export_as_csv.short_description = "导出选中文章为CSV格式"
|
||
|
||
def export_as_json(self, request, queryset):
|
||
"""导出选中的文章为JSON格式"""
|
||
response = HttpResponse(content_type='application/json')
|
||
response['Content-Disposition'] = 'attachment; filename=news_cn_articles.json'
|
||
|
||
# 构造要导出的数据
|
||
articles_data = []
|
||
for article in queryset:
|
||
articles_data.append({
|
||
'id': article.id,
|
||
'title': article.title,
|
||
'website': article.website.name,
|
||
'url': article.url,
|
||
'pub_date': article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else None,
|
||
'content': article.content,
|
||
'created_at': article.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'media_files': article.media_files
|
||
})
|
||
|
||
# 写入JSON数据
|
||
response.write(json.dumps(articles_data, ensure_ascii=False, indent=2))
|
||
return response
|
||
|
||
export_as_json.short_description = "导出选中文章为JSON格式"
|
||
|
||
|
||
class DongfangyancaoArticleAdmin(admin.ModelAdmin):
|
||
list_display = ('title', 'pub_date')
|
||
search_fields = ('title', 'content')
|
||
list_filter = ('pub_date',)
|
||
# 添加动作选项
|
||
actions = ['delete_selected_articles', 'delete_all_articles', 'export_as_csv', 'export_as_json']
|
||
|
||
def get_queryset(self, request):
|
||
qs = super().get_queryset(request)
|
||
# 只显示东方烟草报的文章
|
||
return qs.filter(website__name='东方烟草报')
|
||
|
||
def delete_all_articles(self, request, queryset):
|
||
"""删除当前筛选的所有文章(东方烟草报的所有文章)"""
|
||
# 删除所有东方烟草报的文章
|
||
deleted_count = self.get_queryset(request).delete()[0]
|
||
self.message_user(request, f"成功删除 {deleted_count} 篇文章", messages.SUCCESS)
|
||
|
||
# 设置动作的显示名称
|
||
delete_all_articles.short_description = "删除所有当前筛选的文章"
|
||
|
||
def export_as_csv(self, request, queryset):
|
||
"""导出选中的文章为CSV格式"""
|
||
meta = self.model._meta
|
||
field_names = [field.name for field in meta.fields if field.name != 'content'] # 排除content字段以减小CSV大小
|
||
|
||
response = HttpResponse(content_type='text/csv')
|
||
response['Content-Disposition'] = 'attachment; filename=dongfangyancao_articles.csv'
|
||
writer = csv.writer(response)
|
||
|
||
writer.writerow(field_names)
|
||
for obj in queryset:
|
||
row = []
|
||
for field in field_names:
|
||
value = getattr(obj, field)
|
||
if callable(value):
|
||
value = value()
|
||
if field == 'website':
|
||
value = value.name
|
||
row.append(value)
|
||
writer.writerow(row)
|
||
|
||
return response
|
||
|
||
export_as_csv.short_description = "导出选中文章为CSV格式"
|
||
|
||
def export_as_json(self, request, queryset):
|
||
"""导出选中的文章为JSON格式"""
|
||
response = HttpResponse(content_type='application/json')
|
||
response['Content-Disposition'] = 'attachment; filename=dongfangyancao_articles.json'
|
||
|
||
# 构造要导出的数据
|
||
articles_data = []
|
||
for article in queryset:
|
||
articles_data.append({
|
||
'id': article.id,
|
||
'title': article.title,
|
||
'website': article.website.name,
|
||
'url': article.url,
|
||
'pub_date': article.pub_date.strftime('%Y-%m-%d %H:%M:%S') if article.pub_date else None,
|
||
'content': article.content,
|
||
'created_at': article.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'media_files': article.media_files
|
||
})
|
||
|
||
# 写入JSON数据
|
||
response.write(json.dumps(articles_data, ensure_ascii=False, indent=2))
|
||
return response
|
||
|
||
export_as_json.short_description = "导出选中文章为JSON格式"
|
||
|
||
|
||
# 在各自的管理站点中注册模型
|