当您爬取成千上万的市场页面、发送大量API请求或自动化处理数百个账户时,通过代理进行正确的负载分配变得至关重要。没有合理的负载均衡,您将面临封锁、超时和性能下降。在本指南中,我们将探讨负载分配的架构、负载均衡算法和高负载系统的实际策略。
本材料面向从事数据爬取、API请求自动化或管理大型代理池以满足业务需求的开发人员和技术专家。
为什么需要通过代理进行负载均衡
通过代理进行负载均衡解决了高负载系统的几个关键问题。第一个也是最重要的——防止封锁。当您向一个资源(市场、社交媒体API、搜索引擎)发送成千上万的请求时,目标服务器会看到来自同一IP地址的异常高活动并将其封锁。将请求分配到几十个或数百个代理之间,使您的活动看起来更像普通用户的行为。
第二个问题是性能。一个代理服务器的带宽有限(通常为100-1000 Mbps),并且可以处理有限数量的并发连接。在每分钟爬取10000个页面或大量发送API请求时,一个代理将成为系统的瓶颈。负载均衡允许通过添加新的代理到池中来水平扩展带宽。
第三个任务是可靠性。如果其中一个代理出现故障(技术故障、被封锁、租用期到期),系统会自动将流量重定向到正常工作的代理。如果没有负载均衡和健康检查机制,一个代理的故障可能会停止整个系统。
实际示例: 在爬取Wildberries以监控竞争对手价格时,您需要每小时处理50000个商品。这大约是每秒14个请求。一个代理可以承受这样的负载,但Wildberries会在从同一地址发送100-200个请求后封锁IP。通过在20个住宅代理之间分配请求,您将每个IP的负载降低到每秒0.7个请求——这看起来就像普通用户的活动。
第四个原因是地理分布。许多服务根据用户的地区显示不同的内容或价格。在来自不同国家和城市的代理之间进行负载均衡,可以同时从所有目标地区收集数据。例如,要监控Ozon的价格,您需要来自莫斯科、圣彼得堡、叶卡捷琳堡和其他大城市的代理。
负载分配系统架构
经典的通过代理进行负载均衡的架构由多个组件组成。在最上层是负载均衡器(load balancer)——一个软件模块,它接受传入的任务(爬虫请求、API调用)并将其分配给可用的代理。负载均衡器可以作为独立服务运行,也可以嵌入到应用程序中。
第二个组件是代理池管理器(proxy pool manager)。它存储所有可用代理的列表及其特征:IP地址、端口、协议(HTTP/SOCKS5)、地理位置、类型(住宅、移动、数据中心)、当前状态(活动、被封锁、检查中)。池管理器负责添加新的代理、删除不工作的代理和定期检查可用性。
第三个元素是监控和健康检查系统。它不断检查每个代理的可用性:发送测试请求、测量响应时间、检查连接的成功率。如果代理没有响应或返回错误,系统将其标记为不可用,并暂时将其排除在轮换之外。
| 组件 | 功能 | 技术 |
|---|---|---|
| 负载均衡器 | 在代理之间分配请求 | HAProxy, Nginx, Python/Node.js库 |
| 代理池管理器 | 管理代理列表 | Redis, PostgreSQL, 内存存储 |
| 健康检查系统 | 检查代理的可用性 | 定时任务, Celery, cron |
| 速率限制器 | 请求频率控制 | 令牌桶、漏桶算法 |
| 监控与指标 | 收集性能指标 | Prometheus, Grafana, ELK Stack |
第四个组件是速率限制器(rate limiter)。它确保每个代理不会超过对目标资源的允许请求频率。例如,如果您正在爬取Instagram,并且知道该平台在每分钟60个请求后会封锁IP,则速率限制器将不允许通过一个代理发送超过每分钟50个请求,以留出安全余地。
第五个元素是指标和分析系统。它收集每个代理的性能数据:成功请求的数量、错误的数量、平均响应时间、封锁的百分比。这些数据用于优化负载均衡算法和识别问题代理。
负载均衡算法:轮询、最少连接、加权
轮询算法是最简单和最常用的负载均衡方法。它依次遍历代理列表:第一个请求通过代理#1,第二个请求通过代理#2,第三个请求通过代理#3,依此类推。当列表结束时,负载均衡器返回到开头。如果所有代理的性能相同,该算法可以均匀分配负载。
class RoundRobinBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# 使用
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
balancer = RoundRobinBalancer(proxies)
for i in range(10):
proxy = balancer.get_next_proxy()
print(f"请求 {i+1} → {proxy}")
最少连接算法选择当前活动连接数最少的代理。这在请求执行时间不同的情况下非常有用。例如,在爬取时,一个请求可能处理100毫秒,而另一个请求可能需要5秒(如果页面加载缓慢)。最少连接算法会自动将新请求定向到负载较少的代理。
class LeastConnectionsBalancer:
def __init__(self, proxies):
self.proxies = {proxy: 0 for proxy in proxies}
def get_next_proxy(self):
# 选择活动连接数最少的代理
proxy = min(self.proxies.items(), key=lambda x: x[1])[0]
self.proxies[proxy] += 1
return proxy
def release_proxy(self, proxy):
# 请求完成时减少计数
self.proxies[proxy] -= 1
# 使用上下文管理器
balancer = LeastConnectionsBalancer(proxies)
def make_request(url):
proxy = balancer.get_next_proxy()
try:
# 通过选定的代理执行请求
response = requests.get(url, proxies={"http": proxy})
return response
finally:
balancer.release_proxy(proxy)
加权轮询(Weighted Round Robin)扩展了经典的轮询算法,根据每个代理的性能或质量分配权重。权重较大的代理会接收更多请求。这在您拥有不同质量的代理时非常有用:例如,高速的优质住宅代理和有限制的廉价数据中心代理。
class WeightedRoundRobinBalancer:
def __init__(self, weighted_proxies):
# weighted_proxies = [(proxy, weight), ...]
self.proxies = []
for proxy, weight in weighted_proxies:
# 将代理添加到列表中weight次
self.proxies.extend([proxy] * weight)
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# 使用权重
weighted_proxies = [
("http://premium-proxy1.com:8080", 5), # 高质量
("http://premium-proxy2.com:8080", 5),
("http://cheap-proxy1.com:8080", 2), # 低质量
("http://cheap-proxy2.com:8080", 1) # 非常低质量
]
balancer = WeightedRoundRobinBalancer(weighted_proxies)
随机算法(Random)从可用列表中随机选择代理。它比轮询算法更简单,并且在代理数量较多(100+)时表现良好。随机分配在请求量足够大的情况下会自动平衡。缺点是可能会出现短暂的不均匀负载。
IP Hash是一种基于目标URL或其他参数的哈希选择代理的算法。这确保对同一资源的请求总是通过同一代理。对于使用会话或需要从同一IP进行请求序列的网站(例如,授权+获取数据),这非常有用。
代理池管理:轮换和健康检查
有效的代理池管理需要持续监控每个代理的状态和自动轮换。健康检查是通过发送测试请求定期检查代理的可用性。通常使用一个简单的GET请求到可靠的服务(例如httpbin.org或自己的端点),返回IP地址的信息。
import requests
import time
from datetime import datetime
class ProxyHealthChecker:
def __init__(self, test_url="http://httpbin.org/ip", timeout=10):
self.test_url = test_url
self.timeout = timeout
def check_proxy(self, proxy_url):
"""检查代理的可用性"""
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
return {
"status": "healthy",
"response_time": response_time,
"timestamp": datetime.now(),
"ip": response.json().get("origin")
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"timestamp": datetime.now()
}
except requests.exceptions.Timeout:
return {
"status": "unhealthy",
"error": "timeout",
"timestamp": datetime.now()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now()
}
# 使用
checker = ProxyHealthChecker()
proxies = ["http://proxy1.com:8080", "http://proxy2.com:8080"]
for proxy in proxies:
result = checker.check_proxy(proxy)
print(f"{proxy}: {result['status']} ({result.get('response_time', 'N/A')}s)")
池管理系统应自动将不工作的代理标记为不可用,并定期重复检查。一种常见策略是电路断路器模式:在连续三次失败检查后,代理将被排除在池外5-10分钟,然后再进行检查。如果检查成功,代理将返回到活动池中。
class ProxyPoolManager:
def __init__(self, health_checker, max_failures=3, cooldown_seconds=300):
self.health_checker = health_checker
self.max_failures = max_failures
self.cooldown_seconds = cooldown_seconds
self.proxies = {} # {proxy_url: ProxyInfo}
def add_proxy(self, proxy_url, metadata=None):
"""将代理添加到池中"""
self.proxies[proxy_url] = {
"url": proxy_url,
"status": "active",
"failures": 0,
"last_check": None,
"cooldown_until": None,
"metadata": metadata or {}
}
def get_active_proxies(self):
"""返回活动代理的列表"""
now = datetime.now()
active = []
for proxy_url, info in self.proxies.items():
# 检查代理是否在冷却期
if info["cooldown_until"] and now < info["cooldown_until"]:
continue
if info["status"] == "active":
active.append(proxy_url)
return active
def mark_failure(self, proxy_url):
"""标记代理使用失败的尝试"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] += 1
if info["failures"] >= self.max_failures:
# 转入冷却期
info["status"] = "cooldown"
info["cooldown_until"] = datetime.now() + timedelta(seconds=self.cooldown_seconds)
print(f"代理 {proxy_url} 移动到冷却期,直到 {info['cooldown_until']}")
def mark_success(self, proxy_url):
"""标记代理成功使用"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] = 0
info["status"] = "active"
info["cooldown_until"] = None
代理轮换是一种自动更换代理的策略,可以在特定时间间隔或特定请求数量后进行。有几种方法:按时间轮换(每5-10分钟更换)、按请求数量轮换(在100-500个请求后更换)、按会话轮换(一个代理用于一个爬虫会话)。选择策略取决于目标网站的要求。
建议: 对于爬取市场(Wildberries、Ozon),最优策略是按请求数量轮换:每个代理50-100个请求,然后更换。对于社交媒体API(Instagram、Facebook),最好使用会话轮换:一个代理用于整个授权→操作→退出周期。
速率限制和请求频率控制
速率限制是负载均衡系统的关键组件。它防止超过目标资源的允许请求频率,这可能导致代理被封锁。主要有两种算法:令牌桶(Token Bucket)和漏桶(Leaky Bucket)。
令牌桶的工作原理如下:每个代理都有一个虚拟的“桶”,里面装有令牌。每个令牌允许进行一次请求。桶以设定的速度(例如每秒10个令牌)逐渐填满。桶的最大容量是有限的(例如50个令牌)。当请求到达时,系统检查是否有令牌:如果有,则取出一个令牌并执行请求;如果没有,则等待新的令牌出现。
import time
from threading import Lock
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: 每秒令牌数量
capacity: 桶中最大令牌数量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""根据经过的时间添加令牌"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def acquire(self, tokens=1):
"""尝试获取指定数量的令牌"""
with self.lock:
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens=1):
"""等待令牌出现并获取它们"""
while not self.acquire(tokens):
# 计算等待时间
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
# 对每个代理使用
proxy_limiters = {
"http://proxy1.com:8080": TokenBucketRateLimiter(rate=10, capacity=50),
"http://proxy2.com:8080": TokenBucketRateLimiter(rate=10, capacity=50)
}
def make_request_with_limit(url, proxy_url):
limiter = proxy_limiters[proxy_url]
limiter.wait_and_acquire() # 等待令牌可用
response = requests.get(url, proxies={"http": proxy_url})
return response
漏桶的工作原理不同:请求被放入一个队列(桶),以固定速度“流出”。如果桶溢出,新的请求将被拒绝或放入等待队列。该算法在时间上提供了更均匀的负载分配,但在活动高峰时可能导致延迟。
不同的目标平台需要不同的限制。Instagram API允许每个IP每小时约200个请求(每分钟约3.3个请求)。Wildberries在每分钟从同一IP发送100-200个请求后会封锁。Google搜索允许每分钟10-20个请求。您的速率限制系统应考虑这些限制并留出安全余地(通常为20-30%)。
| 平台 | 请求限制 | 推荐设置 |
|---|---|---|
| ~200请求/小时 | 每分钟2-3个请求(留有余地) | |
| Wildberries | 每分钟100-200个请求 | 每分钟60-80个请求 |
| Google搜索 | 每分钟10-20个请求 | 每分钟8-12个请求 |
| Ozon | 每分钟50-100个请求 | 每分钟30-50个请求 |
| Facebook API | 每小时200个请求 | 每分钟2-3个请求 |
性能监控和自动扩展
有效的负载均衡系统需要持续监控关键指标。第一组指标是代理的性能:平均响应时间(response time)、成功请求的百分比(success rate)、超时的数量、4xx和5xx错误的数量。这些数据可以帮助识别问题代理并优化负载均衡算法。
class ProxyMetrics:
def __init__(self):
self.metrics = {} # {proxy_url: metrics_dict}
def record_request(self, proxy_url, response_time, status_code, success):
"""记录请求的指标"""
if proxy_url not in self.metrics:
self.metrics[proxy_url] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_response_time": 0,
"timeouts": 0,
"errors_4xx": 0,
"errors_5xx": 0
}
m = self.metrics[proxy_url]
m["total_requests"] += 1
if success:
m["successful_requests"] += 1
m["total_response_time"] += response_time
else:
m["failed_requests"] += 1
if status_code == 0: # 超时
m["timeouts"] += 1
elif 400 <= status_code < 500:
m["errors_4xx"] += 1
elif 500 <= status_code < 600:
m["errors_5xx"] += 1
def get_success_rate(self, proxy_url):
"""返回成功请求的百分比"""
m = self.metrics.get(proxy_url, {})
total = m.get("total_requests", 0)
if total == 0:
return 0
return (m.get("successful_requests", 0) / total) * 100
def get_avg_response_time(self, proxy_url):
"""返回平均响应时间"""
m = self.metrics.get(proxy_url, {})
successful = m.get("successful_requests", 0)
if successful == 0:
return 0
return m.get("total_response_time", 0) / successful
def get_report(self, proxy_url):
"""返回代理的完整报告"""
m = self.metrics.get(proxy_url, {})
return {
"proxy": proxy_url,
"total_requests": m.get("total_requests", 0),
"success_rate": self.get_success_rate(proxy_url),
"avg_response_time": self.get_avg_response_time(proxy_url),
"timeouts": m.get("timeouts", 0),
"errors_4xx": m.get("errors_4xx", 0),
"errors_5xx": m.get("errors_5xx", 0)
}
第二组指标是系统的整体性能:每秒请求总数(RPS - requests per second)、平均延迟(latency)、请求队列的大小、活动代理的数量、代理池的使用百分比。这些指标显示系统是否能够承受负载,或者是否需要扩展。
自动扩展(auto-scaling)允许根据负载动态添加或删除代理。简单的策略是:如果代理池的平均负载在5分钟内超过80%,系统将自动添加新的代理。如果负载在15分钟内降到30%以下,系统将删除多余的代理以节省资源。
监控设置示例: 要每小时爬取100000个Wildberries商品(大约每秒28个请求),您至少需要30-40个代理,每个代理的限制为每分钟60个请求。设置警报:如果成功率低于85%或平均响应时间超过3秒,系统应自动添加10-15个备用代理。
要可视化指标,请使用Grafana与Prometheus或ELK Stack。创建带有图表的仪表板:按时间划分的RPS、响应时间分布、前10个最快/最慢的代理、按类型的错误地图。这将快速识别问题并优化系统。
在Python和Node.js上的实际实现
让我们看看使用流行库在Python中实现负载均衡系统的完整示例。此示例结合了所有描述的组件:负载均衡器、池管理器、健康检查、速率限制和监控。
import requests
import time
import random
from datetime import datetime, timedelta
from threading import Lock, Thread
from collections import defaultdict
class ProxyLoadBalancer:
def __init__(self, proxies, algorithm="round_robin", rate_limit=10):
"""
proxies: 代理列表 [{"url": "...", "weight": 1}, ...]
algorithm: round_robin, least_connections, weighted, random
rate_limit: 每个代理的最大请求数量
"""
self.proxies = proxies
self.algorithm = algorithm
self.rate_limit = rate_limit
# 负载均衡器状态
self.current_index = 0
self.connections = defaultdict(int)
self.rate_limiters = {}
self.metrics = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0,
"response_times": [], "last_check": None
})
# 初始化速率限制器
for proxy in proxies:
proxy_url = proxy["url"]
self.rate_limiters[proxy_url] = TokenBucketRateLimiter(
rate=rate_limit,
capacity=rate_limit * 5
)
self.lock = Lock()
# 启动后台健康检查
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def get_next_proxy(self):
"""根据算法选择下一个代理"""
with self.lock:
if self.algorithm == "round_robin":
return self._round_robin()
elif self.algorithm == "least_connections":
return self._least_connections()
elif self.algorithm == "weighted":
return self._weighted_random()
elif self.algorithm == "random":
return random.choice(self.proxies)["url"]
def _round_robin(self):
proxy = self.proxies[self.current_index]["url"]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def _least_connections(self):
min_conn = min(self.connections.values()) if self.connections else 0
candidates = [p["url"] for p in self.proxies if self.connections[p["url"]] == min_conn]
return random.choice(candidates)
def _weighted_random(self):
weights = [p.get("weight", 1) for p in self.proxies]
return random.choices(self.proxies, weights=weights)[0]["url"]
def make_request(self, url, method="GET", **kwargs):
"""通过负载均衡器执行请求"""
proxy_url = self.get_next_proxy()
# 等待速率限制可用
self.rate_limiters[proxy_url].wait_and_acquire()
# 增加连接计数
with self.lock:
self.connections[proxy_url] += 1
try:
start_time = time.time()
response = requests.request(
method,
url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=kwargs.get("timeout", 30),
**kwargs
)
response_time = time.time() - start_time
# 记录指标
self._record_metrics(proxy_url, response_time, True, response.status_code)
return response
except Exception as e:
self._record_metrics(proxy_url, 0, False, 0)
raise
finally:
# 减少连接计数
with self.lock:
self.connections[proxy_url] -= 1
def _record_metrics(self, proxy_url, response_time, success, status_code):
"""记录请求的指标"""
with self.lock:
m = self.metrics[proxy_url]
m["total"] += 1
if success:
m["success"] += 1
m["response_times"].append(response_time)
# 仅存储最近的1000个值
if len(m["response_times"]) > 1000:
m["response_times"].pop(0)
else:
m["failed"] += 1
def _health_check_loop(self):
"""代理的后台健康检查"""
while True:
for proxy in self.proxies:
proxy_url = proxy["url"]
try:
response = requests.get(
"http://httpbin.org/ip",
proxies={"http": proxy_url},
timeout=10
)
with self.lock:
self.metrics[proxy_url]["last_check"] = datetime.now()
proxy["status"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
with self.lock:
proxy["status"] = "unhealthy"
time.sleep(60) # 每分钟检查一次
def get_stats(self):
"""返回所有代理的统计信息"""
stats = []
with self.lock:
for proxy in self.proxies:
proxy_url = proxy["url"]
m = self.metrics[proxy_url]
avg_response_time = (
sum(m["response_times"]) / len(m["response_times"])
if m["response_times"] else 0
)
success_rate = (
(m["success"] / m["total"] * 100)
if m["total"] > 0 else 0
)
stats.append({
"proxy": proxy_url,
"status": proxy.get("status", "unknown"),
"total_requests": m["total"],
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3),
"active_connections": self.connections[proxy_url]
})
return stats
使用此负载均衡器爬取市场的示例:
# 设置负载均衡器
proxies = [
{"url": "http://proxy1.example.com:8080", "weight": 5},
{"url": "http://proxy2.example.com:8080", "weight": 5},
{"url": "http://proxy3.example.com:8080", "weight": 3},
{"url": "http://proxy4.example.com:8080", "weight": 2}
]
balancer = ProxyLoadBalancer(
proxies=proxies,
algorithm="weighted",
rate_limit=60 # 每个代理每分钟60个请求
)
# 爬取商品列表
product_urls = [f"https://www.wildberries.ru/catalog/{i}/detail.aspx" for i in range(1000)]
results = []
for url in product_urls:
try:
response = balancer.make_request(url)
# 处理响应
results.append({"url": url, "status": "success", "data": response.text})
except Exception as e:
results.append({"url": url, "status": "error", "error": str(e)})
# 每100个请求输出一次统计信息
if len(results) % 100 == 0:
stats = balancer.get_stats()
for stat in stats:
print(f"{stat['proxy']}: {stat['success_rate']}% 成功, "
f"{stat['avg_response_time']}s 平均响应")
# 最终统计信息
print("\n=== 最终统计 ===")
for stat in balancer.get_stats():
print(f"{stat['proxy']}:")
print(f" 总请求数: {stat['total_requests']}")
print(f" 成功率: {stat['success_rate']}%")
print(f" 平均响应时间: {stat['avg_response_time']}s")
print(f" 状态: {stat['status']}")
对于Node.js,可以使用类似的架构,使用axios进行HTTP请求和node-rate-limiter进行频率控制:
const axios = require('axios');
const { RateLimiter } = require('limiter');
class ProxyLoadBalancer {
constructor(proxies, algorithm = 'round_robin', rateLimit = 10) {
this.proxies = proxies;
this.algorithm = algorithm;
this.currentIndex = 0;
this.connections = new Map();
this.limiters = new Map();
this.metrics = new Map();
// 初始化速率限制器
proxies.forEach(proxy => {
this.limiters.set(proxy.url, new RateLimiter({
tokensPerInterval: rateLimit,
interval: 'second'
}));
this.connections.set(proxy.url, 0);
this.metrics.set(proxy.url, {
total: 0,
success: 0,
failed: 0,
responseTimes: []
});
});
}
getNextProxy() {
if (this.algorithm === 'round_robin') {
const proxy = this.proxies[this.currentIndex].url;
this.currentIndex = (this.currentIndex + 1) % this.proxies.length;
return proxy;
} else if (this.algorithm === 'least_connections') {
let minConn = Infinity;
let selectedProxy = null;
this.connections.forEach((count, proxy) => {
if (count < minConn) {
minConn = count;
selectedProxy = proxy;
}
});
return selectedProxy;
}
}
async makeRequest(url, options = {}) {
const proxyUrl = this.getNextProxy();
const limiter = this.limiters.get(proxyUrl);
// 等待速率限制可用
await limiter.removeTokens(1);
// 增加连接计数
this.connections.set(proxyUrl, this.connections.get(proxyUrl) + 1);
try {
const startTime = Date.now();
const response = await axios({
url,
proxy: this.parseProxyUrl(proxyUrl),
timeout: options.timeout || 30000,
...options
});
const responseTime = (Date.now() - startTime) / 1000;
this.recordMetrics(proxyUrl, responseTime, true);
return response;
} catch (error) {
this.recordMetrics(proxyUrl, 0, false);
throw error;
} finally {
this.connections.set(proxyUrl, this.connections.get(proxyUrl) - 1);
}
}
parseProxyUrl(proxyUrl) {
const url = new URL(proxyUrl);
return {
host: url.hostname,
port: parseInt(url.port)
};
}
recordMetrics(proxyUrl, responseTime, success) {
const m = this.metrics.get(proxyUrl);
m.total++;
if (success) {
m.success++;
m.responseTimes.push(responseTime);
if (m.responseTimes.length > 1000) {
m.responseTimes.shift();
}
} else {
m.failed++;
}
}
getStats() {
const stats = [];
this.proxies.forEach(proxy => {
const m = this.metrics.get(proxy.url);
const avgResponseTime = m.responseTimes.length > 0
? m.responseTimes.reduce((a, b) => a + b, 0) / m.responseTimes.length
: 0;
const successRate = m.total > 0 ? (m.success / m.total * 100) : 0;
stats.push({
proxy: proxy.url,
totalRequests: m.total,
successRate: successRate.toFixed(2),
avgResponseTime: avgResponseTime.toFixed(3),
activeConnections: this.connections.get(proxy.url)
});
});
return stats;
}
}
// 使用
const proxies = [
{ url: 'http://proxy1.example.com:8080', weight: 5 },
{ url: 'http://proxy2.example.com:8080', weight: 5 }
];
const balancer = new ProxyLoadBalancer(proxies, 'round_robin', 60);
async function parseProducts() {
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://www.wildberries.ru/catalog/${i}/detail.aspx`
);
for (const url of urls) {
try {
const response = await balancer.makeRequest(url);
console.log(`成功: ${url}`);
} catch (error) {
console.error(`错误: ${url} - ${error.message}`);
}
}
console.log('\n=== 统计信息 ===');
console.log(balancer.getStats());
}
parseProducts();
针对特定任务的优化:爬虫、API、自动化
不同的任务需要不同的负载均衡策略。对于爬取市场(Wildberries、Ozon、Avito),最优策略是按请求数量和地理分布进行轮换。使用来自俄罗斯不同城市的住宅代理,每50-100个请求更换代理,在请求之间添加随机延迟(1-3秒)以模拟人类行为。
对于社交媒体API(Instagram、Facebook、VK),在一个会话中IP地址的稳定性至关重要。使用IP Hash算法或粘性会话(sticky sessions):同一账户的所有请求都应通过同一代理。这可以防止可疑的地理位置变化,可能导致账户被封锁。推荐使用移动代理,因为社交媒体对移动IP的信任度高于住宅或数据中心IP。
# Instagram的粘性会话示例
class StickySessionBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.session_map = {} # {account_id: proxy_url}
self.proxy_usage = defaultdict(int)
def get_proxy_for_account(self, account_id):
"""返回账户的固定代理"""
if account_id in self.session_map:
return self.session_map[account_id]
# 选择负载最少的代理
proxy = min(self.proxies, key=lambda p: self.proxy_usage[p])
self.session_map[account_id] = proxy
self.proxy_usage[proxy] += 1
return proxy
def release_account(self, account_id):
"""在完成账户操作后释放代理"""
if account_id in self.session_map:
proxy = self.session_map[account_id]
self.proxy_usage[proxy] -= 1
del self.session_map[account_id]
这段代码展示了如何为每个Instagram账户分配固定的代理,以确保在会话期间使用相同的代理。