Wenn Sie Tausende von Seiten von Marktplätzen scrapen, massenhaft API-Anfragen senden oder die Arbeit mit Hunderten von Konten automatisieren, wird die richtige Lastverteilung über Proxys entscheidend. Ohne eine angemessene Lastverteilung werden Sie mit Sperren, Zeitüberschreitungen und Leistungsabfällen konfrontiert. In diesem Leitfaden werden wir die Architektur der Lastverteilung, die Lastverteilungsalgorithmen und praktische Strategien für hochbelastete Systeme untersuchen.
Das Material richtet sich an Entwickler und technische Spezialisten, die mit Daten-Scraping, der Automatisierung von API-Anfragen oder der Verwaltung großer Proxy-Pools für geschäftliche Aufgaben arbeiten.
Warum ist Lastverteilung über Proxys notwendig?
Die Lastverteilung über Proxys löst mehrere kritische Probleme hochbelasteter Systeme. Das erste und wichtigste ist der Schutz vor Sperren. Wenn Sie Tausende von Anfragen an eine Ressource (Marktplatz, API eines sozialen Netzwerks, Suchmaschine) senden, sieht der Zielserver eine anormal hohe Aktivität von einer IP-Adresse und sperrt diese. Die Verteilung der Anfragen auf Dutzende oder Hunderte von Proxys macht Ihre Aktivität ähnlicher zu der von normalen Benutzern.
Das zweite Problem ist die Leistung. Ein Proxy-Server hat eine begrenzte Bandbreite (normalerweise 100-1000 Mbit/s) und kann eine begrenzte Anzahl gleichzeitiger Verbindungen verarbeiten. Wenn Sie 10.000 Seiten pro Minute scrapen oder massenhaft API-Anfragen senden, wird ein Proxy zum Engpass des Systems. Die Lastverteilung ermöglicht es, die Bandbreite horizontal zu skalieren, indem neue Proxys zum Pool hinzugefügt werden.
Die dritte Herausforderung ist die Zuverlässigkeit. Wenn einer der Proxys ausfällt (technischer Fehler, Sperre, Ablauf der Mietdauer), leitet das System den Verkehr automatisch auf funktionierende Proxys um. Ohne einen Mechanismus zur Lastverteilung und Health Checks kann der Ausfall eines Proxys das gesamte System stoppen.
Echtes Beispiel: Beim Scraping von Wildberries zur Überwachung der Preise von Wettbewerbern müssen Sie 50.000 Produkte jede Stunde verarbeiten. Das sind etwa 14 Anfragen pro Sekunde. Ein Proxy kann diese Last bewältigen, aber Wildberries sperrt die IP nach 100-200 Anfragen von einer Adresse. Indem Sie die Anfragen auf 20 residential Proxys verteilen, reduzieren Sie die Last auf jede IP auf 0,7 Anfragen pro Sekunde — das sieht aus wie die Aktivität eines normalen Benutzers.
Der vierte Grund ist die geografische Verteilung. Viele Dienste zeigen unterschiedliche Inhalte oder Preise je nach Region des Benutzers an. Die Lastverteilung zwischen Proxys aus verschiedenen Ländern und Städten ermöglicht es, gleichzeitig Daten aus allen Zielregionen zu sammeln. Zum Beispiel benötigen Sie für die Überwachung der Preise auf Ozon Proxys aus Moskau, Sankt Petersburg, Jekaterinburg und anderen großen Städten.
Architektur des Lastverteilungssystems
Die klassische Architektur eines Lastverteilungssystems über Proxys besteht aus mehreren Komponenten. Auf der obersten Ebene befindet sich der Lastverteiler (Load Balancer) — ein Softwaremodul, das eingehende Aufgaben (Scraping-Anfragen, API-Aufrufe) entgegennimmt und sie auf die verfügbaren Proxys verteilt. Der Lastverteiler kann als separater Dienst arbeiten oder in die Anwendung integriert sein.
Die zweite Komponente ist der Proxy-Pool-Manager (Proxy Pool Manager). Er speichert eine Liste aller verfügbaren Proxys mit ihren Eigenschaften: IP-Adresse, Port, Protokoll (HTTP/SOCKS5), geografische Lage, Typ (residential, mobil, Rechenzentrum), aktueller Status (aktiv, blockiert, in Prüfung). Der Pool-Manager ist verantwortlich für das Hinzufügen neuer Proxys, das Entfernen nicht funktionierender und die regelmäßige Überprüfung der Verfügbarkeit.
Die dritte Komponente ist das Überwachungssystem und die Health Checks. Es überprüft ständig die Funktionsfähigkeit jedes Proxys: es sendet Testanfragen, misst die Antwortzeiten und überprüft die Erfolgsquote der Verbindungen. Wenn ein Proxy nicht antwortet oder Fehler zurückgibt, wird er als nicht verfügbar markiert und vorübergehend aus der Rotation ausgeschlossen.
| Komponente | Funktion | Technologien |
|---|---|---|
| Load Balancer | Verteilung der Anfragen zwischen Proxys | HAProxy, Nginx, Python/Node.js Bibliotheken |
| Proxy Pool Manager | Verwaltung der Proxy-Liste | Redis, PostgreSQL, In-Memory-Speicher |
| Health Check System | Überprüfung der Verfügbarkeit von Proxys | Geplante Aufgaben, Celery, cron |
| Rate Limiter | Kontrolle der Anfragefrequenz | Token Bucket, Leaky Bucket Algorithmen |
| Monitoring & Metrics | Erfassung von Leistungsmetriken | Prometheus, Grafana, ELK Stack |
Die vierte Komponente ist der Rate Limiter (Anfragefrequenzbegrenzer). Er überwacht, dass jeder Proxy die zulässige Anfragefrequenz an die Zielressource nicht überschreitet. Wenn Sie beispielsweise Instagram scrapen und wissen, dass die Plattform IPs nach 60 Anfragen pro Minute blockiert, wird der Rate Limiter nicht zulassen, dass über einen Proxy mehr als 50 Anfragen pro Minute gesendet werden, um einen Sicherheitsspielraum zu lassen.
Die fünfte Komponente ist das System zur Erfassung von Metriken und Analysen. Es sammelt Daten über die Leistung jedes Proxys: Anzahl der erfolgreichen Anfragen, Anzahl der Fehler, durchschnittliche Antwortzeit, Prozentsatz der Sperren. Diese Daten werden verwendet, um die Algorithmen zur Lastverteilung zu optimieren und problematische Proxys zu identifizieren.
Lastverteilungsalgorithmen: Round Robin, Least Connections, Weighted
Der Round Robin Algorithmus ist die einfachste und am weitesten verbreitete Methode zur Lastverteilung. Er durchläuft die Proxys in der Liste der Reihe nach: die erste Anfrage geht über Proxy Nr. 1, die zweite über Proxy Nr. 2, die dritte über Proxy Nr. 3 und so weiter. Wenn die Liste endet, kehrt der Lastverteiler zum Anfang zurück. Dieser Algorithmus verteilt die Last gleichmäßig, wenn alle Proxys die gleiche Leistung haben.
class RoundRobinBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# Verwendung
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
balancer = RoundRobinBalancer(proxies)
for i in range(10):
proxy = balancer.get_next_proxy()
print(f"Anfrage {i+1} → {proxy}")
Der Least Connections Algorithmus (geringste Anzahl an Verbindungen) wählt den Proxy mit der geringsten Anzahl aktiver Verbindungen zu diesem Zeitpunkt aus. Dies ist nützlich, wenn die Anfragen unterschiedliche Ausführungszeiten haben. Zum Beispiel kann eine Anfrage beim Scraping 100 ms dauern, während eine andere 5 Sekunden dauert (wenn die Seite langsam lädt). Least Connections leitet automatisch neue Anfragen an weniger ausgelastete Proxys weiter.
class LeastConnectionsBalancer:
def __init__(self, proxies):
self.proxies = {proxy: 0 for proxy in proxies}
def get_next_proxy(self):
# Wählt den Proxy mit der geringsten Anzahl an Verbindungen
proxy = min(self.proxies.items(), key=lambda x: x[1])[0]
self.proxies[proxy] += 1
return proxy
def release_proxy(self, proxy):
# Verringert den Zähler nach Abschluss der Anfrage
self.proxies[proxy] -= 1
# Verwendung mit Kontextmanager
balancer = LeastConnectionsBalancer(proxies)
def make_request(url):
proxy = balancer.get_next_proxy()
try:
# Führt die Anfrage über den gewählten Proxy aus
response = requests.get(url, proxies={"http": proxy})
return response
finally:
balancer.release_proxy(proxy)
Weighted Round Robin (gewichtete Rundlaufverteilung) erweitert das klassische Round Robin, indem jedem Proxy ein Gewicht basierend auf seiner Leistung oder Qualität zugewiesen wird. Proxys mit höherem Gewicht erhalten mehr Anfragen. Dies ist nützlich, wenn Sie Proxys unterschiedlicher Qualität haben: beispielsweise Premium residential Proxys mit hoher Geschwindigkeit und günstige Rechenzentrums-Proxys mit Einschränkungen.
class WeightedRoundRobinBalancer:
def __init__(self, weighted_proxies):
# weighted_proxies = [(proxy, weight), ...]
self.proxies = []
for proxy, weight in weighted_proxies:
# Fügt den Proxy zur Liste weight-mal hinzu
self.proxies.extend([proxy] * weight)
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# Verwendung mit Gewichten
weighted_proxies = [
("http://premium-proxy1.com:8080", 5), # hohe Qualität
("http://premium-proxy2.com:8080", 5),
("http://cheap-proxy1.com:8080", 2), # niedrige Qualität
("http://cheap-proxy2.com:8080", 1) # sehr niedrige Qualität
]
balancer = WeightedRoundRobinBalancer(weighted_proxies)
Der Random Algorithmus (zufällige Auswahl) wählt Proxys zufällig aus der Liste der verfügbaren aus. Er ist einfacher zu implementieren als Round Robin und funktioniert gut bei einer großen Anzahl von Proxys (100+). Die zufällige Verteilung gleicht sich automatisch bei ausreichendem Anfragevolumen aus. Nachteil: Es kann zu kurzen Phasen ungleicher Lastverteilung kommen.
IP Hash ist ein Algorithmus, der Proxys basierend auf dem Hash des Ziel-URLs oder eines anderen Parameters auswählt. Dies stellt sicher, dass Anfragen an dasselbe Ziel immer über denselben Proxy gehen. Nützlich für Websites, die Sitzungen verwenden oder eine Reihenfolge der Anfragen von einer IP erfordern (z.B. Authentifizierung + Datenabruf).
Verwaltung von Proxy-Pools: Rotation und Health Checks
Eine effektive Verwaltung des Proxy-Pools erfordert eine ständige Überwachung des Status jedes Proxys und eine automatische Rotation. Ein Health Check ist eine regelmäßige Überprüfung der Verfügbarkeit des Proxys durch das Senden einer Testanfrage. Normalerweise wird eine einfache GET-Anfrage an einen zuverlässigen Dienst (z.B. httpbin.org oder einen eigenen Endpunkt) verwendet, der Informationen über die IP-Adresse zurückgibt.
import requests
import time
from datetime import datetime
class ProxyHealthChecker:
def __init__(self, test_url="http://httpbin.org/ip", timeout=10):
self.test_url = test_url
self.timeout = timeout
def check_proxy(self, proxy_url):
"""Überprüft die Funktionsfähigkeit des Proxys"""
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
return {
"status": "healthy",
"response_time": response_time,
"timestamp": datetime.now(),
"ip": response.json().get("origin")
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"timestamp": datetime.now()
}
except requests.exceptions.Timeout:
return {
"status": "unhealthy",
"error": "timeout",
"timestamp": datetime.now()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now()
}
# Verwendung
checker = ProxyHealthChecker()
proxies = ["http://proxy1.com:8080", "http://proxy2.com:8080"]
for proxy in proxies:
result = checker.check_proxy(proxy)
print(f"{proxy}: {result['status']} ({result.get('response_time', 'N/A')}s)")
Das System zur Verwaltung des Pools sollte nicht funktionierende Proxys automatisch als nicht verfügbar kennzeichnen und regelmäßig erneut überprüfen. Eine gängige Strategie ist das Circuit Breaker Pattern: Nach drei aufeinanderfolgenden fehlgeschlagenen Überprüfungen wird der Proxy für 5-10 Minuten aus dem Pool ausgeschlossen und dann erneut überprüft. Wenn die Überprüfung erfolgreich ist, wird der Proxy wieder in den aktiven Pool aufgenommen.
class ProxyPoolManager:
def __init__(self, health_checker, max_failures=3, cooldown_seconds=300):
self.health_checker = health_checker
self.max_failures = max_failures
self.cooldown_seconds = cooldown_seconds
self.proxies = {} # {proxy_url: ProxyInfo}
def add_proxy(self, proxy_url, metadata=None):
"""Fügt einen Proxy zum Pool hinzu"""
self.proxies[proxy_url] = {
"url": proxy_url,
"status": "active",
"failures": 0,
"last_check": None,
"cooldown_until": None,
"metadata": metadata or {}
}
def get_active_proxies(self):
"""Gibt eine Liste aktiver Proxys zurück"""
now = datetime.now()
active = []
for proxy_url, info in self.proxies.items():
# Überprüfen, ob der Proxy im Cooldown ist
if info["cooldown_until"] and now < info["cooldown_until"]:
continue
if info["status"] == "active":
active.append(proxy_url)
return active
def mark_failure(self, proxy_url):
"""Markiert einen fehlgeschlagenen Versuch, den Proxy zu verwenden"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] += 1
if info["failures"] >= self.max_failures:
# In Cooldown versetzen
info["status"] = "cooldown"
info["cooldown_until"] = datetime.now() + timedelta(seconds=self.cooldown_seconds)
print(f"Proxy {proxy_url} wurde bis {info['cooldown_until']} in den Cooldown versetzt")
def mark_success(self, proxy_url):
"""Markiert die erfolgreiche Verwendung des Proxys"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] = 0
info["status"] = "active"
info["cooldown_until"] = None
Die Rotation von Proxys ist eine Strategie zur automatischen Änderung von Proxys über bestimmte Intervalle oder nach einer bestimmten Anzahl von Anfragen. Es gibt mehrere Ansätze: zeitbasierte Rotation (Änderung alle 5-10 Minuten), rotationsbasierte Anzahl von Anfragen (Änderung nach 100-500 Anfragen), rotationsbasierte Sitzungen (ein Proxy für eine Scraping-Sitzung). Die Wahl der Strategie hängt von den Anforderungen der Zielwebsite ab.
Tipp: Für das Scraping von Marktplätzen (Wildberries, Ozon) ist eine Rotation basierend auf der Anzahl der Anfragen optimal: 50-100 Anfragen pro Proxy, dann Wechsel. Für die Arbeit mit APIs sozialer Netzwerke (Instagram, Facebook) sollten Sie besser eine rotationsbasierte Sitzung verwenden: ein Proxy für den gesamten Zyklus Authentifizierung → Aktionen → Abmeldung.
Rate Limiting und Kontrolle der Anfragefrequenz
Rate Limiting ist ein kritischer Bestandteil des Lastverteilungssystems. Es verhindert, dass die zulässige Anfragefrequenz an die Zielressource überschritten wird, was zu einer Sperre des Proxys führen könnte. Es gibt zwei Hauptalgorithmen: Token Bucket (Token-Eimer) und Leaky Bucket (löchriger Eimer).
Token Bucket funktioniert folgendermaßen: Jeder Proxy hat einen virtuellen "Eimer" mit Tokens. Jeder Token gibt das Recht auf eine Anfrage. Der Eimer füllt sich allmählich mit Tokens mit einer festgelegten Geschwindigkeit (z.B. 10 Tokens pro Sekunde). Die maximale Kapazität des Eimers ist begrenzt (z.B. 50 Tokens). Wenn eine Anfrage eingeht, überprüft das System die Verfügbarkeit von Tokens: Wenn vorhanden, wird ein Token abgezogen und die Anfrage ausgeführt, andernfalls wird auf das Erscheinen eines neuen Tokens gewartet.
import time
from threading import Lock
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: Anzahl der Tokens pro Sekunde
capacity: Maximale Anzahl der Tokens im Eimer
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""Fügt Tokens basierend auf der verstrichenen Zeit hinzu"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def acquire(self, tokens=1):
"""Versucht, die angegebene Anzahl von Tokens zu erhalten"""
with self.lock:
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens=1):
"""Wartet auf das Erscheinen von Tokens und erhält sie"""
while not self.acquire(tokens):
# Berechnet die Wartezeit
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
# Verwendung für jeden Proxy
proxy_limiters = {
"http://proxy1.com:8080": TokenBucketRateLimiter(rate=10, capacity=50),
"http://proxy2.com:8080": TokenBucketRateLimiter(rate=10, capacity=50)
}
def make_request_with_limit(url, proxy_url):
limiter = proxy_limiters[proxy_url]
limiter.wait_and_acquire() # Wartet auf die Verfügbarkeit von Tokens
response = requests.get(url, proxies={"http": proxy_url})
return response
Leaky Bucket (löchriger Eimer) funktioniert anders: Anfragen werden in eine Warteschlange (Eimer) eingefügt, die mit konstanter Geschwindigkeit "ausläuft". Wenn der Eimer überläuft, werden neue Anfragen abgelehnt oder in Wartestellung versetzt. Dieser Algorithmus sorgt für eine gleichmäßigere Lastverteilung über die Zeit, kann jedoch bei Aktivitätsspitzen zu Verzögerungen führen.
Für verschiedene Zielplattformen sind unterschiedliche Limits erforderlich. Die Instagram API erlaubt etwa 200 Anfragen pro Stunde pro IP (ca. 3,3 Anfragen pro Minute). Wildberries sperrt nach 100-200 Anfragen pro Minute von einer IP. Google Search erlaubt 10-20 Anfragen pro Minute. Ihr Rate Limiting-System sollte diese Einschränkungen berücksichtigen und einen Sicherheitsspielraum hinzufügen (normalerweise 20-30%).
| Plattform | Anfrage-Limit | Empfohlene Einstellung |
|---|---|---|
| ~200 Anfragen/Stunde | 2-3 Anfragen/Minute (mit Puffer) | |
| Wildberries | 100-200 Anfragen/Minute | 60-80 Anfragen/Minute |
| Google Search | 10-20 Anfragen/Minute | 8-12 Anfragen/Minute |
| Ozon | 50-100 Anfragen/Minute | 30-50 Anfragen/Minute |
| Facebook API | 200 Anfragen/Stunde | 2-3 Anfragen/Minute |
Überwachung der Leistung und automatisches Skalieren
Ein effektives Lastverteilungssystem erfordert eine ständige Überwachung der Schlüsselmetriken. Die erste Gruppe von Metriken sind die Leistungsmetriken der Proxys: durchschnittliche Antwortzeit (Response Time), Prozentsatz erfolgreicher Anfragen (Success Rate), Anzahl der Zeitüberschreitungen, Anzahl der Fehler 4xx und 5xx. Diese Daten helfen, problematische Proxys zu identifizieren und die Algorithmen zur Lastverteilung zu optimieren.
class ProxyMetrics:
def __init__(self):
self.metrics = {} # {proxy_url: metrics_dict}
def record_request(self, proxy_url, response_time, status_code, success):
"""Aufzeichnungsmetriken der Anfrage"""
if proxy_url not in self.metrics:
self.metrics[proxy_url] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_response_time": 0,
"timeouts": 0,
"errors_4xx": 0,
"errors_5xx": 0
}
m = self.metrics[proxy_url]
m["total_requests"] += 1
if success:
m["successful_requests"] += 1
m["total_response_time"] += response_time
else:
m["failed_requests"] += 1
if status_code == 0: # Zeitüberschreitung
m["timeouts"] += 1
elif 400 <= status_code < 500:
m["errors_4xx"] += 1
elif 500 <= status_code < 600:
m["errors_5xx"] += 1
def get_success_rate(self, proxy_url):
"""Gibt den Prozentsatz erfolgreicher Anfragen zurück"""
m = self.metrics.get(proxy_url, {})
total = m.get("total_requests", 0)
if total == 0:
return 0
return (m.get("successful_requests", 0) / total) * 100
def get_avg_response_time(self, proxy_url):
"""Gibt die durchschnittliche Antwortzeit zurück"""
m = self.metrics.get(proxy_url, {})
successful = m.get("successful_requests", 0)
if successful == 0:
return 0
return m.get("total_response_time", 0) / successful
def get_report(self, proxy_url):
"""Gibt einen vollständigen Bericht über den Proxy zurück"""
m = self.metrics.get(proxy_url, {})
return {
"proxy": proxy_url,
"total_requests": m.get("total_requests", 0),
"success_rate": self.get_success_rate(proxy_url),
"avg_response_time": self.get_avg_response_time(proxy_url),
"timeouts": m.get("timeouts", 0),
"errors_4xx": m.get("errors_4xx", 0),
"errors_5xx": m.get("errors_5xx", 0)
}
Die zweite Gruppe von Metriken sind die allgemeinen Leistungsmetriken des Systems: die Gesamtzahl der Anfragen pro Sekunde (RPS — Requests Per Second), die durchschnittliche Latenz, die Größe der Anfragewarteschlange, die Anzahl aktiver Proxys, der Prozentsatz der Nutzung des Proxy-Pools. Diese Metriken zeigen, ob das System mit der Last zurechtkommt oder ob eine Skalierung erforderlich ist.
Automatisches Skalieren ermöglicht es, Proxys dynamisch hinzuzufügen oder zu entfernen, je nach Last. Eine einfache Strategie: Wenn die durchschnittliche Auslastung des Pools 80% über einen Zeitraum von 5 Minuten überschreitet, fügt das System automatisch neue Proxys hinzu. Wenn die Auslastung in den letzten 15 Minuten unter 30% fällt, entfernt das System überflüssige Proxys zur Ressourcenschonung.
Beispiel für die Einrichtung der Überwachung: Um 100.000 Produkte von Wildberries pro Stunde zu scrapen (ca. 28 Anfragen pro Sekunde), benötigen Sie mindestens 30-40 Proxys bei einem Limit von 60 Anfragen pro Minute pro Proxy. Richten Sie Alarme ein: Wenn die Erfolgsquote unter 85% fällt oder die durchschnittliche Antwortzeit 3 Sekunden überschreitet, sollte das System automatisch 10-15 Backup-Proxys aus dem Pool hinzufügen.
Zur Visualisierung der Metriken verwenden Sie Grafana mit Prometheus oder ELK Stack. Erstellen Sie Dashboards mit Diagrammen: RPS über die Zeit, Verteilung der Antwortzeiten, Top 10 der schnellsten/langsamsten Proxys, Fehlerkarte nach Typen. Dies ermöglicht es, Probleme schnell zu identifizieren und das System zu optimieren.
Praktische Implementierung in Python und Node.js
Lassen Sie uns die vollständige Implementierung eines Lastverteilungssystems in Python mit beliebten Bibliotheken betrachten. Dieses Beispiel vereint alle beschriebenen Komponenten: Lastverteiler, Pool-Manager, Health Checks, Rate Limiting und Überwachung.
import requests
import time
import random
from datetime import datetime, timedelta
from threading import Lock, Thread
from collections import defaultdict
class ProxyLoadBalancer:
def __init__(self, proxies, algorithm="round_robin", rate_limit=10):
"""
proxies: Liste von Proxys [{"url": "...", "weight": 1}, ...]
algorithm: round_robin, least_connections, weighted, random
rate_limit: maximale Anzahl von Anfragen pro Sekunde pro Proxy
"""
self.proxies = proxies
self.algorithm = algorithm
self.rate_limit = rate_limit
# Zustand des Lastverteilers
self.current_index = 0
self.connections = defaultdict(int)
self.rate_limiters = {}
self.metrics = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0,
"response_times": [], "last_check": None
})
# Initialisierung der Rate Limiters
for proxy in proxies:
proxy_url = proxy["url"]
self.rate_limiters[proxy_url] = TokenBucketRateLimiter(
rate=rate_limit,
capacity=rate_limit * 5
)
self.lock = Lock()
# Start des Hintergrund-Health Checks
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def get_next_proxy(self):
"""Wählt den nächsten Proxy gemäß dem Algorithmus aus"""
with self.lock:
if self.algorithm == "round_robin":
return self._round_robin()
elif self.algorithm == "least_connections":
return self._least_connections()
elif self.algorithm == "weighted":
return self._weighted_random()
elif self.algorithm == "random":
return random.choice(self.proxies)["url"]
def _round_robin(self):
proxy = self.proxies[self.current_index]["url"]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def _least_connections(self):
min_conn = min(self.connections.values()) if self.connections else 0
candidates = [p["url"] for p in self.proxies if self.connections[p["url"]] == min_conn]
return random.choice(candidates)
def _weighted_random(self):
weights = [p.get("weight", 1) for p in self.proxies]
return random.choices(self.proxies, weights=weights)[0]["url"]
def make_request(self, url, method="GET", **kwargs):
"""Führt eine Anfrage über den Lastverteiler aus"""
proxy_url = self.get_next_proxy()
# Wartet auf die Verfügbarkeit des Rate Limits
self.rate_limiters[proxy_url].wait_and_acquire()
# Erhöht den Zähler für Verbindungen
with self.lock:
self.connections[proxy_url] += 1
try:
start_time = time.time()
response = requests.request(
method,
url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=kwargs.get("timeout", 30),
**kwargs
)
response_time = time.time() - start_time
# Aufzeichnung der Metriken
self._record_metrics(proxy_url, response_time, True, response.status_code)
return response
except Exception as e:
self._record_metrics(proxy_url, 0, False, 0)
raise
finally:
# Verringert den Zähler für Verbindungen
with self.lock:
self.connections[proxy_url] -= 1
def _record_metrics(self, proxy_url, response_time, success, status_code):
"""Aufzeichnungsmetriken der Anfrage"""
with self.lock:
m = self.metrics[proxy_url]
m["total"] += 1
if success:
m["success"] += 1
m["response_times"].append(response_time)
# Speichert nur die letzten 1000 Werte
if len(m["response_times"]) > 1000:
m["response_times"].pop(0)
else:
m["failed"] += 1
def _health_check_loop(self):
"""Hintergrundüberprüfung der Funktionsfähigkeit der Proxys"""
while True:
for proxy in self.proxies:
proxy_url = proxy["url"]
try:
response = requests.get(
"http://httpbin.org/ip",
proxies={"http": proxy_url},
timeout=10
)
with self.lock:
self.metrics[proxy_url]["last_check"] = datetime.now()
proxy["status"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
with self.lock:
proxy["status"] = "unhealthy"
time.sleep(60) # Überprüfung jede Minute
def get_stats(self):
"""Gibt Statistiken zu allen Proxys zurück"""
stats = []
with self.lock:
for proxy in self.proxies:
proxy_url = proxy["url"]
m = self.metrics[proxy_url]
avg_response_time = (
sum(m["response_times"]) / len(m["response_times"])
if m["response_times"] else 0
)
success_rate = (
(m["success"] / m["total"] * 100)
if m["total"] > 0 else 0
)
stats.append({
"proxy": proxy_url,
"status": proxy.get("status", "unknown"),
"total_requests": m["total"],
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3),
"active_connections": self.connections[proxy_url]
})
return stats
Verwendung dieses Lastverteilers zum Scraping von Marktplätzen:
# Einrichtung des Lastverteilers
proxies = [
{"url": "http://proxy1.example.com:8080", "weight": 5},
{"url": "http://proxy2.example.com:8080", "weight": 5},
{"url": "http://proxy3.example.com:8080", "weight": 3},
{"url": "http://proxy4.example.com:8080", "weight": 2}
]
balancer = ProxyLoadBalancer(
proxies=proxies,
algorithm="weighted",
rate_limit=60 # 60 Anfragen pro Minute pro Proxy
)
# Scraping der Produktliste
product_urls = [f"https://www.wildberries.ru/catalog/{i}/detail.aspx" for i in range(1000)]
results = []
for url in product_urls:
try:
response = balancer.make_request(url)
# Verarbeitung der Antwort
results.append({"url": url, "status": "success", "data": response.text})
except Exception as e:
results.append({"url": url, "status": "error", "error": str(e)})
# Alle 100 Anfragen Statistiken ausgeben
if len(results) % 100 == 0:
stats = balancer.get_stats()
for stat in stats:
print(f"{stat['proxy']}: {stat['success_rate']}% Erfolg, "
f"{stat['avg_response_time']}s durchschnittliche Antwort")
# Endstatistik
print("\n=== Endstatistik ===")
for stat in balancer.get_stats():
print(f"{stat['proxy']}:")
print(f" Gesamtanfragen: {stat['total_requests']}")
print(f" Erfolgsquote: {stat['success_rate']}%")
print(f" Durchschnittliche Antwortzeit: {stat['avg_response_time']}s")
print(f" Status: {stat['status']}")
Für Node.js kann eine ähnliche Architektur mit den Bibliotheken axios für HTTP-Anfragen und node-rate-limiter zur Kontrolle der Frequenz verwendet werden:
const axios = require('axios');
const { RateLimiter } = require('limiter');
class ProxyLoadBalancer {
constructor(proxies, algorithm = 'round_robin', rateLimit = 10) {
this.proxies = proxies;
this.algorithm = algorithm;
this.currentIndex = 0;
this.connections = new Map();
this.limiters = new Map();
this.metrics = new Map();
// Initialisierung der Rate Limiters
proxies.forEach(proxy => {
this.limiters.set(proxy.url, new RateLimiter({
tokensPerInterval: rateLimit,
interval: 'second'
}));
this.connections.set(proxy.url, 0);
this.metrics.set(proxy.url, {
total: 0,
success: 0,
failed: 0,
responseTimes: []
});
});
}
getNextProxy() {
if (this.algorithm === 'round_robin') {
const proxy = this.proxies[this.currentIndex].url;
this.currentIndex = (this.currentIndex + 1) % this.proxies.length;
return proxy;
} else if (this.algorithm === 'least_connections') {
let minConn = Infinity;
let selectedProxy = null;
this.connections.forEach((count, proxy) => {
if (count < minConn) {
minConn = count;
selectedProxy = proxy;
}
});
return selectedProxy;
}
}
async makeRequest(url, options = {}) {
const proxyUrl = this.getNextProxy();
const limiter = this.limiters.get(proxyUrl);
// Wartet auf die Verfügbarkeit des Rate Limits
await limiter.removeTokens(1);
// Erhöht den Zähler für Verbindungen
this.connections.set(proxyUrl, this.connections.get(proxyUrl) + 1);
try {
const startTime = Date.now();
const response = await axios({
url,
proxy: this.parseProxyUrl(proxyUrl),
timeout: options.timeout || 30000,
...options
});
const responseTime = (Date.now() - startTime) / 1000;
this.recordMetrics(proxyUrl, responseTime, true);
return response;
} catch (error) {
this.recordMetrics(proxyUrl, 0, false);
throw error;
} finally {
this.connections.set(proxyUrl, this.connections.get(proxyUrl) - 1);
}
}
parseProxyUrl(proxyUrl) {
const url = new URL(proxyUrl);
return {
host: url.hostname,
port: parseInt(url.port)
};
}
recordMetrics(proxyUrl, responseTime, success) {
const m = this.metrics.get(proxyUrl);
m.total++;
if (success) {
m.success++;
m.responseTimes.push(responseTime);
if (m.responseTimes.length > 1000) {
m.responseTimes.shift();
}
} else {
m.failed++;
}
}
getStats() {
const stats = [];
this.proxies.forEach(proxy => {
const m = this.metrics.get(proxy.url);
const avgResponseTime = m.responseTimes.length > 0
? m.responseTimes.reduce((a, b) => a + b, 0) / m.responseTimes.length
: 0;
const successRate = m.total > 0 ? (m.success / m.total * 100) : 0;
stats.push({
proxy: proxy.url,
totalRequests: m.total,
successRate: successRate.toFixed(2),
avgResponseTime: avgResponseTime.toFixed(3),
activeConnections: this.connections.get(proxy.url)
});
});
return stats;
}
}
// Verwendung
const proxies = [
{ url: 'http://proxy1.example.com:8080', weight: 5 },
{ url: 'http://proxy2.example.com:8080', weight: 5 }
];
const balancer = new ProxyLoadBalancer(proxies, 'round_robin', 60);
async function parseProducts() {
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://www.wildberries.ru/catalog/${i}/detail.aspx`
);
for (const url of urls) {
try {
const response = await balancer.makeRequest(url);
console.log(`Erfolg: ${url}`);
} catch (error) {
console.error(`Fehler: ${url} - ${error.message}`);
}
}
console.log('\n=== Statistiken ===');
console.log(balancer.getStats());
}
parseProducts();
Optimierung für spezifische Aufgaben: Scraping, API, Automatisierung
Verschiedene Aufgaben erfordern unterschiedliche Strategien zur Lastverteilung. Für das Scraping von Marktplätzen (Wildberries, Ozon, Avito) ist eine Strategie mit Rotation basierend auf der Anzahl der Anfragen und geografischer Verteilung optimal. Verwenden Sie residential Proxys aus verschiedenen Städten Russlands, wechseln Sie die Proxys alle 50-100 Anfragen und fügen Sie zufällige Verzögerungen zwischen den Anfragen (1-3 Sekunden) hinzu, um das Verhalten eines Menschen zu simulieren.
Für die Arbeit mit APIs sozialer Netzwerke (Instagram, Facebook, VK) ist die Stabilität der IP-Adresse innerhalb einer Sitzung entscheidend. Verwenden Sie den IP Hash-Algorithmus oder sticky sessions: Alle Anfragen eines Kontos sollten über denselben Proxy gehen. Dies verhindert verdächtige Standortwechsel, die zur Sperrung des Kontos führen können. Empfohlen werden mobile Proxys, da soziale Netzwerke mobilen IPs mehr vertrauen als residential oder Rechenzentrums-Proxys.
# Beispiel für sticky sessions für Instagram
class StickySessionBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.session_map = {} # {account_id: proxy_url}
self.proxy_usage = defaultdict(int)
def get_proxy_for_account(self, account_id):
"""Gibt einen festen Proxy für das Konto zurück"""
if account_id in self.session_map:
return self.session_map[account_id]
# Wählt den am wenigsten ausgelasteten Proxy aus
proxy = min(self.proxies, key=lambda p: self.proxy_usage[p])
self.session_map[account_id] = proxy
self.proxy_usage[proxy] += 1
return proxy
def release_account(self, account_id):
"""Gibt den Proxy bei Abschluss der Arbeit mit dem Konto frei"""
if account_id in self.session_map:
proxy = self.session_map[account_id]
self.proxy_usage[proxy] -= 1
del self.session_map[account_id]