From 1b947158a9c0e0826d08303fbc28c6a152cd1b39 Mon Sep 17 00:00:00 2001 From: yuangyaa Date: Sun, 17 Aug 2025 02:20:51 +0800 Subject: [PATCH] Support more crawler --- core/distributed_crawler.py | 276 ++++++++++++++++++++++++++++++++++++ 1 file changed, 276 insertions(+) create mode 100644 core/distributed_crawler.py diff --git a/core/distributed_crawler.py b/core/distributed_crawler.py new file mode 100644 index 0000000..1867599 --- /dev/null +++ b/core/distributed_crawler.py @@ -0,0 +1,276 @@ +""" +分布式爬虫模块 +支持多节点爬虫集群,任务分发和结果聚合 +""" + +import json +import logging +import time +from typing import Dict, List, Optional, Any +from celery import group, chain +from django.conf import settings +from django.core.cache import cache +from django.db import transaction +from .models import Website, Article +from .tasks import crawl_website, crawl_all_websites +from .utils import full_site_crawler + +logger = logging.getLogger(__name__) + + +class DistributedCrawler: + """分布式爬虫管理器""" + + def __init__(self): + self.cache_prefix = "crawler:distributed:" + self.task_timeout = getattr(settings, 'CRAWLER_TASK_TIMEOUT', 1800) # 30分钟 + + def get_node_status(self, node_id: str) -> Dict[str, Any]: + """获取节点状态""" + cache_key = f"{self.cache_prefix}node:{node_id}:status" + status = cache.get(cache_key, {}) + return { + 'node_id': node_id, + 'status': status.get('status', 'unknown'), + 'last_heartbeat': status.get('last_heartbeat'), + 'active_tasks': status.get('active_tasks', 0), + 'completed_tasks': status.get('completed_tasks', 0), + 'failed_tasks': status.get('failed_tasks', 0), + } + + def register_node(self, node_id: str, capacity: int = 10) -> bool: + """注册爬虫节点""" + cache_key = f"{self.cache_prefix}node:{node_id}:status" + status = { + 'status': 'active', + 'capacity': capacity, + 'active_tasks': 0, + 'completed_tasks': 0, + 'failed_tasks': 0, + 'last_heartbeat': time.time(), + 'registered_at': time.time(), + } + cache.set(cache_key, status, timeout=3600) # 1小时过期 + + # 添加到节点列表 + nodes_key = f"{self.cache_prefix}active_nodes" + nodes = cache.get(nodes_key, []) + if node_id not in nodes: + nodes.append(node_id) + cache.set(nodes_key, nodes, timeout=3600) + + logger.info(f"注册爬虫节点: {node_id}, 容量: {capacity}") + return True + + def unregister_node(self, node_id: str) -> bool: + """注销爬虫节点""" + cache_key = f"{self.cache_prefix}node:{node_id}:status" + cache.delete(cache_key) + + # 从节点列表移除 + nodes_key = f"{self.cache_prefix}active_nodes" + nodes = cache.get(nodes_key, []) + if node_id in nodes: + nodes.remove(node_id) + cache.set(nodes_key, nodes, timeout=3600) + + logger.info(f"注销爬虫节点: {node_id}") + return True + + def heartbeat(self, node_id: str, active_tasks: int = 0) -> bool: + """节点心跳""" + cache_key = f"{self.cache_prefix}node:{node_id}:status" + status = cache.get(cache_key, {}) + if status: + status['last_heartbeat'] = time.time() + status['active_tasks'] = active_tasks + cache.set(cache_key, status, timeout=3600) + return True + + def get_available_nodes(self) -> List[str]: + """获取可用节点列表""" + nodes_key = f"{self.cache_prefix}active_nodes" + nodes = cache.get(nodes_key, []) + available_nodes = [] + + for node_id in nodes: + status = self.get_node_status(node_id) + if status['status'] == 'active': + # 检查心跳是否在5分钟内 + if status['last_heartbeat'] and (time.time() - status['last_heartbeat']) < 300: + available_nodes.append(node_id) + + return available_nodes + + def distribute_crawl_tasks(self, websites: List[int], max_concurrent: int = 5) -> str: + """分发爬虫任务到多个节点""" + if not websites: + return "no_websites" + + available_nodes = self.get_available_nodes() + if not available_nodes: + logger.warning("没有可用的爬虫节点") + return "no_available_nodes" + + # 创建任务批次 + batch_id = f"batch_{int(time.time())}" + batch_key = f"{self.cache_prefix}batch:{batch_id}" + + # 将网站分组分配给不同节点 + tasks = [] + for i, website_id in enumerate(websites): + node_id = available_nodes[i % len(available_nodes)] + task = crawl_website.apply_async( + args=[website_id], + kwargs={'node_id': node_id, 'batch_id': batch_id}, + countdown=i * 2 # 错开启动时间 + ) + tasks.append(task) + + # 保存批次信息 + batch_info = { + 'batch_id': batch_id, + 'websites': websites, + 'tasks': [task.id for task in tasks], + 'nodes': available_nodes, + 'status': 'running', + 'created_at': time.time(), + 'total_tasks': len(tasks), + 'completed_tasks': 0, + 'failed_tasks': 0, + } + cache.set(batch_key, batch_info, timeout=7200) # 2小时过期 + + logger.info(f"创建分布式爬虫批次: {batch_id}, 任务数: {len(tasks)}, 节点数: {len(available_nodes)}") + return batch_id + + def get_batch_status(self, batch_id: str) -> Dict[str, Any]: + """获取批次状态""" + batch_key = f"{self.cache_prefix}batch:{batch_id}" + batch_info = cache.get(batch_key, {}) + + if not batch_info: + return {'status': 'not_found'} + + # 统计任务状态 + completed = 0 + failed = 0 + running = 0 + + for task_id in batch_info.get('tasks', []): + task_result = cache.get(f"{self.cache_prefix}task:{task_id}") + if task_result: + if task_result.get('status') == 'completed': + completed += 1 + elif task_result.get('status') == 'failed': + failed += 1 + else: + running += 1 + + batch_info.update({ + 'completed_tasks': completed, + 'failed_tasks': failed, + 'running_tasks': running, + 'progress': (completed + failed) / batch_info.get('total_tasks', 1) * 100 + }) + + # 检查是否完成 + if completed + failed >= batch_info.get('total_tasks', 0): + batch_info['status'] = 'completed' + + cache.set(batch_key, batch_info, timeout=7200) + return batch_info + + def get_all_batches(self) -> List[Dict[str, Any]]: + """获取所有批次""" + pattern = f"{self.cache_prefix}batch:*" + batches = [] + + # 这里简化实现,实际应该使用Redis的SCAN命令 + for i in range(100): # 假设最多100个批次 + batch_key = f"{self.cache_prefix}batch:batch_{i}" + batch_info = cache.get(batch_key) + if batch_info: + batches.append(batch_info) + + return sorted(batches, key=lambda x: x.get('created_at', 0), reverse=True) + + def cleanup_old_batches(self, max_age_hours: int = 24) -> int: + """清理旧的批次数据""" + cutoff_time = time.time() - (max_age_hours * 3600) + cleaned = 0 + + for i in range(100): + batch_key = f"{self.cache_prefix}batch:batch_{i}" + batch_info = cache.get(batch_key) + if batch_info and batch_info.get('created_at', 0) < cutoff_time: + cache.delete(batch_key) + cleaned += 1 + + logger.info(f"清理了 {cleaned} 个旧批次") + return cleaned + + +class CrawlerNode: + """爬虫节点""" + + def __init__(self, node_id: str, capacity: int = 10): + self.node_id = node_id + self.capacity = capacity + self.distributed_crawler = DistributedCrawler() + self.active_tasks = 0 + + def start(self): + """启动节点""" + self.distributed_crawler.register_node(self.node_id, self.capacity) + logger.info(f"爬虫节点 {self.node_id} 已启动") + + def stop(self): + """停止节点""" + self.distributed_crawler.unregister_node(self.node_id) + logger.info(f"爬虫节点 {self.node_id} 已停止") + + def heartbeat(self): + """发送心跳""" + self.distributed_crawler.heartbeat(self.node_id, self.active_tasks) + + def process_task(self, website_id: int, batch_id: str = None) -> Dict[str, Any]: + """处理爬虫任务""" + self.active_tasks += 1 + start_time = time.time() + + try: + # 执行爬虫任务 + website = Website.objects.get(id=website_id) + result = full_site_crawler(website.base_url, website, max_pages=100) + + # 记录任务结果 + task_result = { + 'status': 'completed', + 'website_id': website_id, + 'website_name': website.name, + 'result': result, + 'duration': time.time() - start_time, + 'completed_at': time.time(), + } + + logger.info(f"节点 {self.node_id} 完成网站 {website.name} 爬取") + + except Exception as e: + task_result = { + 'status': 'failed', + 'website_id': website_id, + 'error': str(e), + 'duration': time.time() - start_time, + 'failed_at': time.time(), + } + logger.error(f"节点 {self.node_id} 爬取网站 {website_id} 失败: {e}") + + finally: + self.active_tasks -= 1 + + return task_result + + +# 全局分布式爬虫实例 +distributed_crawler = DistributedCrawler()