Khi bạn parsing hàng ngàn trang của các marketplace, gửi hàng loạt API requests hoặc tự động hóa công việc với hàng trăm tài khoản, việc phân phối tải đúng cách qua proxy trở nên cực kỳ quan trọng. Nếu không có cân bằng hợp lý, bạn sẽ gặp phải các vấn đề như bị chặn, timeout và giảm hiệu suất. Trong hướng dẫn này, chúng ta sẽ phân tích kiến trúc phân phối tải, các thuật toán cân bằng và các chiến lược thực tiễn cho các hệ thống có tải cao.
Tài liệu này được thiết kế cho các nhà phát triển và chuyên gia kỹ thuật làm việc với việc parsing dữ liệu, tự động hóa API requests hoặc quản lý các nhóm proxy lớn cho các nhiệm vụ kinh doanh.
Tại sao cần cân bằng tải qua proxy
Cân bằng tải qua proxy giải quyết một số vấn đề quan trọng của các hệ thống có tải cao. Vấn đề đầu tiên và quan trọng nhất là bảo vệ khỏi bị chặn. Khi bạn gửi hàng ngàn yêu cầu đến một tài nguyên (marketplace, API của mạng xã hội, công cụ tìm kiếm), máy chủ mục tiêu sẽ thấy hoạt động bất thường cao từ một địa chỉ IP và chặn nó. Phân phối các yêu cầu giữa hàng chục hoặc hàng trăm proxy làm cho hoạt động của bạn giống như hành động của người dùng bình thường.
Vấn đề thứ hai là hiệu suất. Một proxy server có băng thông giới hạn (thường là 100-1000 Mbit/s) và có thể xử lý một số lượng kết nối đồng thời hạn chế. Khi parsing 10,000 trang mỗi phút hoặc gửi hàng loạt API requests, một proxy sẽ trở thành điểm nghẽn của hệ thống. Cân bằng tải cho phép mở rộng băng thông theo chiều ngang bằng cách thêm các proxy mới vào nhóm.
Vấn đề thứ ba là độ tin cậy. Nếu một trong các proxy gặp sự cố (sự cố kỹ thuật, bị chặn, hết hạn thuê), hệ thống sẽ tự động chuyển hướng lưu lượng đến các proxy đang hoạt động. Nếu không có cơ chế cân bằng và health checks, sự cố của một proxy có thể dừng toàn bộ hệ thống.
Ví dụ thực tế: Khi parsing Wildberries để theo dõi giá của đối thủ, bạn cần xử lý 50,000 sản phẩm mỗi giờ. Điều này tương đương với khoảng 14 yêu cầu mỗi giây. Một proxy có thể chịu được tải này, nhưng Wildberries sẽ chặn IP sau 100-200 yêu cầu từ một địa chỉ. Bằng cách phân phối các yêu cầu giữa 20 proxy cư trú, bạn giảm tải cho mỗi IP xuống còn 0.7 yêu cầu mỗi giây — điều này giống như hoạt động của một người dùng bình thường.
Lý do thứ tư là phân phối địa lý. Nhiều dịch vụ hiển thị nội dung hoặc giá khác nhau tùy thuộc vào khu vực của người dùng. Cân bằng giữa các proxy từ các quốc gia và thành phố khác nhau cho phép thu thập dữ liệu từ tất cả các khu vực mục tiêu cùng một lúc. Ví dụ, để theo dõi giá trên Ozon, bạn cần các proxy từ Moscow, Saint Petersburg, Yekaterinburg và các thành phố lớn khác.
Kiến trúc của hệ thống phân phối tải
Kiến trúc cổ điển của hệ thống cân bằng tải qua proxy bao gồm một số thành phần. Ở cấp độ cao nhất là bộ cân bằng tải (load balancer) — một mô-đun phần mềm nhận các nhiệm vụ đầu vào (yêu cầu parsing, API calls) và phân phối chúng giữa các proxy có sẵn. Bộ cân bằng tải có thể hoạt động như một dịch vụ riêng biệt hoặc được tích hợp vào ứng dụng.
Thành phần thứ hai là quản lý nhóm proxy (proxy pool manager). Nó lưu trữ danh sách tất cả các proxy có sẵn cùng với các đặc điểm của chúng: địa chỉ IP, cổng, giao thức (HTTP/SOCKS5), vị trí địa lý, loại (cư trú, di động, trung tâm dữ liệu), trạng thái hiện tại (hoạt động, bị chặn, đang kiểm tra). Quản lý nhóm chịu trách nhiệm thêm các proxy mới, xóa các proxy không hoạt động và kiểm tra định kỳ tính khả dụng.
Thành phần thứ ba là hệ thống giám sát và health checks. Nó liên tục kiểm tra tính khả dụng của từng proxy: gửi các yêu cầu thử nghiệm, đo thời gian phản hồi, kiểm tra tính thành công của kết nối. Nếu proxy không phản hồi hoặc trả về lỗi, hệ thống đánh dấu nó là không khả dụng và tạm thời loại trừ khỏi vòng quay.
| Thành phần | Chức năng | Công nghệ |
|---|---|---|
| Load Balancer | Phân phối yêu cầu giữa các proxy | HAProxy, Nginx, thư viện Python/Node.js |
| Proxy Pool Manager | Quản lý danh sách proxy | Redis, PostgreSQL, kho nhớ trong |
| Health Check System | Kiểm tra tính khả dụng của proxy | Nhiệm vụ theo lịch, Celery, cron |
| Rate Limiter | Kiểm soát tần suất yêu cầu | Thuật toán Token bucket, leaky bucket |
| Monitoring & Metrics | Thu thập các chỉ số hiệu suất | Prometheus, Grafana, ELK Stack |
Thành phần thứ tư là rate limiter (giới hạn tần suất yêu cầu). Nó theo dõi để đảm bảo rằng mỗi proxy không vượt quá tần suất cho phép khi truy cập vào tài nguyên mục tiêu. Ví dụ, nếu bạn đang parsing Instagram và biết rằng nền tảng này chặn IP sau 60 yêu cầu mỗi phút, rate limiter sẽ không cho phép gửi quá 50 yêu cầu mỗi phút qua một proxy, để lại một khoảng an toàn.
Thành phần thứ năm là hệ thống chỉ số và phân tích. Nó thu thập dữ liệu về hiệu suất của từng proxy: số lượng yêu cầu thành công, số lượng lỗi, thời gian phản hồi trung bình, tỷ lệ bị chặn. Những dữ liệu này được sử dụng để tối ưu hóa các thuật toán cân bằng và phát hiện các proxy gặp sự cố.
Các thuật toán cân bằng: Round Robin, Least Connections, Weighted
Thuật toán Round Robin là phương pháp cân bằng đơn giản và phổ biến nhất. Nó lần lượt duyệt qua các proxy trong danh sách: yêu cầu đầu tiên đi qua proxy số 1, yêu cầu thứ hai qua proxy số 2, yêu cầu thứ ba qua proxy số 3, và cứ như vậy. Khi danh sách kết thúc, bộ cân bằng tải quay lại đầu. Thuật toán này phân phối tải một cách đồng đều nếu tất cả các proxy có hiệu suất giống nhau.
class RoundRobinBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# Sử dụng
proxies = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080"
]
balancer = RoundRobinBalancer(proxies)
for i in range(10):
proxy = balancer.get_next_proxy()
print(f"Yêu cầu {i+1} → {proxy}")
Thuật toán Least Connections (số lượng kết nối ít nhất) chọn proxy có số lượng kết nối hoạt động ít nhất tại thời điểm hiện tại. Điều này hữu ích khi các yêu cầu có thời gian thực hiện khác nhau. Ví dụ, trong khi parsing, một yêu cầu có thể được xử lý trong 100 ms, trong khi yêu cầu khác có thể mất 5 giây (nếu trang tải chậm). Least Connections tự động hướng các yêu cầu mới đến các proxy ít tải hơn.
class LeastConnectionsBalancer:
def __init__(self, proxies):
self.proxies = {proxy: 0 for proxy in proxies}
def get_next_proxy(self):
# Chọn proxy có số lượng kết nối ít nhất
proxy = min(self.proxies.items(), key=lambda x: x[1])[0]
self.proxies[proxy] += 1
return proxy
def release_proxy(self, proxy):
# Giảm số đếm khi hoàn thành yêu cầu
self.proxies[proxy] -= 1
# Sử dụng với quản lý ngữ cảnh
balancer = LeastConnectionsBalancer(proxies)
def make_request(url):
proxy = balancer.get_next_proxy()
try:
# Thực hiện yêu cầu qua proxy đã chọn
response = requests.get(url, proxies={"http": proxy})
return response
finally:
balancer.release_proxy(proxy)
Weighted Round Robin (cân bằng vòng có trọng số) mở rộng Round Robin cổ điển bằng cách gán cho mỗi proxy một trọng số tùy thuộc vào hiệu suất hoặc chất lượng của nó. Các proxy có trọng số cao hơn nhận được nhiều yêu cầu hơn. Điều này hữu ích khi bạn có các proxy có chất lượng khác nhau: ví dụ, các proxy cư trú cao cấp với tốc độ cao và các proxy trung tâm dữ liệu giá rẻ với các hạn chế.
class WeightedRoundRobinBalancer:
def __init__(self, weighted_proxies):
# weighted_proxies = [(proxy, weight), ...]
self.proxies = []
for proxy, weight in weighted_proxies:
# Thêm proxy vào danh sách theo trọng số
self.proxies.extend([proxy] * weight)
self.current_index = 0
def get_next_proxy(self):
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
# Sử dụng với trọng số
weighted_proxies = [
("http://premium-proxy1.com:8080", 5), # chất lượng cao
("http://premium-proxy2.com:8080", 5),
("http://cheap-proxy1.com:8080", 2), # chất lượng thấp
("http://cheap-proxy2.com:8080", 1) # chất lượng rất thấp
]
balancer = WeightedRoundRobinBalancer(weighted_proxies)
Thuật toán Random (chọn ngẫu nhiên) chọn proxy ngẫu nhiên từ danh sách có sẵn. Nó đơn giản hơn trong việc triển khai so với Round Robin và hoạt động tốt khi có nhiều proxy (trên 100). Phân phối ngẫu nhiên tự động được cân bằng khi có đủ khối lượng yêu cầu. Nhược điểm là có thể có những khoảng thời gian ngắn không đồng đều về tải.
IP Hash là thuật toán chọn proxy dựa trên hash của URL mục tiêu hoặc một tham số khác. Điều này đảm bảo rằng các yêu cầu đến cùng một tài nguyên luôn đi qua cùng một proxy. Điều này hữu ích cho các trang web sử dụng phiên hoặc yêu cầu thứ tự của các yêu cầu từ một IP (ví dụ: xác thực + lấy dữ liệu).
Quản lý các nhóm proxy: xoay vòng và health checks
Quản lý hiệu quả một nhóm proxy yêu cầu giám sát liên tục tình trạng của từng proxy và xoay vòng tự động. Health check là việc kiểm tra định kỳ tính khả dụng của proxy bằng cách gửi một yêu cầu thử nghiệm. Thường sử dụng một yêu cầu GET đơn giản đến một dịch vụ đáng tin cậy (ví dụ: httpbin.org hoặc một endpoint của riêng bạn), trả về thông tin về địa chỉ IP.
import requests
import time
from datetime import datetime
class ProxyHealthChecker:
def __init__(self, test_url="http://httpbin.org/ip", timeout=10):
self.test_url = test_url
self.timeout = timeout
def check_proxy(self, proxy_url):
"""Kiểm tra tính khả dụng của proxy"""
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=self.timeout
)
response_time = time.time() - start_time
if response.status_code == 200:
return {
"status": "healthy",
"response_time": response_time,
"timestamp": datetime.now(),
"ip": response.json().get("origin")
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"timestamp": datetime.now()
}
except requests.exceptions.Timeout:
return {
"status": "unhealthy",
"error": "timeout",
"timestamp": datetime.now()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now()
}
# Sử dụng
checker = ProxyHealthChecker()
proxies = ["http://proxy1.com:8080", "http://proxy2.com:8080"]
for proxy in proxies:
result = checker.check_proxy(proxy)
print(f"{proxy}: {result['status']} ({result.get('response_time', 'N/A')}s)")
Hệ thống quản lý nhóm cần tự động đánh dấu các proxy không hoạt động là không khả dụng và kiểm tra lại định kỳ. Một chiến lược phổ biến là mô hình circuit breaker: sau ba lần kiểm tra không thành công liên tiếp, proxy sẽ bị loại khỏi nhóm trong 5-10 phút, sau đó sẽ được kiểm tra lại. Nếu kiểm tra thành công, proxy sẽ được đưa trở lại nhóm hoạt động.
class ProxyPoolManager:
def __init__(self, health_checker, max_failures=3, cooldown_seconds=300):
self.health_checker = health_checker
self.max_failures = max_failures
self.cooldown_seconds = cooldown_seconds
self.proxies = {} # {proxy_url: ProxyInfo}
def add_proxy(self, proxy_url, metadata=None):
"""Thêm proxy vào nhóm"""
self.proxies[proxy_url] = {
"url": proxy_url,
"status": "active",
"failures": 0,
"last_check": None,
"cooldown_until": None,
"metadata": metadata or {}
}
def get_active_proxies(self):
"""Trả về danh sách các proxy hoạt động"""
now = datetime.now()
active = []
for proxy_url, info in self.proxies.items():
# Kiểm tra xem proxy có đang trong thời gian cooldown không
if info["cooldown_until"] and now < info["cooldown_until"]:
continue
if info["status"] == "active":
active.append(proxy_url)
return active
def mark_failure(self, proxy_url):
"""Đánh dấu một lần sử dụng proxy không thành công"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] += 1
if info["failures"] >= self.max_failures:
# Chuyển sang trạng thái cooldown
info["status"] = "cooldown"
info["cooldown_until"] = datetime.now() + timedelta(seconds=self.cooldown_seconds)
print(f"Proxy {proxy_url} chuyển sang trạng thái cooldown đến {info['cooldown_until']}")
def mark_success(self, proxy_url):
"""Đánh dấu một lần sử dụng proxy thành công"""
if proxy_url not in self.proxies:
return
info = self.proxies[proxy_url]
info["failures"] = 0
info["status"] = "active"
info["cooldown_until"] = None
Xoay vòng proxy là một chiến lược thay đổi tự động proxy qua các khoảng thời gian nhất định hoặc sau một số lượng yêu cầu nhất định. Có một số cách tiếp cận: xoay vòng theo thời gian (thay đổi mỗi 5-10 phút), xoay vòng theo số lượng yêu cầu (thay đổi sau 100-500 yêu cầu), xoay vòng theo phiên (một proxy cho một phiên parsing). Việc chọn chiến lược phụ thuộc vào yêu cầu của trang web mục tiêu.
Mẹo: Đối với việc parsing các marketplace (Wildberries, Ozon), chiến lược tối ưu là xoay vòng theo số lượng yêu cầu: 50-100 yêu cầu cho mỗi proxy, sau đó thay đổi. Đối với việc làm việc với API của các mạng xã hội (Instagram, Facebook), tốt hơn là sử dụng xoay vòng theo phiên: một proxy cho toàn bộ chu trình xác thực → hành động → thoát.
Giới hạn tốc độ và kiểm soát tần suất yêu cầu
Giới hạn tốc độ là một thành phần quan trọng của hệ thống cân bằng tải. Nó ngăn chặn việc vượt quá tần suất yêu cầu cho phép đến tài nguyên mục tiêu, điều này có thể dẫn đến việc chặn proxy. Có hai thuật toán chính: Token Bucket (thùng token) và Leaky Bucket (thùng rò rỉ).
Token Bucket hoạt động như sau: mỗi proxy có một "thùng" ảo chứa các token. Mỗi token cho phép một yêu cầu. Thùng sẽ dần được lấp đầy bằng các token với một tốc độ nhất định (ví dụ: 10 token mỗi giây). Sức chứa tối đa của thùng là có giới hạn (ví dụ: 50 token). Khi một yêu cầu đến, hệ thống kiểm tra sự có mặt của các token: nếu có — lấy một token và thực hiện yêu cầu, nếu không — chờ cho đến khi có token mới.
import time
from threading import Lock
class TokenBucketRateLimiter:
def __init__(self, rate, capacity):
"""
rate: số lượng token mỗi giây
capacity: số lượng tối đa token trong thùng
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = Lock()
def _add_tokens(self):
"""Thêm token dựa trên thời gian đã trôi qua"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def acquire(self, tokens=1):
"""Cố gắng lấy số lượng token đã chỉ định"""
with self.lock:
self._add_tokens()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens=1):
"""Chờ cho đến khi có token và lấy chúng"""
while not self.acquire(tokens):
# Tính toán thời gian chờ
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
# Sử dụng cho mỗi proxy
proxy_limiters = {
"http://proxy1.com:8080": TokenBucketRateLimiter(rate=10, capacity=50),
"http://proxy2.com:8080": TokenBucketRateLimiter(rate=10, capacity=50)
}
def make_request_with_limit(url, proxy_url):
limiter = proxy_limiters[proxy_url]
limiter.wait_and_acquire() # Chờ cho đến khi có token
response = requests.get(url, proxies={"http": proxy_url})
return response
Leaky Bucket (thùng rò rỉ) hoạt động khác: các yêu cầu được đưa vào hàng đợi (thùng), hàng đợi này "rò rỉ" với một tốc độ cố định. Nếu thùng bị tràn, các yêu cầu mới sẽ bị từ chối hoặc đặt trong trạng thái chờ. Thuật toán này đảm bảo phân phối tải đồng đều theo thời gian, nhưng có thể dẫn đến độ trễ trong các đợt hoạt động cao.
Các nền tảng mục tiêu khác nhau yêu cầu các giới hạn khác nhau. API Instagram cho phép khoảng 200 yêu cầu mỗi giờ cho mỗi IP (khoảng 3.3 yêu cầu mỗi phút). Wildberries chặn sau 100-200 yêu cầu mỗi phút từ một IP. Google Search cho phép 10-20 yêu cầu mỗi phút. Hệ thống giới hạn tốc độ của bạn cần xem xét các hạn chế này và thêm một khoảng an toàn (thường là 20-30%).
| Nền tảng | Giới hạn yêu cầu | Cài đặt đề xuất |
|---|---|---|
| ~200 yêu cầu/giờ | 2-3 yêu cầu/phút (có khoảng an toàn) | |
| Wildberries | 100-200 yêu cầu/phút | 60-80 yêu cầu/phút |
| Google Search | 10-20 yêu cầu/phút | 8-12 yêu cầu/phút |
| Ozon | 50-100 yêu cầu/phút | 30-50 yêu cầu/phút |
| Facebook API | 200 yêu cầu/giờ | 2-3 yêu cầu/phút |
Giám sát hiệu suất và tự động mở rộng quy mô
Một hệ thống cân bằng tải hiệu quả yêu cầu giám sát liên tục các chỉ số chính. Nhóm chỉ số đầu tiên là hiệu suất của proxy: thời gian phản hồi trung bình (response time), tỷ lệ yêu cầu thành công (success rate), số lượng timeout, số lượng lỗi 4xx và 5xx. Những dữ liệu này cho phép phát hiện các proxy gặp sự cố và tối ưu hóa các thuật toán cân bằng.
class ProxyMetrics:
def __init__(self):
self.metrics = {} # {proxy_url: metrics_dict}
def record_request(self, proxy_url, response_time, status_code, success):
"""Ghi lại các chỉ số yêu cầu"""
if proxy_url not in self.metrics:
self.metrics[proxy_url] = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_response_time": 0,
"timeouts": 0,
"errors_4xx": 0,
"errors_5xx": 0
}
m = self.metrics[proxy_url]
m["total_requests"] += 1
if success:
m["successful_requests"] += 1
m["total_response_time"] += response_time
else:
m["failed_requests"] += 1
if status_code == 0: # timeout
m["timeouts"] += 1
elif 400 <= status_code < 500:
m["errors_4xx"] += 1
elif 500 <= status_code < 600:
m["errors_5xx"] += 1
def get_success_rate(self, proxy_url):
"""Trả về tỷ lệ yêu cầu thành công"""
m = self.metrics.get(proxy_url, {})
total = m.get("total_requests", 0)
if total == 0:
return 0
return (m.get("successful_requests", 0) / total) * 100
def get_avg_response_time(self, proxy_url):
"""Trả về thời gian phản hồi trung bình"""
m = self.metrics.get(proxy_url, {})
successful = m.get("successful_requests", 0)
if successful == 0:
return 0
return m.get("total_response_time", 0) / successful
def get_report(self, proxy_url):
"""Trả về báo cáo đầy đủ cho proxy"""
m = self.metrics.get(proxy_url, {})
return {
"proxy": proxy_url,
"total_requests": m.get("total_requests", 0),
"success_rate": self.get_success_rate(proxy_url),
"avg_response_time": self.get_avg_response_time(proxy_url),
"timeouts": m.get("timeouts", 0),
"errors_4xx": m.get("errors_4xx", 0),
"errors_5xx": m.get("errors_5xx", 0)
}
Nhóm chỉ số thứ hai là hiệu suất tổng thể của hệ thống: tổng số yêu cầu mỗi giây (RPS — requests per second), độ trễ trung bình (latency), kích thước hàng đợi yêu cầu, số lượng proxy đang hoạt động, tỷ lệ sử dụng nhóm proxy. Những chỉ số này cho thấy hệ thống có thể xử lý tải hay không hoặc cần mở rộng quy mô.
Tự động mở rộng quy mô (auto-scaling) cho phép thêm hoặc xóa proxy một cách linh hoạt tùy thuộc vào tải. Chiến lược đơn giản: nếu tải trung bình của nhóm vượt quá 80% trong 5 phút, hệ thống tự động thêm các proxy mới. Nếu tải giảm xuống dưới 30% trong 15 phút, hệ thống sẽ xóa các proxy dư thừa để tiết kiệm tài nguyên.
Ví dụ về thiết lập giám sát: Để parsing 100,000 sản phẩm trên Wildberries mỗi giờ (khoảng 28 yêu cầu mỗi giây), bạn sẽ cần ít nhất 30-40 proxy với giới hạn 60 yêu cầu mỗi phút cho mỗi proxy. Thiết lập cảnh báo: nếu tỷ lệ thành công giảm xuống dưới 85% hoặc thời gian phản hồi trung bình vượt quá 3 giây, hệ thống nên tự động thêm 10-15 proxy dự phòng từ nhóm.
Để trực quan hóa các chỉ số, hãy sử dụng Grafana với Prometheus hoặc ELK Stack. Tạo các bảng điều khiển với các biểu đồ: RPS theo thời gian, phân phối thời gian phản hồi, top 10 proxy nhanh nhất/chậm nhất, bản đồ lỗi theo loại. Điều này sẽ giúp nhanh chóng phát hiện các vấn đề và tối ưu hóa hệ thống.
Triển khai thực tiễn trên Python và Node.js
Hãy xem xét một triển khai hoàn chỉnh của hệ thống cân bằng tải trên Python với việc sử dụng các thư viện phổ biến. Ví dụ này kết hợp tất cả các thành phần đã mô tả: bộ cân bằng tải, quản lý nhóm, health checks, giới hạn tốc độ và giám sát.
import requests
import time
import random
from datetime import datetime, timedelta
from threading import Lock, Thread
from collections import defaultdict
class ProxyLoadBalancer:
def __init__(self, proxies, algorithm="round_robin", rate_limit=10):
"""
proxies: danh sách proxy [{"url": "...", "weight": 1}, ...]
algorithm: round_robin, least_connections, weighted, random
rate_limit: số lượng yêu cầu tối đa mỗi giây cho proxy
"""
self.proxies = proxies
self.algorithm = algorithm
self.rate_limit = rate_limit
# Trạng thái của bộ cân bằng tải
self.current_index = 0
self.connections = defaultdict(int)
self.rate_limiters = {}
self.metrics = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0,
"response_times": [], "last_check": None
})
# Khởi tạo các rate limiters
for proxy in proxies:
proxy_url = proxy["url"]
self.rate_limiters[proxy_url] = TokenBucketRateLimiter(
rate=rate_limit,
capacity=rate_limit * 5
)
self.lock = Lock()
# Khởi động vòng lặp kiểm tra sức khỏe nền
self.health_check_thread = Thread(target=self._health_check_loop, daemon=True)
self.health_check_thread.start()
def get_next_proxy(self):
"""Chọn proxy tiếp theo theo thuật toán"""
with self.lock:
if self.algorithm == "round_robin":
return self._round_robin()
elif self.algorithm == "least_connections":
return self._least_connections()
elif self.algorithm == "weighted":
return self._weighted_random()
elif self.algorithm == "random":
return random.choice(self.proxies)["url"]
def _round_robin(self):
proxy = self.proxies[self.current_index]["url"]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def _least_connections(self):
min_conn = min(self.connections.values()) if self.connections else 0
candidates = [p["url"] for p in self.proxies if self.connections[p["url"]] == min_conn]
return random.choice(candidates)
def _weighted_random(self):
weights = [p.get("weight", 1) for p in self.proxies]
return random.choices(self.proxies, weights=weights)[0]["url"]
def make_request(self, url, method="GET", **kwargs):
"""Thực hiện yêu cầu qua bộ cân bằng tải"""
proxy_url = self.get_next_proxy()
# Chờ cho đến khi có giới hạn tốc độ
self.rate_limiters[proxy_url].wait_and_acquire()
# Tăng số đếm kết nối
with self.lock:
self.connections[proxy_url] += 1
try:
start_time = time.time()
response = requests.request(
method,
url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=kwargs.get("timeout", 30),
**kwargs
)
response_time = time.time() - start_time
# Ghi lại các chỉ số
self._record_metrics(proxy_url, response_time, True, response.status_code)
return response
except Exception as e:
self._record_metrics(proxy_url, 0, False, 0)
raise
finally:
# Giảm số đếm kết nối
with self.lock:
self.connections[proxy_url] -= 1
def _record_metrics(self, proxy_url, response_time, success, status_code):
"""Ghi lại các chỉ số yêu cầu"""
with self.lock:
m = self.metrics[proxy_url]
m["total"] += 1
if success:
m["success"] += 1
m["response_times"].append(response_time)
# Chỉ lưu trữ 1000 giá trị gần nhất
if len(m["response_times"]) > 1000:
m["response_times"].pop(0)
else:
m["failed"] += 1
def _health_check_loop(self):
"""Kiểm tra sức khỏe nền của các proxy"""
while True:
for proxy in self.proxies:
proxy_url = proxy["url"]
try:
response = requests.get(
"http://httpbin.org/ip",
proxies={"http": proxy_url},
timeout=10
)
with self.lock:
self.metrics[proxy_url]["last_check"] = datetime.now()
proxy["status"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
with self.lock:
proxy["status"] = "unhealthy"
time.sleep(60) # Kiểm tra mỗi phút
def get_stats(self):
"""Trả về thống kê cho tất cả các proxy"""
stats = []
with self.lock:
for proxy in self.proxies:
proxy_url = proxy["url"]
m = self.metrics[proxy_url]
avg_response_time = (
sum(m["response_times"]) / len(m["response_times"])
if m["response_times"] else 0
)
success_rate = (
(m["success"] / m["total"] * 100)
if m["total"] > 0 else 0
)
stats.append({
"proxy": proxy_url,
"status": proxy.get("status", "unknown"),
"total_requests": m["total"],
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 3),
"active_connections": self.connections[proxy_url]
})
return stats
Sử dụng bộ cân bằng tải này để parsing marketplace:
# Cấu hình bộ cân bằng tải
proxies = [
{"url": "http://proxy1.example.com:8080", "weight": 5},
{"url": "http://proxy2.example.com:8080", "weight": 5},
{"url": "http://proxy3.example.com:8080", "weight": 3},
{"url": "http://proxy4.example.com:8080", "weight": 2}
]
balancer = ProxyLoadBalancer(
proxies=proxies,
algorithm="weighted",
rate_limit=60 # 60 yêu cầu mỗi phút cho mỗi proxy
)
# Parsing danh sách sản phẩm
product_urls = [f"https://www.wildberries.ru/catalog/{i}/detail.aspx" for i in range(1000)]
results = []
for url in product_urls:
try:
response = balancer.make_request(url)
# Xử lý phản hồi
results.append({"url": url, "status": "success", "data": response.text})
except Exception as e:
results.append({"url": url, "status": "error", "error": str(e)})
# Mỗi 100 yêu cầu in ra thống kê
if len(results) % 100 == 0:
stats = balancer.get_stats()
for stat in stats:
print(f"{stat['proxy']}: {stat['success_rate']}% thành công, "
f"{stat['avg_response_time']}s thời gian phản hồi trung bình")
# Thống kê cuối cùng
print("\n=== Thống kê cuối cùng ===")
for stat in balancer.get_stats():
print(f"{stat['proxy']}:")
print(f" Tổng số yêu cầu: {stat['total_requests']}")
print(f" Tỷ lệ thành công: {stat['success_rate']}%")
print(f" Thời gian phản hồi trung bình: {stat['avg_response_time']}s")
print(f" Trạng thái: {stat['status']}")
Đối với Node.js, bạn có thể sử dụng kiến trúc tương tự với các thư viện axios cho các yêu cầu HTTP và node-rate-limiter để kiểm soát tần suất:
const axios = require('axios');
const { RateLimiter } = require('limiter');
class ProxyLoadBalancer {
constructor(proxies, algorithm = 'round_robin', rateLimit = 10) {
this.proxies = proxies;
this.algorithm = algorithm;
this.currentIndex = 0;
this.connections = new Map();
this.limiters = new Map();
this.metrics = new Map();
// Khởi tạo các rate limiters
proxies.forEach(proxy => {
this.limiters.set(proxy.url, new RateLimiter({
tokensPerInterval: rateLimit,
interval: 'second'
}));
this.connections.set(proxy.url, 0);
this.metrics.set(proxy.url, {
total: 0,
success: 0,
failed: 0,
responseTimes: []
});
});
}
getNextProxy() {
if (this.algorithm === 'round_robin') {
const proxy = this.proxies[this.currentIndex].url;
this.currentIndex = (this.currentIndex + 1) % this.proxies.length;
return proxy;
} else if (this.algorithm === 'least_connections') {
let minConn = Infinity;
let selectedProxy = null;
this.connections.forEach((count, proxy) => {
if (count < minConn) {
minConn = count;
selectedProxy = proxy;
}
});
return selectedProxy;
}
}
async makeRequest(url, options = {}) {
const proxyUrl = this.getNextProxy();
const limiter = this.limiters.get(proxyUrl);
// Chờ cho đến khi có giới hạn tốc độ
await limiter.removeTokens(1);
// Tăng số đếm kết nối
this.connections.set(proxyUrl, this.connections.get(proxyUrl) + 1);
try {
const startTime = Date.now();
const response = await axios({
url,
proxy: this.parseProxyUrl(proxyUrl),
timeout: options.timeout || 30000,
...options
});
const responseTime = (Date.now() - startTime) / 1000;
this.recordMetrics(proxyUrl, responseTime, true);
return response;
} catch (error) {
this.recordMetrics(proxyUrl, 0, false);
throw error;
} finally {
this.connections.set(proxyUrl, this.connections.get(proxyUrl) - 1);
}
}
parseProxyUrl(proxyUrl) {
const url = new URL(proxyUrl);
return {
host: url.hostname,
port: parseInt(url.port)
};
}
recordMetrics(proxyUrl, responseTime, success) {
const m = this.metrics.get(proxyUrl);
m.total++;
if (success) {
m.success++;
m.responseTimes.push(responseTime);
if (m.responseTimes.length > 1000) {
m.responseTimes.shift();
}
} else {
m.failed++;
}
}
getStats() {
const stats = [];
this.proxies.forEach(proxy => {
const m = this.metrics.get(proxy.url);
const avgResponseTime = m.responseTimes.length > 0
? m.responseTimes.reduce((a, b) => a + b, 0) / m.responseTimes.length
: 0;
const successRate = m.total > 0 ? (m.success / m.total * 100) : 0;
stats.push({
proxy: proxy.url,
totalRequests: m.total,
successRate: successRate.toFixed(2),
avgResponseTime: avgResponseTime.toFixed(3),
activeConnections: this.connections.get(proxy.url)
});
});
return stats;
}
}
// Sử dụng
const proxies = [
{ url: 'http://proxy1.example.com:8080', weight: 5 },
{ url: 'http://proxy2.example.com:8080', weight: 5 }
];
const balancer = new ProxyLoadBalancer(proxies, 'round_robin', 60);
async function parseProducts() {
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://www.wildberries.ru/catalog/${i}/detail.aspx`
);
for (const url of urls) {
try {
const response = await balancer.makeRequest(url);
console.log(`Thành công: ${url}`);
} catch (error) {
console.error(`Lỗi: ${url} - ${error.message}`);
}
}
console.log('\n=== Thống kê ===');
console.log(balancer.getStats());
}
parseProducts();
Tối ưu hóa cho các nhiệm vụ cụ thể: parsing, API, tự động hóa
Các nhiệm vụ khác nhau yêu cầu các chiến lược cân bằng tải khác nhau. Đối với việc parsing các marketplace (Wildberries, Ozon, Avito), chiến lược tối ưu là xoay vòng theo số lượng yêu cầu và phân phối địa lý. Sử dụng proxy cư trú từ các thành phố khác nhau ở Nga, thay đổi proxy mỗi 50-100 yêu cầu, thêm độ trễ ngẫu nhiên giữa các yêu cầu (1-3 giây) để mô phỏng hành vi của con người.
Đối với việc làm việc với API của các mạng xã hội (Instagram, Facebook, VK), độ ổn định của địa chỉ IP trong một phiên là rất quan trọng. Sử dụng thuật toán IP Hash hoặc sticky sessions: tất cả các yêu cầu của một tài khoản phải đi qua cùng một proxy. Điều này ngăn chặn sự thay đổi địa lý đáng ngờ, có thể dẫn đến việc chặn tài khoản. Các proxy di động được khuyến nghị, vì các mạng xã hội tin tưởng các IP di động hơn là các IP cư trú hoặc trung tâm dữ liệu.
# Ví dụ về sticky sessions cho Instagram
class StickySessionBalancer:
def __init__(self, proxies):
self.proxies = proxies
self.session_map = {} # {account_id: proxy_url}
self.proxy_usage = defaultdict(int)
def get_proxy_for_account(self, account_id):
"""Trả về proxy cố định cho tài khoản"""
if account_id in self.session_map:
return self.session_map[account_id]
# Chọn proxy ít tải nhất
proxy = min(self.proxies, key=lambda p: self.proxy_usage[p])
self.session_map[account_id] = proxy
self.proxy_usage[proxy] += 1
return proxy
def release_account(self, account_id):
"""Giải phóng proxy khi hoàn thành công việc với tài khoản"""
if account_id in self.session_map:
proxy = self.session_map[account_id]
self.proxy_usage[proxy] -= 1
del self.session_map[account_id]