Support more crawler

This commit is contained in:
2025-08-17 02:20:51 +08:00
parent 46f9ff87f1
commit 1b947158a9

276
core/distributed_crawler.py Normal file
View File

@@ -0,0 +1,276 @@
"""
分布式爬虫模块
支持多节点爬虫集群,任务分发和结果聚合
"""
import json
import logging
import time
from typing import Dict, List, Optional, Any
from celery import group, chain
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from .models import Website, Article
from .tasks import crawl_website, crawl_all_websites
from .utils import full_site_crawler
logger = logging.getLogger(__name__)
class DistributedCrawler:
"""分布式爬虫管理器"""
def __init__(self):
self.cache_prefix = "crawler:distributed:"
self.task_timeout = getattr(settings, 'CRAWLER_TASK_TIMEOUT', 1800) # 30分钟
def get_node_status(self, node_id: str) -> Dict[str, Any]:
"""获取节点状态"""
cache_key = f"{self.cache_prefix}node:{node_id}:status"
status = cache.get(cache_key, {})
return {
'node_id': node_id,
'status': status.get('status', 'unknown'),
'last_heartbeat': status.get('last_heartbeat'),
'active_tasks': status.get('active_tasks', 0),
'completed_tasks': status.get('completed_tasks', 0),
'failed_tasks': status.get('failed_tasks', 0),
}
def register_node(self, node_id: str, capacity: int = 10) -> bool:
"""注册爬虫节点"""
cache_key = f"{self.cache_prefix}node:{node_id}:status"
status = {
'status': 'active',
'capacity': capacity,
'active_tasks': 0,
'completed_tasks': 0,
'failed_tasks': 0,
'last_heartbeat': time.time(),
'registered_at': time.time(),
}
cache.set(cache_key, status, timeout=3600) # 1小时过期
# 添加到节点列表
nodes_key = f"{self.cache_prefix}active_nodes"
nodes = cache.get(nodes_key, [])
if node_id not in nodes:
nodes.append(node_id)
cache.set(nodes_key, nodes, timeout=3600)
logger.info(f"注册爬虫节点: {node_id}, 容量: {capacity}")
return True
def unregister_node(self, node_id: str) -> bool:
"""注销爬虫节点"""
cache_key = f"{self.cache_prefix}node:{node_id}:status"
cache.delete(cache_key)
# 从节点列表移除
nodes_key = f"{self.cache_prefix}active_nodes"
nodes = cache.get(nodes_key, [])
if node_id in nodes:
nodes.remove(node_id)
cache.set(nodes_key, nodes, timeout=3600)
logger.info(f"注销爬虫节点: {node_id}")
return True
def heartbeat(self, node_id: str, active_tasks: int = 0) -> bool:
"""节点心跳"""
cache_key = f"{self.cache_prefix}node:{node_id}:status"
status = cache.get(cache_key, {})
if status:
status['last_heartbeat'] = time.time()
status['active_tasks'] = active_tasks
cache.set(cache_key, status, timeout=3600)
return True
def get_available_nodes(self) -> List[str]:
"""获取可用节点列表"""
nodes_key = f"{self.cache_prefix}active_nodes"
nodes = cache.get(nodes_key, [])
available_nodes = []
for node_id in nodes:
status = self.get_node_status(node_id)
if status['status'] == 'active':
# 检查心跳是否在5分钟内
if status['last_heartbeat'] and (time.time() - status['last_heartbeat']) < 300:
available_nodes.append(node_id)
return available_nodes
def distribute_crawl_tasks(self, websites: List[int], max_concurrent: int = 5) -> str:
"""分发爬虫任务到多个节点"""
if not websites:
return "no_websites"
available_nodes = self.get_available_nodes()
if not available_nodes:
logger.warning("没有可用的爬虫节点")
return "no_available_nodes"
# 创建任务批次
batch_id = f"batch_{int(time.time())}"
batch_key = f"{self.cache_prefix}batch:{batch_id}"
# 将网站分组分配给不同节点
tasks = []
for i, website_id in enumerate(websites):
node_id = available_nodes[i % len(available_nodes)]
task = crawl_website.apply_async(
args=[website_id],
kwargs={'node_id': node_id, 'batch_id': batch_id},
countdown=i * 2 # 错开启动时间
)
tasks.append(task)
# 保存批次信息
batch_info = {
'batch_id': batch_id,
'websites': websites,
'tasks': [task.id for task in tasks],
'nodes': available_nodes,
'status': 'running',
'created_at': time.time(),
'total_tasks': len(tasks),
'completed_tasks': 0,
'failed_tasks': 0,
}
cache.set(batch_key, batch_info, timeout=7200) # 2小时过期
logger.info(f"创建分布式爬虫批次: {batch_id}, 任务数: {len(tasks)}, 节点数: {len(available_nodes)}")
return batch_id
def get_batch_status(self, batch_id: str) -> Dict[str, Any]:
"""获取批次状态"""
batch_key = f"{self.cache_prefix}batch:{batch_id}"
batch_info = cache.get(batch_key, {})
if not batch_info:
return {'status': 'not_found'}
# 统计任务状态
completed = 0
failed = 0
running = 0
for task_id in batch_info.get('tasks', []):
task_result = cache.get(f"{self.cache_prefix}task:{task_id}")
if task_result:
if task_result.get('status') == 'completed':
completed += 1
elif task_result.get('status') == 'failed':
failed += 1
else:
running += 1
batch_info.update({
'completed_tasks': completed,
'failed_tasks': failed,
'running_tasks': running,
'progress': (completed + failed) / batch_info.get('total_tasks', 1) * 100
})
# 检查是否完成
if completed + failed >= batch_info.get('total_tasks', 0):
batch_info['status'] = 'completed'
cache.set(batch_key, batch_info, timeout=7200)
return batch_info
def get_all_batches(self) -> List[Dict[str, Any]]:
"""获取所有批次"""
pattern = f"{self.cache_prefix}batch:*"
batches = []
# 这里简化实现实际应该使用Redis的SCAN命令
for i in range(100): # 假设最多100个批次
batch_key = f"{self.cache_prefix}batch:batch_{i}"
batch_info = cache.get(batch_key)
if batch_info:
batches.append(batch_info)
return sorted(batches, key=lambda x: x.get('created_at', 0), reverse=True)
def cleanup_old_batches(self, max_age_hours: int = 24) -> int:
"""清理旧的批次数据"""
cutoff_time = time.time() - (max_age_hours * 3600)
cleaned = 0
for i in range(100):
batch_key = f"{self.cache_prefix}batch:batch_{i}"
batch_info = cache.get(batch_key)
if batch_info and batch_info.get('created_at', 0) < cutoff_time:
cache.delete(batch_key)
cleaned += 1
logger.info(f"清理了 {cleaned} 个旧批次")
return cleaned
class CrawlerNode:
"""爬虫节点"""
def __init__(self, node_id: str, capacity: int = 10):
self.node_id = node_id
self.capacity = capacity
self.distributed_crawler = DistributedCrawler()
self.active_tasks = 0
def start(self):
"""启动节点"""
self.distributed_crawler.register_node(self.node_id, self.capacity)
logger.info(f"爬虫节点 {self.node_id} 已启动")
def stop(self):
"""停止节点"""
self.distributed_crawler.unregister_node(self.node_id)
logger.info(f"爬虫节点 {self.node_id} 已停止")
def heartbeat(self):
"""发送心跳"""
self.distributed_crawler.heartbeat(self.node_id, self.active_tasks)
def process_task(self, website_id: int, batch_id: str = None) -> Dict[str, Any]:
"""处理爬虫任务"""
self.active_tasks += 1
start_time = time.time()
try:
# 执行爬虫任务
website = Website.objects.get(id=website_id)
result = full_site_crawler(website.base_url, website, max_pages=100)
# 记录任务结果
task_result = {
'status': 'completed',
'website_id': website_id,
'website_name': website.name,
'result': result,
'duration': time.time() - start_time,
'completed_at': time.time(),
}
logger.info(f"节点 {self.node_id} 完成网站 {website.name} 爬取")
except Exception as e:
task_result = {
'status': 'failed',
'website_id': website_id,
'error': str(e),
'duration': time.time() - start_time,
'failed_at': time.time(),
}
logger.error(f"节点 {self.node_id} 爬取网站 {website_id} 失败: {e}")
finally:
self.active_tasks -= 1
return task_result
# 全局分布式爬虫实例
distributed_crawler = DistributedCrawler()