""" 分布式爬虫模块 支持多节点爬虫集群,任务分发和结果聚合 """ import json import logging import time from typing import Dict, List, Optional, Any from celery import group, chain from django.conf import settings from django.core.cache import cache from django.db import transaction from .models import Website, Article from .tasks import crawl_website, crawl_all_websites from .utils import full_site_crawler logger = logging.getLogger(__name__) class DistributedCrawler: """分布式爬虫管理器""" def __init__(self): self.cache_prefix = "crawler:distributed:" self.task_timeout = getattr(settings, 'CRAWLER_TASK_TIMEOUT', 1800) # 30分钟 def get_node_status(self, node_id: str) -> Dict[str, Any]: """获取节点状态""" cache_key = f"{self.cache_prefix}node:{node_id}:status" status = cache.get(cache_key, {}) return { 'node_id': node_id, 'status': status.get('status', 'unknown'), 'last_heartbeat': status.get('last_heartbeat'), 'active_tasks': status.get('active_tasks', 0), 'completed_tasks': status.get('completed_tasks', 0), 'failed_tasks': status.get('failed_tasks', 0), } def register_node(self, node_id: str, capacity: int = 10) -> bool: """注册爬虫节点""" cache_key = f"{self.cache_prefix}node:{node_id}:status" status = { 'status': 'active', 'capacity': capacity, 'active_tasks': 0, 'completed_tasks': 0, 'failed_tasks': 0, 'last_heartbeat': time.time(), 'registered_at': time.time(), } cache.set(cache_key, status, timeout=3600) # 1小时过期 # 添加到节点列表 nodes_key = f"{self.cache_prefix}active_nodes" nodes = cache.get(nodes_key, []) if node_id not in nodes: nodes.append(node_id) cache.set(nodes_key, nodes, timeout=3600) logger.info(f"注册爬虫节点: {node_id}, 容量: {capacity}") return True def unregister_node(self, node_id: str) -> bool: """注销爬虫节点""" cache_key = f"{self.cache_prefix}node:{node_id}:status" cache.delete(cache_key) # 从节点列表移除 nodes_key = f"{self.cache_prefix}active_nodes" nodes = cache.get(nodes_key, []) if node_id in nodes: nodes.remove(node_id) cache.set(nodes_key, nodes, timeout=3600) logger.info(f"注销爬虫节点: {node_id}") return True def heartbeat(self, node_id: str, active_tasks: int = 0) -> bool: """节点心跳""" cache_key = f"{self.cache_prefix}node:{node_id}:status" status = cache.get(cache_key, {}) if status: status['last_heartbeat'] = time.time() status['active_tasks'] = active_tasks cache.set(cache_key, status, timeout=3600) return True def get_available_nodes(self) -> List[str]: """获取可用节点列表""" nodes_key = f"{self.cache_prefix}active_nodes" nodes = cache.get(nodes_key, []) available_nodes = [] for node_id in nodes: status = self.get_node_status(node_id) if status['status'] == 'active': # 检查心跳是否在5分钟内 if status['last_heartbeat'] and (time.time() - status['last_heartbeat']) < 300: available_nodes.append(node_id) return available_nodes def distribute_crawl_tasks(self, websites: List[int], max_concurrent: int = 5) -> str: """分发爬虫任务到多个节点""" if not websites: return "no_websites" available_nodes = self.get_available_nodes() if not available_nodes: logger.warning("没有可用的爬虫节点") return "no_available_nodes" # 创建任务批次 batch_id = f"batch_{int(time.time())}" batch_key = f"{self.cache_prefix}batch:{batch_id}" # 将网站分组分配给不同节点 tasks = [] for i, website_id in enumerate(websites): node_id = available_nodes[i % len(available_nodes)] task = crawl_website.apply_async( args=[website_id], kwargs={'node_id': node_id, 'batch_id': batch_id}, countdown=i * 2 # 错开启动时间 ) tasks.append(task) # 保存批次信息 batch_info = { 'batch_id': batch_id, 'websites': websites, 'tasks': [task.id for task in tasks], 'nodes': available_nodes, 'status': 'running', 'created_at': time.time(), 'total_tasks': len(tasks), 'completed_tasks': 0, 'failed_tasks': 0, } cache.set(batch_key, batch_info, timeout=7200) # 2小时过期 logger.info(f"创建分布式爬虫批次: {batch_id}, 任务数: {len(tasks)}, 节点数: {len(available_nodes)}") return batch_id def get_batch_status(self, batch_id: str) -> Dict[str, Any]: """获取批次状态""" batch_key = f"{self.cache_prefix}batch:{batch_id}" batch_info = cache.get(batch_key, {}) if not batch_info: return {'status': 'not_found'} # 统计任务状态 completed = 0 failed = 0 running = 0 for task_id in batch_info.get('tasks', []): task_result = cache.get(f"{self.cache_prefix}task:{task_id}") if task_result: if task_result.get('status') == 'completed': completed += 1 elif task_result.get('status') == 'failed': failed += 1 else: running += 1 batch_info.update({ 'completed_tasks': completed, 'failed_tasks': failed, 'running_tasks': running, 'progress': (completed + failed) / batch_info.get('total_tasks', 1) * 100 }) # 检查是否完成 if completed + failed >= batch_info.get('total_tasks', 0): batch_info['status'] = 'completed' cache.set(batch_key, batch_info, timeout=7200) return batch_info def get_all_batches(self) -> List[Dict[str, Any]]: """获取所有批次""" pattern = f"{self.cache_prefix}batch:*" batches = [] # 这里简化实现,实际应该使用Redis的SCAN命令 for i in range(100): # 假设最多100个批次 batch_key = f"{self.cache_prefix}batch:batch_{i}" batch_info = cache.get(batch_key) if batch_info: batches.append(batch_info) return sorted(batches, key=lambda x: x.get('created_at', 0), reverse=True) def cleanup_old_batches(self, max_age_hours: int = 24) -> int: """清理旧的批次数据""" cutoff_time = time.time() - (max_age_hours * 3600) cleaned = 0 for i in range(100): batch_key = f"{self.cache_prefix}batch:batch_{i}" batch_info = cache.get(batch_key) if batch_info and batch_info.get('created_at', 0) < cutoff_time: cache.delete(batch_key) cleaned += 1 logger.info(f"清理了 {cleaned} 个旧批次") return cleaned class CrawlerNode: """爬虫节点""" def __init__(self, node_id: str, capacity: int = 10): self.node_id = node_id self.capacity = capacity self.distributed_crawler = DistributedCrawler() self.active_tasks = 0 def start(self): """启动节点""" self.distributed_crawler.register_node(self.node_id, self.capacity) logger.info(f"爬虫节点 {self.node_id} 已启动") def stop(self): """停止节点""" self.distributed_crawler.unregister_node(self.node_id) logger.info(f"爬虫节点 {self.node_id} 已停止") def heartbeat(self): """发送心跳""" self.distributed_crawler.heartbeat(self.node_id, self.active_tasks) def process_task(self, website_id: int, batch_id: str = None) -> Dict[str, Any]: """处理爬虫任务""" self.active_tasks += 1 start_time = time.time() try: # 执行爬虫任务 website = Website.objects.get(id=website_id) result = full_site_crawler(website.base_url, website, max_pages=100) # 记录任务结果 task_result = { 'status': 'completed', 'website_id': website_id, 'website_name': website.name, 'result': result, 'duration': time.time() - start_time, 'completed_at': time.time(), } logger.info(f"节点 {self.node_id} 完成网站 {website.name} 爬取") except Exception as e: task_result = { 'status': 'failed', 'website_id': website_id, 'error': str(e), 'duration': time.time() - start_time, 'failed_at': time.time(), } logger.error(f"节点 {self.node_id} 爬取网站 {website_id} 失败: {e}") finally: self.active_tasks -= 1 return task_result # 全局分布式爬虫实例 distributed_crawler = DistributedCrawler()