زمانی که شما هزاران صفحه از بازارها را پارس میکنید، درخواستهای API انبوهی ارسال میکنید یا کار با صدها حساب کاربری را خودکار میکنید، توزیع صحیح بار از طریق پروکسی به شدت مهم میشود. بدون تعادل مناسب، با مسدود شدن، زمانهای تایماوت و کاهش عملکرد مواجه خواهید شد. در این راهنما به بررسی معماری توزیع بار، الگوریتمهای تعادل و استراتژیهای عملی برای سیستمهای با بار بالا خواهیم پرداخت.
این مطلب برای توسعهدهندگان و متخصصان فنی که با پارس دادهها، خودکارسازی درخواستهای API یا مدیریت مجموعههای بزرگ پروکسی برای وظایف تجاری کار میکنند، طراحی شده است.
چرا به تعادل بار از طریق پروکسی نیاز داریم
تعادل بار از طریق پروکسی چندین مشکل بحرانی سیستمهای با بار بالا را حل میکند. اولین و مهمترین مشکل — حفاظت از مسدود شدن است. وقتی شما هزاران درخواست به یک منبع (بازار، API شبکه اجتماعی، موتور جستجو) ارسال میکنید، سرور هدف فعالیت غیرعادی بالایی را از یک آدرس IP مشاهده میکند و آن را مسدود میکند. توزیع درخواستها بین دهها یا صدها پروکسی فعالیت شما را شبیه به اقدامات کاربران عادی میکند.
مشکل دوم — عملکرد است. یک سرور پروکسی دارای ظرفیت محدود است (معمولاً 100-1000 مگابیت بر ثانیه) و میتواند تعداد محدودی از اتصالات همزمان را پردازش کند. هنگام پارس 10000 صفحه در دقیقه یا ارسال انبوه درخواستهای API، یک پروکسی به گلوگاه سیستم تبدیل میشود. تعادل بار اجازه میدهد تا ظرفیت به صورت افقی مقیاسگذاری شود و پروکسیهای جدید به مجموعه اضافه شوند.
سومین مشکل — قابلیت اطمینان است. اگر یکی از پروکسیها از کار بیفتد (خرابی فنی، مسدود شدن، پایان مدت اجاره)، سیستم به طور خودکار ترافیک را به پروکسیهای فعال هدایت میکند. بدون مکانیزم تعادل و بررسی سلامت، از کار افتادن یک پروکسی میتواند کل سیستم را متوقف کند.
مثال واقعی: هنگام پارس Wildberries به منظور نظارت بر قیمتهای رقبای خود، شما باید 50000 کالا را هر ساعت پردازش کنید. این تقریباً 14 درخواست در ثانیه است. یک پروکسی چنین باری را تحمل میکند، اما Wildberries IP را پس از 100-200 درخواست از یک آدرس مسدود میکند. با توزیع درخواستها بین 20 پروکسیهای مسکونی، بار هر IP را به 0.7 درخواست در ثانیه کاهش میدهید — این شبیه به فعالیت یک کاربر عادی به نظر میرسد.
چهارمین دلیل — توزیع جغرافیایی است. بسیاری از خدمات محتوای متفاوت یا قیمتهای مختلفی را بسته به منطقه کاربر نشان میدهند. تعادل بین پروکسیهای از کشورهای مختلف و شهرها به شما این امکان را میدهد که به طور همزمان دادهها را از تمام مناطق هدف جمعآوری کنید. به عنوان مثال، برای نظارت بر قیمتها در Ozon به پروکسیهایی از مسکو، سنپترزبورگ، یکتیرینبورگ و دیگر شهرهای بزرگ نیاز دارید.
معماری سیستم توزیع بار
معماری کلاسیک سیستم تعادل بار از طریق پروکسی شامل چندین مؤلفه است. در سطح بالاتر، یک تعادلساز (load balancer) وجود دارد — ماژول نرمافزاری که وظایف ورودی (درخواستهای پارس، فراخوانیهای API) را میپذیرد و آنها را بین پروکسیهای موجود توزیع میکند. تعادلساز میتواند به عنوان یک سرویس جداگانه کار کند یا در برنامه گنجانده شود.
مؤلفه دوم — مدیر مجموعه پروکسی (proxy pool manager) است. او لیستی از تمام پروکسیهای موجود با ویژگیهای آنها را ذخیره میکند: آدرس IP، پورت، پروتکل (HTTP/SOCKS5)، موقعیت جغرافیایی، نوع (مسکونی، موبایل، دیتاسنتر)، وضعیت فعلی (فعال، مسدود، در حال بررسی). مدیر مجموعه مسئول اضافه کردن پروکسیهای جدید، حذف پروکسیهای غیرعملی و بررسی دورهای در دسترس بودن است.
مؤلفه سوم — سیستم نظارت و بررسی سلامت است. این سیستم به طور مداوم عملکرد هر پروکسی را بررسی میکند: درخواستهای آزمایشی ارسال میکند، زمان پاسخ را اندازهگیری میکند و موفقیت اتصال را بررسی میکند. اگر پروکسی پاسخ ندهد یا خطاهایی را برگرداند، سیستم آن را به عنوان غیرقابل دسترسی علامتگذاری کرده و به طور موقت از چرخش خارج میکند.
| مؤلفه | عملکرد | تکنولوژیها |
|---|---|---|
| Load Balancer | توزیع درخواستها بین پروکسیها | HAProxy، Nginx، کتابخانههای Python/Node.js |
| Proxy Pool Manager | مدیریت لیست پروکسیها | Redis، PostgreSQL، ذخیرهسازی در حافظه |
| Health Check System | بررسی در دسترس بودن پروکسی | وظایف زمانبندی شده، Celery، cron |
| Rate Limiter | کنترل فرکانس درخواستها | الگوریتمهای Token bucket، leaky bucket |
| Monitoring & Metrics | جمعآوری معیارهای عملکرد | Prometheus، Grafana، ELK Stack |
مؤلفه چهارم — محدودکننده نرخ (rate limiter) است. او نظارت میکند که هر پروکسی از فرکانس مجاز درخواستها به منبع هدف فراتر نرود. به عنوان مثال، اگر شما Instagram را پارس میکنید و میدانید که پلتفرم IP را پس از 60 درخواست در دقیقه مسدود میکند، محدودکننده نرخ اجازه نمیدهد که بیش از 50 درخواست در دقیقه از طریق یک پروکسی ارسال شود و فضای ایمنی را حفظ میکند.
مؤلفه پنجم — سیستم معیارها و تجزیه و تحلیل است. او دادههایی درباره عملکرد هر پروکسی جمعآوری میکند: تعداد درخواستهای موفق، تعداد خطاها، زمان پاسخ متوسط، درصد مسدود شدنها. این دادهها برای بهینهسازی الگوریتمهای تعادل و شناسایی پروکسیهای مشکلدار استفاده میشوند.
الگوریتمهای تعادل: Round Robin، Least Connections، Weighted
الگوریتم Round Robin — سادهترین و رایجترین روش تعادل است. او به صورت متوالی پروکسیها را از لیست بررسی میکند: اولین درخواست از طریق پروکسی شماره 1، دومین درخواست از طریق پروکسی شماره 2، سومین درخواست از طریق پروکسی شماره 3 و به همین ترتیب. وقتی لیست تمام میشود، تعادلساز به ابتدای لیست برمیگردد. این الگوریتم بار را به طور یکنواخت توزیع میکند، اگر همه پروکسیها عملکرد یکسانی داشته باشند.
class RoundRobinBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# استفاده
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
balancer = RoundRobinBalancer(proxies)
for i in range(10):
proxy = balancer.get_next_proxy()
print(f"Request {i+1} → {proxy}")
الگوریتم Least Connections (کمترین تعداد اتصالات) پروکسی را با کمترین تعداد اتصالات فعال در حال حاضر انتخاب میکند. این برای زمانی مفید است که درخواستها مدت زمان اجرای متفاوتی دارند. به عنوان مثال، هنگام پارس کردن، یک درخواست ممکن است 100 میلیثانیه پردازش شود، در حالی که دیگری 5 ثانیه طول میکشد (اگر صفحه به آرامی بارگذاری شود). Least Connections به طور خودکار درخواستهای جدید را به پروکسیهای کمتر بارگذاری شده هدایت میکند.
class LeastConnectionsBalancer:
def __init__(self, proxies):
self.proxies = {proxy: 0 for proxy in proxies}
def get_next_proxy(self):
# انتخاب پروکسی با کمترین تعداد اتصالات
proxy = min(self.proxies.items(), key=lambda x: x[1])[0]
self.proxies[proxy] += 1
return proxy
def release_proxy(self, proxy):
# کاهش شمارنده پس از اتمام درخواست
self.proxies[proxy] -= 1
# استفاده با مدیر زمینه
balancer = LeastConnectionsBalancer(proxies)
def make_request(url):
proxy = balancer.get_next_proxy()
try:
# انجام درخواست از طریق پروکسی انتخاب شده
response = requests.get(url, proxies={"http": proxy})
return response
finally:
balancer.release_proxy(proxy)
Weighted Round Robin (چرخش وزنی) Round Robin کلاسیک را گسترش میدهد و به هر پروکسی وزنی بر اساس عملکرد یا کیفیت آن اختصاص میدهد. پروکسیهای با وزن بیشتر درخواستهای بیشتری دریافت میکنند. این برای زمانی مفید است که شما پروکسیهای با کیفیت متفاوت دارید: به عنوان مثال، پروکسیهای مسکونی پریمیوم با سرعت بالا و پروکسیهای ارزان دیتاسنتر با محدودیتها.
class WeightedRoundRobinBalancer:
def __init__(self, weighted_proxies):
# weighted_proxies = [(proxy, weight), ...]
self.proxies = []
for proxy, weight in weighted_proxies:
# اضافه کردن پروکسی به لیست به تعداد weight
self.proxies.extend([proxy] * weight)
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# استفاده با وزنها
weighted_proxies = [
("http://premium-proxy1.com:8080", 5), # کیفیت بالا
("http://premium-proxy2.com:8080", 5),
("http://cheap-proxy1.com:8080", 2), # کیفیت پایین
("http://cheap-proxy2.com:8080", 1) # کیفیت بسیار پایین
]
balancer = WeightedRoundRobinBalancer(weighted_proxies)
الگوریتم Random (انتخاب تصادفی) پروکسی را به صورت تصادفی از لیست موجود انتخاب میکند. این در پیادهسازی سادهتر از Round Robin است و در زمان وجود تعداد زیادی پروکسی (بیش از 100) به خوبی کار میکند. توزیع تصادفی به طور خودکار با حجم کافی درخواستها متعادل میشود. معایب آن — ممکن است دورههای کوتاه بار نامتعادل وجود داشته باشد.
IP Hash — الگوریتمی است که پروکسی را بر اساس هش URL هدف یا پارامتر دیگر انتخاب میکند. این تضمین میکند که درخواستها به یک منبع خاص همیشه از طریق یک پروکسی خاص انجام میشوند. این برای سایتهایی که از نشستها استفاده میکنند یا نیاز به توالی درخواستها از یک IP دارند (به عنوان مثال، احراز هویت + دریافت دادهها) مفید است.
مدیریت مجموعههای پروکسی: چرخش و بررسی سلامت
مدیریت مؤثر مجموعه پروکسی نیاز به نظارت مداوم بر وضعیت هر پروکسی و چرخش خودکار دارد. بررسی سلامت — بررسی دورهای در دسترس بودن پروکسی با ارسال یک درخواست آزمایشی است. معمولاً از یک درخواست GET ساده به یک سرویس قابل اعتماد (به عنوان مثال، httpbin.org یا نقطه پایانی خود) استفاده میشود که اطلاعاتی درباره آدرس IP برمیگرداند.
import requests
import time
from datetime import datetime
class ProxyHealthChecker:
def __init__(self, test_url="http://httpbin.org/ip", timeout=10):
self.test_url = test_url
self.timeout = timeout
def check_proxy(self, proxy_url):
"""بررسی میکند که پروکسی کار میکند"""
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
return {
"status": "healthy",
"response_time": response_time,
"timestamp": datetime.now(),
"ip": response.json().get("origin")
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"timestamp": datetime.now()
}
except requests.exceptions.Timeout:
return {
"status": "unhealthy",
"error": "timeout",
"timestamp": datetime.now()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now()
}
# استفاده
checker = ProxyHealthChecker()
proxies = ["http://proxy1.com:8080", "http://proxy2.com:8080"]
for proxy in proxies:
result = checker.check_proxy(proxy)
print(f"{proxy}: {result['status']} ({result.get('response_time', 'N/A')}s)")
سیستم مدیریت مجموعه باید به طور خودکار پروکسیهای غیرعملی را به عنوان غیرقابل دسترسی علامتگذاری کرده و به طور دورهای بررسی کند. استراتژی رایج — الگوی circuit breaker: پس از سه بررسی ناموفق متوالی، پروکسی به مدت 5-10 دقیقه از مجموعه خارج میشود و سپس دوباره بررسی میشود. اگر بررسی موفقیتآمیز بود، پروکسی به مجموعه فعال بازمیگردد.
class ProxyPoolManager:
def __init__(self, health_checker, max_failures=3, cooldown_seconds=300):
self.health_checker = health_checker
self.max_failures = max_failures
self.cooldown_seconds = cooldown_seconds
self.proxies = {} # {proxy_url: ProxyInfo}
def add_proxy(self, proxy_url, metadata=None):
"""اضافه کردن پروکسی به مجموعه"""
self.proxies[proxy_url] = {
"url": proxy_url,
"status": "active",
"failures": 0,
"last_check": None,
"cooldown_until": None,
"metadata": metadata or {}
}
def get_active_proxies(self):
"""بازگشت لیست پروکسیهای فعال"""
now = datetime.now()
active = []
for proxy_url, info in self.proxies.items():
# بررسی میکند که آیا پروکسی در حالت cooldown است
if info["cooldown_until"] and now < info["cooldown_until"]:
continue
if info["status"] == "active":
active.append(proxy_url)
return active
def mark_failure(self, proxy_url):
"""علامتگذاری تلاش ناموفق استفاده از پروکسی"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] += 1
if info["failures"] >= self.max_failures:
# انتقال به حالت cooldown
info["status"] = "cooldown"
info["cooldown_until"] = datetime.now() + timedelta(seconds=self.cooldown_seconds)
print(f"پروکسی {proxy_url} به حالت cooldown منتقل شد تا {info['cooldown_until']}")
def mark_success(self, proxy_url):
"""علامتگذاری استفاده موفق از پروکسی"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] = 0
info["status"] = "active"
info["cooldown_until"] = None
چرخش پروکسی — استراتژی تغییر خودکار پروکسی در فواصل معین یا پس از تعداد مشخصی از درخواستها است. چندین رویکرد وجود دارد: چرخش بر اساس زمان (تغییر هر 5-10 دقیقه)، چرخش بر اساس تعداد درخواستها (تغییر پس از 100-500 درخواست)، چرخش بر اساس نشستها (یک پروکسی برای یک نشست پارس). انتخاب استراتژی بستگی به الزامات سایت هدف دارد.
نکته: برای پارس کردن بازارها (Wildberries، Ozon) بهترین استراتژی چرخش بر اساس تعداد درخواستها است: 50-100 درخواست برای هر پروکسی، سپس تغییر. برای کار با API شبکههای اجتماعی (Instagram، Facebook) بهتر است از چرخش بر اساس نشستها استفاده کنید: یک پروکسی برای کل چرخه احراز هویت → اقدامات → خروج.
محدودیت نرخ و کنترل فرکانس درخواستها
محدودیت نرخ — مؤلفهای بحرانی از سیستم تعادل بار است. این از تجاوز به فرکانس مجاز درخواستها به منبع هدف جلوگیری میکند که ممکن است منجر به مسدود شدن پروکسی شود. دو الگوریتم اصلی وجود دارد: Token Bucket (سطل توکن) و Leaky Bucket (سطل نشتی).
Token Bucket به این صورت کار میکند: هر پروکسی یک "سطل" مجازی با توکنها دارد. هر توکن حق یک درخواست را میدهد. سطل به تدریج با توکنها با سرعت مشخصی پر میشود (به عنوان مثال، 10 توکن در ثانیه). حداکثر ظرفیت سطل محدود است (به عنوان مثال، 50 توکن). وقتی یک درخواست میآید، سیستم وجود توکنها را بررسی میکند: اگر وجود داشته باشد — یک توکن را برمیدارد و درخواست را انجام میدهد، اگر نه — منتظر ظهور یک توکن جدید میماند.
import time
from threading import Lock
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: تعداد توکنها در ثانیه
capacity: حداکثر تعداد توکنها در سطل
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""توکنها را بر اساس زمان گذشته اضافه میکند"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def acquire(self, tokens=1):
"""تلاش برای دریافت تعداد مشخصی توکن"""
with self.lock:
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens=1):
"""منتظر میماند تا توکنها ظاهر شوند و آنها را دریافت میکند"""
while not self.acquire(tokens):
# محاسبه زمان انتظار
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
# استفاده برای هر پروکسی
proxy_limiters = {
"http://proxy1.com:8080": TokenBucketRateLimiter(rate=10, capacity=50),
"http://proxy2.com:8080": TokenBucketRateLimiter(rate=10, capacity=50)
}
def make_request_with_limit(url, proxy_url):
limiter = proxy_limiters[proxy_url]
limiter.wait_and_acquire() # منتظر میمانیم تا توکن در دسترس باشد
response = requests.get(url, proxies={"http": proxy_url})
return response
Leaky Bucket (سطل نشتی) به طور متفاوتی کار میکند: درخواستها در یک صف (سطل) قرار میگیرند که با سرعت ثابت "نشت" میکند. اگر سطل پر شود، درخواستهای جدید رد میشوند یا در انتظار قرار میگیرند. این الگوریتم توزیع بار را در طول زمان به طور یکنواخت تضمین میکند، اما ممکن است در زمانهای اوج فعالیت منجر به تأخیر شود.
برای پلتفرمهای هدف مختلف نیاز به محدودیتهای متفاوتی است. API Instagram حدود 200 درخواست در ساعت برای هر IP مجاز میدارد (حدود 3.3 درخواست در دقیقه). Wildberries پس از 100-200 درخواست در دقیقه از یک IP مسدود میکند. Google Search حدود 10-20 درخواست در دقیقه مجاز میدارد. سیستم محدودیت نرخ شما باید این محدودیتها را در نظر بگیرد و فضای ایمنی (معمولاً 20-30%) اضافه کند.
| پلتفرم | محدودیت درخواستها | تنظیمات پیشنهادی |
|---|---|---|
| ~200 درخواست/ساعت | 2-3 درخواست/دقیقه (با احتساب فضای ایمنی) | |
| Wildberries | 100-200 درخواست/دقیقه | 60-80 درخواست/دقیقه |
| Google Search | 10-20 درخواست/دقیقه | 8-12 درخواست/دقیقه |
| Ozon | 50-100 درخواست/دقیقه | 30-50 درخواست/دقیقه |
| Facebook API | 200 درخواست/ساعت | 2-3 درخواست/دقیقه |
نظارت بر عملکرد و مقیاسگذاری خودکار
یک سیستم تعادل بار مؤثر نیاز به نظارت مداوم بر معیارهای کلیدی دارد. گروه اول معیارها — عملکرد پروکسی: زمان پاسخ متوسط (response time)، درصد درخواستهای موفق (success rate)، تعداد تایماوتها، تعداد خطاهای 4xx و 5xx. این دادهها به شناسایی پروکسیهای مشکلدار و بهینهسازی الگوریتمهای تعادل کمک میکند.
class ProxyMetrics:
def __init__(self):
self.metrics = {} # {proxy_url: metrics_dict}
def record_request(self, proxy_url, response_time, status_code, success):
"""ضبط معیارهای درخواست"""
if proxy_url not in self.metrics:
self.metrics[proxy_url] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_response_time": 0,
"timeouts": 0,
"errors_4xx": 0,
"errors_5xx": 0
}
m = self.metrics[proxy_url]
m["total_requests"] += 1
if success:
m["successful_requests"] += 1
m["total_response_time"] += response_time
else:
m["failed_requests"] += 1
if status_code == 0: # تایماوت
m["timeouts"] += 1
elif 400 <= status_code < 500:
m["errors_4xx"] += 1
elif 500 <= status_code < 600:
m["errors_5xx"] += 1
def get_success_rate(self, proxy_url):
"""درصد درخواستهای موفق را برمیگرداند"""
m = self.metrics.get(proxy_url, {})
total = m.get("total_requests", 0)
if total == 0:
return 0
return (m.get("successful_requests", 0) / total) * 100
def get_avg_response_time(self, proxy_url):
"""زمان پاسخ متوسط را برمیگرداند"""
m = self.metrics.get(proxy_url, {})
successful = m.get("successful_requests", 0)
if successful == 0:
return 0
return m.get("total_response_time", 0) / successful
def get_report(self, proxy_url):
"""گزارش کامل پروکسی را برمیگرداند"""
m = self.metrics.get(proxy_url, {})
return {
"proxy": proxy_url,
"total_requests": m.get("total_requests", 0),
"success_rate": self.get_success_rate(proxy_url),
"avg_response_time": self.get_avg_response_time(proxy_url),
"timeouts": m.get("timeouts", 0),
"errors_4xx": m.get("errors_4xx", 0),
"errors_5xx": m.get("errors_5xx", 0)
}
گروه دوم معیارها — عملکرد کلی سیستم: تعداد کل درخواستها در ثانیه (RPS — requests per second)، تأخیر متوسط (latency)، اندازه صف درخواستها، تعداد پروکسیهای فعال، درصد استفاده از مجموعه پروکسی. این معیارها نشان میدهند که آیا سیستم با بار مواجه است یا نیاز به مقیاسگذاری دارد.
مقیاسگذاری خودکار (auto-scaling) اجازه میدهد تا به صورت دینامیک پروکسیها را بسته به بار اضافه یا حذف کنید. استراتژی ساده: اگر بار متوسط مجموعه بیش از 80% به مدت 5 دقیقه باشد، سیستم به طور خودکار پروکسیهای جدیدی اضافه میکند. اگر بار به کمتر از 30% به مدت 15 دقیقه کاهش یابد، سیستم پروکسیهای اضافی را برای صرفهجویی در منابع حذف میکند.
مثال تنظیم نظارت: برای پارس کردن 100000 کالا از Wildberries در ساعت (تقریباً 28 درخواست در ثانیه) شما حداقل به 30-40 پروکسی با محدودیت 60 درخواست در دقیقه برای هر پروکسی نیاز دارید. هشدارها را تنظیم کنید: اگر درصد موفقیت کمتر از 85% شود یا زمان پاسخ متوسط بیش از 3 ثانیه باشد، سیستم باید به طور خودکار 10-15 پروکسی پشتیبان از مجموعه اضافه کند.
برای تجسم معیارها از Grafana با Prometheus یا ELK Stack استفاده کنید. داشبوردهایی با نمودارها ایجاد کنید: RPS بر اساس زمان، توزیع زمان پاسخ، 10 پروکسی سریعترین/کندترین، نقشه خطاها بر اساس نوع. این به شما کمک میکند تا به سرعت مشکلات را شناسایی کرده و سیستم را بهینهسازی کنید.
پیادهسازی عملی در Python و Node.js
بیایید پیادهسازی کامل سیستم تعادل بار را در Python با استفاده از کتابخانههای محبوب بررسی کنیم. این مثال تمام مؤلفههای توصیف شده را ترکیب میکند: تعادلساز، مدیر مجموعه، بررسی سلامت، محدودیت نرخ و نظارت.
import requests
import time
import random
from datetime import datetime, timedelta
from threading import Lock, Thread
from collections import defaultdict
class ProxyLoadBalancer:
def __init__(self, proxies, algorithm="round_robin", rate_limit=10):
"""
proxies: لیست پروکسیها [{"url": "...", "weight": 1}, ...]
algorithm: round_robin، least_connections، weighted، random
rate_limit: حداکثر تعداد درخواستها در ثانیه برای پروکسی
"""
self.proxies = proxies
self.algorithm = algorithm
self.rate_limit = rate_limit
# وضعیت تعادلساز
self.current_index = 0
self.connections = defaultdict(int)
self.rate_limiters = {}
self.metrics = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0,
"response_times": [], "last_check": None
})
# راهاندازی محدودکنندههای نرخ
for proxy in proxies:
proxy_url = proxy["url"]
self.rate_limiters[proxy_url] = TokenBucketRateLimiter(
rate=rate_limit,
capacity=rate_limit * 5
)
self.lock = Lock()
# راهاندازی بررسی سلامت در پسزمینه
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def get_next_proxy(self):
"""پروکسی بعدی را بر اساس الگوریتم انتخاب میکند"""
with self.lock:
if self.algorithm == "round_robin":
return self._round_robin()
elif self.algorithm == "least_connections":
return self._least_connections()
elif self.algorithm == "weighted":
return self._weighted_random()
elif self.algorithm == "random":
return random.choice(self.proxies)["url"]
def _round_robin(self):
proxy = self.proxies[self.current_index]["url"]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def _least_connections(self):
min_conn = min(self.connections.values()) if self.connections else 0
candidates = [p["url"] for p in self.proxies if self.connections[p["url"]] == min_conn]
return random.choice(candidates)
def _weighted_random(self):
weights = [p.get("weight", 1) for p in self.proxies]
return random.choices(self.proxies, weights=weights)[0]["url"]
def make_request(self, url, method="GET", **kwargs):
"""درخواست را از طریق تعادلساز انجام میدهد"""
proxy_url = self.get_next_proxy()
# منتظر میمانیم تا محدودیت نرخ در دسترس باشد
self.rate_limiters[proxy_url].wait_and_acquire()
# افزایش شمارنده اتصالات
with self.lock:
self.connections[proxy_url] += 1
try:
start_time = time.time()
response = requests.request(
method,
url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=kwargs.get("timeout", 30),
**kwargs
)
response_time = time.time() - start_time
# ضبط معیارها
self._record_metrics(proxy_url, response_time, True, response.status_code)
return response
except Exception as e:
self._record_metrics(proxy_url, 0, False, 0)
raise
finally:
# کاهش شمارنده اتصالات
with self.lock:
self.connections[proxy_url] -= 1
def _record_metrics(self, proxy_url, response_time, success, status_code):
"""ضبط معیارهای درخواست"""
with self.lock:
m = self.metrics[proxy_url]
m["total"] += 1
if success:
m["success"] += 1
m["response_times"].append(response_time)
# فقط آخرین 1000 مقدار را ذخیره میکنیم
if len(m["response_times"]) > 1000:
m["response_times"].pop(0)
else:
m["failed"] += 1
def _health_check_loop(self):
"""بررسی سلامت پروکسی در پسزمینه"""
while True:
for proxy in self.proxies:
proxy_url = proxy["url"]
try:
response = requests.get(
"http://httpbin.org/ip",
proxies={"http": proxy_url},
timeout=10
)
with self.lock:
self.metrics[proxy_url]["last_check"] = datetime.now()
proxy["status"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
with self.lock:
proxy["status"] = "unhealthy"
time.sleep(60) # بررسی هر دقیقه یک بار
def get_stats(self):
"""آمار را برای تمام پروکسیها برمیگرداند"""
stats = []
with self.lock:
for proxy in self.proxies:
proxy_url = proxy["url"]
m = self.metrics[proxy_url]
avg_response_time = (
sum(m["response_times"]) / len(m["response_times"])
if m["response_times"] else 0
)
success_rate = (
(m["success"] / m["total"] * 100)
if m["total"] > 0 else 0
)
stats.append({
"proxy": proxy_url,
"status": proxy.get("status", "unknown"),
"total_requests": m["total"],
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3),
"active_connections": self.connections[proxy_url]
})
return stats
استفاده از این تعادلساز برای پارس کردن بازار:
# تنظیم تعادلساز
proxies = [
{"url": "http://proxy1.example.com:8080", "weight": 5},
{"url": "http://proxy2.example.com:8080", "weight": 5},
{"url": "http://proxy3.example.com:8080", "weight": 3},
{"url": "http://proxy4.example.com:8080", "weight": 2}
]
balancer = ProxyLoadBalancer(
proxies=proxies,
algorithm="weighted",
rate_limit=60 # 60 درخواست در دقیقه برای هر پروکسی
)
# پارس کردن لیست کالاها
product_urls = [f"https://www.wildberries.ru/catalog/{i}/detail.aspx" for i in range(1000)]
results = []
for url in product_urls:
try:
response = balancer.make_request(url)
# پردازش پاسخ
results.append({"url": url, "status": "success", "data": response.text})
except Exception as e:
results.append({"url": url, "status": "error", "error": str(e)})
# هر 100 درخواست آمار را نمایش میدهیم
if len(results) % 100 == 0:
stats = balancer.get_stats()
for stat in stats:
print(f"{stat['proxy']}: {stat['success_rate']}% success, "
f"{stat['avg_response_time']}s avg response")
# آمار نهایی
print("\n=== آمار نهایی ===")
for stat in balancer.get_stats():
print(f"{stat['proxy']}:")
print(f" تعداد کل درخواستها: {stat['total_requests']}")
print(f" درصد موفقیت: {stat['success_rate']}%")
print(f" زمان پاسخ متوسط: {stat['avg_response_time']}s")
print(f" وضعیت: {stat['status']}")
برای Node.js میتوانید از معماری مشابهی با کتابخانههای axios برای درخواستهای HTTP و node-rate-limiter برای کنترل فرکانس استفاده کنید:
const axios = require('axios');
const { RateLimiter } = require('limiter');
class ProxyLoadBalancer {
constructor(proxies, algorithm = 'round_robin', rateLimit = 10) {
this.proxies = proxies;
this.algorithm = algorithm;
this.currentIndex = 0;
this.connections = new Map();
this.limiters = new Map();
this.metrics = new Map();
// راهاندازی محدودکنندههای نرخ
proxies.forEach(proxy => {
this.limiters.set(proxy.url, new RateLimiter({
tokensPerInterval: rateLimit,
interval: 'second'
}));
this.connections.set(proxy.url, 0);
this.metrics.set(proxy.url, {
total: 0,
success: 0,
failed: 0,
responseTimes: []
});
});
}
getNextProxy() {
if (this.algorithm === 'round_robin') {
const proxy = this.proxies[this.currentIndex].url;
this.currentIndex = (this.currentIndex + 1) % this.proxies.length;
return proxy;
} else if (this.algorithm === 'least_connections') {
let minConn = Infinity;
let selectedProxy = null;
this.connections.forEach((count, proxy) => {
if (count < minConn) {
minConn = count;
selectedProxy = proxy;
}
});
return selectedProxy;
}
}
async makeRequest(url, options = {}) {
const proxyUrl = this.getNextProxy();
const limiter = this.limiters.get(proxyUrl);
// منتظر میمانیم تا محدودیت نرخ در دسترس باشد
await limiter.removeTokens(1);
// افزایش شمارنده اتصالات
this.connections.set(proxyUrl, this.connections.get(proxyUrl) + 1);
try {
const startTime = Date.now();
const response = await axios({
url,
proxy: this.parseProxyUrl(proxyUrl),
timeout: options.timeout || 30000,
...options
});
const responseTime = (Date.now() - startTime) / 1000;
this.recordMetrics(proxyUrl, responseTime, true);
return response;
} catch (error) {
this.recordMetrics(proxyUrl, 0, false);
throw error;
} finally {
this.connections.set(proxyUrl, this.connections.get(proxyUrl) - 1);
}
}
parseProxyUrl(proxyUrl) {
const url = new URL(proxyUrl);
return {
host: url.hostname,
port: parseInt(url.port)
};
}
recordMetrics(proxyUrl, responseTime, success) {
const m = this.metrics.get(proxyUrl);
m.total++;
if (success) {
m.success++;
m.responseTimes.push(responseTime);
if (m.responseTimes.length > 1000) {
m.responseTimes.shift();
}
} else {
m.failed++;
}
}
getStats() {
const stats = [];
this.proxies.forEach(proxy => {
const m = this.metrics.get(proxy.url);
const avgResponseTime = m.responseTimes.length > 0
? m.responseTimes.reduce((a, b) => a + b, 0) / m.responseTimes.length
: 0;
const successRate = m.total > 0 ? (m.success / m.total * 100) : 0;
stats.push({
proxy: proxy.url,
totalRequests: m.total,
successRate: successRate.toFixed(2),
avgResponseTime: avgResponseTime.toFixed(3),
activeConnections: this.connections.get(proxy.url)
});
});
return stats;
}
}
// استفاده
const proxies = [
{ url: 'http://proxy1.example.com:8080', weight: 5 },
{ url: 'http://proxy2.example.com:8080', weight: 5 }
];
const balancer = new ProxyLoadBalancer(proxies, 'round_robin', 60);
async function parseProducts() {
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://www.wildberries.ru/catalog/${i}/detail.aspx`
);
for (const url of urls) {
try {
const response = await balancer.makeRequest(url);
console.log(`موفق: ${url}`);
} catch (error) {
console.error(`خطا: ${url} - ${error.message}`);
}
}
console.log('\n=== آمار ===');
console.log(balancer.getStats());
}
parseProducts();
بهینهسازی برای وظایف خاص: پارس، API، خودکارسازی
وظایف مختلف نیاز به استراتژیهای متفاوتی برای تعادل بار دارند. برای پارس کردن بازارها (Wildberries، Ozon، Avito) بهترین استراتژی چرخش بر اساس تعداد درخواستها و توزیع جغرافیایی است. از پروکسیهای مسکونی از شهرهای مختلف روسیه استفاده کنید، پروکسیها را هر 50-100 درخواست تغییر دهید و تأخیرهای تصادفی بین درخواستها (1-3 ثانیه) برای شبیهسازی رفتار انسان اضافه کنید.
برای کار با API شبکههای اجتماعی (Instagram، Facebook، VK) ثبات آدرس IP در طول یک نشست بسیار مهم است. از الگوریتم IP Hash یا نشستهای چسبنده (sticky sessions) استفاده کنید: تمام درخواستهای یک حساب باید از طریق یک پروکسی خاص انجام شوند. این از تغییر مشکوک مکان جغرافیایی جلوگیری میکند که ممکن است منجر به مسدود شدن حساب شود. پروکسیهای موبایل توصیه میشوند، زیرا شبکههای اجتماعی به IPهای موبایل بیشتر از پروکسیهای مسکونی یا دیتاسنتر اعتماد دارند.
# مثال نشستهای چسبنده برای Instagram
class StickySessionBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.session_map = {} # {account_id: proxy_url}
self.proxy_usage = defaultdict(int)
def get_proxy_for_account(self, account_id):
"""پروکسی ثابت برای حساب را برمیگرداند"""
if account_id in self.session_map:
return self.session_map[account_id]
# انتخاب پروکسی با کمترین بار
proxy = min(self.proxies, key=lambda p: self.proxy_usage[p])
self.session_map[account_id] = proxy
self.proxy_usage[proxy] += 1
return proxy
def release_account(self, account_id):
"""پروکسی را پس از اتمام کار با حساب آزاد میکند"""
if account_id in self.session_map:
proxy = self.session_map[account_id]
self.proxy_usage[proxy] -= 1
del self.session_map[account_id]