マーケットプレイスの数千ページをスクレイピングしたり、大量のAPIリクエストを送信したり、数百のアカウントで作業を自動化したりする場合、プロキシ経由の適切な負荷分散が極めて重要になります。適切な負荷分散がなければ、ブロック、タイムアウト、パフォーマンスの低下に直面することになります。このガイドでは、負荷分散アーキテクチャ、負荷分散アルゴリズム、高負荷システムのための実践的な戦略について解説します。
本資料は、データスクレイピング、APIリクエストの自動化、またはビジネスタスクのための大規模プロキシプールを管理する開発者や技術専門家を対象としています。
プロキシ経由の負荷分散が必要な理由
プロキシ経由の負荷分散は、高負荷システムのいくつかの重要な問題を解決します。第一かつ最も重要なのは、ブロックからの保護です。1つのリソース(マーケットプレイス、ソーシャルネットワークAPI、検索エンジン)に数千のリクエストを送信すると、ターゲットサーバーは1つのIPアドレスからの異常に高いアクティビティを検出してブロックします。数十または数百のプロキシ間でリクエストを分散することで、アクティビティが通常のユーザーの行動に似たものになります。
第二の問題はパフォーマンスです。1つのプロキシサーバーには限られた帯域幅(通常100-1000 Mbps)があり、処理できる同時接続数も限られています。1分間に10000ページをスクレイピングしたり、大量のAPIリクエストを送信したりする場合、1つのプロキシがシステムのボトルネックになります。負荷分散により、プールに新しいプロキシを追加することで帯域幅を水平方向にスケーリングできます。
第三のタスクは信頼性です。プロキシの1つが故障した場合(技術的障害、ブロック、リース期限切れ)、システムは自動的にトラフィックを稼働中のプロキシにリダイレクトします。負荷分散とヘルスチェックのメカニズムがなければ、1つのプロキシの障害がシステム全体を停止させる可能性があります。
実例: 競合他社の価格監視のためにWildberriesをスクレイピングする場合、1時間ごとに50000商品を処理する必要があります。これは約14リクエスト/秒です。1つのプロキシでこの負荷に耐えられますが、Wildberriesは1つのアドレスから100-200リクエスト後にIPをブロックします。20のレジデンシャルプロキシ間でリクエストを分散することで、各IPの負荷を0.7リクエスト/秒に減らします—これは通常のユーザーのアクティビティのように見えます。
第四の理由は地理的分散です。多くのサービスは、ユーザーの地域に応じて異なるコンテンツや価格を表示します。異なる国や都市のプロキシ間で負荷分散することで、すべてのターゲット地域から同時にデータを収集できます。たとえば、Ozonの価格監視には、モスクワ、サンクトペテルブルク、エカテリンブルク、その他の主要都市のプロキシが必要です。
負荷分散システムのアーキテクチャ
プロキシ経由の負荷分散システムの古典的なアーキテクチャは、いくつかのコンポーネントで構成されています。最上位レベルには、ロードバランサー(load balancer)があります—受信タスク(スクレイピングリクエスト、API呼び出し)を受け取り、利用可能なプロキシ間で分散するソフトウェアモジュールです。ロードバランサーは、独立したサービスとして動作するか、アプリケーションに組み込むことができます。
第二のコンポーネントは、プロキシプールマネージャー(proxy pool manager)です。これは、すべての利用可能なプロキシのリストとその特性を保存します:IPアドレス、ポート、プロトコル(HTTP/SOCKS5)、地理的位置、タイプ(レジデンシャル、モバイル、データセンター)、現在の状態(アクティブ、ブロック済み、チェック中)。プールマネージャーは、新しいプロキシの追加、動作していないプロキシの削除、定期的な可用性チェックを担当します。
第三の要素は、モニタリングとヘルスチェックシステムです。これは各プロキシの動作状態を常にチェックします:テストリクエストを送信し、応答時間を測定し、接続の成功を確認します。プロキシが応答しないかエラーを返す場合、システムはそれを利用不可としてマークし、一時的にローテーションから除外します。
| コンポーネント | 機能 | 技術 |
|---|---|---|
| Load Balancer | プロキシ間のリクエスト分散 | HAProxy、Nginx、Python/Node.jsライブラリ |
| Proxy Pool Manager | プロキシリストの管理 | Redis、PostgreSQL、インメモリストレージ |
| Health Check System | プロキシの可用性チェック | スケジュールタスク、Celery、cron |
| Rate Limiter | リクエスト頻度の制御 | トークンバケット、リーキーバケットアルゴリズム |
| Monitoring & Metrics | パフォーマンスメトリクスの収集 | Prometheus、Grafana、ELK Stack |
第四のコンポーネントは、レートリミッター(rate limiter、リクエスト頻度制限)です。これは、各プロキシがターゲットリソースへの許容リクエスト頻度を超えないように監視します。たとえば、Instagramをスクレイピングしていて、プラットフォームが1分間に60リクエスト後にIPをブロックすることを知っている場合、レートリミッターは1つのプロキシを通じて1分間に50リクエスト以上を送信することを許可せず、安全マージンを残します。
第五の要素は、メトリクスと分析システムです。これは各プロキシのパフォーマンスデータを収集します:成功したリクエストの数、エラーの数、平均応答時間、ブロックの割合。これらのデータは、負荷分散アルゴリズムの最適化と問題のあるプロキシの特定に使用されます。
負荷分散アルゴリズム:ラウンドロビン、最小接続数、重み付け
ラウンドロビンアルゴリズムは、最もシンプルで一般的な負荷分散方法です。リストからプロキシを順番に選択します:最初のリクエストはプロキシ#1を経由し、2番目はプロキシ#2を経由し、3番目はプロキシ#3を経由し、というように続きます。リストが終わると、ロードバランサーは最初に戻ります。このアルゴリズムは、すべてのプロキシが同じパフォーマンスを持つ場合、負荷を均等に分散します。
class RoundRobinBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# 使用例
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
balancer = RoundRobinBalancer(proxies)
for i in range(10):
proxy = balancer.get_next_proxy()
print(f"Request {i+1} → {proxy}")
最小接続数アルゴリズム(Least Connections)は、現時点でアクティブな接続数が最も少ないプロキシを選択します。これは、リクエストの実行時間が異なる場合に便利です。たとえば、スクレイピング時に1つのリクエストは100ミリ秒で処理されますが、別のリクエストは5秒かかる場合があります(ページの読み込みが遅い場合)。最小接続数アルゴリズムは、新しいリクエストを自動的に負荷の少ないプロキシに振り向けます。
class LeastConnectionsBalancer:
def __init__(self, proxies):
self.proxies = {proxy: 0 for proxy in proxies}
def get_next_proxy(self):
# 接続数が最小のプロキシを選択
proxy = min(self.proxies.items(), key=lambda x: x[1])[0]
self.proxies[proxy] += 1
return proxy
def release_proxy(self, proxy):
# リクエスト完了時にカウンターを減らす
self.proxies[proxy] -= 1
# コンテキストマネージャーでの使用
balancer = LeastConnectionsBalancer(proxies)
def make_request(url):
proxy = balancer.get_next_proxy()
try:
# 選択したプロキシ経由でリクエストを実行
response = requests.get(url, proxies={"http": proxy})
return response
finally:
balancer.release_proxy(proxy)
重み付けラウンドロビン(Weighted Round Robin)は、古典的なラウンドロビンを拡張し、各プロキシにパフォーマンスや品質に応じた重みを割り当てます。重みの大きいプロキシはより多くのリクエストを受け取ります。これは、異なる品質のプロキシがある場合に便利です:たとえば、高速なプレミアムレジデンシャルプロキシと制限のある安価なデータセンタープロキシなどです。
class WeightedRoundRobinBalancer:
def __init__(self, weighted_proxies):
# weighted_proxies = [(proxy, weight), ...]
self.proxies = []
for proxy, weight in weighted_proxies:
# プロキシをweight回リストに追加
self.proxies.extend([proxy] * weight)
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# 重み付きでの使用
weighted_proxies = [
("http://premium-proxy1.com:8080", 5), # 高品質
("http://premium-proxy2.com:8080", 5),
("http://cheap-proxy1.com:8080", 2), # 低品質
("http://cheap-proxy2.com:8080", 1) # 非常に低品質
]
balancer = WeightedRoundRobinBalancer(weighted_proxies)
ランダムアルゴリズム(Random)は、利用可能なリストからランダムにプロキシを選択します。ラウンドロビンよりも実装が簡単で、大量のプロキシ(100以上)がある場合にうまく機能します。ランダム分散は、十分なリクエスト量があれば自動的に均等化されます。欠点は、短期間の不均等な負荷が発生する可能性があることです。
IPハッシュ—ターゲットURLまたは他のパラメータのハッシュに基づいてプロキシを選択するアルゴリズムです。これにより、同じリソースへのリクエストが常に同じプロキシを経由することが保証されます。セッションを使用するサイトや、同じIPからの一連のリクエストを必要とするサイト(たとえば、認証+データ取得)に便利です。
プロキシプールの管理:ローテーションとヘルスチェック
効果的なプロキシプール管理には、各プロキシの状態の継続的な監視と自動ローテーションが必要です。ヘルスチェックは、テストリクエストを送信することによるプロキシの可用性の定期的なチェックです。通常、信頼できるサービス(たとえば、httpbin.orgまたは独自のエンドポイント)への単純なGETリクエストを使用し、IPアドレスに関する情報を返します。
import requests
import time
from datetime import datetime
class ProxyHealthChecker:
def __init__(self, test_url="http://httpbin.org/ip", timeout=10):
self.test_url = test_url
self.timeout = timeout
def check_proxy(self, proxy_url):
"""プロキシの動作状態をチェック"""
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
return {
"status": "healthy",
"response_time": response_time,
"timestamp": datetime.now(),
"ip": response.json().get("origin")
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"timestamp": datetime.now()
}
except requests.exceptions.Timeout:
return {
"status": "unhealthy",
"error": "timeout",
"timestamp": datetime.now()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now()
}
# 使用例
checker = ProxyHealthChecker()
proxies = ["http://proxy1.com:8080", "http://proxy2.com:8080"]
for proxy in proxies:
result = checker.check_proxy(proxy)
print(f"{proxy}: {result['status']} ({result.get('response_time', 'N/A')}s)")
プール管理システムは、動作していないプロキシを自動的に利用不可としてマークし、定期的にチェックを繰り返す必要があります。一般的な戦略は、サーキットブレーカーパターンです:3回連続でチェックに失敗した後、プロキシは5-10分間プールから除外され、その後再度チェックされます。チェックが成功すれば、プロキシはアクティブプールに戻されます。
class ProxyPoolManager:
def __init__(self, health_checker, max_failures=3, cooldown_seconds=300):
self.health_checker = health_checker
self.max_failures = max_failures
self.cooldown_seconds = cooldown_seconds
self.proxies = {} # {proxy_url: ProxyInfo}
def add_proxy(self, proxy_url, metadata=None):
"""プロキシをプールに追加"""
self.proxies[proxy_url] = {
"url": proxy_url,
"status": "active",
"failures": 0,
"last_check": None,
"cooldown_until": None,
"metadata": metadata or {}
}
def get_active_proxies(self):
"""アクティブなプロキシのリストを返す"""
now = datetime.now()
active = []
for proxy_url, info in self.proxies.items():
# プロキシがクールダウン中でないかチェック
if info["cooldown_until"] and now < info["cooldown_until"]:
continue
if info["status"] == "active":
active.append(proxy_url)
return active
def mark_failure(self, proxy_url):
"""プロキシの使用失敗をマーク"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] += 1
if info["failures"] >= self.max_failures:
# クールダウンに移行
info["status"] = "cooldown"
info["cooldown_until"] = datetime.now() + timedelta(seconds=self.cooldown_seconds)
print(f"Proxy {proxy_url} moved to cooldown until {info['cooldown_until']}")
def mark_success(self, proxy_url):
"""プロキシの使用成功をマーク"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] = 0
info["status"] = "active"
info["cooldown_until"] = None
プロキシローテーションは、特定の間隔または特定のリクエスト数の後にプロキシを自動的に変更する戦略です。いくつかのアプローチがあります:時間によるローテーション(5-10分ごとに変更)、リクエスト数によるローテーション(100-500リクエスト後に変更)、セッションによるローテーション(1つのスクレイピングセッションに1つのプロキシ)。戦略の選択は、ターゲットサイトの要件によって異なります。
アドバイス: マーケットプレイス(Wildberries、Ozon)のスクレイピングには、リクエスト数によるローテーションが最適です:1つのプロキシで50-100リクエスト、その後変更。ソーシャルネットワークAPI(Instagram、Facebook)での作業には、セッションによるローテーションが適しています:認証→アクション→ログアウトの全サイクルに1つのプロキシ。
レート制限とリクエスト頻度の制御
レート制限は、負荷分散システムの極めて重要なコンポーネントです。これは、ターゲットリソースへの許容リクエスト頻度の超過を防ぎ、プロキシのブロックにつながる可能性を防ぎます。主に2つのアルゴリズムがあります:トークンバケット(Token Bucket)とリーキーバケット(Leaky Bucket)です。
トークンバケットは次のように動作します:各プロキシには仮想的な「バケット」にトークンがあります。各トークンは1つのリクエストの権利を与えます。バケットは指定された速度(たとえば、1秒あたり10トークン)で徐々にトークンで満たされます。バケットの最大容量は制限されています(たとえば、50トークン)。リクエストが来ると、システムはトークンの有無をチェックします:ある場合は1つのトークンを取ってリクエストを実行し、ない場合は新しいトークンの出現を待ちます。
import time
from threading import Lock
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: 1秒あたりのトークン数
capacity: バケット内の最大トークン数
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""経過時間に基づいてトークンを追加"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def acquire(self, tokens=1):
"""指定された数のトークンの取得を試みる"""
with self.lock:
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens=1):
"""トークンの出現を待って取得"""
while not self.acquire(tokens):
# 待機時間を計算
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
# 各プロキシでの使用
proxy_limiters = {
"http://proxy1.com:8080": TokenBucketRateLimiter(rate=10, capacity=50),
"http://proxy2.com:8080": TokenBucketRateLimiter(rate=10, capacity=50)
}
def make_request_with_limit(url, proxy_url):
limiter = proxy_limiters[proxy_url]
limiter.wait_and_acquire() # トークンの利用可能性を待つ
response = requests.get(url, proxies={"http": proxy_url})
return response
リーキーバケット(Leaky Bucket)は異なる動作をします:リクエストはキュー(バケット)に配置され、一定の速度で「漏れ出します」。バケットがオーバーフローすると、新しいリクエストは拒否されるか、待機状態になります。このアルゴリズムは、時間的により均等な負荷分散を保証しますが、アクティビティの急増時に遅延が発生する可能性があります。
異なるターゲットプラットフォームには異なる制限が必要です。Instagram APIは1つのIPあたり1時間に約200リクエスト(約3.3リクエスト/分)を許可します。Wildberriesは1つのIPから1分間に100-200リクエスト後にブロックします。Google Searchは1分間に10-20リクエストを許可します。レート制限システムは、これらの制限を考慮し、安全マージン(通常20-30%)を追加する必要があります。
| プラットフォーム | リクエスト制限 | 推奨設定 |
|---|---|---|
| ~200リクエスト/時間 | 2-3リクエスト/分(マージン付き) | |
| Wildberries | 100-200リクエスト/分 | 60-80リクエスト/分 |
| Google Search | 10-20リクエスト/分 | 8-12リクエスト/分 |
| Ozon | 50-100リクエスト/分 | 30-50リクエスト/分 |
| Facebook API | 200リクエスト/時間 | 2-3リクエスト/分 |
パフォーマンスモニタリングと自動スケーリング
効果的な負荷分散システムには、主要メトリクスの継続的な監視が必要です。最初のメトリクスグループはプロキシのパフォーマンスです:平均応答時間(response time)、成功したリクエストの割合(success rate)、タイムアウトの数、4xxおよび5xxエラーの数。これらのデータにより、問題のあるプロキシを特定し、負荷分散アルゴリズムを最適化できます。
class ProxyMetrics:
def __init__(self):
self.metrics = {} # {proxy_url: metrics_dict}
def record_request(self, proxy_url, response_time, status_code, success):
"""リクエストのメトリクスを記録"""
if proxy_url not in self.metrics:
self.metrics[proxy_url] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_response_time": 0,
"timeouts": 0,
"errors_4xx": 0,
"errors_5xx": 0
}
m = self.metrics[proxy_url]
m["total_requests"] += 1
if success:
m["successful_requests"] += 1
m["total_response_time"] += response_time
else:
m["failed_requests"] += 1
if status_code == 0: # timeout
m["timeouts"] += 1
elif 400 <= status_code < 500:
m["errors_4xx"] += 1
elif 500 <= status_code < 600:
m["errors_5xx"] += 1
def get_success_rate(self, proxy_url):
"""成功したリクエストの割合を返す"""
m = self.metrics.get(proxy_url, {})
total = m.get("total_requests", 0)
if total == 0:
return 0
return (m.get("successful_requests", 0) / total) * 100
def get_avg_response_time(self, proxy_url):
"""平均応答時間を返す"""
m = self.metrics.get(proxy_url, {})
successful = m.get("successful_requests", 0)
if successful == 0:
return 0
return m.get("total_response_time", 0) / successful
def get_report(self, proxy_url):
"""プロキシの完全なレポートを返す"""
m = self.metrics.get(proxy_url, {})
return {
"proxy": proxy_url,
"total_requests": m.get("total_requests", 0),
"success_rate": self.get_success_rate(proxy_url),
"avg_response_time": self.get_avg_response_time(proxy_url),
"timeouts": m.get("timeouts", 0),
"errors_4xx": m.get("errors_4xx", 0),
"errors_5xx": m.get("errors_5xx", 0)
}
第二のメトリクスグループは、システム全体のパフォーマンスです:1秒あたりの総リクエスト数(RPS — requests per second)、平均レイテンシ(latency)、リクエストキューのサイズ、アクティブなプロキシの数、プロキシプールの使用率。これらのメトリクスは、システムが負荷に対処できているか、スケーリングが必要かを示します。
自動スケーリング(auto-scaling)により、負荷に応じてプロキシを動的に追加または削除できます。シンプルな戦略:プールの平均負荷が5分間80%を超える場合、システムは自動的に新しいプロキシを追加します。負荷が15分間30%を下回る場合、システムはリソースを節約するために余剰プロキシを削除します。
モニタリング設定の例: 1時間に100000商品のWildberriesスクレイピング(約28リクエスト/秒)には、1分あたり60リクエストの制限で最低30-40のプロキシが必要です。アラートを設定:成功率が85%を下回るか、平均応答時間が3秒を超える場合、システムはプールから10-15の予備プロキシを自動的に追加する必要があります。
メトリクスの可視化には、PrometheusとGrafanaまたはELK Stackを使用します。ダッシュボードにグラフを作成:時間別のRPS、応答時間の分布、最速/最遅のプロキシトップ10、タイプ別のエラーマップ。これにより、問題を迅速に特定し、システムを最適化できます。
PythonとNode.jsでの実装例
人気のライブラリを使用したPythonでの負荷分散システムの完全な実装を見てみましょう。この例は、説明したすべてのコンポーネントを統合します:ロードバランサー、プールマネージャー、ヘルスチェック、レート制限、モニタリング。
import requests
import time
import random
from datetime import datetime, timedelta
from threading import Lock, Thread
from collections import defaultdict
class ProxyLoadBalancer:
def __init__(self, proxies, algorithm="round_robin", rate_limit=10):
"""
proxies: プロキシのリスト [{"url": "...", "weight": 1}, ...]
algorithm: round_robin, least_connections, weighted, random
rate_limit: プロキシあたりの1秒あたりの最大リクエスト数
"""
self.proxies = proxies
self.algorithm = algorithm
self.rate_limit = rate_limit
# ロードバランサーの状態
self.current_index = 0
self.connections = defaultdict(int)
self.rate_limiters = {}
self.metrics = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0,
"response_times": [], "last_check": None
})
# レートリミッターの初期化
for proxy in proxies:
proxy_url = proxy["url"]
self.rate_limiters[proxy_url] = TokenBucketRateLimiter(
rate=rate_limit,
capacity=rate_limit * 5
)
self.lock = Lock()
# バックグラウンドヘルスチェックの開始
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def get_next_proxy(self):
"""アルゴリズムに従って次のプロキシを選択"""
with self.lock:
if self.algorithm == "round_robin":
return self._round_robin()
elif self.algorithm == "least_connections":
return self._least_connections()
elif self.algorithm == "weighted":
return self._weighted_random()
elif self.algorithm == "random":
return random.choice(self.proxies)["url"]
def _round_robin(self):
proxy = self.proxies[self.current_index]["url"]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def _least_connections(self):
min_conn = min(self.connections.values()) if self.connections else 0
candidates = [p["url"] for p in self.proxies if self.connections[p["url"]] == min_conn]
return random.choice(candidates)
def _weighted_random(self):
weights = [p.get("weight", 1) for p in self.proxies]
return random.choices(self.proxies, weights=weights)[0]["url"]
def make_request(self, url, method="GET", **kwargs):
"""ロードバランサー経由でリクエストを実行"""
proxy_url = self.get_next_proxy()
# レート制限の利用可能性を待つ
self.rate_limiters[proxy_url].wait_and_acquire()
# 接続カウンターを増やす
with self.lock:
self.connections[proxy_url] += 1
try:
start_time = time.time()
response = requests.request(
method,
url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=kwargs.get("timeout", 30),
**kwargs
)
response_time = time.time() - start_time
# メトリクスを記録
self._record_metrics(proxy_url, response_time, True, response.status_code)
return response
except Exception as e:
self._record_metrics(proxy_url, 0, False, 0)
raise
finally:
# 接続カウンターを減らす
with self.lock:
self.connections[proxy_url] -= 1
def _record_metrics(self, proxy_url, response_time, success, status_code):
"""リクエストのメトリクスを記録"""
with self.lock:
m = self.metrics[proxy_url]
m["total"] += 1
if success:
m["success"] += 1
m["response_times"].append(response_time)
# 最新の1000個の値のみを保持
if len(m["response_times"]) > 1000:
m["response_times"].pop(0)
else:
m["failed"] += 1
def _health_check_loop(self):
"""プロキシの動作状態のバックグラウンドチェック"""
while True:
for proxy in self.proxies:
proxy_url = proxy["url"]
try:
response = requests.get(
"http://httpbin.org/ip",
proxies={"http": proxy_url},
timeout=10
)
with self.lock:
self.metrics[proxy_url]["last_check"] = datetime.now()
proxy["status"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
with self.lock:
proxy["status"] = "unhealthy"
time.sleep(60) # 1分ごとにチェック
def get_stats(self):
"""すべてのプロキシの統計を返す"""
stats = []
with self.lock:
for proxy in self.proxies:
proxy_url = proxy["url"]
m = self.metrics[proxy_url]
avg_response_time = (
sum(m["response_times"]) / len(m["response_times"])
if m["response_times"] else 0
)
success_rate = (
(m["success"] / m["total"] * 100)
if m["total"] > 0 else 0
)
stats.append({
"proxy": proxy_url,
"status": proxy.get("status", "unknown"),
"total_requests": m["total"],
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3),
"active_connections": self.connections[proxy_url]
})
return stats
マーケットプレイススクレイピングでのこのロードバランサーの使用:
# ロードバランサーの設定
proxies = [
{"url": "http://proxy1.example.com:8080", "weight": 5},
{"url": "http://proxy2.example.com:8080", "weight": 5},
{"url": "http://proxy3.example.com:8080", "weight": 3},
{"url": "http://proxy4.example.com:8080", "weight": 2}
]
balancer = ProxyLoadBalancer(
proxies=proxies,
algorithm="weighted",
rate_limit=60 # プロキシあたり60リクエスト/分
)
# 商品リストのスクレイピング
product_urls = [f"https://www.wildberries.ru/catalog/{i}/detail.aspx" for i in range(1000)]
results = []
for url in product_urls:
try:
response = balancer.make_request(url)
# レスポンスの処理
results.append({"url": url, "status": "success", "data": response.text})
except Exception as e:
results.append({"url": url, "status": "error", "error": str(e)})
# 100リクエストごとに統計を出力
if len(results) % 100 == 0:
stats = balancer.get_stats()
for stat in stats:
print(f"{stat['proxy']}: {stat['success_rate']}% success, "
f"{stat['avg_response_time']}s avg response")
# 最終統計
print("\n=== Final Statistics ===")
for stat in balancer.get_stats():
print(f"{stat['proxy']}:")
print(f" Total requests: {stat['total_requests']}")
print(f" Success rate: {stat['success_rate']}%")
print(f" Avg response time: {stat['avg_response_time']}s")
print(f" Status: {stat['status']}")
Node.jsでは、HTTPリクエスト用のaxiosライブラリと頻度制御用のnode-rate-limiterを使用して、同様のアーキテクチャを使用できます:
const axios = require('axios');
const { RateLimiter } = require('limiter');
class ProxyLoadBalancer {
constructor(proxies, algorithm = 'round_robin', rateLimit = 10) {
this.proxies = proxies;
this.algorithm = algorithm;
this.currentIndex = 0;
this.connections = new Map();
this.limiters = new Map();
this.metrics = new Map();
// レートリミッターの初期化
proxies.forEach(proxy => {
this.limiters.set(proxy.url, new RateLimiter({
tokensPerInterval: rateLimit,
interval: 'second'
}));
this.connections.set(proxy.url, 0);
this.metrics.set(proxy.url, {
total: 0,
success: 0,
failed: 0,
responseTimes: []
});
});
}
getNextProxy() {
if (this.algorithm === 'round_robin') {
const proxy = this.proxies[this.currentIndex].url;
this.currentIndex = (this.currentIndex + 1) % this.proxies.length;
return proxy;
} else if (this.algorithm === 'least_connections') {
let minConn = Infinity;
let selectedProxy = null;
this.connections.forEach((count, proxy) => {
if (count < minConn) {
minConn = count;
selectedProxy = proxy;
}
});
return selectedProxy;
}
}
async makeRequest(url, options = {}) {
const proxyUrl = this.getNextProxy();
const limiter = this.limiters.get(proxyUrl);
// レート制限の利用可能性を待つ
await limiter.removeTokens(1);
// 接続カウンターを増やす
this.connections.set(proxyUrl, this.connections.get(proxyUrl) + 1);
try {
const startTime = Date.now();
const response = await axios({
url,
proxy: this.parseProxyUrl(proxyUrl),
timeout: options.timeout || 30000,
...options
});
const responseTime = (Date.now() - startTime) / 1000;
this.recordMetrics(proxyUrl, responseTime, true);
return response;
} catch (error) {
this.recordMetrics(proxyUrl, 0, false);
throw error;
} finally {
this.connections.set(proxyUrl, this.connections.get(proxyUrl) - 1);
}
}
parseProxyUrl(proxyUrl) {
const url = new URL(proxyUrl);
return {
host: url.hostname,
port: parseInt(url.port)
};
}
recordMetrics(proxyUrl, responseTime, success) {
const m = this.metrics.get(proxyUrl);
m.total++;
if (success) {
m.success++;
m.responseTimes.push(responseTime);
if (m.responseTimes.length > 1000) {
m.responseTimes.shift();
}
} else {
m.failed++;
}
}
getStats() {
const stats = [];
this.proxies.forEach(proxy => {
const m = this.metrics.get(proxy.url);
const avgResponseTime = m.responseTimes.length > 0
? m.responseTimes.reduce((a, b) => a + b, 0) / m.responseTimes.length
: 0;
const successRate = m.total > 0 ? (m.success / m.total * 100) : 0;
stats.push({
proxy: proxy.url,
totalRequests: m.total,
successRate: successRate.toFixed(2),
avgResponseTime: avgResponseTime.toFixed(3),
activeConnections: this.connections.get(proxy.url)
});
});
return stats;
}
}
// 使用例
const proxies = [
{ url: 'http://proxy1.example.com:8080', weight: 5 },
{ url: 'http://proxy2.example.com:8080', weight: 5 }
];
const balancer = new ProxyLoadBalancer(proxies, 'round_robin', 60);
async function parseProducts() {
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://www.wildberries.ru/catalog/${i}/detail.aspx`
);
for (const url of urls) {
try {
const response = await balancer.makeRequest(url);
console.log(`Success: ${url}`);
} catch (error) {
console.error(`Error: ${url} - ${error.message}`);
}
}
console.log('\n=== Statistics ===');
console.log(balancer.getStats());
}
parseProducts();
特定タスクの最適化:スクレイピング、API、自動化
異なるタスクには異なる負荷分散戦略が必要です。マーケットプレイス(Wildberries、Ozon、Avito)のスクレイピングには、リクエスト数によるローテーションと地理的分散を伴う戦略が最適です。ロシアの異なる都市からのレジデンシャルプロキシを使用し、50-100リクエストごとにプロキシを変更し、人間の行動を模倣するためにリクエスト間にランダムな遅延(1-3秒)を追加します。
ソーシャルネットワークAPI(Instagram、Facebook、VK)での作業には、1つのセッション内でのIPアドレスの安定性が重要です。IPハッシュまたはスティッキーセッションアルゴリズムを使用します:1つのアカウントのすべてのリクエストは同じプロキシを経由する必要があります。これにより、アカウントのブロックを引き起こす可能性のある疑わしい地理位置の変更を防ぎます。ソーシャルネットワークはレジデンシャルやデータセンターよりもモバイルIPを信頼するため、モバイルプロキシが推奨されます。
# Instagram用のスティッキーセッションの例
class StickySessionBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.session_map = {} # {account_id: proxy_url}
self.proxy_usage = defaultdict(int)
def get_proxy_for_account(self, account_id):
"""アカウント用の固定プロキシを返す"""
if account_id in self.session_map:
return self.session_map[account_id]
# 最も負荷の少ないプロキシを選択
proxy = min(self.proxies, key=lambda p: self.proxy_usage[p])
self.session_map[account_id] = proxy
self.proxy_usage[proxy] += 1
return proxy
def release_account(self, account_id):
"""アカウントでの作業完了時にプロキシを解放"""
if account_id in self.session_map:
proxy = self.session_map[account_id]
self.proxy_usage[proxy] -= 1
del self.session_map[account_id]
# Instagramでの使用
balancer = StickySessionBalancer([
"http://mobile-proxy1.com:8080",
"http://mobile-proxy2.com:8080",
"http://mobile-proxy3.com:8080"
])
def work_with_instagram_account(account_id, actions):
proxy = balancer.get_proxy_for_account(account_id)
try:
# すべてのアクションを同じプロキシ経由で実行
for action in actions:
response = requests.post(
action["url"],
data=action["data"],
proxies={"http": proxy, "https": proxy}
)
# レスポンスの処理
finally:
balancer.release_account(account_id)
検索エンジン(Google、Yandex、Bing)のスクレイピングには、最も厳格なアプローチが必要です。検索エンジンには高度なボット検出システムがあり、わずかな異常も検出します。各プロキシで1分あたり8-12リクエストを超えないようにし、リクエスト間に10-20秒のランダムな遅延を追加し、実際のブラウザのUser-Agentとヘッダーを使用します。最良の結果を得るには、レジデンシャルプロキシとモバイルプロキシを組み合わせて使用します。
eコマースプラットフォーム(Amazon、eBay、AliExpress)での価格監視には、地理的分散が重要です。多くのプラットフォームは地域によって異なる価格を表示します。ターゲット地域(国、都市)ごとに個別のプロキシプールを作成し、各地域のプールを独立して管理します。これにより、すべての地域から正確なデータを収集できます。
実践的な推奨事項: 複数のマーケットプレイスから同時にスクレイピングする場合、各プラットフォーム用に個別のプロキシプールを作成します。たとえば、Wildberries用に20プロキシ、Ozon用に15プロキシ、Avito用に10プロキシ。これにより、1つのプラットフォームでのブロックが他のプラットフォームでの作業に影響を与えません。
API自動化(Telegram、WhatsApp、Discord)には、各アカウントのIPアドレスの一貫性が必要です。スティッキーセッション戦略を使用し、アカウントごとに個別のプロキシを割り当てます。アカウントが長期間(数日または数週間)同じIPから動作する場合、プラットフォームはそれを通常のユーザーアクティビティと見なします。頻繁なIP変更は疑いを引き起こし、ブロックにつながる可能性があります。
まとめ
プロキシ経由の負荷分散は、スクレイピング、API自動化、大量データ処理のための高負荷システムの基盤です。適切に設定された負荷分散システムは、ブロックから保護し、パフォーマンスを向上させ、信頼性を確保し、水平方向のスケーリングを可能にします。
主要な原則:タスクに適したアルゴリズム(ラウンドロビン、最小接続数、重み付け、スティッキーセッション)を選択し、レート制限を設定してターゲットプラットフォームの制限を超えないようにし、ヘルスチェックを実装してプロキシの可用性を監視し、メトリクスを収集してシステムを最適化し、負荷に応じて自動スケーリングを使用します。
プロキシの品質は成功の鍵です。信頼できるプロバイダーからの高品質なレジデンシャルプロキシとモバイルプロキシを使用してください。安価な低品質のプロキシは、頻繁なブロック、低速、不安定な動作につながり、最終的にはより高いコストとプロジェクトの失敗につながります。
小規模から始めて、徐々にスケールアップしてください。最初に10-20のプロキシでシステムをテストし、メトリクスを収集し、ボトルネックを特定し、設定を最適化します。その後、プロキシの数を増やし、新しいアルゴリズムを追加し、機能を拡張します。これにより、エラーを回避し、安定したシステムを構築できます。