كيفية إصلاح أخطاء timeout عند العمل عبر الوكيل
تعطل الطلب، انهار السكريبت مع خطأ TimeoutError، لم يتم الحصول على البيانات. موقف مألوف؟ أخطاء timeout عبر الوكيل - واحدة من أكثر المشاكل شيوعاً عند الكشط والأتمتة. دعنا نحلل الأسباب ونقدم حلولاً محددة.
لماذا تحدث أخطاء timeout
Timeout ليست مشكلة واحدة، بل هي عرض. قبل العلاج، عليك فهم السبب:
خادم وكيل بطيء. يضيف خادم مثقل أو وكيل بعيد جغرافياً تأخيراً لكل طلب. إذا كانت مهلتك الزمنية 10 ثوان والوكيل يستجيب في 12 ثانية - خطأ.
الحجب من جانب الموقع المستهدف. قد يعلق الموقع الطلبات المريبة عن قصد بدلاً من الرفض الصريح. هذه تكتيك ضد البوتات - إبقاء الاتصال مفتوحاً بلا نهاية.
مشاكل DNS. يجب على الوكيل حل اسم النطاق. إذا كان خادم DNS للوكيل بطيئاً أو غير متاح - يتعطل الطلب في مرحلة الاتصال.
إعدادات المهلات الزمنية غير الصحيحة. مهلة زمنية واحدة عامة لكل شيء - خطأ شائع. Connect timeout و read timeout - أشياء مختلفة، وتحتاج إلى إعدادها بشكل منفصل.
مشاكل الشبكة. فقدان الحزم، اتصال الوكيل غير المستقر، مشاكل التوجيه - كل هذا يؤدي إلى المهلات الزمنية.
أنواع المهلات الزمنية وإعدادها
تدعم معظم مكتبات HTTP عدة أنواع من المهلات الزمنية. فهم الفرق بينها هو مفتاح الإعداد الصحيح.
Connect timeout
الوقت المسموح لإنشاء اتصال TCP مع الوكيل والخادم المستهدف. إذا كان الوكيل غير متاح أو الخادم لا يستجيب - ستعمل هذه المهلة الزمنية. القيمة الموصى بها: 5-10 ثوان.
Read timeout
وقت الانتظار للبيانات بعد إنشاء الاتصال. اتصل الخادم لكنه صامت - ستعمل مهلة القراءة. للصفحات العادية: 15-30 ثانية. لواجهات برمجية ثقيلة: 60+ ثانية.
Total timeout
الوقت الإجمالي للطلب من البداية إلى النهاية. تأمين ضد الاتصالات المعلقة. عادة: connect + read + احتياطي.
مثال على الإعداد في Python مع مكتبة requests:
import requests
proxies = {
"http": "http://user:pass@proxy.example.com:8080",
"https": "http://user:pass@proxy.example.com:8080"
}
# كورة: (connect_timeout, read_timeout)
timeout = (10, 30)
try:
response = requests.get(
"https://target-site.com/api/data",
proxies=proxies,
timeout=timeout
)
except requests.exceptions.ConnectTimeout:
print("فشل الاتصال بالوكيل أو الخادم")
except requests.exceptions.ReadTimeout:
print("لم يرسل الخادم البيانات في الوقت المناسب")
لـ aiohttp (Python غير متزامن):
import aiohttp
import asyncio
async def fetch_with_timeout():
timeout = aiohttp.ClientTimeout(
total=60, # المهلة الزمنية الإجمالية
connect=10, # للاتصال
sock_read=30 # لقراءة البيانات
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
"https://target-site.com/api/data",
proxy="http://user:pass@proxy.example.com:8080"
) as response:
return await response.text()
منطق إعادة المحاولة: النهج الصحيح
Timeout ليست دائماً خطأ قاتل. غالباً ما تنجح محاولة إعادة الطلب. لكن يجب إعادة المحاولة بحكمة.
التأخير الأسي
لا تقصف الخادم بطلبات متكررة بدون توقف. استخدم exponential backoff: كل محاولة تالية - بتأخير متزايد.
import requests
import time
import random
def fetch_with_retry(url, proxies, max_retries=3):
"""طلب مع إعادة محاولة وتأخير أسي"""
for attempt in range(max_retries):
try:
response = requests.get(
url,
proxies=proxies,
timeout=(10, 30)
)
response.raise_for_status()
return response
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError) as e:
if attempt == max_retries - 1:
raise # آخر محاولة - رمي الخطأ
# تأخير أسي: 1s, 2s, 4s...
# + jitter عشوائي لتجنب موجات الطلبات
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"المحاولة {attempt + 1} فشلت: {e}")
print(f"إعادة المحاولة بعد {delay:.1f} ثانية...")
time.sleep(delay)
مكتبة tenacity
لكود الإنتاج، من الأفضل استخدام حلول جاهزة:
from tenacity import retry, stop_after_attempt, wait_exponential
from tenacity import retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((
requests.exceptions.Timeout,
requests.exceptions.ConnectionError
))
)
def fetch_data(url, proxies):
response = requests.get(url, proxies=proxies, timeout=(10, 30))
response.raise_for_status()
return response.json()
تدوير الوكيل عند المهلات الزمنية
إذا كان وكيل واحد يعطي مهلات زمنية باستمرار - المشكلة فيه. الحل المنطقي: التبديل إلى آخر.
import requests
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class ProxyManager:
"""مدير الوكيل مع تتبع المحاولات الفاشلة"""
proxies: list
max_failures: int = 3
cooldown_seconds: int = 300
_failures: dict = field(default_factory=dict)
_cooldown_until: dict = field(default_factory=dict)
def get_proxy(self) -> Optional[str]:
"""الحصول على وكيل يعمل"""
current_time = time.time()
for proxy in self.proxies:
# تخطي الوكلاء في فترة الانتظار
if self._cooldown_until.get(proxy, 0) > current_time:
continue
return proxy
return None # جميع الوكلاء في فترة انتظار
def report_failure(self, proxy: str):
"""الإبلاغ عن طلب فاشل"""
self._failures[proxy] = self._failures.get(proxy, 0) + 1
if self._failures[proxy] >= self.max_failures:
# إرسال الوكيل إلى فترة انتظار
self._cooldown_until[proxy] = time.time() + self.cooldown_seconds
self._failures[proxy] = 0
print(f"الوكيل {proxy} أرسل إلى فترة انتظار")
def report_success(self, proxy: str):
"""إعادة تعيين عداد الأخطاء عند النجاح"""
self._failures[proxy] = 0
def fetch_with_rotation(url, proxy_manager, max_attempts=5):
"""طلب مع تبديل تلقائي للوكيل عند الأخطاء"""
for attempt in range(max_attempts):
proxy = proxy_manager.get_proxy()
if not proxy:
raise Exception("لا توجد وكلاء متاحة")
proxies = {"http": proxy, "https": proxy}
try:
response = requests.get(url, proxies=proxies, timeout=(10, 30))
response.raise_for_status()
proxy_manager.report_success(proxy)
return response
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
proxy_manager.report_failure(proxy)
print(f"انتهاء المهلة الزمنية عبر {proxy}، نحاول آخر...")
continue
raise Exception(f"فشل الحصول على البيانات بعد {max_attempts} محاولات")
عند استخدام وكلاء سكنية مع تدوير تلقائي، يتم تبسيط هذا المنطق - يقوم المزود بتبديل IP بنفسه عند كل طلب أو بفاصل زمني محدد.
الطلبات غير المتزامنة مع التحكم في المهلات الزمنية
عند الكشط الضخم، الطلبات المتزامنة غير فعالة. يسمح النهج غير المتزامن بمعالجة مئات عناوين URL بالتوازي، لكنه يتطلب عملاً حذراً مع المهلات الزمنية.
import aiohttp
import asyncio
from typing import List, Tuple
async def fetch_one(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore
) -> Tuple[str, str | None, str | None]:
"""تحميل عنوان URL واحد مع معالجة انتهاء المهلة الزمنية"""
async with semaphore: # تحديد التوازي
try:
async with session.get(url) as response:
content = await response.text()
return (url, content, None)
except asyncio.TimeoutError:
return (url, None, "timeout")
except aiohttp.ClientError as e:
return (url, None, str(e))
async def fetch_all(
urls: List[str],
proxy: str,
max_concurrent: int = 10
) -> List[Tuple[str, str | None, str | None]]:
"""تحميل ضخم مع التحكم في المهلات الزمنية والتوازي"""
timeout = aiohttp.ClientTimeout(total=45, connect=10, sock_read=30)
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(
limit=max_concurrent,
limit_per_host=5 # لا أكثر من 5 اتصالات لكل مضيف
)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector
) as session:
# تعيين الوكيل لجميع الطلبات
tasks = [
fetch_one(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
# الإحصائيات
success = sum(1 for _, content, _ in results if content)
timeouts = sum(1 for _, _, error in results if error == "timeout")
print(f"نجح: {success}, المهلات الزمنية: {timeouts}")
return results
# الاستخدام
async def main():
urls = [f"https://example.com/page/{i}" for i in range(100)]
results = await fetch_all(
urls,
proxy="http://user:pass@proxy.example.com:8080",
max_concurrent=10
)
asyncio.run(main())
مهم: لا تضع توازياً عالياً جداً. 50-100 طلب متزامن عبر وكيل واحد - بالفعل الكثير. من الأفضل 10-20 مع عدة وكلاء.
التشخيص: كيفية العثور على السبب
قبل تغيير الإعدادات، حدد مصدر المشكلة.
الخطوة 1: تحقق من الوكيل مباشرة
# اختبار بسيط عبر curl مع قياس الوقت
curl -x http://user:pass@proxy:8080 \
-w "Connect: %{time_connect}s\nTotal: %{time_total}s\n" \
-o /dev/null -s \
https://httpbin.org/get
إذا كان time_connect أكثر من 5 ثوان - المشكلة في الوكيل أو الشبكة إليه.
الخطوة 2: قارن مع الطلب المباشر
import requests
import time
def measure_request(url, proxies=None):
start = time.time()
try:
r = requests.get(url, proxies=proxies, timeout=30)
elapsed = time.time() - start
return f"موافق: {elapsed:.2f}s، الحالة: {r.status_code}"
except Exception as e:
elapsed = time.time() - start
return f"فشل: {elapsed:.2f}s، خطأ: {type(e).__name__}"
url = "https://target-site.com"
proxy = {"http": "http://proxy:8080", "https": "http://proxy:8080"}
print("مباشرة:", measure_request(url))
print("عبر الوكيل:", measure_request(url, proxy))
الخطوة 3: تحقق من أنواع وكلاء مختلفة
قد تعتمد المهلات الزمنية على نوع الوكيل:
| نوع الوكيل | التأخير النموذجي | المهلة الزمنية الموصى بها |
|---|---|---|
| مركز البيانات | 50-200 مللي ثانية | الاتصال: 5s، القراءة: 15s |
| سكنية | 200-800 مللي ثانية | الاتصال: 10s، القراءة: 30s |
| الهاتف المحمول | 300-1500 مللي ثانية | الاتصال: 15s، القراءة: 45s |
الخطوة 4: سجل التفاصيل
import logging
import requests
from requests.adapters import HTTPAdapter
# تفعيل تسجيل التصحيح
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.DEBUG)
# الآن سترى جميع مراحل الطلب:
# - حل DNS
# - إنشاء الاتصال
# - إرسال الطلب
# - استقبال الرد
قائمة التحقق لحل أخطاء timeout
خوارزمية سريعة للإجراء عند حدوث المهلات الزمنية:
- حدد نوع المهلة الزمنية - الاتصال أم القراءة؟ هذه مشاكل مختلفة.
- تحقق من الوكيل بشكل منفصل - هل يعمل على الإطلاق؟ ما التأخير؟
- زيادة المهلات الزمنية - ربما القيم عدوانية جداً لنوع الوكيل الخاص بك.
- أضف إعادة محاولة مع backoff - المهلات الزمنية الفردية طبيعية، المهم هو الاستقرار.
- قم بإعداد التدوير - قم بالتبديل التلقائي إلى وكيل آخر عند المشاكل.
- حدد التوازي - عدد كبير جداً من الطلبات المتزامنة يثقل الوكيل.
- تحقق من الموقع المستهدف - قد يحجب أو يخنق طلباتك.
الخلاصة
أخطاء timeout عبر الوكيل - مشكلة قابلة للحل. في معظم الحالات، يكفي إعداد المهلات الزمنية بشكل صحيح حسب نوع الوكيل، وإضافة منطق إعادة محاولة، وتنفيذ التدوير عند الفشل. للمهام ذات المتطلبات العالية للاستقرار، استخدم وكلاء سكنية مع تدوير تلقائي - المزيد على proxycove.com.