Add Support dongfangyaocao

This commit is contained in:
2025-08-11 22:20:19 +08:00
parent 6d80326a4e
commit d9d2ea9d99
11 changed files with 686 additions and 58 deletions

View File

@@ -1,20 +1,21 @@
# core/management/commands/crawl_full_site.py
# core/management/commands/crawl_dongfangyancao.py
from django.core.management.base import BaseCommand
from core.models import Website
from core.utils import full_site_crawler
class Command(BaseCommand):
help = "全站递归爬取 www.news.cn"
help = "全站递归爬取 东方烟草报"
def handle(self, *args, **kwargs):
website, created = Website.objects.get_or_create(
name="www.news.cn",
name="东方烟草报",
defaults={
'article_list_url': 'https://www.news.cn/',
'article_list_url': 'https://www.eastobacco.com/',
'article_selector': 'a'
}
)
start_url = "https://www.news.cn/"
start_url = "https://www.eastobacco.com/"
self.stdout.write(f"开始全站爬取: {start_url}")
full_site_crawler(start_url, website, max_pages=500)
self.stdout.write("爬取完成")

View File

@@ -1,18 +1,21 @@
# core/management/commands/crawl_xinhua.py
from django.core.management.base import BaseCommand
from core.models import Website
from core.utils import crawl_xinhua_list
from core.utils import full_site_crawler
class Command(BaseCommand):
help = '批量爬取新华网文章'
help = "全站递归爬取 www.news.cn"
def handle(self, *args, **options):
list_url = "https://www.news.cn/legal/index.html"
try:
website = Website.objects.get(base_url="https://www.news.cn/")
except Website.DoesNotExist:
self.stdout.write(self.style.ERROR("网站 https://www.news.cn/ 不存在,请先后台添加"))
return
self.stdout.write(f"开始爬取文章列表页: {list_url}")
crawl_xinhua_list(list_url, website)
self.stdout.write(self.style.SUCCESS("批量爬取完成"))
def handle(self, *args, **kwargs):
website, created = Website.objects.get_or_create(
name="www.news.cn",
defaults={
'article_list_url': 'https://www.news.cn/',
'article_selector': 'a'
}
)
start_url = "https://www.news.cn/"
self.stdout.write(f"开始全站爬取: {start_url}")
full_site_crawler(start_url, website, max_pages=500)
self.stdout.write("爬取完成")

View File

@@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand
from core.models import Website
from core.utils import crawl_xinhua_list
class Command(BaseCommand):
help = '批量爬取新华网文章'
def handle(self, *args, **options):
# 添加使用标记,确认该命令是否被调用
self.stdout.write(self.style.WARNING("crawl_xinhua command is being used"))
list_url = "https://www.news.cn/legal/index.html"
try:
website = Website.objects.get(base_url="https://www.news.cn/")
except Website.DoesNotExist:
self.stdout.write(self.style.ERROR("网站 https://www.news.cn/ 不存在,请先后台添加"))
return
self.stdout.write(f"开始爬取文章列表页: {list_url}")
crawl_xinhua_list(list_url, website)
self.stdout.write(self.style.SUCCESS("批量爬取完成"))

View File

@@ -0,0 +1,130 @@
from django.core.management.base import BaseCommand
from core.models import Article, Website
import json
import csv
import os
from django.conf import settings
from django.core.files.storage import default_storage
import zipfile
from django.utils import timezone
class Command(BaseCommand):
help = '导出文章及相关的媒体文件(图片、视频等)'
def add_arguments(self, parser):
parser.add_argument('--format', type=str, default='json', help='导出格式: json 或 csv')
parser.add_argument('--website', type=str, help='指定网站名称导出特定网站的文章')
parser.add_argument('--output', type=str, default='', help='输出文件路径')
parser.add_argument('--include-media', action='store_true', help='包含媒体文件')
def handle(self, *args, **options):
format_type = options['format'].lower()
website_name = options['website']
output_path = options['output']
include_media = options['include_media']
# 获取文章查询集
articles = Article.objects.all()
if website_name:
try:
website = Website.objects.get(name=website_name)
articles = articles.filter(website=website)
except Website.DoesNotExist:
self.stdout.write(self.style.ERROR(f'网站 "{website_name}" 不存在'))
return
if not articles.exists():
self.stdout.write(self.style.WARNING('没有找到文章'))
return
# 准备导出数据
articles_data = []
media_files = []
for article in articles:
article_data = {
'id': article.id,
'title': article.title,
'website': article.website.name,
'url': article.url,
'pub_date': article.pub_date.isoformat() if article.pub_date else None,
'content': article.content,
'created_at': article.created_at.isoformat(),
'media_files': article.media_files
}
articles_data.append(article_data)
# 收集媒体文件路径
if include_media:
for media_path in article.media_files:
full_path = os.path.join(settings.MEDIA_ROOT, media_path)
if os.path.exists(full_path):
media_files.append(full_path)
# 确定输出路径
if not output_path:
timestamp = timezone.now().strftime('%Y%m%d_%H%M%S')
if include_media:
output_path = f'articles_export_{timestamp}.zip'
else:
output_path = f'articles_export_{timestamp}.{format_type}'
# 执行导出
if include_media:
self.export_with_media(articles_data, media_files, output_path, format_type)
else:
if format_type == 'json':
self.export_as_json(articles_data, output_path)
elif format_type == 'csv':
self.export_as_csv(articles_data, output_path)
else:
self.stdout.write(self.style.ERROR('不支持的格式,仅支持 json 或 csv'))
return
self.stdout.write(self.style.SUCCESS(f'成功导出 {len(articles_data)} 篇文章到 {output_path}'))
def export_as_json(self, articles_data, output_path):
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(articles_data, f, ensure_ascii=False, indent=2)
def export_as_csv(self, articles_data, output_path):
if not articles_data:
return
# 打开CSV文件
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['id', 'title', 'website', 'url', 'pub_date', 'content', 'created_at', 'media_files']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for article_data in articles_data:
# 将列表转换为字符串以便在CSV中存储
article_data['media_files'] = ';'.join(article_data['media_files']) if article_data['media_files'] else ''
writer.writerow(article_data)
def export_with_media(self, articles_data, media_files, output_path, format_type):
# 创建ZIP文件
with zipfile.ZipFile(output_path, 'w') as zipf:
# 添加文章数据文件
data_filename = f'articles.{format_type}'
if format_type == 'json':
json_data = json.dumps(articles_data, ensure_ascii=False, indent=2)
zipf.writestr(data_filename, json_data)
elif format_type == 'csv':
# 创建CSV内容
if articles_data:
import io
csv_buffer = io.StringIO()
fieldnames = ['id', 'title', 'website', 'url', 'pub_date', 'content', 'created_at', 'media_files']
writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames)
writer.writeheader()
for article_data in articles_data:
article_data['media_files'] = ';'.join(article_data['media_files']) if article_data['media_files'] else ''
writer.writerow(article_data)
zipf.writestr(data_filename, csv_buffer.getvalue())
# 添加媒体文件
for media_path in media_files:
arcname = os.path.join('media', os.path.relpath(media_path, settings.MEDIA_ROOT))
zipf.write(media_path, arcname)